<a href="https://colab.research.google.com/github/apoorvaec1030/Data-Science-projects/blob/main/pyspark_basics.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Install Dependencies:

Java 8
Apache Spark with hadoop and
Findspark (used to locate the spark in the system)

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz
!tar xf spark-3.5.1-bin-hadoop3.tgz
!pip install -q findspark

Set Environment Variables:

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.1-bin-hadoop3"

In [3]:
!ls

sample_data  spark-3.5.1-bin-hadoop3  spark-3.5.1-bin-hadoop3.tgz


In [4]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark

Exploring the Dataset

In [None]:

# Downloading and preprocessing Cars Data downloaded origianlly from https://perso.telecom-paristech.fr/eagan/class/igr204/datasets
# Many of these datasets have been cleaned up by Petra Isenberg, Pierre Dragicevic and Yvonne Jansen
!wget https://jacobceles.github.io/knowledge_repo/colab_and_pyspark/cars.csv

In [6]:
!ls

cars.csv  sample_data  spark-3.5.1-bin-hadoop3	spark-3.5.1-bin-hadoop3.tgz


In [41]:
# Load data from csv to a dataframe.
# header=True means the first row is a header
# sep=';' means the column are seperated using ''

df=spark.read.csv('cars.csv', header=True, sep=';')
df.show(5)

+--------------------+----+---------+------------+----------+------+------------+-----+------+
|                 Car| MPG|Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|
+--------------------+----+---------+------------+----------+------+------------+-----+------+
|Chevrolet Chevell...|18.0|        8|       307.0|     130.0| 3504.|        12.0|   70|    US|
|   Buick Skylark 320|15.0|        8|       350.0|     165.0| 3693.|        11.5|   70|    US|
|  Plymouth Satellite|18.0|        8|       318.0|     150.0| 3436.|        11.0|   70|    US|
|       AMC Rebel SST|16.0|        8|       304.0|     150.0| 3433.|        12.0|   70|    US|
|         Ford Torino|17.0|        8|       302.0|     140.0| 3449.|        10.5|   70|    US|
+--------------------+----+---------+------------+----------+------+------------+-----+------+
only showing top 5 rows



Viewing the Dataframe
There are a couple of ways to view your dataframe(DF) in PySpark:

1. df.take(5) will return a list of five Row objects.
2. df.collect() will get all of the data from the entire DataFrame. Be really careful when using it, because if you have a large data set, you can easily crash the driver node.
3. df.show() is the most commonly used method to view a dataframe. There are a few parameters we can pass to this method, like the number of rows and truncaiton. For example, 4. df.show(5, False) or df.show(5, truncate=False) will show the entire data wihtout any truncation.
5. df.limit(5) will return a new DataFrame by taking the first n rows. As spark is distributed in nature, there is no guarantee that df.limit() will give you the same results each time.


In [8]:
df.show(5,False)

+-------------------------+----+---------+------------+----------+------+------------+-----+------+
|Car                      |MPG |Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|
+-------------------------+----+---------+------------+----------+------+------------+-----+------+
|Chevrolet Chevelle Malibu|18.0|8        |307.0       |130.0     |3504. |12.0        |70   |US    |
|Buick Skylark 320        |15.0|8        |350.0       |165.0     |3693. |11.5        |70   |US    |
|Plymouth Satellite       |18.0|8        |318.0       |150.0     |3436. |11.0        |70   |US    |
|AMC Rebel SST            |16.0|8        |304.0       |150.0     |3433. |12.0        |70   |US    |
|Ford Torino              |17.0|8        |302.0       |140.0     |3449. |10.5        |70   |US    |
+-------------------------+----+---------+------------+----------+------+------------+-----+------+
only showing top 5 rows



In [9]:
df.limit(5)

Car,MPG,Cylinders,Displacement,Horsepower,Weight,Acceleration,Model,Origin
Chevrolet Chevell...,18.0,8,307.0,130.0,3504.0,12.0,70,US
Buick Skylark 320,15.0,8,350.0,165.0,3693.0,11.5,70,US
Plymouth Satellite,18.0,8,318.0,150.0,3436.0,11.0,70,US
AMC Rebel SST,16.0,8,304.0,150.0,3433.0,12.0,70,US
Ford Torino,17.0,8,302.0,140.0,3449.0,10.5,70,US


Viewing Dataframe Columns

In [10]:
df.columns

['Car',
 'MPG',
 'Cylinders',
 'Displacement',
 'Horsepower',
 'Weight',
 'Acceleration',
 'Model',
 'Origin']

**Dataframe Schema**

There are two methods commonly used to view the data types of a dataframe:



In [11]:
df.dtypes

[('Car', 'string'),
 ('MPG', 'string'),
 ('Cylinders', 'string'),
 ('Displacement', 'string'),
 ('Horsepower', 'string'),
 ('Weight', 'string'),
 ('Acceleration', 'string'),
 ('Model', 'string'),
 ('Origin', 'string')]

In [12]:
df.printSchema()

root
 |-- Car: string (nullable = true)
 |-- MPG: string (nullable = true)
 |-- Cylinders: string (nullable = true)
 |-- Displacement: string (nullable = true)
 |-- Horsepower: string (nullable = true)
 |-- Weight: string (nullable = true)
 |-- Acceleration: string (nullable = true)
 |-- Model: string (nullable = true)
 |-- Origin: string (nullable = true)



**Inferring Schema Implicitly**

We can use the parameter inferschema=true to infer the input schema automatically while loading the data. An example is shown below:

In [13]:
df=spark.read.csv('cars.csv', header=True, sep=';', inferSchema=True)

In [14]:
df.printSchema()

root
 |-- Car: string (nullable = true)
 |-- MPG: double (nullable = true)
 |-- Cylinders: integer (nullable = true)
 |-- Displacement: double (nullable = true)
 |-- Horsepower: double (nullable = true)
 |-- Weight: decimal(4,0) (nullable = true)
 |-- Acceleration: double (nullable = true)
 |-- Model: integer (nullable = true)
 |-- Origin: string (nullable = true)



As you can see, the datatype has been infered automatically spark with even the correct precison for decimal type. A problem that might arise here is that sometimes, when you have to read multiple files with different schemas in different files, there might be an issue with implicit inferring leading to null values in some columns. Therefore, let us also see how to define schemas explicitly.


**Defining Schema Explicitly**

In [15]:
from pyspark.sql.types import *

df.columns

['Car',
 'MPG',
 'Cylinders',
 'Displacement',
 'Horsepower',
 'Weight',
 'Acceleration',
 'Model',
 'Origin']

In [17]:
# Creating a list of the schema in the format column_name, data_type

labels = [('Car', StringType()),
 ('MPG', DoubleType()),
 ('Cylinders', IntegerType()),
 ('Displacement', DoubleType()),
 ('Horsepower', DoubleType()),
 ('Weight', DoubleType()),
 ('Acceleration', DoubleType()),
 ('Model', IntegerType()),
 ('Origin', StringType())]

In [18]:
schema = StructType([StructField (x[0],x[1], True) for x in labels])
schema

StructType([StructField('Car', StringType(), True), StructField('MPG', DoubleType(), True), StructField('Cylinders', IntegerType(), True), StructField('Displacement', DoubleType(), True), StructField('Horsepower', DoubleType(), True), StructField('Weight', DoubleType(), True), StructField('Acceleration', DoubleType(), True), StructField('Model', IntegerType(), True), StructField('Origin', StringType(), True)])

In [19]:
df = spark.read.csv('cars.csv', header=True, sep=';', schema=schema)

In [20]:
df.printSchema()

root
 |-- Car: string (nullable = true)
 |-- MPG: double (nullable = true)
 |-- Cylinders: integer (nullable = true)
 |-- Displacement: double (nullable = true)
 |-- Horsepower: double (nullable = true)
 |-- Weight: double (nullable = true)
 |-- Acceleration: double (nullable = true)
 |-- Model: integer (nullable = true)
 |-- Origin: string (nullable = true)



In [21]:
df.show(5)

+--------------------+----+---------+------------+----------+------+------------+-----+------+
|                 Car| MPG|Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|
+--------------------+----+---------+------------+----------+------+------------+-----+------+
|Chevrolet Chevell...|18.0|        8|       307.0|     130.0|3504.0|        12.0|   70|    US|
|   Buick Skylark 320|15.0|        8|       350.0|     165.0|3693.0|        11.5|   70|    US|
|  Plymouth Satellite|18.0|        8|       318.0|     150.0|3436.0|        11.0|   70|    US|
|       AMC Rebel SST|16.0|        8|       304.0|     150.0|3433.0|        12.0|   70|    US|
|         Ford Torino|17.0|        8|       302.0|     140.0|3449.0|        10.5|   70|    US|
+--------------------+----+---------+------------+----------+------+------------+-----+------+
only showing top 5 rows



**DataFrame Operations on Columns**
We will go over the following in this section:

1. Selecting Columns
2. Selecting Multiple Columns
3. Adding New Columns
4. Renaming Columns
5. Grouping By Columns
6. Removing Columns



**Selecting Columns**
There are multiple ways to do a select in PySpark. You can find how they differ and how each below:

In [25]:
#Method-1
df.select(df.Car).show(5)

+--------------------+
|                 Car|
+--------------------+
|Chevrolet Chevell...|
|   Buick Skylark 320|
|  Plymouth Satellite|
|       AMC Rebel SST|
|         Ford Torino|
+--------------------+
only showing top 5 rows



In [28]:
#Method-2
df.select(df['car']).show(5, truncate=False)

+-------------------------+
|car                      |
+-------------------------+
|Chevrolet Chevelle Malibu|
|Buick Skylark 320        |
|Plymouth Satellite       |
|AMC Rebel SST            |
|Ford Torino              |
+-------------------------+
only showing top 5 rows



In [30]:
#Method-3
from pyspark.sql.functions import col

df.select(col('car')).show(5, truncate=False)

+-------------------------+
|car                      |
+-------------------------+
|Chevrolet Chevelle Malibu|
|Buick Skylark 320        |
|Plymouth Satellite       |
|AMC Rebel SST            |
|Ford Torino              |
+-------------------------+
only showing top 5 rows



**Multiple Columns**

In [31]:
df.select(df.Car, df.Acceleration).show(5)

+--------------------+------------+
|                 Car|Acceleration|
+--------------------+------------+
|Chevrolet Chevell...|        12.0|
|   Buick Skylark 320|        11.5|
|  Plymouth Satellite|        11.0|
|       AMC Rebel SST|        12.0|
|         Ford Torino|        10.5|
+--------------------+------------+
only showing top 5 rows



In [33]:
df.select(df['car'], df['Acceleration']).show(5)

+--------------------+------------+
|                 car|Acceleration|
+--------------------+------------+
|Chevrolet Chevell...|        12.0|
|   Buick Skylark 320|        11.5|
|  Plymouth Satellite|        11.0|
|       AMC Rebel SST|        12.0|
|         Ford Torino|        10.5|
+--------------------+------------+
only showing top 5 rows



In [34]:
df.select(col('car'),col('acceleration')).show(5)

+--------------------+------------+
|                 car|acceleration|
+--------------------+------------+
|Chevrolet Chevell...|        12.0|
|   Buick Skylark 320|        11.5|
|  Plymouth Satellite|        11.0|
|       AMC Rebel SST|        12.0|
|         Ford Torino|        10.5|
+--------------------+------------+
only showing top 5 rows



**Adding new Columns**

We will take a look at three cases here:

1. Adding a new column
2. Adding multiple columns
3. Deriving a new column from an exisitng one


In [35]:
from pyspark.sql.functions import lit, concat

In [44]:
#Case-1 Adding single col
df=df.withColumn('first_col',lit(1))
# lit means literal. It populates the row with the literal value given.
# When adding static data / constant values, it is a good practice to use it.
df.show(5,truncate=False)

+-------------------------+----+---------+------------+----------+------+------------+-----+------+---------+
|Car                      |MPG |Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|first_col|
+-------------------------+----+---------+------------+----------+------+------------+-----+------+---------+
|Chevrolet Chevelle Malibu|18.0|8        |307.0       |130.0     |3504. |12.0        |70   |US    |1        |
|Buick Skylark 320        |15.0|8        |350.0       |165.0     |3693. |11.5        |70   |US    |1        |
|Plymouth Satellite       |18.0|8        |318.0       |150.0     |3436. |11.0        |70   |US    |1        |
|AMC Rebel SST            |16.0|8        |304.0       |150.0     |3433. |12.0        |70   |US    |1        |
|Ford Torino              |17.0|8        |302.0       |140.0     |3449. |10.5        |70   |US    |1        |
+-------------------------+----+---------+------------+----------+------+------------+-----+------+---------+
only showi

In [45]:
#Case-2 Adding multiple cols
df=df.withColumn('Sec col',lit(2)) \
      .withColumn('Third col', lit('Third Column'))
df.show(5)

+--------------------+----+---------+------------+----------+------+------------+-----+------+---------+-------+------------+
|                 Car| MPG|Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|first_col|Sec col|   Third col|
+--------------------+----+---------+------------+----------+------+------------+-----+------+---------+-------+------------+
|Chevrolet Chevell...|18.0|        8|       307.0|     130.0| 3504.|        12.0|   70|    US|        1|      2|Third Column|
|   Buick Skylark 320|15.0|        8|       350.0|     165.0| 3693.|        11.5|   70|    US|        1|      2|Third Column|
|  Plymouth Satellite|18.0|        8|       318.0|     150.0| 3436.|        11.0|   70|    US|        1|      2|Third Column|
|       AMC Rebel SST|16.0|        8|       304.0|     150.0| 3433.|        12.0|   70|    US|        1|      2|Third Column|
|         Ford Torino|17.0|        8|       302.0|     140.0| 3449.|        10.5|   70|    US|        1|      2|Third 

In [46]:
#Case-3 Deriving a new col
df=df.withColumn('car_model',concat(col('car'), lit(" "), col('MPG')))
df.show(5)

+--------------------+----+---------+------------+----------+------+------------+-----+------+---------+-------+------------+--------------------+
|                 Car| MPG|Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|first_col|Sec col|   Third col|           car_model|
+--------------------+----+---------+------------+----------+------+------------+-----+------+---------+-------+------------+--------------------+
|Chevrolet Chevell...|18.0|        8|       307.0|     130.0| 3504.|        12.0|   70|    US|        1|      2|Third Column|Chevrolet Chevell...|
|   Buick Skylark 320|15.0|        8|       350.0|     165.0| 3693.|        11.5|   70|    US|        1|      2|Third Column|Buick Skylark 320...|
|  Plymouth Satellite|18.0|        8|       318.0|     150.0| 3436.|        11.0|   70|    US|        1|      2|Third Column|Plymouth Satellit...|
|       AMC Rebel SST|16.0|        8|       304.0|     150.0| 3433.|        12.0|   70|    US|        1|      2|Third 

**Renaming Columns**

In [47]:
df=df.withColumnRenamed('first_col','new_1st_col') \
      .withColumnRenamed('Sec_col', 'new_2nd_col') \
      .withColumnRenamed('Third col', 'new_3rd_col')

df.show(5)

+--------------------+----+---------+------------+----------+------+------------+-----+------+-----------+-------+------------+--------------------+
|                 Car| MPG|Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|new_1st_col|Sec col| new_3rd_col|           car_model|
+--------------------+----+---------+------------+----------+------+------------+-----+------+-----------+-------+------------+--------------------+
|Chevrolet Chevell...|18.0|        8|       307.0|     130.0| 3504.|        12.0|   70|    US|          1|      2|Third Column|Chevrolet Chevell...|
|   Buick Skylark 320|15.0|        8|       350.0|     165.0| 3693.|        11.5|   70|    US|          1|      2|Third Column|Buick Skylark 320...|
|  Plymouth Satellite|18.0|        8|       318.0|     150.0| 3436.|        11.0|   70|    US|          1|      2|Third Column|Plymouth Satellit...|
|       AMC Rebel SST|16.0|        8|       304.0|     150.0| 3433.|        12.0|   70|    US|          1|

**Grouping by columns**


Here, we see the Dataframe API way of grouping values. We will discuss how to:

1. Group By a single column
2. Group By multiple columns

In [48]:
df.groupBy('Origin').count().show(5)

+------+-----+
|Origin|count|
+------+-----+
|Europe|   73|
|    US|  254|
| Japan|   79|
+------+-----+



In [49]:
df.groupBy('origin','model').count().show(5)

+------+-----+-----+
|origin|model|count|
+------+-----+-----+
| Japan|   76|    4|
|    US|   81|   13|
|    US|   80|    7|
|    US|   76|   22|
| Japan|   70|    2|
+------+-----+-----+
only showing top 5 rows



**Removing columns**

In [51]:
df=df.drop('new_1st_col')
df.show(5)

+--------------------+----+---------+------------+----------+------+------------+-----+------+-------+------------+--------------------+
|                 Car| MPG|Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|Sec col| new_3rd_col|           car_model|
+--------------------+----+---------+------------+----------+------+------------+-----+------+-------+------------+--------------------+
|Chevrolet Chevell...|18.0|        8|       307.0|     130.0| 3504.|        12.0|   70|    US|      2|Third Column|Chevrolet Chevell...|
|   Buick Skylark 320|15.0|        8|       350.0|     165.0| 3693.|        11.5|   70|    US|      2|Third Column|Buick Skylark 320...|
|  Plymouth Satellite|18.0|        8|       318.0|     150.0| 3436.|        11.0|   70|    US|      2|Third Column|Plymouth Satellit...|
|       AMC Rebel SST|16.0|        8|       304.0|     150.0| 3433.|        12.0|   70|    US|      2|Third Column|  AMC Rebel SST 16.0|
|         Ford Torino|17.0|        8|    

In [52]:
df=df.drop('Sec col') \
      .drop('new_3rd_col')
df.show(5)

+--------------------+----+---------+------------+----------+------+------------+-----+------+--------------------+
|                 Car| MPG|Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|           car_model|
+--------------------+----+---------+------------+----------+------+------------+-----+------+--------------------+
|Chevrolet Chevell...|18.0|        8|       307.0|     130.0| 3504.|        12.0|   70|    US|Chevrolet Chevell...|
|   Buick Skylark 320|15.0|        8|       350.0|     165.0| 3693.|        11.5|   70|    US|Buick Skylark 320...|
|  Plymouth Satellite|18.0|        8|       318.0|     150.0| 3436.|        11.0|   70|    US|Plymouth Satellit...|
|       AMC Rebel SST|16.0|        8|       304.0|     150.0| 3433.|        12.0|   70|    US|  AMC Rebel SST 16.0|
|         Ford Torino|17.0|        8|       302.0|     140.0| 3449.|        10.5|   70|    US|    Ford Torino 17.0|
+--------------------+----+---------+------------+----------+------+----