# Setting up Spark

In [None]:
!sudo apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.4.2/spark-3.4.2-bin-hadoop3.tgz
!tar xf spark-3.4.2-bin-hadoop3.tgz
!pip install -q findspark
!pip install pyspark


Get:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,626 B]
Hit:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Hit:3 http://security.ubuntu.com/ubuntu jammy-security InRelease
Hit:4 http://archive.ubuntu.com/ubuntu jammy InRelease
Hit:5 http://archive.ubuntu.com/ubuntu jammy-updates InRelease
Hit:6 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
Hit:7 https://ppa.launchpadcontent.net/c2d4u.team/c2d4u4.0+/ubuntu jammy InRelease
Hit:8 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:9 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:10 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Fetched 3,626 B in 2s (1,912 B/s)
Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
36 packages can be upgraded. Run 'apt list --upgradable' to see them.


In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.4.2-bin-hadoop3"

In [None]:
import findspark
findspark.init()
findspark.find()

'/content/spark-3.4.2-bin-hadoop3'

In [None]:
from pyspark.sql import DataFrame, SparkSession
from typing import List
import pyspark.sql.types as T
import pyspark.sql.functions as F

In [None]:
spark = SparkSession \
       .builder \
       .appName("Our First Spark example") \
       .getOrCreate()

In [None]:
spark

# Loading Data

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, StringType, IntegerType, FloatType
from pyspark.sql.functions import split, count, when, isnan, col, regexp_replace
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [None]:
# Define a schema
schema = StructType([StructField('mpg', FloatType(), nullable = True),
                     StructField('cylinders', IntegerType(), nullable = True),
                     StructField('displacement', FloatType(), nullable = True),
                     StructField('horsepower', StringType(), nullable = True),
                     StructField('weight', IntegerType(), nullable = True),
                     StructField('acceleration', FloatType(), nullable = True),
                     StructField('model year', IntegerType(), nullable = True),
                     StructField('origin', IntegerType(), nullable = True),
                     StructField('car name', StringType(), nullable = True)])

file_path = '/content/auto-mpg.csv'

In [None]:
df = spark.read.csv(file_path, header = True, inferSchema = True, nanValue = '?')
df.show(5)

+----+---------+------------+----------+------+------------+----------+------+--------------------+
| mpg|cylinders|displacement|horsepower|weight|acceleration|model year|origin|            car name|
+----+---------+------------+----------+------+------------+----------+------+--------------------+
|18.0|        8|       307.0|     130.0|  3504|        12.0|        70|     1|chevrolet chevell...|
|15.0|        8|       350.0|     165.0|  3693|        11.5|        70|     1|   buick skylark 320|
|18.0|        8|       318.0|     150.0|  3436|        11.0|        70|     1|  plymouth satellite|
|16.0|        8|       304.0|     150.0|  3433|        12.0|        70|     1|       amc rebel sst|
|17.0|        8|       302.0|     140.0|  3449|        10.5|        70|     1|         ford torino|
+----+---------+------------+----------+------+------------+----------+------+--------------------+
only showing top 5 rows



In [None]:
# Check for missing values
def check_missing(dataframe):

  return dataframe.select([count(when(isnan(c) | col(c).isNull(), c)). \
                           alias(c) for c in dataframe.columns]).show()

check_missing(df)

+---+---------+------------+----------+------+------------+----------+------+--------+
|mpg|cylinders|displacement|horsepower|weight|acceleration|model year|origin|car name|
+---+---------+------------+----------+------+------------+----------+------+--------+
|  0|        0|           0|         6|     0|           0|         0|     0|       0|
+---+---------+------------+----------+------+------------+----------+------+--------+



In [None]:
# Handling Missing Values
df = df.na.drop()

df = df.withColumn("horsepower", df["horsepower"].cast(IntegerType()))   # Convert Horsepower from str to int

df.show(5)

+----+---------+------------+----------+------+------------+----------+------+--------------------+
| mpg|cylinders|displacement|horsepower|weight|acceleration|model year|origin|            car name|
+----+---------+------------+----------+------+------------+----------+------+--------------------+
|18.0|        8|       307.0|       130|  3504|        12.0|        70|     1|chevrolet chevell...|
|15.0|        8|       350.0|       165|  3693|        11.5|        70|     1|   buick skylark 320|
|18.0|        8|       318.0|       150|  3436|        11.0|        70|     1|  plymouth satellite|
|16.0|        8|       304.0|       150|  3433|        12.0|        70|     1|       amc rebel sst|
|17.0|        8|       302.0|       140|  3449|        10.5|        70|     1|         ford torino|
+----+---------+------------+----------+------+------------+----------+------+--------------------+
only showing top 5 rows



# PySpark DataFrame Basics

In [None]:
df.columns

['mpg',
 'cylinders',
 'displacement',
 'horsepower',
 'weight',
 'acceleration',
 'model year',
 'origin',
 'car name']

In [None]:
df.printSchema()

root
 |-- mpg: double (nullable = true)
 |-- cylinders: integer (nullable = true)
 |-- displacement: double (nullable = true)
 |-- horsepower: integer (nullable = true)
 |-- weight: integer (nullable = true)
 |-- acceleration: double (nullable = true)
 |-- model year: integer (nullable = true)
 |-- origin: integer (nullable = true)
 |-- car name: string (nullable = true)



In [None]:
# Renaming Columns
df = df.withColumnRenamed('model year', 'model_year')

df = df.withColumnRenamed('car name', 'car_name')

In [None]:
df.show(3)

+----+---------+------------+----------+------+------------+----------+------+--------------------+
| mpg|cylinders|displacement|horsepower|weight|acceleration|model_year|origin|            car_name|
+----+---------+------------+----------+------+------------+----------+------+--------------------+
|18.0|        8|       307.0|       130|  3504|        12.0|        70|     1|chevrolet chevell...|
|15.0|        8|       350.0|       165|  3693|        11.5|        70|     1|   buick skylark 320|
|18.0|        8|       318.0|       150|  3436|        11.0|        70|     1|  plymouth satellite|
+----+---------+------------+----------+------+------------+----------+------+--------------------+
only showing top 3 rows



In [None]:
# Get info from first 4 rows
for car in df.head(4):
    print(car, '\n')

Row(mpg=18.0, cylinders=8, displacement=307.0, horsepower=130, weight=3504, acceleration=12.0, model_year=70, origin=1, car_name='chevrolet chevelle malibu') 

Row(mpg=15.0, cylinders=8, displacement=350.0, horsepower=165, weight=3693, acceleration=11.5, model_year=70, origin=1, car_name='buick skylark 320') 

Row(mpg=18.0, cylinders=8, displacement=318.0, horsepower=150, weight=3436, acceleration=11.0, model_year=70, origin=1, car_name='plymouth satellite') 

Row(mpg=16.0, cylinders=8, displacement=304.0, horsepower=150, weight=3433, acceleration=12.0, model_year=70, origin=1, car_name='amc rebel sst') 



In [None]:
# statistical summary of dataframe
df.describe().show()

+-------+-----------------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+--------------------+
|summary|              mpg|         cylinders|      displacement|        horsepower|            weight|      acceleration|       model_year|            origin|            car_name|
+-------+-----------------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+--------------------+
|  count|              392|               392|               392|               392|               392|               392|              392|               392|                 392|
|   mean|23.44591836734694| 5.471938775510204|194.41198979591837|104.46938775510205|2977.5841836734694|15.541326530612228| 75.9795918367347|1.5765306122448979|                null|
| stddev|7.805007486571802|1.7057832474527845|104.64400390890465| 38.49115993282846| 849.402560

In [None]:
# describe with specific variables
df.describe(['mpg', 'horsepower']).show()

+-------+-----------------+------------------+
|summary|              mpg|        horsepower|
+-------+-----------------+------------------+
|  count|              392|               392|
|   mean|23.44591836734694|104.46938775510205|
| stddev|7.805007486571802| 38.49115993282846|
|    min|              9.0|                46|
|    max|             46.6|               230|
+-------+-----------------+------------------+



In [None]:
# Describe with numerical columns
def get_num_cols(dataframe):

  num_cols = [col for col in dataframe.columns if dataframe.select(col). \
              dtypes[0][1] in ['double', 'int']]

  return num_cols

num_cols = get_num_cols(df)

df.describe(num_cols).show()

+-------+-----------------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+
|summary|              mpg|         cylinders|      displacement|        horsepower|            weight|      acceleration|       model_year|            origin|
+-------+-----------------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+
|  count|              392|               392|               392|               392|               392|               392|              392|               392|
|   mean|23.44591836734694| 5.471938775510204|194.41198979591837|104.46938775510205|2977.5841836734694|15.541326530612228| 75.9795918367347|1.5765306122448979|
| stddev|7.805007486571802|1.7057832474527845|104.64400390890465| 38.49115993282846| 849.4025600429486|  2.75886411918808|3.683736543577868|0.8055181834183057|
|    min|              9.0|             

# Spark DataFrame Basic Operations

### Filtering and Sorting

In [None]:
# Get the cars with mpg more than 23
df.filter(df['mpg'] > 23).show(5)

+----+---------+------------+----------+------+------------+----------+------+--------------------+
| mpg|cylinders|displacement|horsepower|weight|acceleration|model_year|origin|            car_name|
+----+---------+------------+----------+------+------------+----------+------+--------------------+
|24.0|        4|       113.0|        95|  2372|        15.0|        70|     3|toyota corona mar...|
|27.0|        4|        97.0|        88|  2130|        14.5|        70|     3|        datsun pl510|
|26.0|        4|        97.0|        46|  1835|        20.5|        70|     2|volkswagen 1131 d...|
|25.0|        4|       110.0|        87|  2672|        17.5|        70|     2|         peugeot 504|
|24.0|        4|       107.0|        90|  2430|        14.5|        70|     2|         audi 100 ls|
+----+---------+------------+----------+------+------------+----------+------+--------------------+
only showing top 5 rows



In [None]:
# Multiple Conditions
df.filter((df['horsepower'] > 80) &
          (df['weight'] > 2000)).select('car_name').show(5)

+--------------------+
|            car_name|
+--------------------+
|chevrolet chevell...|
|   buick skylark 320|
|  plymouth satellite|
|       amc rebel sst|
|         ford torino|
+--------------------+
only showing top 5 rows



In [None]:
# Sorting
df.filter((df['mpg'] > 25) & (df['origin'] == 2)). \
orderBy('mpg', ascending= False).show(5)

+----+---------+------------+----------+------+------------+----------+------+--------------------+
| mpg|cylinders|displacement|horsepower|weight|acceleration|model_year|origin|            car_name|
+----+---------+------------+----------+------+------------+----------+------+--------------------+
|44.3|        4|        90.0|        48|  2085|        21.7|        80|     2|vw rabbit c (diesel)|
|44.0|        4|        97.0|        52|  2130|        24.6|        82|     2|           vw pickup|
|43.4|        4|        90.0|        48|  2335|        23.7|        80|     2|  vw dasher (diesel)|
|43.1|        4|        90.0|        48|  1985|        21.5|        78|     2|volkswagen rabbit...|
|41.5|        4|        98.0|        76|  2144|        14.7|        80|     2|           vw rabbit|
+----+---------+------------+----------+------+------------+----------+------+--------------------+
only showing top 5 rows



In [None]:
# Get the cars with 'volkswagen' in their names, and sort them by model year and horsepower
df.filter(df['car_name'].contains('volkswagen')). \
orderBy(['model_year', 'horsepower'], ascending=[False, False]).show(5)

+----+---------+------------+----------+------+------------+----------+------+--------------------+
| mpg|cylinders|displacement|horsepower|weight|acceleration|model_year|origin|            car_name|
+----+---------+------------+----------+------+------------+----------+------+--------------------+
|36.0|        4|       105.0|        74|  1980|        15.3|        82|     2| volkswagen rabbit l|
|33.0|        4|       105.0|        74|  2190|        14.2|        81|     2|    volkswagen jetta|
|31.5|        4|        89.0|        71|  1990|        14.9|        78|     2| volkswagen scirocco|
|43.1|        4|        90.0|        48|  1985|        21.5|        78|     2|volkswagen rabbit...|
|29.0|        4|        97.0|        78|  1940|        14.5|        77|     2|volkswagen rabbit...|
+----+---------+------------+----------+------+------------+----------+------+--------------------+
only showing top 5 rows



### Filtering with SQL

In [None]:
# Get the cars with Toyota in their names
df.filter("car_name like '%toyota%'").show(5)

+----+---------+------------+----------+------+------------+----------+------+--------------------+
| mpg|cylinders|displacement|horsepower|weight|acceleration|model_year|origin|            car_name|
+----+---------+------------+----------+------+------------+----------+------+--------------------+
|24.0|        4|       113.0|        95|  2372|        15.0|        70|     3|toyota corona mar...|
|25.0|        4|       113.0|        95|  2228|        14.0|        71|     3|       toyota corona|
|31.0|        4|        71.0|        65|  1773|        19.0|        71|     3| toyota corolla 1200|
|24.0|        4|       113.0|        95|  2278|        15.5|        72|     3|toyota corona har...|
|27.0|        4|        97.0|        88|  2100|        16.5|        72|     3|toyota corolla 16...|
+----+---------+------------+----------+------+------------+----------+------+--------------------+
only showing top 5 rows



In [None]:
df.filter('mpg > 22').show(5)

+----+---------+------------+----------+------+------------+----------+------+--------------------+
| mpg|cylinders|displacement|horsepower|weight|acceleration|model_year|origin|            car_name|
+----+---------+------------+----------+------+------------+----------+------+--------------------+
|24.0|        4|       113.0|        95|  2372|        15.0|        70|     3|toyota corona mar...|
|27.0|        4|        97.0|        88|  2130|        14.5|        70|     3|        datsun pl510|
|26.0|        4|        97.0|        46|  1835|        20.5|        70|     2|volkswagen 1131 d...|
|25.0|        4|       110.0|        87|  2672|        17.5|        70|     2|         peugeot 504|
|24.0|        4|       107.0|        90|  2430|        14.5|        70|     2|         audi 100 ls|
+----+---------+------------+----------+------+------------+----------+------+--------------------+
only showing top 5 rows



In [None]:
#Multiple Conditions
df.filter('mpg > 22 and acceleration < 15').show(5)

+----+---------+------------+----------+------+------------+----------+------+-------------+
| mpg|cylinders|displacement|horsepower|weight|acceleration|model_year|origin|     car_name|
+----+---------+------------+----------+------+------------+----------+------+-------------+
|27.0|        4|        97.0|        88|  2130|        14.5|        70|     3| datsun pl510|
|24.0|        4|       107.0|        90|  2430|        14.5|        70|     2|  audi 100 ls|
|26.0|        4|       121.0|       113|  2234|        12.5|        70|     2|     bmw 2002|
|27.0|        4|        97.0|        88|  2130|        14.5|        71|     3| datsun pl510|
|25.0|        4|       113.0|        95|  2228|        14.0|        71|     3|toyota corona|
+----+---------+------------+----------+------+------------+----------+------+-------------+
only showing top 5 rows



In [None]:
df.filter('horsepower == 88 and weight between 2600 and 3000') \
.select(['horsepower', 'weight', 'car_name']).show()

+----------+------+--------------------+
|horsepower|weight|            car_name|
+----------+------+--------------------+
|        88|  2957|         peugeot 504|
|        88|  2740|pontiac sunbird c...|
|        88|  2720| ford fairmont (man)|
|        88|  2890|     ford fairmont 4|
|        88|  2870|       ford fairmont|
|        88|  2605|  chevrolet cavalier|
|        88|  2640|chevrolet cavalie...|
+----------+------+--------------------+



# GroupBy and Aggregate Operations

In [None]:
# Brands
df.createOrReplaceTempView('auto_mpg')

df = df.withColumn('brand', split(df['car_name'], ' ').getItem(0)).drop('car_name')

# Replacing Misspelled Brands
auto_misspelled = {'chevroelt': 'chevrolet',
                   'chevy': 'chevrolet',
                   'vokswagen': 'volkswagen',
                   'vw': 'volkswagen',
                   'hi': 'harvester',
                   'maxda': 'mazda',
                   'toyouta': 'toyota',
                   'mercedes-benz': 'mercedes'}

for key in auto_misspelled.keys():

  df = df.withColumn('brand', regexp_replace('brand', key, auto_misspelled[key]))

df.show(5)

+----+---------+------------+----------+------+------------+----------+------+---------+
| mpg|cylinders|displacement|horsepower|weight|acceleration|model_year|origin|    brand|
+----+---------+------------+----------+------+------------+----------+------+---------+
|18.0|        8|       307.0|       130|  3504|        12.0|        70|     1|chevrolet|
|15.0|        8|       350.0|       165|  3693|        11.5|        70|     1|    buick|
|18.0|        8|       318.0|       150|  3436|        11.0|        70|     1| plymouth|
|16.0|        8|       304.0|       150|  3433|        12.0|        70|     1|      amc|
|17.0|        8|       302.0|       140|  3449|        10.5|        70|     1|     ford|
+----+---------+------------+----------+------+------------+----------+------+---------+
only showing top 5 rows



In [None]:
# Average acceleration by car brands
df.groupBy('brand').agg({'acceleration': 'mean'}).show(5)

+--------+------------------+
|   brand| avg(acceleration)|
+--------+------------------+
|   buick|14.700000000000003|
| pontiac|14.081249999999999|
|mercedes| 19.53333333333333|
|  toyota| 16.03846153846154|
|    saab|            15.175|
+--------+------------------+
only showing top 5 rows



In [None]:
# Max MPG by car brands
df.groupBy('brand').agg({'mpg': 'max'}).show(5)

+--------+--------+
|   brand|max(mpg)|
+--------+--------+
|   buick|    30.0|
| pontiac|    33.5|
|mercedes|    30.0|
|  toyota|    39.1|
|    saab|    25.0|
+--------+--------+
only showing top 5 rows



In [None]:
# End Session
spark.stop()