d 
In this notebook, we will apply Linear Regression to predict the price of a used car in Germany .

Dataset: https://www.kaggle.com/orgesleka/used-cars-database

In [2]:
# File location and type
file_location = "/FileStore/tables/autos.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(df)

dateCrawled,name,seller,offerType,price,abtest,vehicleType,yearOfRegistration,gearbox,powerPS,model,kilometer,monthOfRegistration,fuelType,brand,notRepairedDamage,dateCreated,nrOfPictures,postalCode,lastSeen
2016-03-24 11:52:17,Golf_3_1.6,privat,Angebot,480,test,,1993,manuell,0,golf,150000,0,benzin,volkswagen,,2016-03-24 00:00:00,0,70435,2016-04-07 03:16:57
2016-03-24 10:58:45,A5_Sportback_2.7_Tdi,privat,Angebot,18300,test,coupe,2011,manuell,190,,125000,5,diesel,audi,ja,2016-03-24 00:00:00,0,66954,2016-04-07 01:46:50
2016-03-14 12:52:21,"Jeep_Grand_Cherokee_""Overland""",privat,Angebot,9800,test,suv,2004,automatik,163,grand,125000,8,diesel,jeep,,2016-03-14 00:00:00,0,90480,2016-04-05 12:47:46
2016-03-17 16:54:04,GOLF_4_1_4__3T�RER,privat,Angebot,1500,test,kleinwagen,2001,manuell,75,golf,150000,6,benzin,volkswagen,nein,2016-03-17 00:00:00,0,91074,2016-03-17 17:40:17
2016-03-31 17:25:20,Skoda_Fabia_1.4_TDI_PD_Classic,privat,Angebot,3600,test,kleinwagen,2008,manuell,69,fabia,90000,7,diesel,skoda,nein,2016-03-31 00:00:00,0,60437,2016-04-06 10:17:21
2016-04-04 17:36:23,BMW_316i___e36_Limousine___Bastlerfahrzeug__Export,privat,Angebot,650,test,limousine,1995,manuell,102,3er,150000,10,benzin,bmw,ja,2016-04-04 00:00:00,0,33775,2016-04-06 19:17:07
2016-04-01 20:48:51,Peugeot_206_CC_110_Platinum,privat,Angebot,2200,test,cabrio,2004,manuell,109,2_reihe,150000,8,benzin,peugeot,nein,2016-04-01 00:00:00,0,67112,2016-04-05 18:18:39
2016-03-21 18:54:38,VW_Derby_Bj_80__Scheunenfund,privat,Angebot,0,test,limousine,1980,manuell,50,andere,40000,7,benzin,volkswagen,nein,2016-03-21 00:00:00,0,19348,2016-03-25 16:47:58
2016-04-04 23:42:13,Ford_C___Max_Titanium_1_0_L_EcoBoost,privat,Angebot,14500,control,bus,2014,manuell,125,c_max,30000,8,benzin,ford,,2016-04-04 00:00:00,0,94505,2016-04-04 23:42:13
2016-03-17 10:53:50,VW_Golf_4_5_tuerig_zu_verkaufen_mit_Anhaengerkupplung,privat,Angebot,999,test,kleinwagen,1998,manuell,101,golf,150000,0,,volkswagen,,2016-03-17 00:00:00,0,27472,2016-03-31 17:17:06


In [3]:
df.printSchema()

In [4]:
df.groupby('notRepairedDamage').count().show()

In [5]:
df.groupby('abtest').count().show()

Both Control and Test are popular so we will keep the 'abtest' column.

In [7]:
(df.count() , len(df.columns))

In this dataframe we have 371824 lines and 20 columns

In [9]:
#Explore datatypes

In [10]:
from collections import Counter 
print(Counter(x[1] for x in df.dtypes))

In [11]:
df.columns

In [12]:
import pyspark.sql.functions as f

# null values in each column
data_agg = df.agg(*[f.count(f.when(f.isnull(c), c)).alias(c) for c in df.columns])
data_agg.toPandas().T

Unnamed: 0,0
dateCrawled,0
name,0
seller,1
offerType,1
price,1
abtest,1
vehicleType,37900
yearOfRegistration,1
gearbox,20224
powerPS,1


In [13]:
df=df.dropna()

Changing Data Types

In [15]:
# casting price target variable to double
new_df = df.withColumn("price", df["price"].cast("double"))
#

In [16]:
new_df = df.withColumn("price", df["price"].cast("double"))
new_df = new_df.withColumn("kilometer",df['kilometer'].cast("double"))
new_df = new_df.withColumn("nrOfPictures",df['nrOfPictures'].cast("double"))
new_df = new_df.withColumn("powerPS",df['powerPS'].cast("int"))

In [17]:
#describe the numeric variables:
new_df.select("price","kilometer","nrOfpictures","powerPS").describe().show()

Interpretations:

In the price case we have a hight standard deviation comparing to mean , in terms of distribution it indicates that the data are more spread out. 
This is also confirmed by the fact that the minimum value is 0 and the maximum one is 99999999 .
We can remove this range of values by using the quantile.
The number of pictures take only 0 variables so we will remove this column .

In [19]:
new_df=new_df.drop('nrOfpictures')

In [20]:
new_df.dtypes

In [21]:
from pyspark.sql.functions import col
new_df.groupby('price').count().sort(col("price")).show()

In [22]:
display(new_df.select('price'))

price
1500.0
3600.0
650.0
2200.0
0.0
2000.0
2799.0
17999.0
1750.0
7550.0


In [23]:
quantile_price = new_df.approxQuantile(['price'], [0.05,0.25, 0.5, 0.75,0.95], 0)
quantile_power=  new_df.approxQuantile(['powerPS'], [0.05,0.25, 0.5, 0.75,0.95], 0)

#price quantile
quantile_05 = quantile_price[0][0]
quantile_25 = quantile_price[0][1]
quantile_50 = quantile_price[0][2]
quantile_75 = quantile_price[0][3]
quantile_95 = quantile_price[0][4]

print('quantile_05: '+str(quantile_05))
print('quantile_25: '+str(quantile_25))
print('quantile_50: '+str(quantile_50))
print('quantile_75: '+str(quantile_75))
print('quantile_95: '+str(quantile_95))

#power quantile
quantile_1 = quantile_power[0][0]
quantile_2 = quantile_power[0][4]

print('quantile_05: '+str(quantile_05))
print('quantile_25: '+str(quantile_25))
print('quantile_50: '+str(quantile_50))
print('quantile_75: '+str(quantile_75))
print('quantile_95: '+str(quantile_95))
print('quantile_1: '+str(quantile_1))
print('quantile_2: '+str(quantile_2))



we are going to remove extremely hight prices ( that exceed quantile_95) and very low prices (below quantile_05)

In [25]:
import pyspark.sql.functions as f 
h=new_df.filter((f.col("price")>quantile_05)  & (f.col("price")<quantile_95) & (f.col("powerPS")>quantile_1) & (f.col("powerPS")<quantile_2) )

In [26]:
h.count()

In [27]:
display(h.select('powerPS'))

powerPS
75
69
102
109
105
140
190
75
136
102


In [28]:
display(h.select('price'))

price
1500.0
3600.0
650.0
2200.0
2000.0
2799.0
17999.0
1750.0
7550.0
1850.0


The features we are going to learn are :
- Categorical : 
 - brand
 - fuelType
 - abtest
 - notRepairedDamage
- Numeric:
 - kilometer

# Data Preprocessing

Encode String features

Basic Steps :
- StringIndexer
- OneHotEncoderEstimator
- VectorAssembler

In [33]:
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator
from pyspark.ml.feature import VectorAssembler

In [34]:
stage_1 = StringIndexer(inputCol= 'brand', outputCol= 'brand_index')
# define stage 2: transform the column feature_2 to numeric
stage_2 = StringIndexer(inputCol= 'fuelType', outputCol= 'fuelType_index')
# define stage 3: transform the column feature_3 to numeric
stage_3 = StringIndexer(inputCol= 'notRepairedDamage', outputCol= 'notRepairedDamage_index')
# define stage 3: transform the column feature_ to numeric
stage_4= StringIndexer(inputCol= 'abtest', outputCol= 'abtest_index')
# define stage 5: one hot encode the numeric versions of feature 2 and 3 generated from stage 1 and stage 2
stage_5= OneHotEncoderEstimator(inputCols=[stage_1.getOutputCol(),stage_2.getOutputCol(),stage_3.getOutputCol(),stage_4.getOutputCol()], 
                                 outputCols= ['brand_encoded','fuelType_encoded','notRepairedDamage_encoded','abtest_encoded'])
#build a vector that involves all the features
stage_6 =VectorAssembler(inputCols=['kilometer','powerPS']+stage_5.getOutputCols(),outputCol='features')


In [35]:
# setup the pipeline
from pyspark.ml import Pipeline
my_pipeline = Pipeline(stages= [stage_1, stage_2, stage_3, stage_4,stage_5,stage_6])

In [36]:
fitted_model=my_pipeline.fit(h)
transformed_model=fitted_model.transform(h)


In [37]:
data=transformed_model.select('price','features')

Training data

In [39]:
train,test = data.randomSplit([0.7,0.3],seed=12345)

In [40]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'features', labelCol='price', maxIter=50, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

In [41]:
trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

R squared at 0.58 indicates that in our model, approximate 58% of the variability in 'price' can be explained using the model.

In [43]:
lr_predictions = lr_model.transform(test)
lr_predictions.select("prediction","price","features").show(20)
from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="price",metricName="r2")



In [44]:
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))