# Machine Learning in PySpark (Spark MLlib)
In this notebook we use the same data set used in the data processing notebook; this time our goal is to build a prediction model for rental price in Lausanne. 

## Import libraries

In [1]:
from pyspark.sql import SparkSession
import pandas as pd
import re
from pyspark.sql.functions import col
import pyspark.sql.functions as F

In [2]:
spark = SparkSession.builder.getOrCreate()

## Import Data

In [3]:
df = spark.read.csv('../../data/lausanne_rental.csv', header = True)

## Preprocessing

### Insight

**Initial peek**

In [8]:
pd.DataFrame(df.head(5), columns=df.columns)

Unnamed: 0,Id,SurfaceArea,NumRooms,Type,Address,Description,Rent,Bookmark,Link
0,5286965.0,52.0,2.5,flat,"Route Aloys-Fauquez 124, 1018 Lausanne, VD",Agréable 2 pièces proche de toutes les commodités,1460.0,New,/en/d/flat-rent-lausanne/5286965?s=2&t=1&l=202...
1,5277530.0,80.0,3.0,flat,"Avenue de Béthusy 53, 1012 Lausanne, VD",Proche du centre ville,2100.0,,/en/d/flat-rent-lausanne/5277530?s=2&t=1&l=202...
2,5249019.0,74.0,3.5,stepped-apartment,"Route des Plaines-du-Loup 42, 1018 Lausanne, VD","Magnifique 3,5 pièces avec jardin privé -Immeu...",2150.0,Highlight,/en/d/stepped-apartment-rent-lausanne/5249019?...
3,5274375.0,108.0,3.5,duplex-maisonette,"Route de Prilly 12, 1004 Lausanne, VD",Magnifique logement en duplex entièrement neuf,3160.0,,/en/d/duplex-maisonette-rent-lausanne/5274375?...
4,5274374.0,80.0,3.5,flat,"Route de Prilly 12, 1004 Lausanne, VD",Nouveaux logements,2085.0,,/en/d/flat-rent-lausanne/5274374?s=2&t=1&l=202...


**Size**

In [4]:
df.count()

594

**Feature size**

In [11]:
len(df.columns)

9

Removing `id` and `link` the feature size is **7**. 

**Summary Statistics**

In [15]:
df.select(['SurfaceArea', 'NumRooms', 'Rent']).describe().toPandas()

Unnamed: 0,summary,SurfaceArea,NumRooms,Rent
0,count,533.0,491.0,588
1,mean,74.62851782363977,3.230142566191446,1984.3287904599658
2,stddev,44.51662793438704,1.21721821532338,989.8851120494071
3,min,1.0,1.5,"VD"""
4,max,99.0,8.5,990.0


Note that strangely, the maximum Rent is reported to be **990.0**, while the average is around **1984.30**! We fix this when we change data types below.

**Data types**

In [16]:
df.dtypes

[('Id', 'string'),
 ('SurfaceArea', 'string'),
 ('NumRooms', 'string'),
 ('Type', 'string'),
 ('Address', 'string'),
 ('Description', 'string'),
 ('Rent', 'string'),
 ('Bookmark', 'string'),
 ('Link', 'string')]

Well, PySpark is not as smart as Pandas in getting data types! 

### Transformations, modifications

**Converting data types** (A)

In [4]:
df = df.withColumn('SurfaceArea', df['SurfaceArea'].astype('double'))
df = df.withColumn('NumRooms', df['NumRooms'].astype('int'))
df = df.withColumn('Rent', df['Rent'].astype('double'))

**Summary Statistics again**

In [30]:
df.select(['SurfaceArea', 'NumRooms', 'Rent']).describe().toPandas()

Unnamed: 0,summary,SurfaceArea,NumRooms,Rent
0,count,533.0,491.0,587.0
1,mean,74.62851782363977,2.920570264765784,1984.3287904599656
2,stddev,44.51662793438704,1.183131645448193,989.8851120494072
3,min,1.0,1.0,300.0
4,max,540.0,11.0,8900.0


Just out of curosity, can you have a look at the apartment(s) with the highes Rent?

In [33]:
df[df['Rent'] == 8900.00].show()

+---------+-----------+--------+----+--------------------+--------------------+------+--------+--------------------+
|       Id|SurfaceArea|NumRooms|Type|             Address|         Description|  Rent|Bookmark|                Link|
+---------+-----------+--------+----+--------------------+--------------------+------+--------+--------------------+
|4984514.0|      423.0|      11|flat|Chemin de Praz-Bu...|En bordure de Gol...|8900.0|    null|/en/d/flat-rent-l...|
|5225916.0|      540.0|      11|flat|   1000 Lausanne, VD|Duplex d'exceptio...|8900.0|    null|/en/d/flat-rent-l...|
+---------+-----------+--------+----+--------------------+--------------------+------+--------+--------------------+



**Dealing with the outliers**

In [42]:
def get_whiskers(df, colname, WHIS):
    q1 = df.approxQuantile(colname, [0.25], 0.01)[0]
    q3 = df.approxQuantile(colname, [0.75], 0.01)[0]
    iqn = q3 - q1
    return [q1 - WHIS*iqn, q3 + WHIS*iqn]

`SurfaceArea`

In [43]:
get_whiskers(df, 'SurfaceArea', 1.5)

[-22.5, 165.5]

`NumRooms`

In [44]:
get_whiskers(df, 'NumRooms', 1.5)

[0.5, 4.5]

`Rent`

In [45]:
get_whiskers(df, 'Rent', 1.5)

[-285.0, 4075.0]

`RentPerArea` (A)

In [5]:
df = df.withColumn('RentPerArea', df['Rent']/df['SurfaceArea'])

In [58]:
get_whiskers(dftmp, 'RentPerArea', 1.5)

[13.451926414439434, 43.649774383894474]

In [60]:
out = df[df['RentPerArea'] < 13.45]

In [55]:
pd.DataFrame(out.head(20), columns=df.columns)

Unnamed: 0,Id,SurfaceArea,NumRooms,Type,Address,Description,Rent,Link,RentPerArea
0,5198531.0,129.0,,flat,"1010 Lausanne, VD",Appartements de 1 à 4.5 pièces avec vues panor...,1190.0,/en/d/flat-rent-lausanne/5198531?s=2&t=1&l=202...,9.224806
1,5242058.0,204.0,3.0,flat,"Chemin de Pierrefleur 3, 1004 Lausanne, VD",Magnifique logement n° 204 de 3.5 pièces avec ...,2510.0,/en/d/flat-rent-lausanne/5242058?s=2&t=1&l=202...,12.303922
2,5285252.0,80.0,4.0,flat,"1007 Lausanne, VD",Sous location noël et ou nouvel an,400.0,/en/d/flat-rent-lausanne/5285252?s=2&t=1&l=202...,5.0
3,5254423.0,50.0,2.0,flat,"1004 Lausanne, VDClose### CHF 500.—Favourite##...",2 piece in Lausanne until 11th JanuaryA louer ...,500.0,/en/d/flat-rent-lausanne/5254423?s=2&t=1&l=202...,10.0
4,5284570.0,50.0,,flat,"Chemin de Longeraie 9, 1006 Lausanne, VDClose#...",Appartement meublé en plein LausanneJolie cham...,500.0,/en/d/flat-rent-lausanne/5284570?s=2&t=1&l=202...,10.0
5,5175721.0,70.0,3.0,single-room,"Village 8, 1012 Lausanne, VD",Chambre à louer dans 3 pcs à Lausanne 12 min d...,700.0,/en/d/single-room-rent-lausanne/5175721?s=2&t=...,10.0
6,5283201.0,70.0,3.0,single-room,"Trabandan, 1006 Lausanne, VD","Lausanne, jolie chambre à louer dans appart 3.5",770.0,/en/d/single-room-rent-lausanne/5283201?s=2&t=...,11.0
7,5269223.0,60.0,3.0,single-room,"1010 Lausanne, VD","Chambre dans appartement entre le Chuv, la Sal...",790.0,/en/d/single-room-rent-lausanne/5269223?s=2&t=...,13.166667
8,4987213.0,100.0,5.0,single-room,"1007 Lausanne, VDClose### CHF 800.—Favourite##...",Chambre meublée à louer proche EPFL et UNIlCha...,800.0,/en/d/single-room-rent-lausanne/4987213?s=2&t=...,8.0
9,5272411.0,125.0,5.0,single-room,"1010 Lausanne, VD",Collocation Lausanne,800.0,/en/d/single-room-rent-lausanne/5272411?s=2&t=...,6.4


Note that `Rent` column by itself seems reasonable, also `SurfaceArea` by itself, however, when you look at `RentPerArea` you see something is wrong. 

**Remove** `RentPerArea` **outliers: (A)**

In [6]:
df = df.where(col('RentPerArea') > 13.45)

In [67]:
df.count()

507

**Missing data**

In [31]:
for c in df.columns:
    print(c, ' ', df.where(col(c).isNull()).count())

Id   0
SurfaceArea   60
NumRooms   102
Type   0
Address   0
Description   8
Rent   6
Bookmark   525
Link   0


**Dealing with the missing data**

Question what whould you do with the missing data? What would you do with `NumRooms`? with `Bookmark`? `Rent`? 

`Bookmark` (A)

In [7]:
df = df.drop('Bookmark')

`Rent` (A)

In [8]:
rentMedian = df.approxQuantile('Rent', [0.5], 0.01)[0]

In [9]:
df = df.fillna(rentMedian, subset=['Rent'])

`NumRooms` (A)

In [10]:
numRoomsMedian = df.approxQuantile('NumRooms', [0.5], 0.01)[0]

In [11]:
df = df.fillna(numRoomsMedian, subset=['NumRooms'])

`SurfaceArea` (A)

In [12]:
areaMedian = df.approxQuantile('SurfaceArea', [0.5], 0.01)[0]

In [13]:
df = df.fillna(areaMedian, subset=['SurfaceArea'])

**Check missing data again**

In [84]:
for c in df.columns:
    print(c, ' ', df.where(col(c).isNull()).count())

Id   0
SurfaceArea   0
NumRooms   0
Type   0
Address   0
Description   8
Rent   0
Link   0
RentPerArea   0


## Building prediction model

### Feature engineering

Can you get `ZipCode` from the `Address`?

In [30]:
df.select('Address').withColumn('ZipCode', F.regexp_extract('Address', '(\d+) Lausanne', 1)).show()

+--------------------+-------+
|             Address|ZipCode|
+--------------------+-------+
|Route Aloys-Fauqu...|   1018|
|Avenue de Béthusy...|   1012|
|Route des Plaines...|   1018|
|Route de Prilly 1...|   1004|
|Route de Prilly 1...|   1004|
|Route Aloys-Fauqu...|   1018|
|Rue du Bourg 15, ...|   1003|
|Avenue Druey 13, ...|   1018|
|Chemin de Pierref...|   1004|
|Av. de Chailly 52...|   1012|
|Av. de Chailly 52...|   1012|
|Av. de Riant-Mont...|   1004|
|Av. de Tivoli 19B...|   1007|
|Isabelle de Monto...|   1010|
|Rue St-Martin 28,...|   1005|
|Avenue Sainte-Luc...|   1003|
|Chemin de Montoli...|   1006|
|Ch. du Devin 64, ...|   1012|
|Chemin de Boston ...|   1004|
|Avenue de Menthon...|   1005|
+--------------------+-------+
only showing top 20 rows



In [31]:
df = df.withColumn('ZipCode', F.regexp_extract('Address', '(\d+) Lausanne', 1))

In [32]:
pd.DataFrame(df.select('ZipCode').head(5), columns=['ZipCode'])

Unnamed: 0,ZipCode
0,1018
1,1012
2,1018
3,1004
4,1004


**Onehot encoding?**

Question: when do we need onehot encoding?

In [94]:
df.select(['Type']).groupby(['Type']).mean().show()

+--------------------+
|                Type|
+--------------------+
|         single-room|
|   duplex-maisonette|
|                loft|
|           penthouse|
|          attic-flat|
|              studio|
|furnished-residen...|
|                flat|
|   stepped-apartment|
+--------------------+



In [84]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoderEstimator

`Type`:

In [76]:
indexer = StringIndexer(inputCol='Type', outputCol='Type_numeric').fit(df)
df = indexer.transform(df)
encoder = OneHotEncoderEstimator(inputCols=['Type_numeric'], outputCols=['Type_vector']).fit(df)
df = encoder.transform(df)

`ZipCode`:

In [85]:
indexer = StringIndexer(inputCol='ZipCode', outputCol='ZipCode_numeric').fit(df)
df = indexer.transform(df)
encoder = OneHotEncoderEstimator(inputCols=['ZipCode_numeric'], outputCols=['ZipCode_vector']).fit(df)
df = encoder.transform(df)

### Prediction

In [262]:
from pyspark.ml.regression import LinearRegression, RandomForestRegressor, GBTRegressor
from pyspark.ml import Pipeline
from pyspark.sql.types import *
from pyspark.ml.feature import VectorAssembler

In [62]:
features = ['SurfaceArea', 'NumRooms']

In [51]:
assembler = VectorAssembler(inputCols = features, outputCol='features')

In [52]:
lr = LinearRegression(featuresCol = 'features', labelCol = 'Rent')

In [53]:
pipeline = Pipeline(stages=[assembler, lr])

**Train test split**

In [54]:
(train, test) = df.randomSplit(weights=[0.8, 0.2], seed=1)

In [55]:
model = pipeline.fit(train)

In [56]:
pred = model.transform(test)

**Let's put all these steps into a function:**

In [184]:
features = ['SurfaceArea', 'NumRooms', 'Type_vector', 'ZipCode_vector']

**ML Algorithms:**

In [199]:
lr = LinearRegression(featuresCol = 'features', labelCol = 'Rent')

In [384]:
rf = RandomForestRegressor(featuresCol = 'features', labelCol = 'Rent')

In [385]:
gb = GBTRegressor(featuresCol = 'features', labelCol = 'Rent', maxDepth = 3)

In [386]:
def fit(trainData, features, alg):
    assembler = VectorAssembler(inputCols = features, outputCol='features')
    pipeline = Pipeline(stages=[assembler, alg])
    model = pipeline.fit(trainData)
    return model

In [387]:
(train, test) = df.randomSplit(weights=[0.8, 0.2], seed=1)

In [388]:
model = fit(train, features, rf)
pred = model.transform(test)

In [389]:
pred.select(['Rent', 'Prediction']).show()

+------+------------------+
|  Rent|        Prediction|
+------+------------------+
|2377.0|1763.6753111305966|
|2650.0| 2303.923768993243|
|1155.0|1299.4327805698963|
|1280.0|1293.6998648744807|
| 780.0| 851.8083832017655|
|2110.0| 2133.529778195488|
|2244.0|3605.6230436341307|
|2250.0|2119.0173115892785|
|2850.0| 3051.605046771757|
|3220.0| 4022.473543218022|
|1590.0|1590.2329286565023|
|1660.0| 1581.095886103378|
|2950.0|3008.7022365017365|
|1690.0|1744.6811435213301|
|4050.0| 6193.814872923774|
|1635.0|1584.2747554628493|
|2210.0| 2354.084451267648|
|2380.0|2851.2713042252244|
|1380.0| 1563.949921392174|
|1490.0|1576.1641439573152|
+------+------------------+
only showing top 20 rows



You can now determine **value for money** apartments!

### Evaluation

In [390]:
from pyspark.ml.evaluation import RegressionEvaluator

In [391]:
regEval = RegressionEvaluator(predictionCol = 'prediction', labelCol = 'Rent', metricName = 'rmse')
regEval.evaluate(pred)

442.8079270040139

**Relative error:**

In [311]:
pred = pred.select(['Rent', 'Prediction']).withColumn('RelError', F.abs((pred['Rent'] - pred['Prediction']) / pred['Rent']))
pred.select(F.mean(pred['RelError'])).collect()

[Row(avg(RelError)=0.14103161091722985)]

Less than **15%** relative error on average. Not bad for such a simple model! 

### Cross validation and GridsearchCV

Some hints on how to do Cross validation and Grid search with PySpark.

In [299]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [403]:
assembler = VectorAssembler(inputCols = features, outputCol='features')
rf = RandomForestRegressor(featuresCol = 'features', labelCol = 'Rent')
pipeline = Pipeline(stages=[assembler, rf])
evaluator = RegressionEvaluator(predictionCol = 'prediction', labelCol = 'Rent', metricName = 'rmse')

In [439]:
paramGrid = ParamGridBuilder().addGrid(rf.maxDepth, [5, 6]).build()

In [440]:
crossval = CrossValidator(estimator = pipeline, estimatorParamMaps = paramGrid, evaluator = evaluator, numFolds = 4)

In [441]:
cvModel = crossval.fit(train)

In [442]:
pred = cvModel.transform(test)

In [443]:
evaluator.evaluate(pred)

434.80359030041603