In [0]:
#importing modules

import numpy as np
import pandas as pd 
import os
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

from pyspark.sql.functions import round
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator

In [0]:
url = "https://raw.githubusercontent.com/Cyndie-Matinou/fuel_consumption/master/cars_one.csv"
from pyspark import SparkFiles
spark.sparkContext.addFile(url)

df = spark.read.csv("file://"+SparkFiles.get("cars_one.csv"), header=True, inferSchema= True)

In [0]:
#performing some data cleansing
cols = ("car","_c9","_c10","_c11","_c12","_c13")

df=df.drop(*cols) 
#.printSchema()
df.show()

+----+---------+------------+----------+------+------------+-----+------+
| mpg|cylinders|displacement|horsepower|weight|acceleration|model|origin|
+----+---------+------------+----------+------+------------+-----+------+
|18.0|        8|         307|       130|  3504|        12.0|   70|     1|
|15.0|        8|         350|       165|  3693|        11.5|   70|     1|
|18.0|        8|         318|       150|  3436|        11.0|   70|     1|
|16.0|        8|         304|       150|  3433|        12.0|   70|     1|
|17.0|        8|         302|       140|  3449|        10.5|   70|     1|
|15.0|        8|         429|       198|  4341|        10.0|   70|     1|
|14.0|        8|         454|       220|  4354|         9.0|   70|     1|
|14.0|        8|         440|       215|  4312|         8.5|   70|     1|
|14.0|        8|         455|       225|  4425|        10.0|   70|     1|
|15.0|        8|         390|       190|  3850|         8.5|   70|     1|
|15.0|        8|         383|       17

In [0]:
#data cleansing, getting rid of duplicates
df.drop_duplicates()

Out[23]: DataFrame[mpg: double, cylinders: int, displacement: int, horsepower: string, weight: int, acceleration: double, model: int, origin: int]

In [0]:
df.dropna()

Out[24]: DataFrame[mpg: double, cylinders: int, displacement: int, horsepower: string, weight: int, acceleration: double, model: int, origin: int]

In [0]:
#visualising the dataframe
display(df)

#let's visualize the distribution of the features of the cars
#i decided to use histograms

mpg,cylinders,displacement,horsepower,weight,acceleration,model,origin
18.0,8,307,130,3504,12.0,70,1
15.0,8,350,165,3693,11.5,70,1
18.0,8,318,150,3436,11.0,70,1
16.0,8,304,150,3433,12.0,70,1
17.0,8,302,140,3449,10.5,70,1
15.0,8,429,198,4341,10.0,70,1
14.0,8,454,220,4354,9.0,70,1
14.0,8,440,215,4312,8.5,70,1
14.0,8,455,225,4425,10.0,70,1
15.0,8,390,190,3850,8.5,70,1


Output can only be rendered in Databricks

Output can only be rendered in Databricks

Output can only be rendered in Databricks

Output can only be rendered in Databricks

Output can only be rendered in Databricks

Output can only be rendered in Databricks

Output can only be rendered in Databricks

In [0]:
display(df)

mpg,cylinders,displacement,horsepower,weight,acceleration,model,origin
18.0,8,307,130,3504,12.0,70,1
15.0,8,350,165,3693,11.5,70,1
18.0,8,318,150,3436,11.0,70,1
16.0,8,304,150,3433,12.0,70,1
17.0,8,302,140,3449,10.5,70,1
15.0,8,429,198,4341,10.0,70,1
14.0,8,454,220,4354,9.0,70,1
14.0,8,440,215,4312,8.5,70,1
14.0,8,455,225,4425,10.0,70,1
15.0,8,390,190,3850,8.5,70,1


In [0]:
#this is where the fun part starts
#I am going to visualize the relationships between the Mileage Per Galon(mpg) of a car and the other features.

#1 I start with cylinders
from pyspark.sql.functions import corr
df.select(corr("mpg", "cylinders")).show()

+--------------------+
|corr(mpg, cylinders)|
+--------------------+
| -0.8667867876973869|
+--------------------+



In [0]:
#2 mpg of a car and displacement
df.select(corr("mpg", "displacement")).show()

+-----------------------+
|corr(mpg, displacement)|
+-----------------------+
|    -0.8851367055258322|
+-----------------------+



In [0]:
#3 mpg and horsepower
df.select(corr("mpg", "horsepower")).show()

+---------------------+
|corr(mpg, horsepower)|
+---------------------+
|  -0.8154545895904027|
+---------------------+



In [0]:
#4 mpg and weight
df.select(corr("mpg", "weight")).show()

+-------------------+
|  corr(mpg, weight)|
+-------------------+
|-0.9042371479265466|
+-------------------+



In [0]:
#5 mpg and acceleration
df.select(corr("mpg","acceleration")).show()

+-----------------------+
|corr(mpg, acceleration)|
+-----------------------+
|     0.5318790511742024|
+-----------------------+



In [0]:
#6 mpg and model
df.select(corr("mpg","model")).show()

+-------------------+
|   corr(mpg, model)|
+-------------------+
|0.18179123717662585|
+-------------------+



In [0]:
#7 mpg and origin
df.select(corr("mpg","origin")).show()

+-----------------+
|corr(mpg, origin)|
+-----------------+
|0.657652196695826|
+-----------------+



In [0]:

#train/test split
traindf, testdf = df.randomSplit([.8, .2], seed=56)

In [0]:
#Split the dataset into train and test set 
(trainRepartitiondf, testRepartitiondf) = (df
                                           .repartition(24)
                                           .randomSplit([.8, .2], seed=56))

print(trainRepartitiondf.count())

157


In [0]:
#I am starting with mpg and acceleration. I want to predict the mpg based on acceleration
display(df.select("mpg", "acceleration").summary())

summary,mpg,acceleration
count,198.0,198.0
mean,19.71969696969697,15.005555555555556
stddev,5.814253963002064,2.8723819586167947
min,9.0,8.0
25%,15.0,13.0
50%,19.0,15.0
75%,24.5,16.9
max,35.0,23.5


Output can only be rendered in Databricks

In [0]:
# Create a VectorAssembler object to assemble the mpgusage into one vector column 
from pyspark.ml.feature import VectorAssembler

vecAssembler = VectorAssembler(inputCols=["acceleration"], outputCol="mpgUsage")

vecTraindf = vecAssembler.transform(traindf)

vecTraindf.select("acceleration","mpgUsage","mpg").show(10)


+------------+--------+----+
|acceleration|mpgUsage| mpg|
+------------+--------+----+
|        15.0|  [15.0]|10.0|
|        14.0|  [14.0]|10.0|
|        13.5|  [13.5]|11.0|
|        11.0|  [11.0]|11.0|
|        11.0|  [11.0]|11.0|
|        13.5|  [13.5]|12.0|
|        12.5|  [12.5]|12.0|
|        11.5|  [11.5]|12.0|
|        12.5|  [12.5]|12.0|
|        11.5|  [11.5]|12.0|
+------------+--------+----+
only showing top 10 rows



In [0]:
display(vecTraindf)

mpg,cylinders,displacement,horsepower,weight,acceleration,model,origin,mpgUsage/Weight
10.0,8,307,200,4376,15.0,70,1,"Map(vectorType -> dense, length -> 1, values -> List(4376.0))"
10.0,8,360,215,4615,14.0,70,1,"Map(vectorType -> dense, length -> 1, values -> List(4615.0))"
11.0,8,318,210,4382,13.5,70,1,"Map(vectorType -> dense, length -> 1, values -> List(4382.0))"
11.0,8,350,180,3664,11.0,73,1,"Map(vectorType -> dense, length -> 1, values -> List(3664.0))"
11.0,8,429,208,4633,11.0,72,1,"Map(vectorType -> dense, length -> 1, values -> List(4633.0))"
12.0,8,350,160,4456,13.5,72,1,"Map(vectorType -> dense, length -> 1, values -> List(4456.0))"
12.0,8,350,180,4499,12.5,73,1,"Map(vectorType -> dense, length -> 1, values -> List(4499.0))"
12.0,8,383,180,4955,11.5,71,1,"Map(vectorType -> dense, length -> 1, values -> List(4955.0))"
12.0,8,400,167,4906,12.5,73,1,"Map(vectorType -> dense, length -> 1, values -> List(4906.0))"
12.0,8,429,198,4952,11.5,73,1,"Map(vectorType -> dense, length -> 1, values -> List(4952.0))"


Output can only be rendered in Databricks

In [0]:
#linear regression 
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol="mpgUsage", labelCol="mpg")
lrModel = lr.fit(vecTraindf)

In [0]:
#Leverage Pipeline class from ML package
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[vecAssembler, lr])
pipelineModel = pipeline.fit(traindf)

In [0]:
#now applying to my test set
preddf = pipelineModel.transform(testdf)

preddf.select("acceleration","mpgUsage","mpg", "prediction").show(20)

+------------+--------+----+----------+
|acceleration|mpgUsage| mpg|prediction|
+------------+--------+----+----------+
|        18.5|   [9.0]| 9.0|       9.0|
|        14.0|  [11.0]|11.0|      11.0|
|        12.0|  [13.0]|13.0|      13.0|
|        12.5|  [13.0]|13.0|      13.0|
|        14.5|  [14.0]|14.0|      14.0|
|        12.0|  [14.0]|14.0|      14.0|
|        13.5|  [14.0]|14.0|      14.0|
|        12.0|  [14.0]|14.0|      14.0|
|        11.5|  [14.0]|14.0|      14.0|
|        21.0|  [15.0]|15.0|      15.0|
|        12.5|  [15.0]|15.0|      15.0|
|        13.5|  [15.0]|15.0|      15.0|
|        14.0|  [16.0]|16.0|      16.0|
|        13.0|  [16.0]|16.0|      16.0|
|        11.5|  [17.0]|17.0|      17.0|
|        14.5|  [18.0]|18.0|      18.0|
|        19.0|  [18.0]|18.0|      18.0|
|        14.5|  [18.0]|18.0|      18.0|
|        13.5|  [19.0]|19.0|      19.0|
|        16.0|  [20.0]|20.0|      20.0|
+------------+--------+----+----------+
only showing top 20 rows



In [0]:
display(preddf)

mpg,cylinders,displacement,horsepower,weight,acceleration,model,origin,mpgUsage,prediction
9.0,8,304,193,4732,18.5,70,1,"Map(vectorType -> dense, length -> 1, values -> List(9.0))",9.0
11.0,8,400,150,4997,14.0,73,1,"Map(vectorType -> dense, length -> 1, values -> List(11.0))",11.0
13.0,8,400,170,4746,12.0,71,1,"Map(vectorType -> dense, length -> 1, values -> List(13.0))",13.0
13.0,8,400,190,4422,12.5,72,1,"Map(vectorType -> dense, length -> 1, values -> List(13.0))",13.0
14.0,8,302,137,4042,14.5,73,1,"Map(vectorType -> dense, length -> 1, values -> List(14.0))",14.0
14.0,8,350,165,4209,12.0,71,1,"Map(vectorType -> dense, length -> 1, values -> List(14.0))",14.0
14.0,8,351,148,4657,13.5,75,1,"Map(vectorType -> dense, length -> 1, values -> List(14.0))",14.0
14.0,8,400,175,4385,12.0,72,1,"Map(vectorType -> dense, length -> 1, values -> List(14.0))",14.0
14.0,8,400,175,4464,11.5,71,1,"Map(vectorType -> dense, length -> 1, values -> List(14.0))",14.0
15.0,6,250,72,3432,21.0,75,1,"Map(vectorType -> dense, length -> 1, values -> List(15.0))",15.0


Output can only be rendered in Databricks

Output can only be rendered in Databricks

In [0]:
#let us try to predict the mpg usage with the weight of the car
display(df.select("mpg", "weight").summary())

summary,mpg,weight
count,198.0,198.0
mean,19.71969696969697,3177.8888888888887
stddev,5.814253963002064,934.7837328417526
min,9.0,1613.0
25%,15.0,2300.0
50%,19.0,3021.0
75%,24.5,4082.0
max,35.0,5140.0


In [0]:
# Create a VectorAssembler object to assemble the features into one vector column 
from pyspark.ml.feature import VectorAssembler

vecAssembler = VectorAssembler(inputCols=["weight"], outputCol="mpgUsage/Weight")

vecTraindf = vecAssembler.transform(traindf)

vecTraindf.select("weight","mpg","mpgUsage/Weight").show(5)

+------+----+---------------+
|weight| mpg|mpgUsage/Weight|
+------+----+---------------+
|  4376|10.0|       [4376.0]|
|  4615|10.0|       [4615.0]|
|  4382|11.0|       [4382.0]|
|  3664|11.0|       [3664.0]|
|  4633|11.0|       [4633.0]|
+------+----+---------------+
only showing top 5 rows



In [0]:
#linear regression 
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol="mpgUsage/Weight", labelCol="mpg")
lrModel = lr.fit(vecTraindf)

In [0]:
#Leverage Pipeline class from ML package
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[vecAssembler, lr])
pipelineModel = pipeline.fit(traindf)

In [0]:
#now applying to my test set
preddf = pipelineModel.transform(testdf)

preddf.select("weight","mpgUsage/Weight", "mpg","prediction").show(10)

+------+---------------+----+------------------+
|weight|mpgUsage/Weight| mpg|        prediction|
+------+---------------+----+------------------+
|  4732|       [4732.0]| 9.0|10.800139650289928|
|  4997|       [4997.0]|11.0| 9.304862765427291|
|  4746|       [4746.0]|13.0|10.721143890334918|
|  4422|       [4422.0]|13.0|12.549331477865088|
|  4042|       [4042.0]|14.0|14.693502105215284|
|  4209|       [4209.0]|14.0|13.751195540037699|
|  4657|       [4657.0]|14.0|11.223331221477466|
|  4385|       [4385.0]|14.0|12.758105986317606|
|  4464|       [4464.0]|14.0|12.312344198000066|
|  3432|       [3432.0]|15.0|  18.1354602175406|
+------+---------------+----+------------------+
only showing top 10 rows



In [0]:
display(preddf)

mpg,cylinders,displacement,horsepower,weight,acceleration,model,origin,mpgUsage/Weight,prediction
9.0,8,304,193,4732,18.5,70,1,"Map(vectorType -> dense, length -> 1, values -> List(4732.0))",10.800139650289928
11.0,8,400,150,4997,14.0,73,1,"Map(vectorType -> dense, length -> 1, values -> List(4997.0))",9.304862765427291
13.0,8,400,170,4746,12.0,71,1,"Map(vectorType -> dense, length -> 1, values -> List(4746.0))",10.721143890334918
13.0,8,400,190,4422,12.5,72,1,"Map(vectorType -> dense, length -> 1, values -> List(4422.0))",12.549331477865088
14.0,8,302,137,4042,14.5,73,1,"Map(vectorType -> dense, length -> 1, values -> List(4042.0))",14.693502105215284
14.0,8,350,165,4209,12.0,71,1,"Map(vectorType -> dense, length -> 1, values -> List(4209.0))",13.7511955400377
14.0,8,351,148,4657,13.5,75,1,"Map(vectorType -> dense, length -> 1, values -> List(4657.0))",11.223331221477466
14.0,8,400,175,4385,12.0,72,1,"Map(vectorType -> dense, length -> 1, values -> List(4385.0))",12.758105986317606
14.0,8,400,175,4464,11.5,71,1,"Map(vectorType -> dense, length -> 1, values -> List(4464.0))",12.312344198000066
15.0,6,250,72,3432,21.0,75,1,"Map(vectorType -> dense, length -> 1, values -> List(3432.0))",18.1354602175406


Output can only be rendered in Databricks

Output can only be rendered in Databricks