<a href="https://colab.research.google.com/github/AsmitaGhoderao/spark-projects/blob/main/Pyspark_Basics.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

In [3]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [4]:
!ls

sample_data  spark-3.1.1-bin-hadoop3.2	spark-3.1.1-bin-hadoop3.2.tgz


In [8]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark

In [9]:
# Downloading and preprocessing Cars Data downloaded origianlly from https://perso.telecom-paristech.fr/eagan/class/igr204/datasets
!wget https://jacobceles.github.io/knowledge_repo/colab_and_pyspark/cars.csv

--2023-07-09 09:36:00--  https://jacobceles.github.io/knowledge_repo/colab_and_pyspark/cars.csv
Resolving jacobceles.github.io (jacobceles.github.io)... 185.199.108.153, 185.199.109.153, 185.199.110.153, ...
Connecting to jacobceles.github.io (jacobceles.github.io)|185.199.108.153|:443... connected.
HTTP request sent, awaiting response... 301 Moved Permanently
Location: https://jacobcelestine.com/knowledge_repo/colab_and_pyspark/cars.csv [following]
--2023-07-09 09:36:00--  https://jacobcelestine.com/knowledge_repo/colab_and_pyspark/cars.csv
Resolving jacobcelestine.com (jacobcelestine.com)... 185.199.108.153, 185.199.109.153, 185.199.110.153, ...
Connecting to jacobcelestine.com (jacobcelestine.com)|185.199.108.153|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 22608 (22K) [text/csv]
Saving to: ‘cars.csv.1’


2023-07-09 09:36:00 (49.5 MB/s) - ‘cars.csv.1’ saved [22608/22608]



In [18]:
df = spark.read.csv('cars.csv', header = True, sep = ";")
df.show()

+--------------------+----+---------+------------+----------+------+------------+-----+------+
|                 Car| MPG|Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|
+--------------------+----+---------+------------+----------+------+------------+-----+------+
|Chevrolet Chevell...|18.0|        8|       307.0|     130.0| 3504.|        12.0|   70|    US|
|   Buick Skylark 320|15.0|        8|       350.0|     165.0| 3693.|        11.5|   70|    US|
|  Plymouth Satellite|18.0|        8|       318.0|     150.0| 3436.|        11.0|   70|    US|
|       AMC Rebel SST|16.0|        8|       304.0|     150.0| 3433.|        12.0|   70|    US|
|         Ford Torino|17.0|        8|       302.0|     140.0| 3449.|        10.5|   70|    US|
|    Ford Galaxie 500|15.0|        8|       429.0|     198.0| 4341.|        10.0|   70|    US|
|    Chevrolet Impala|14.0|        8|       454.0|     220.0| 4354.|         9.0|   70|    US|
|   Plymouth Fury iii|14.0|        8|       440.0|

In [19]:
df.dtypes

[('Car', 'string'),
 ('MPG', 'string'),
 ('Cylinders', 'string'),
 ('Displacement', 'string'),
 ('Horsepower', 'string'),
 ('Weight', 'string'),
 ('Acceleration', 'string'),
 ('Model', 'string'),
 ('Origin', 'string')]

In [21]:
df.printSchema()

root
 |-- Car: string (nullable = true)
 |-- MPG: string (nullable = true)
 |-- Cylinders: string (nullable = true)
 |-- Displacement: string (nullable = true)
 |-- Horsepower: string (nullable = true)
 |-- Weight: string (nullable = true)
 |-- Acceleration: string (nullable = true)
 |-- Model: string (nullable = true)
 |-- Origin: string (nullable = true)



In [23]:
## implicitly infer the schema while inputing data
df = spark.read.csv('cars.csv', header = True, sep = ";", inferSchema = True)
df.printSchema()

root
 |-- Car: string (nullable = true)
 |-- MPG: double (nullable = true)
 |-- Cylinders: integer (nullable = true)
 |-- Displacement: double (nullable = true)
 |-- Horsepower: double (nullable = true)
 |-- Weight: decimal(4,0) (nullable = true)
 |-- Acceleration: double (nullable = true)
 |-- Model: integer (nullable = true)
 |-- Origin: string (nullable = true)



In [24]:
## explicitly defining schemas
from pyspark.sql.types import *
df.columns

['Car',
 'MPG',
 'Cylinders',
 'Displacement',
 'Horsepower',
 'Weight',
 'Acceleration',
 'Model',
 'Origin']

In [25]:
# Creating a list of the schema in the format column_name, data_type
labels = [
     ('Car',StringType()),
     ('MPG',DoubleType()),
     ('Cylinders',IntegerType()),
     ('Displacement',DoubleType()),
     ('Horsepower',DoubleType()),
     ('Weight',DoubleType()),
     ('Acceleration',DoubleType()),
     ('Model',IntegerType()),
     ('Origin',StringType())
]

In [26]:
# Creating the schema that will be passed when reading the csv
custom_schema = StructType([StructField (x[0], x[1], True) for x in labels])
custom_schema

StructType(List(StructField(Car,StringType,true),StructField(MPG,DoubleType,true),StructField(Cylinders,IntegerType,true),StructField(Displacement,DoubleType,true),StructField(Horsepower,DoubleType,true),StructField(Weight,DoubleType,true),StructField(Acceleration,DoubleType,true),StructField(Model,IntegerType,true),StructField(Origin,StringType,true)))

In [27]:
df = spark.read.csv('cars.csv', header=True, sep=";", schema=custom_schema)
df.printSchema()

root
 |-- Car: string (nullable = true)
 |-- MPG: double (nullable = true)
 |-- Cylinders: integer (nullable = true)
 |-- Displacement: double (nullable = true)
 |-- Horsepower: double (nullable = true)
 |-- Weight: double (nullable = true)
 |-- Acceleration: double (nullable = true)
 |-- Model: integer (nullable = true)
 |-- Origin: string (nullable = true)



In [28]:
df.select(df['car'],df['mpg'])

car,mpg
Chevrolet Chevell...,18.0
Buick Skylark 320,15.0
Plymouth Satellite,18.0
AMC Rebel SST,16.0
Ford Torino,17.0
Ford Galaxie 500,15.0
Chevrolet Impala,14.0
Plymouth Fury iii,14.0
Pontiac Catalina,14.0
AMC Ambassador DPL,15.0


In [29]:
# Column name is case insensitive in this usage
from pyspark.sql.functions import col
df.select(col('car'),col('cylinders')).show(truncate=False)

+--------------------------------+---------+
|car                             |cylinders|
+--------------------------------+---------+
|Chevrolet Chevelle Malibu       |8        |
|Buick Skylark 320               |8        |
|Plymouth Satellite              |8        |
|AMC Rebel SST                   |8        |
|Ford Torino                     |8        |
|Ford Galaxie 500                |8        |
|Chevrolet Impala                |8        |
|Plymouth Fury iii               |8        |
|Pontiac Catalina                |8        |
|AMC Ambassador DPL              |8        |
|Citroen DS-21 Pallas            |4        |
|Chevrolet Chevelle Concours (sw)|8        |
|Ford Torino (sw)                |8        |
|Plymouth Satellite (sw)         |8        |
|AMC Rebel SST (sw)              |8        |
|Dodge Challenger SE             |8        |
|Plymouth 'Cuda 340              |8        |
|Ford Mustang Boss 302           |8        |
|Chevrolet Monte Carlo           |8        |
|Buick Est

In [30]:
## Adding a NEW COLUMN to df
# Column name is case insensitive in this usage
from pyspark.sql.functions import lit
df = df.withColumn("new_col", lit(100))
df.show()

# lit means literal. It populates the row with the literal value given.
# When adding static data / constant values, it is a good practice to use it.

+--------------------+----+---------+------------+----------+------+------------+-----+------+-------+
|                 Car| MPG|Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|new_col|
+--------------------+----+---------+------------+----------+------+------------+-----+------+-------+
|Chevrolet Chevell...|18.0|        8|       307.0|     130.0|3504.0|        12.0|   70|    US|    100|
|   Buick Skylark 320|15.0|        8|       350.0|     165.0|3693.0|        11.5|   70|    US|    100|
|  Plymouth Satellite|18.0|        8|       318.0|     150.0|3436.0|        11.0|   70|    US|    100|
|       AMC Rebel SST|16.0|        8|       304.0|     150.0|3433.0|        12.0|   70|    US|    100|
|         Ford Torino|17.0|        8|       302.0|     140.0|3449.0|        10.5|   70|    US|    100|
|    Ford Galaxie 500|15.0|        8|       429.0|     198.0|4341.0|        10.0|   70|    US|    100|
|    Chevrolet Impala|14.0|        8|       454.0|     220.0|4354.0|     

In [31]:
df = df.withColumn("new_col", lit(100)) .withColumn("new_col1",lit(200))
df.show()

+--------------------+----+---------+------------+----------+------+------------+-----+------+-------+--------+
|                 Car| MPG|Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|new_col|new_col1|
+--------------------+----+---------+------------+----------+------+------------+-----+------+-------+--------+
|Chevrolet Chevell...|18.0|        8|       307.0|     130.0|3504.0|        12.0|   70|    US|    100|     200|
|   Buick Skylark 320|15.0|        8|       350.0|     165.0|3693.0|        11.5|   70|    US|    100|     200|
|  Plymouth Satellite|18.0|        8|       318.0|     150.0|3436.0|        11.0|   70|    US|    100|     200|
|       AMC Rebel SST|16.0|        8|       304.0|     150.0|3433.0|        12.0|   70|    US|    100|     200|
|         Ford Torino|17.0|        8|       302.0|     140.0|3449.0|        10.5|   70|    US|    100|     200|
|    Ford Galaxie 500|15.0|        8|       429.0|     198.0|4341.0|        10.0|   70|    US|    100|  

In [41]:
from pyspark.sql.functions import concat
df = df.withColumn("new_col3", concat(col("origin"), lit("-"), col("model")))
df.show()

+--------------------+----+---------+------------+----------+------+------------+-----+------+-------+--------+---------+
|                 Car| MPG|Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|new_col|new_col1| new_col3|
+--------------------+----+---------+------------+----------+------+------------+-----+------+-------+--------+---------+
|Chevrolet Chevell...|18.0|        8|       307.0|     130.0|3504.0|        12.0|   70|    US|    100|     200|    US-70|
|   Buick Skylark 320|15.0|        8|       350.0|     165.0|3693.0|        11.5|   70|    US|    100|     200|    US-70|
|  Plymouth Satellite|18.0|        8|       318.0|     150.0|3436.0|        11.0|   70|    US|    100|     200|    US-70|
|       AMC Rebel SST|16.0|        8|       304.0|     150.0|3433.0|        12.0|   70|    US|    100|     200|    US-70|
|         Ford Torino|17.0|        8|       302.0|     140.0|3449.0|        10.5|   70|    US|    100|     200|    US-70|
|    Ford Galaxie 500|15

In [42]:
df = df.withColumnRenamed("new_col", "qty")
df.show(5)

+--------------------+----+---------+------------+----------+------+------------+-----+------+---+--------+--------+
|                 Car| MPG|Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|qty|new_col1|new_col3|
+--------------------+----+---------+------------+----------+------+------------+-----+------+---+--------+--------+
|Chevrolet Chevell...|18.0|        8|       307.0|     130.0|3504.0|        12.0|   70|    US|100|     200|   US-70|
|   Buick Skylark 320|15.0|        8|       350.0|     165.0|3693.0|        11.5|   70|    US|100|     200|   US-70|
|  Plymouth Satellite|18.0|        8|       318.0|     150.0|3436.0|        11.0|   70|    US|100|     200|   US-70|
|       AMC Rebel SST|16.0|        8|       304.0|     150.0|3433.0|        12.0|   70|    US|100|     200|   US-70|
|         Ford Torino|17.0|        8|       302.0|     140.0|3449.0|        10.5|   70|    US|100|     200|   US-70|
+--------------------+----+---------+------------+----------+---

In [44]:
df.groupBy('origin').count().show()

+------+-----+
|origin|count|
+------+-----+
|Europe|   73|
|    US|  254|
| Japan|   79|
+------+-----+



In [45]:
df.groupBy('origin','model').count().show()

+------+-----+-----+
|origin|model|count|
+------+-----+-----+
|Europe|   71|    5|
|Europe|   80|    9|
|Europe|   79|    4|
| Japan|   75|    4|
|    US|   72|   18|
|    US|   80|    7|
|Europe|   74|    6|
| Japan|   79|    2|
|Europe|   76|    8|
|    US|   75|   20|
| Japan|   77|    6|
|    US|   82|   20|
| Japan|   80|   13|
| Japan|   78|    8|
|    US|   78|   22|
|Europe|   75|    6|
|    US|   71|   20|
|    US|   77|   18|
| Japan|   70|    2|
| Japan|   71|    4|
+------+-----+-----+
only showing top 20 rows



In [48]:
df = df.drop('qty') .drop('new_col1') .drop('new_col3')

In [49]:
df.show(5)

+--------------------+----+---------+------------+----------+------+------------+-----+------+
|                 Car| MPG|Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|
+--------------------+----+---------+------------+----------+------+------------+-----+------+
|Chevrolet Chevell...|18.0|        8|       307.0|     130.0|3504.0|        12.0|   70|    US|
|   Buick Skylark 320|15.0|        8|       350.0|     165.0|3693.0|        11.5|   70|    US|
|  Plymouth Satellite|18.0|        8|       318.0|     150.0|3436.0|        11.0|   70|    US|
|       AMC Rebel SST|16.0|        8|       304.0|     150.0|3433.0|        12.0|   70|    US|
|         Ford Torino|17.0|        8|       302.0|     140.0|3449.0|        10.5|   70|    US|
+--------------------+----+---------+------------+----------+------+------------+-----+------+
only showing top 5 rows

