# Task 1 : 
### Installing Java, Spark3+ , and a compatible pyspark Python lib and making it work.


In [3]:
# Testing pyspark installation

import findspark
findspark.init()
findspark.find()
import pyspark
findspark.find()

'C:\\spark-3.2.0-bin-hadoop3.2'

In [7]:
# Initiate spark context
from pyspark import SparkContext, SparkConf 
from pyspark.sql import SparkSession
conf = pyspark.SparkConf().setAppName('SparkApp').setMaster('local')
sc = pyspark.SparkContext(conf=conf)
spark =SparkSession(sc)


In [4]:
#Example Test code
numeric_val = sc.parallelize([1,2,3,4])
numeric_val.map(lambda x: x*x*x).collect()


[1, 8, 27, 64]

In [174]:
# Stop the spark session
sc.stop()

# Task 2 
### Read "Car details v3.csv" data with spark

In [198]:
df = spark.read.option("header", "true").csv ('Car details v3.csv')
df.show(5)

+--------------------+----+-------------+---------+------+-----------+------------+------------+----------+-------+----------+--------------------+-----+
|                name|year|selling_price|km_driven|  fuel|seller_type|transmission|       owner|   mileage| engine| max_power|              torque|seats|
+--------------------+----+-------------+---------+------+-----------+------------+------------+----------+-------+----------+--------------------+-----+
|Maruti Swift Dzir...|2014|       450000|   145500|Diesel| Individual|      Manual| First Owner| 23.4 kmpl|1248 CC|    74 bhp|      190Nm@ 2000rpm|    5|
|Skoda Rapid 1.5 T...|2014|       370000|   120000|Diesel| Individual|      Manual|Second Owner|21.14 kmpl|1498 CC|103.52 bhp| 250Nm@ 1500-2500rpm|    5|
|Honda City 2017-2...|2006|       158000|   140000|Petrol| Individual|      Manual| Third Owner| 17.7 kmpl|1497 CC|    78 bhp|12.7@ 2,700(kgm@ ...|    5|
|Hyundai i20 Sport...|2010|       225000|   127000|Diesel| Individual|      

In [199]:
df.limit(10).toPandas()

Unnamed: 0,name,year,selling_price,km_driven,fuel,seller_type,transmission,owner,mileage,engine,max_power,torque,seats
0,Maruti Swift Dzire VDI,2014,450000,145500,Diesel,Individual,Manual,First Owner,23.4 kmpl,1248 CC,74 bhp,190Nm@ 2000rpm,5
1,Skoda Rapid 1.5 TDI Ambition,2014,370000,120000,Diesel,Individual,Manual,Second Owner,21.14 kmpl,1498 CC,103.52 bhp,250Nm@ 1500-2500rpm,5
2,Honda City 2017-2020 EXi,2006,158000,140000,Petrol,Individual,Manual,Third Owner,17.7 kmpl,1497 CC,78 bhp,"12.7@ 2,700(kgm@ rpm)",5
3,Hyundai i20 Sportz Diesel,2010,225000,127000,Diesel,Individual,Manual,First Owner,23.0 kmpl,1396 CC,90 bhp,22.4 kgm at 1750-2750rpm,5
4,Maruti Swift VXI BSIII,2007,130000,120000,Petrol,Individual,Manual,First Owner,16.1 kmpl,1298 CC,88.2 bhp,"11.5@ 4,500(kgm@ rpm)",5
5,Hyundai Xcent 1.2 VTVT E Plus,2017,440000,45000,Petrol,Individual,Manual,First Owner,20.14 kmpl,1197 CC,81.86 bhp,113.75nm@ 4000rpm,5
6,Maruti Wagon R LXI DUO BSIII,2007,96000,175000,LPG,Individual,Manual,First Owner,17.3 km/kg,1061 CC,57.5 bhp,"7.8@ 4,500(kgm@ rpm)",5
7,Maruti 800 DX BSII,2001,45000,5000,Petrol,Individual,Manual,Second Owner,16.1 kmpl,796 CC,37 bhp,59Nm@ 2500rpm,4
8,Toyota Etios VXD,2011,350000,90000,Diesel,Individual,Manual,First Owner,23.59 kmpl,1364 CC,67.1 bhp,170Nm@ 1800-2400rpm,5
9,Ford Figo Diesel Celebration Edition,2013,200000,169000,Diesel,Individual,Manual,First Owner,20.0 kmpl,1399 CC,68.1 bhp,160Nm@ 2000rpm,5


# Task 3
### Creating a model to predict the selling price from the other variables using Sparks'mlib 

In [188]:
spark = SparkSession.builder.appName('car_price_predictor').getOrCreate()

In [197]:
spark

In [200]:
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- year: string (nullable = true)
 |-- selling_price: string (nullable = true)
 |-- km_driven: string (nullable = true)
 |-- fuel: string (nullable = true)
 |-- seller_type: string (nullable = true)
 |-- transmission: string (nullable = true)
 |-- owner: string (nullable = true)
 |-- mileage: string (nullable = true)
 |-- engine: string (nullable = true)
 |-- max_power: string (nullable = true)
 |-- torque: string (nullable = true)
 |-- seats: string (nullable = true)



All the columns are string type.
The selling_price is our target and the remainder are our features we want to predict the target with.

Let's select some columns to see how the dataframe look like.

## Pre-processing the data

### Drop the name and the torque column

In [201]:
to_drop = ["name","torque"]
for x in to_drop:
    df = df.drop(x)
df.show(5)

+----+-------------+---------+------+-----------+------------+------------+----------+-------+----------+-----+
|year|selling_price|km_driven|  fuel|seller_type|transmission|       owner|   mileage| engine| max_power|seats|
+----+-------------+---------+------+-----------+------------+------------+----------+-------+----------+-----+
|2014|       450000|   145500|Diesel| Individual|      Manual| First Owner| 23.4 kmpl|1248 CC|    74 bhp|    5|
|2014|       370000|   120000|Diesel| Individual|      Manual|Second Owner|21.14 kmpl|1498 CC|103.52 bhp|    5|
|2006|       158000|   140000|Petrol| Individual|      Manual| Third Owner| 17.7 kmpl|1497 CC|    78 bhp|    5|
|2010|       225000|   127000|Diesel| Individual|      Manual| First Owner| 23.0 kmpl|1396 CC|    90 bhp|    5|
|2007|       130000|   120000|Petrol| Individual|      Manual| First Owner| 16.1 kmpl|1298 CC|  88.2 bhp|    5|
+----+-------------+---------+------+-----------+------------+------------+----------+-------+----------

### Replacing the year column by an age column (2021 - year )

In [205]:
df = df.withColumn('Age', ( 2021 - df['year'] ) ).drop('year')
df.show(5)

+-------------+---------+------+-----------+------------+------------+----------+-------+----------+-----+----+
|selling_price|km_driven|  fuel|seller_type|transmission|       owner|   mileage| engine| max_power|seats| Age|
+-------------+---------+------+-----------+------------+------------+----------+-------+----------+-----+----+
|       450000|   145500|Diesel| Individual|      Manual| First Owner| 23.4 kmpl|1248 CC|    74 bhp|    5| 7.0|
|       370000|   120000|Diesel| Individual|      Manual|Second Owner|21.14 kmpl|1498 CC|103.52 bhp|    5| 7.0|
|       158000|   140000|Petrol| Individual|      Manual| Third Owner| 17.7 kmpl|1497 CC|    78 bhp|    5|15.0|
|       225000|   127000|Diesel| Individual|      Manual| First Owner| 23.0 kmpl|1396 CC|    90 bhp|    5|11.0|
|       130000|   120000|Petrol| Individual|      Manual| First Owner| 16.1 kmpl|1298 CC|  88.2 bhp|    5|14.0|
+-------------+---------+------+-----------+------------+------------+----------+-------+----------+----

### removing units from mileage, engine and max_power columns 

In [206]:
df= df.withColumn("mileage_clean", regexp_extract("mileage", "[+-]?([0-9]*[.])?[0-9]+", 0)).drop('mileage')
df= df.withColumn("engine_clean", regexp_extract("engine", "[+-]?([0-9]*[.])?[0-9]+", 0)).drop('engine')
df= df.withColumn("mpower_clean", regexp_extract("max_power", "[+-]?([0-9]*[.])?[0-9]+", 0)).drop('max_power')
df.show(10)

+-------------+---------+------+-----------+------------+------------+-----+----+-------------+------------+------------+
|selling_price|km_driven|  fuel|seller_type|transmission|       owner|seats| Age|mileage_clean|engine_clean|mpower_clean|
+-------------+---------+------+-----------+------------+------------+-----+----+-------------+------------+------------+
|       450000|   145500|Diesel| Individual|      Manual| First Owner|    5| 7.0|         23.4|        1248|          74|
|       370000|   120000|Diesel| Individual|      Manual|Second Owner|    5| 7.0|        21.14|        1498|      103.52|
|       158000|   140000|Petrol| Individual|      Manual| Third Owner|    5|15.0|         17.7|        1497|          78|
|       225000|   127000|Diesel| Individual|      Manual| First Owner|    5|11.0|         23.0|        1396|          90|
|       130000|   120000|Petrol| Individual|      Manual| First Owner|    5|14.0|         16.1|        1298|        88.2|
|       440000|    45000

### Casting the numerical values (String  to Integer)

In [207]:
df.printSchema()

root
 |-- selling_price: string (nullable = true)
 |-- km_driven: string (nullable = true)
 |-- fuel: string (nullable = true)
 |-- seller_type: string (nullable = true)
 |-- transmission: string (nullable = true)
 |-- owner: string (nullable = true)
 |-- seats: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- mileage_clean: string (nullable = true)
 |-- engine_clean: string (nullable = true)
 |-- mpower_clean: string (nullable = true)



In [208]:
from pyspark.sql.types import DoubleType

numCols=["Age","selling_price","km_driven","seats","mileage_clean","engine_clean",
"mpower_clean"]
for x in numCols:
    df = df.withColumn(x,df[x].cast(DoubleType()))

df.printSchema()


root
 |-- selling_price: double (nullable = true)
 |-- km_driven: double (nullable = true)
 |-- fuel: string (nullable = true)
 |-- seller_type: string (nullable = true)
 |-- transmission: string (nullable = true)
 |-- owner: string (nullable = true)
 |-- seats: double (nullable = true)
 |-- Age: double (nullable = true)
 |-- mileage_clean: double (nullable = true)
 |-- engine_clean: double (nullable = true)
 |-- mpower_clean: double (nullable = true)



### counting null values in the dataFrame

In [209]:
from pyspark.sql.functions import col,sum
df.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in df.columns)).show()

+-------------+---------+----+-----------+------------+-----+-----+---+-------------+------------+------------+
|selling_price|km_driven|fuel|seller_type|transmission|owner|seats|Age|mileage_clean|engine_clean|mpower_clean|
+-------------+---------+----+-----------+------------+-----+-----+---+-------------+------------+------------+
|            0|        0|   0|          0|           0|    0|  221|  0|          221|         221|         216|
+-------------+---------+----+-----------+------------+-----+-----+---+-------------+------------+------------+



### Replacing the null values with corresponding strategy

mileage=mean
engine=mean
max_power=mean

In [210]:
from pyspark.sql.functions import avg

def fill_with_mean(this_df, exclude=set()):
    stats = this_df.agg(*(avg(c).alias(c) for c in this_df.columns if c not in exclude))
    return this_df.na.fill(stats.first().asDict())

df = fill_with_mean(df, ["year", "selling_price", "km_driven", "fuel" , "seller_type" , "transmission","owner","seats"])

df.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in df.columns)).show()

+-------------+---------+----+-----------+------------+-----+-----+---+-------------+------------+------------+
|selling_price|km_driven|fuel|seller_type|transmission|owner|seats|Age|mileage_clean|engine_clean|mpower_clean|
+-------------+---------+----+-----------+------------+-----+-----+---+-------------+------------+------------+
|            0|        0|   0|          0|           0|    0|  221|  0|            0|           0|           0|
+-------------+---------+----+-----------+------------+-----+-----+---+-------------+------------+------------+



### Dealing with seats null values 

In [165]:
df.first()

Row(year=2014, selling_price=450000, km_driven=145500, fuel='Diesel', seller_type='Individual', transmission='Manual', owner='First Owner', seats=5, mileage_clean=23, engine_clean=1248, mpower_clean=74)

### Dealing with categorical variables

Let's identify the unique value for the string columns

In [27]:

df.select('seller_type').distinct().collect()

[Row(seller_type='Individual'),
 Row(seller_type='Dealer'),
 Row(seller_type='Trustmark Dealer')]

In [28]:
df.select('fuel').distinct().collect()

[Row(fuel='Diesel'), Row(fuel='CNG'), Row(fuel='LPG'), Row(fuel='Petrol')]

In [29]:
df.select('transmission').distinct().collect() 

[Row(transmission='Automatic'), Row(transmission='Manual')]

In [160]:
df.select('owner').distinct().collect()

[Row(owner='Third Owner'),
 Row(owner='Fourth & Above Owner'),
 Row(owner='Second Owner'),
 Row(owner='First Owner'),
 Row(owner='Test Drive Car')]

In [164]:
df.select('seats').distinct().collect()

[Row(seats=None),
 Row(seats=6),
 Row(seats=5),
 Row(seats=9),
 Row(seats=4),
 Row(seats=8),
 Row(seats=7),
 Row(seats=10),
 Row(seats=14),
 Row(seats=2)]

In [162]:
df.groupby('seats').agg(func.expr('count(seats)').alias('Distinct_Stores')).show()

NameError: name 'func' is not defined

In [113]:
from pyspark.ml.feature import (OneHotEncoder, StringIndexer)
string_indexer = [ StringIndexer(inputCol = x, outputCol = x + "_StringIndexer", handleInvalid='skip')
                  for x in ["fuel","seller_type","transmission","owner"]]
string_indexer

[StringIndexer_e4e5c1b4e2b0,
 StringIndexer_2e5049f6080f,
 StringIndexer_37a458af9051,
 StringIndexer_b78e8ed689fd]

In [126]:
One_Hot_Encoder = [ 
    OneHotEncoder(
        inputCols=[f"{x}_StringIndexer" for x in ["fuel","seller_type","transmission","owner"]],
        outputCols=[f"{x}_OneHotEncoder" for x in ["fuel","seller_type","transmission","owner"]],
    )
]

In [127]:
One_Hot_Encoder

[OneHotEncoder_0ba8bd00bfa8]

## Vector Assembling

In [112]:
from pyspark.ml.feature import VectorAssembler

In [118]:
catCols=["fuel","seller_type","transmission","owner"]
numCols=["year","selling_price","km_driven","seats","mileage_clean","engine_clean",
"mpower_clean"]

In [121]:
assemblerInput=[x for x in numCols]
assemblerInput+=[f"{x}_OneHotEncoder" for x in catCols]

In [122]:
assemblerInput

['year',
 'selling_price',
 'km_driven',
 'seats',
 'mileage_clean',
 'engine_clean',
 'mpower_clean',
 'fuel_OneHotEncoder',
 'seller_type_OneHotEncoder',
 'transmission_OneHotEncoder',
 'owner_OneHotEncoder']

In [128]:
vector_assembler = VectorAssembler(inputCols = assemblerInput, outputCol="VectorAssembler_features")

In [131]:
stages  = []
stages += string_indexer
stages += One_Hot_Encoder
stages += [vector_assembler]

In [132]:
stages

[StringIndexer_e4e5c1b4e2b0,
 StringIndexer_2e5049f6080f,
 StringIndexer_37a458af9051,
 StringIndexer_b78e8ed689fd,
 OneHotEncoder_0ba8bd00bfa8,
 VectorAssembler_34a890c8d233]

In [133]:
%%time
from pyspark.ml import Pipeline

pipeline = Pipeline().setStages(stages)
model = pipeline.fit(df)



Wall time: 4.85 s


In [134]:
model

PipelineModel_a76ac71b14d1