# 0 - Load the modules

In [154]:
import org.apache.spark.sql.SparkSession
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, StringIndexerModel, VectorAssembler, VectorIndexer}
import org.apache.spark.ml.Pipeline

import org.apache.spark.ml.regression.DecisionTreeRegressionModel
import org.apache.spark.ml.regression.DecisionTreeRegressor
import org.apache.spark.ml.evaluation.RegressionEvaluator

import org.apache.spark.sql.SparkSession
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, StringIndexerModel, VectorAssembler, VectorIndexer}
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.regression.DecisionTreeRegressionModel
import org.apache.spark.ml.regression.DecisionTreeRegressor
import org.apache.spark.ml.evaluation.RegressionEvaluator


# 1 - Start a Spark session

In [9]:
val spark = SparkSession
.builder
.appName("auto")
.config("spark.master", "local")
.getOrCreate()

import spark.implicits._

spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@638b8cfe
import spark.implicits._


# 2 - Load Data

In [10]:
val workingDir = "data/"
val df = spark.read.format("csv")
.option("header","true")
.option("inferSchema","true")
.option("timestampFormat", "yyyy-MM-dd HH:mm:ss")
.load("data/autos.csv")
df.show(5)
df.printSchema

+-------------------+--------------------+------+---------+-----+------+-----------+------------------+---------+-------+-----+---------+-------------------+--------+----------+-----------------+-------------------+------------+----------+-------------------+
|        dateCrawled|                name|seller|offerType|price|abtest|vehicleType|yearOfRegistration|  gearbox|powerPS|model|kilometer|monthOfRegistration|fuelType|     brand|notRepairedDamage|        dateCreated|nrOfPictures|postalCode|           lastSeen|
+-------------------+--------------------+------+---------+-----+------+-----------+------------------+---------+-------+-----+---------+-------------------+--------+----------+-----------------+-------------------+------------+----------+-------------------+
|2016-03-24 11:52:17|          Golf_3_1.6|privat|  Angebot|  480|  test|       null|              1993|  manuell|      0| golf|   150000|                  0|  benzin|volkswagen|             null|2016-03-24 00:00:00|     

workingDir: String = data/
df: org.apache.spark.sql.DataFrame = [dateCrawled: timestamp, name: string ... 18 more fields]


# 3 - Data exploration and preprocessing

## 3.1 - Null ratio for each column

In [11]:
val nbRows = df.count()

val nullAttsStats = df.columns.map(a => (a, df.where(col(a).isNull===true)
                              .count/nbRows.toFloat)).toMap

nbRows: Long = 371824
nullAttsStats: scala.collection.immutable.Map[String,Float] = Map(lastSeen -> 2.6894445E-6, name -> 0.0, yearOfRegistration -> 2.6894445E-6, model -> 0.05513092, abtest -> 2.6894445E-6, powerPS -> 2.6894445E-6, fuelType -> 0.089870475, notRepairedDamage -> 0.1939735, price -> 2.6894445E-6, vehicleType -> 0.10192995, dateCrawled -> 0.0, offerType -> 2.6894445E-6, gearbox -> 0.054391325, monthOfRegistration -> 2.6894445E-6, brand -> 2.6894445E-6, postalCode -> 2.6894445E-6, dateCreated -> 2.6894445E-6, nrOfPictures -> 2.6894445E-6, kilometer -> 2.6894445E-6, seller -> 2.6894445E-6)


In [12]:
nullAttsStats.filter{case(att,ratio)=> ratio>0.01}
             .toSeq
             .sortWith(_._2<_._2)
             .map{case(att,ratio) =>"Ratio of nulls in "+ att +": "+ ratio*100 + "%"}
             .foreach(println)

Ratio of nulls in gearbox: 5.4391327%
Ratio of nulls in model: 5.513092%
Ratio of nulls in fuelType: 8.987047%
Ratio of nulls in vehicleType: 10.192995%
Ratio of nulls in notRepairedDamage: 19.39735%


## 3.2 - Distinct values for each column

In [13]:
val attDistValsList = df.columns.map(att=> (att, df.select(att)
                                .distinct().count()))
                                .toMap

attDistValsList: scala.collection.immutable.Map[String,Long] = Map(lastSeen -> 182905, name -> 233698, yearOfRegistration -> 156, model -> 252, abtest -> 3, powerPS -> 795, fuelType -> 8, notRepairedDamage -> 3, price -> 5598, vehicleType -> 9, dateCrawled -> 280652, offerType -> 3, gearbox -> 3, monthOfRegistration -> 14, brand -> 41, postalCode -> 8152, dateCreated -> 115, nrOfPictures -> 2, kilometer -> 14, seller -> 3)


In [14]:
attDistValsList.toSeq
               .sortWith(_._2<_._2)
               .foreach{case(att,countDist)=>println(s"column $att has $countDist distinct values")}

column nrOfPictures has 2 distinct values
column abtest has 3 distinct values
column notRepairedDamage has 3 distinct values
column offerType has 3 distinct values
column gearbox has 3 distinct values
column seller has 3 distinct values
column fuelType has 8 distinct values
column vehicleType has 9 distinct values
column monthOfRegistration has 14 distinct values
column kilometer has 14 distinct values
column brand has 41 distinct values
column dateCreated has 115 distinct values
column yearOfRegistration has 156 distinct values
column model has 252 distinct values
column powerPS has 795 distinct values
column price has 5598 distinct values
column postalCode has 8152 distinct values
column lastSeen has 182905 distinct values
column name has 233698 distinct values
column dateCrawled has 280652 distinct values


## 3.3 - Remove outliers on target value

In [74]:
df.select("price").summary().show()

+-------+------------------+
|summary|             price|
+-------+------------------+
|  count|            371823|
|   mean|17286.338865535483|
| stddev|3586530.1840677853|
|    min|                 0|
|    25%|              1150|
|    50%|              2950|
|    75%|              7200|
|    max|        2147483647|
+-------+------------------+



In [129]:
val c75 = df.stat.approxQuantile("price", Array(0.75), 0.01)(0)
val c25 = df.stat.approxQuantile("price", Array(0.25), 0.01)(0)
val c5 = df.stat.approxQuantile("price", Array(0.05), 0.01)(0)

val IR = (c75-c25)*1.5

val df_bis = df.filter($"price" >= c5 and $"price" <= c75+IR)
               .withColumn("monthOfRegistration", col("monthOfRegistration").cast("string"))

c75: Double = 6999.0
c25: Double = 1150.0
c5: Double = 200.0
IR: Double = 8773.5
df_bis: org.apache.spark.sql.DataFrame = [dateCrawled: timestamp, name: string ... 18 more fields]


In [130]:
df_bis.select("price").summary().show()

+-------+------------------+
|summary|             price|
+-------+------------------+
|  count|            324157|
|   mean| 4116.705710504478|
| stddev|3735.9490031849205|
|    min|               200|
|    25%|              1200|
|    50%|              2800|
|    75%|              5999|
|    max|             15755|
+-------+------------------+



In [131]:
// ##### display histogram
df_bis.select("price").where("price is not null").rdd.map(x=>x.getInt(0)).histogram(10)

res50: (Array[Double], Array[Long]) = (Array(200.0, 1755.5, 3311.0, 4866.5, 6422.0, 7977.5, 9533.0, 11088.5, 12644.0, 14199.5, 15755.0),Array(116150, 65393, 39205, 28052, 22021, 17412, 12473, 8639, 8247, 6565))


## 3.4 Remove some attributes

### Attribute with < 8 disctint values

In [132]:
val rmv_1 = attDistValsList.filter{case(att,countDist)=> countDist<8}.map{case(att,countDist)=>att}.toSeq

rmv_1: Seq[String] = List(abtest, notRepairedDamage, offerType, gearbox, nrOfPictures, seller)


### Time stamp attributes

In [133]:
// timestamp attributes
val rmv_2 = List("dateCrawled","dateCreated", "lastSeen","name","model")

rmv_2: List[String] = List(dateCrawled, dateCreated, lastSeen, name, model)


In [134]:
val attToRemove =  rmv_1 ++ rmv_2 ++ List("postalCode")
val data = df_bis.drop(attToRemove : _*).na.drop()

attToRemove: Seq[String] = List(abtest, notRepairedDamage, offerType, gearbox, nrOfPictures, seller, dateCrawled, dateCreated, lastSeen, name, model, postalCode)
data: org.apache.spark.sql.DataFrame = [price: int, vehicleType: string ... 6 more fields]


# 4 - Final Dataset

In [135]:
data.show(5)
data.printSchema

+-----+-----------+------------------+-------+---------+-------------------+--------+----------+
|price|vehicleType|yearOfRegistration|powerPS|kilometer|monthOfRegistration|fuelType|     brand|
+-----+-----------+------------------+-------+---------+-------------------+--------+----------+
| 9800|        suv|              2004|    163|   125000|                  8|  diesel|      jeep|
| 1500| kleinwagen|              2001|     75|   150000|                  6|  benzin|volkswagen|
| 3600| kleinwagen|              2008|     69|    90000|                  7|  diesel|     skoda|
|  650|  limousine|              1995|    102|   150000|                 10|  benzin|       bmw|
| 2200|     cabrio|              2004|    109|   150000|                  8|  benzin|   peugeot|
+-----+-----------+------------------+-------+---------+-------------------+--------+----------+
only showing top 5 rows



In [137]:
data.count()

res53: Long = 279516


# 5 - Features indexation, Target indexation, Training/Test Split

## 5.1 Target indexation

In [138]:

val label = "price"
val labelIndexer = new StringIndexer()
    .setInputCol(label)
    .setOutputCol("indexed_" + label) 

label: String = price
labelIndexer: org.apache.spark.ml.feature.StringIndexer = strIdx_4e8982b772d7


## 5.2 - Features String encoding

In [139]:
val attributes= data.columns.filterNot(_.contains(label))
val catFeatIndexer= attributes.map{
    att => 
    new StringIndexer()
    .setInputCol(att)
    .setOutputCol("indexed_" + att)   
} 


attributes: Array[String] = Array(vehicleType, yearOfRegistration, powerPS, kilometer, monthOfRegistration, fuelType, brand)
catFeatIndexer: Array[org.apache.spark.ml.feature.StringIndexer] = Array(strIdx_832fc318fbf3, strIdx_5bedcec6d6e7, strIdx_561bb6722621, strIdx_b5cc0e680908, strIdx_eba1a5f203b5, strIdx_cfbea2803dbc, strIdx_4c3c76ceb65b)


## 5.3 - Fit the string indexer to the data and extract the labels

In [140]:
val indcatFeatIndexer = catFeatIndexer.map(x=>x.getOutputCol).zipWithIndex

indcatFeatIndexer: Array[(String, Int)] = Array((indexed_vehicleType,0), (indexed_yearOfRegistration,1), (indexed_powerPS,2), (indexed_kilometer,3), (indexed_monthOfRegistration,4), (indexed_fuelType,5), (indexed_brand,6))


## 5.4 - Assemble the indexed features

In [141]:
val features = catFeatIndexer.map(_.getOutputCol)

val vectorAssemb = new VectorAssembler()
.setInputCols(features)
.setOutputCol("assembled")

features: Array[String] = Array(indexed_vehicleType, indexed_yearOfRegistration, indexed_powerPS, indexed_kilometer, indexed_monthOfRegistration, indexed_fuelType, indexed_brand)
vectorAssemb: org.apache.spark.ml.feature.VectorAssembler = VectorAssembler: uid=vecAssembler_0e9ca661a809, handleInvalid=error, numInputCols=7


## 5.5 - Index the vectors

In [142]:
val maxCat = 800
val vecIndexer = new VectorIndexer()
.setInputCol(vectorAssemb.getOutputCol)
.setOutputCol("features")
.setMaxCategories(maxCat)


maxCat: Int = 800
vecIndexer: org.apache.spark.ml.feature.VectorIndexer = vecIdx_b9a54d5e493c


# 6 - Build and fit the pipeline on data

In [143]:
val pipeline = new Pipeline()
.setStages(Array(labelIndexer)++catFeatIndexer++Array(vectorAssemb,vecIndexer))

pipeline: org.apache.spark.ml.Pipeline = pipeline_122f799c50c7


In [144]:
import org.apache.spark.sql.types.DoubleType
val ftdata = pipeline.fit(data).transform(data)
.withColumn("label",col(label).cast(DoubleType))
.select("features","label")


import org.apache.spark.sql.types.DoubleType
ftdata: org.apache.spark.sql.DataFrame = [features: vector, label: double]


## Check final dataset

In [145]:
ftdata.printSchema
ftdata.show()

root
 |-- features: vector (nullable = true)
 |-- label: double (nullable = true)

+--------------------+-------+
|            features|  label|
+--------------------+-------+
|[6.0,3.0,13.0,1.0...| 9800.0|
|[1.0,5.0,1.0,0.0,...| 1500.0|
|[1.0,10.0,22.0,3....| 3600.0|
|[0.0,16.0,12.0,0....|  650.0|
|[4.0,3.0,17.0,0.0...| 2200.0|
|[3.0,24.0,10.0,9....|14500.0|
|[0.0,3.0,8.0,0.0,...| 2000.0|
|[2.0,2.0,5.0,0.0,...| 2799.0|
|[2.0,16.0,29.0,0....|  999.0|
|[1.0,69.0,0.0,10....|  450.0|
|[1.0,3.0,1.0,0.0,...| 1750.0|
|[3.0,8.0,11.0,0.0...| 7550.0|
|[3.0,3.0,12.0,0.0...| 1850.0|
|[5.0,11.0,34.0,2....|10400.0|
|[0.0,7.0,38.0,0.0...| 3699.0|
|[1.0,12.0,20.0,0....|  450.0|
|[0.0,23.0,49.0,0....|  500.0|
|[2.0,7.0,30.0,0.0...| 2500.0|
|[0.0,10.0,77.0,6....| 6900.0|
|[3.0,36.0,20.0,10...| 1990.0|
+--------------------+-------+
only showing top 20 rows



# 7 - Train/Test Split

In [146]:
val Array(trainingData,testData)= ftdata.randomSplit(Array(0.7,0.3))

trainingData: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [features: vector, label: double]
testData: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [features: vector, label: double]


# 8 - Decision Tree : build, train, evaluate

In [147]:
// ####### build the model
val dt = new DecisionTreeRegressor()
.setLabelCol("label")
.setFeaturesCol("features")
.setMaxBins(maxCat)
.setMinInstancesPerNode(100)

// ####### train the model
val model = dt.fit(trainingData)


import org.apache.spark.ml.regression.DecisionTreeRegressionModel
import org.apache.spark.ml.regression.DecisionTreeRegressor
import org.apache.spark.ml.evaluation.RegressionEvaluator
dt: org.apache.spark.ml.regression.DecisionTreeRegressor = dtr_b0a23103d84b
model: org.apache.spark.ml.regression.DecisionTreeRegressionModel = DecisionTreeRegressionModel: uid=dtr_b0a23103d84b, depth=5, numNodes=63, numFeatures=7


In [148]:
val treeModel = model.asInstanceOf[DecisionTreeRegressionModel]
println(s"Learned classification tree model :\n ${treeModel.toDebugString}")

Learned classification tree model :
 DecisionTreeRegressionModel: uid=dtr_b0a23103d84b, depth=5, numNodes=63, numFeatures=7
  If (feature 1 in {0.0,1.0,3.0,5.0,6.0,7.0,9.0,12.0,13.0,16.0,18.0,19.0,21.0,22.0,23.0,25.0,26.0,28.0,29.0,30.0,32.0,69.0,71.0,75.0,81.0,83.0,84.0,87.0})
   If (feature 2 in {0.0,1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,10.0,11.0,12.0,14.0,15.0,17.0,18.0,19.0,20.0,21.0,22.0,23.0,24.0,26.0,27.0,28.0,29.0,30.0,31.0,32.0,36.0,40.0,42.0,43.0,44.0,46.0,47.0,48.0,49.0,50.0,55.0,56.0,57.0,58.0,59.0,60.0,61.0,62.0,63.0,64.0,65.0,66.0,67.0,68.0,69.0,70.0,71.0,72.0,76.0,77.0,79.0,82.0,83.0,84.0,85.0,88.0,89.0,90.0,91.0,92.0,93.0,95.0,96.0,98.0,99.0,100.0,101.0,103.0,104.0,105.0,106.0,107.0,108.0,109.0,113.0,115.0,116.0,118.0,120.0,121.0,124.0,125.0,126.0,128.0,130.0,131.0,136.0,137.0,140.0,141.0,143.0,144.0,145.0,146.0,147.0,149.0,150.0,153.0,156.0,158.0,159.0,160.0,162.0,164.0,166.0,167.0,170.0,172.0,173.0,176.0,179.0,182.0,184.0,188.0,190.0,191.0,194.0,198.0,200.0,201.0,202.0,204

     If (feature 2 in {3.0,5.0,6.0,8.0,10.0,11.0,12.0,19.0,24.0,28.0,41.0,43.0,47.0,55.0,57.0,63.0,70.0,77.0,80.0,81.0,84.0,91.0,109.0,114.0,116.0,121.0,134.0,135.0,139.0,142.0,151.0,157.0,163.0,167.0,171.0,172.0,175.0,176.0,181.0,182.0,184.0,185.0,190.0,191.0,200.0,202.0,209.0,214.0,222.0,237.0,239.0,242.0,258.0,263.0,268.0,272.0,280.0,284.0,289.0,294.0,306.0,313.0,319.0,324.0,341.0,353.0,358.0,452.0,468.0,506.0,577.0})
      If (feature 1 in {10.0,11.0,48.0,51.0,55.0})
       Predict: 8016.618432239415
      Else (feature 1 not in {10.0,11.0,48.0,51.0,55.0})
       Predict: 10830.611919989115
     Else (feature 2 not in {3.0,5.0,6.0,8.0,10.0,11.0,12.0,19.0,24.0,28.0,41.0,43.0,47.0,55.0,57.0,63.0,70.0,77.0,80.0,81.0,84.0,91.0,109.0,114.0,116.0,121.0,134.0,135.0,139.0,142.0,151.0,157.0,163.0,167.0,171.0,172.0,175.0,176.0,181.0,182.0,184.0,185.0,190.0,191.0,200.0,202.0,209.0,214.0,222.0,237.0,239.0,242.0,258.0,263.0,268.0,272.0,280.0,284.0,289.0,294.0,306.0,313.0,319.0,324.0,341.0,353.0

treeModel: org.apache.spark.ml.regression.DecisionTreeRegressionModel = DecisionTreeRegressionModel: uid=dtr_b0a23103d84b, depth=5, numNodes=63, numFeatures=7


## Model evaluation on Test Data 

In [150]:
// ####### make predictions

val predictions = model.transform(testData)

// ####### Select (prediction, true label) and compute error

val evaluator = new RegressionEvaluator()
.setLabelCol("label")
.setPredictionCol("prediction")

// ####### get RMSE
evaluator.setMetricName("rmse")
val mse = evaluator.evaluate(predictions)
println(s"Root Mean Squared Error (MSE) on test data = $mse")

// ####### get RMSE
evaluator.setMetricName("mae")
val mae = evaluator.evaluate(predictions)
println(s"Root Mean Squared Error (MAE) on test data = $mae")

Root Mean Squared Error (MSE) on test data = 1984.7886254687126
Root Mean Squared Error (MAE) on test data = 1387.3779224349873


evaluator: org.apache.spark.ml.evaluation.RegressionEvaluator = RegressionEvaluator: uid=regEval_ab624d830fe7, metricName=mae, throughOrigin=false
mse: Double = 1984.7886254687126
mae: Double = 1387.3779224349873


# Features importance

In [153]:
val featureImportances = model.featureImportances
val res = attributes.zip(featureImportances.toArray).sortBy(-_._2).foreach(println)

(yearOfRegistration,0.686443082572499)
(powerPS,0.27985940288872335)
(brand,0.018680900227790103)
(vehicleType,0.0150166143109876)
(kilometer,0.0)
(monthOfRegistration,0.0)
(fuelType,0.0)


featureImportances: org.apache.spark.ml.linalg.Vector = (7,[0,1,2,6],[0.0150166143109876,0.686443082572499,0.27985940288872335,0.018680900227790103])
