# **Projet 2 BDLE**
# Mohammad Afandi, Moïne Satouri



# Decision Tree in Spark ML

## Objectif

Réaliser une pipeline ML sur les données "Housing" (https://www.kaggle.com/austinreese/usa-housing-listings)

## Préparation

Vérifier que des ressources de calcul sont allouées à votre notebook est connecté (cf RAM  de disque indiqués en haut à droite) . Sinon cliquer sur le bouton connecter pour obtenir des ressources.




Pour accéder directement aux fichiers stockées sur votre google drive. Renseigner le code d'authentification lorsqu'il est demandé

Ajuster le nom de votre dossier : MyDrive/ens/bdle/dir. **Remplacer dir **

In [1]:
import os
from google.colab import drive
drive.mount("/content/drive", force_remount=True)

drive_dir = "/content/drive/MyDrive/BDLE"
os.makedirs(drive_dir, exist_ok=True)
os.listdir(drive_dir)

Mounted at /content/drive


['1.ipynb',
 'data',
 'DecisionTreesSparkML_afandi_satouri.ipynb',
 'DecisionTreesSparkML_afandi_satouri(1).ipynb',
 'DecisionTreesSparkML_afandi_satouri(1)(1)(1).ipynb']

Télécharger le jeu de données

In [None]:
# os.environ['KAGGLE_CONFIG_DIR'] = "/content/drive/MyDrive/BDLE"
# !kaggle datasets download --force -d austinreese/usa-housing-listings

Downloading usa-housing-listings.zip to /content
 97% 102M/106M [00:01<00:00, 61.9MB/s] 
100% 106M/106M [00:01<00:00, 64.3MB/s]


In [None]:
# !unzip usa-housing-listings.zip

Archive:  usa-housing-listings.zip
replace housing.csv? [y]es, [n]o, [A]ll, [N]one, [r]ename: y
  inflating: housing.csv             


Installer pyspark et findspark :


In [2]:
!pip install -q pyspark
!pip install -q findspark

Démarrer la session spark

In [3]:
import os
# !find /usr/local -name "pyspark"
os.environ["SPARK_HOME"] = "/usr/local/lib/python3.7/dist-packages/pyspark"
os.environ["JAVA_HOME"] = "/usr"

In [4]:
# Principaux import
import findspark
from pyspark.sql import SparkSession 
from pyspark import SparkConf  

# pour les dataframe et udf
from pyspark.sql import *  
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import *

# pour le chronomètre
import time

# initialise les variables d'environnement pour spark
findspark.init()

# Démarrage session spark 
# --------------------------
def demarrer_spark():
  local = "local[*]"
  appName = "TP"
  configLocale = SparkConf().setAppName(appName).setMaster(local).\
  set("spark.executor.memory", "6G").\
  set("spark.driver.memory","6G").\
  set("spark.sql.catalogImplementation","in-memory")
  
  spark = SparkSession.builder.config(conf = configLocale).getOrCreate()
  sc = spark.sparkContext
  sc.setLogLevel("ERROR")
  
  spark.conf.set("spark.sql.autoBroadcastJoinThreshold","-1")

  # On ajuste l'environnement d'exécution des requêtes à la taille du cluster (4 coeurs)
  spark.conf.set("spark.sql.shuffle.partitions","4")    
  print("session démarrée, son id est ", sc.applicationId)
  return spark
spark = demarrer_spark()

session démarrée, son id est  local-1637533430421


In [5]:
# on utilise 8 partitions au lieu de 200 par défaut
spark.conf.set("spark.sql.shuffle.partitions", "8")
print("Nombre de partitions utilisées : ", spark.conf.get("spark.sql.shuffle.partitions"))

Nombre de partitions utilisées :  8


## Chargement des données

In [6]:
#df = spark.read.options(mode='FAILFAST', multiLine=True, escape='"').option("header","true").option("inferSchema", True).csv("./housing.csv")
df = spark.read.options(mode='FAILFAST', multiLine=True, escape='"').option("header","true").option("inferSchema", True).csv("drive/MyDrive/BDLE/data/housing.csv")
df.printSchema()
df.show()

root
 |-- id: long (nullable = true)
 |-- url: string (nullable = true)
 |-- region: string (nullable = true)
 |-- region_url: string (nullable = true)
 |-- price: long (nullable = true)
 |-- type: string (nullable = true)
 |-- sqfeet: integer (nullable = true)
 |-- beds: integer (nullable = true)
 |-- baths: double (nullable = true)
 |-- cats_allowed: integer (nullable = true)
 |-- dogs_allowed: integer (nullable = true)
 |-- smoking_allowed: integer (nullable = true)
 |-- wheelchair_access: integer (nullable = true)
 |-- electric_vehicle_charge: integer (nullable = true)
 |-- comes_furnished: integer (nullable = true)
 |-- laundry_options: string (nullable = true)
 |-- parking_options: string (nullable = true)
 |-- image_url: string (nullable = true)
 |-- description: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- state: string (nullable = true)

+----------+--------------------+------------+--------------------+-----+---------+--

###Exploration des données

In [7]:
#nombre de lignes
data_len = df.count()
data_len

384977

In [8]:
df.summary().show()

+-------+-------------------+--------------------+--------------------+--------------------+-----------------+---------+------------------+------------------+------------------+------------------+-------------------+-------------------+------------------+-----------------------+-------------------+---------------+---------------+--------------------+--------------------+-----------------+------------------+------+
|summary|                 id|                 url|              region|          region_url|            price|     type|            sqfeet|              beds|             baths|      cats_allowed|       dogs_allowed|    smoking_allowed| wheelchair_access|electric_vehicle_charge|    comes_furnished|laundry_options|parking_options|           image_url|         description|              lat|              long| state|
+-------+-------------------+--------------------+--------------------+--------------------+-----------------+---------+------------------+------------------+------

#### Description des variables

On dispose des variables suivantes : 
- *id* (long) : identifiant du logement
- *url* (string) : url de l'annonce
- *region* (string) : région dans laquelle se situe le logement
- *region_rul* (string) : url de la région
- *type* (string) : type du logement
- ***price* (long) : prix du logement (variable à expliquer)**
- *sqfeet* (integer) : superficie du logement en pieds carrés
- *beds* (integer) : nombre de lits
- *baths* (integer) : nombre de salle de bains
- *cats_allowed* (integer 0-1) : les chats sont-il acceptés dans le logement ?
- *dosg_allowed* (integer 0-1) : les chiens sont-ils acceptés dans le logement ?
- *smoking_allowed* (integer 0-1) : peut-on fumer dans le logement ?
- *wheelchair_access* (integer 0-1) : peut-on accéder au logement en fauteuil roulant ?
- *electric_vehicle_charge* (integer 0-1) : peut-on charger notre véhicule électrique ?
- *comes_furnished* (integer 0-1) : le logement est-il meublé ?
- *laundry_options* (string) : les options pour la lessive
- *parking_options* (string) : les options pour le parking
- *image_url* (string) : url de la photo du logement
- *description* (string) : description du logement
- *lat* (double) : latitude
- *long* (double) : longitude
- *state* (string) : état des Etats-Unis dans lequel se situe le logement



#### Valeurs manquantes

On peut déjà remarquer que certaines colonnes (*laundry_options*, *parking_options*, *lat* et *long*) présentent des valeurs manquantes. Dans un premier temps nous allons ignorer ce problème.

In [9]:
from pyspark.sql.functions import col,isnan, when, count
# Nombre de valeurs manquantes pour chaque variable
df_missing = df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns])
df_missing.show()

+---+---+------+----------+-----+----+------+----+-----+------------+------------+---------------+-----------------+-----------------------+---------------+---------------+---------------+---------+-----------+----+----+-----+
| id|url|region|region_url|price|type|sqfeet|beds|baths|cats_allowed|dogs_allowed|smoking_allowed|wheelchair_access|electric_vehicle_charge|comes_furnished|laundry_options|parking_options|image_url|description| lat|long|state|
+---+---+------+----------+-----+----+------+----+-----+------------+------------+---------------+-----------------+-----------------------+---------------+---------------+---------------+---------+-----------+----+----+-----+
|  0|  0|     0|         0|    0|   0|     0|   0|    0|           0|           0|              0|                0|                      0|              0|          79026|         140687|        0|          2|1918|1918|    0|
+---+---+------+----------+-----+----+------+----+-----+------------+------------+----------

In [10]:
# Taux de valeurs manquantes pour chaque variable
df_missing_rate = df_missing.select([(col(c)/data_len).alias(c) for c in df.columns])
df_missing_rate.show()

+---+---+------+----------+-----+----+------+----+-----+------------+------------+---------------+-----------------+-----------------------+---------------+-------------------+-----------------+---------+--------------------+--------------------+--------------------+-----+
| id|url|region|region_url|price|type|sqfeet|beds|baths|cats_allowed|dogs_allowed|smoking_allowed|wheelchair_access|electric_vehicle_charge|comes_furnished|    laundry_options|  parking_options|image_url|         description|                 lat|                long|state|
+---+---+------+----------+-----+----+------+----+-----+------------+------------+---------------+-----------------+-----------------------+---------------+-------------------+-----------------+---------+--------------------+--------------------+--------------------+-----+
|0.0|0.0|   0.0|       0.0|  0.0| 0.0|   0.0| 0.0|  0.0|         0.0|         0.0|            0.0|              0.0|                    0.0|            0.0|0.20527460082030874|0.

#### Valeurs distinctes

On peut aussi dénombrer le nombre de valeurs distinctes pour chaque variable :

In [11]:
from pyspark.sql.functions import col, countDistinct
# Nombre de valeurs distinctes pour chaque variable
df.agg(*(countDistinct(col(c)).alias(c) for c in df.columns)).show()

+------+------+------+----------+-----+----+------+----+-----+------------+------------+---------------+-----------------+-----------------------+---------------+---------------+---------------+---------+-----------+-----+-----+-----+
|    id|   url|region|region_url|price|type|sqfeet|beds|baths|cats_allowed|dogs_allowed|smoking_allowed|wheelchair_access|electric_vehicle_charge|comes_furnished|laundry_options|parking_options|image_url|description|  lat| long|state|
+------+------+------+----------+-----+----+------+----+-----+------------+------------+---------------+-----------------+-----------------------+---------------+---------------+---------------+---------+-----------+-----+-----+-----+
|384977|384977|   404|       413| 3961|  12|  3277|  11|   20|           2|           2|              2|                2|                      2|              2|              5|              7|   181068|     280836|56772|54035|   51|
+------+------+------+----------+-----+----+------+----+----

## Stratégie 1

Cette première stratégie consistera à appliquer le modèle de régression sur les données d'origine encodées, sans se préoccuper des valeurs manquantes ou aberrantes.

### Variables trivialement non-pertinentes

On se permettra de s'affranchir des variables *id*, *url*, *image_url* et *description* présentant un très grand nombre de valeurs distinctes afin d'alléger les calculs. Intuitivement ce choix fait sens car a priori ces données ne devraient pas influer sur le prix du logement. L'url de chaque région est caractérisée par l'attribut région, on peut donc également retirer cette colonne redondante.


In [12]:
df1 = df.drop('id', 'url', 'image_url', 'description', 'region_url')

In [13]:
df1.show()

+------------+-----+---------+------+----+-----+------------+------------+---------------+-----------------+-----------------------+---------------+---------------+------------------+-------+--------+-----+
|      region|price|     type|sqfeet|beds|baths|cats_allowed|dogs_allowed|smoking_allowed|wheelchair_access|electric_vehicle_charge|comes_furnished|laundry_options|   parking_options|    lat|    long|state|
+------------+-----+---------+------+----+-----+------------+------------+---------------+-----------------+-----------------------+---------------+---------------+------------------+-------+--------+-----+
|reno / tahoe| 1148|apartment|  1078|   3|  2.0|           1|           1|              0|                0|                      0|              0|    w/d in unit|           carport|39.5483|-119.796|   ca|
|reno / tahoe| 1200|    condo|  1001|   2|  2.0|           0|           0|              0|                0|                      0|              0|    w/d hookups|        

### Encodage des variables

Commençons par avoir un aperçu du type des données pour chaque variable :

In [14]:
df1.dtypes

[('region', 'string'),
 ('price', 'bigint'),
 ('type', 'string'),
 ('sqfeet', 'int'),
 ('beds', 'int'),
 ('baths', 'double'),
 ('cats_allowed', 'int'),
 ('dogs_allowed', 'int'),
 ('smoking_allowed', 'int'),
 ('wheelchair_access', 'int'),
 ('electric_vehicle_charge', 'int'),
 ('comes_furnished', 'int'),
 ('laundry_options', 'string'),
 ('parking_options', 'string'),
 ('lat', 'double'),
 ('long', 'double'),
 ('state', 'string')]

On définit nos variables continues, puis les variables qualitatives par disjonction :

In [15]:
# Variables continues
cont_vars = ['price','sqfeet','beds','baths','lat','long']
# Variables qualitatives
qual_vars = list(set(df1.columns)^set(cont_vars))

Il faut ensuite choisir le nombre maximal de catégories *maxCategories* pour l'encodage de manière judicieuse. En effet, si le nombre de valeurs distinctes pour une variable est inférieur à ce paramètre, celle-ci variable sera considérée comme qualitative.

In [16]:
# Nombres de valeurs distinctes pour les variables continues
distinctCont = df1.agg(*(countDistinct(col(c)).alias(c) for c in cont_vars))
distinctCont.show()

+-----+------+----+-----+-----+-----+
|price|sqfeet|beds|baths|  lat| long|
+-----+------+----+-----+-----+-----+
| 3961|  3277|  11|   20|56772|54035|
+-----+------+----+-----+-----+-----+



In [17]:
# Nombres de valeurs distinctes pour les variables qualitatives
distinctQual = df1.agg(*(countDistinct(col(c)).alias(c) for c in qual_vars))
distinctQual.show()

+----+-----+-----------------+------+------------+---------------+---------------+---------------+-----------------------+------------+---------------+
|type|state|wheelchair_access|region|dogs_allowed|parking_options|smoking_allowed|comes_furnished|electric_vehicle_charge|cats_allowed|laundry_options|
+----+-----+-----------------+------+------------+---------------+---------------+---------------+-----------------------+------------+---------------+
|  12|   51|                2|   404|           2|              7|              2|              2|                      2|           2|              5|
+----+-----+-----------------+------+------------+---------------+---------------+---------------+-----------------------+------------+---------------+



Il suffit de choisir le nombre minimal de valeurs distinctes parmi les variables continues auquel on soustrait 1 pour ne pas avoir de problème. Ici, 10 convient donc.

In [18]:
import numpy as np
maxCategories = np.min(distinctCont.collect()) - 1
maxCategories

10

### Arbre de décision

#### Régression

In [19]:
from pyspark.sql.types import StructType, StructField, DoubleType, IntegerType, StringType
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import SQLContext
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.feature import MinMaxScaler


def get_features(s):
  if 'Index' in s:
    return True
  elif 'scaled' in s:
    return True
  else:
    return False


#string_index = StringIndexer().setInputCol("type").setOutputCol('typeIndex')
#md = string_index.fit(df1).transform(df1)
pipeline=Pipeline(stages=[StringIndexer().setInputCol(c[0]).setOutputCol(c[0]+'Index') for c in df1.dtypes if c[1]=='string']+\
                  [VectorAssembler(inputCols=[c[0] for c in df1.dtypes if c[1] == 'double'] , outputCol='features').setHandleInvalid("skip")])
                  

model = pipeline.fit(df1).transform(df1)
model.dtypes
dataframe1 = model.select("features","price")

In [20]:
dataframe1.show()

+--------------------+-----+
|            features|price|
+--------------------+-----+
|[2.0,39.5483,-119...| 1148|
|[2.0,39.5026,-119...| 1200|
|[2.0,39.6269,-119...| 1813|
|[1.0,39.4477,-119...| 1095|
|[1.0,39.5357,-119...|  289|
|[1.0,39.4572,-119...| 1093|
|[1.0,39.5118,-119...|  935|
|[1.0,39.4477,-119...| 1095|
|[2.0,39.6185,-119...| 1525|
|[2.0,39.5193,-119...| 1295|
|[1.0,39.4572,-119...| 1086|
|[3.0,39.5987,-119...| 2195|
|[2.0,39.5384,-119...| 1159|
|[1.0,39.5393,-119...| 1349|
|[1.0,39.5502,-119...| 1449|
|[2.0,39.5617,-119...| 1295|
|[2.0,39.5483,-119...|  982|
|[2.0,39.4015,-119...| 1895|
|[1.0,39.6369,-119...| 1495|
|[2.0,39.5614,-119...| 1899|
+--------------------+-----+
only showing top 20 rows



In [21]:
dataframe1.count()

383059

In [22]:
train, validation = dataframe1.randomSplit([0.9, 0.1])
train.show()

+--------------------+-----+
|            features|price|
+--------------------+-----+
|[0.0,-24.9558,-94...|  600|
|[0.0,21.3575,-157...|  775|
|[0.0,21.3576,-157...|  775|
|[0.0,21.3581,-157...|  775|
|[0.0,21.4063,-157...| 1900|
|[0.0,26.1323,-98....|  650|
|[0.0,26.1798,-98....|  765|
|[0.0,26.2152,-81....| 2400|
|[0.0,26.2746,-81....| 1818|
|[0.0,26.3446,-80....| 1799|
|[0.0,26.3464,-80....| 1300|
|[0.0,26.3464,-80....| 1300|
|[0.0,26.3464,-80....| 1300|
|[0.0,26.3464,-80....| 1300|
|[0.0,26.3464,-80....| 1394|
|[0.0,26.3464,-80....| 1695|
|[0.0,26.3464,-80....| 1695|
|[0.0,26.3464,-80....| 1695|
|[0.0,26.3464,-80....| 1725|
|[0.0,26.3481,-80....| 6999|
+--------------------+-----+
only showing top 20 rows



In [23]:
# from pyspark.ml.regression import LinearRegression
# lr = LinearRegression(featuresCol = 'features', labelCol='price', maxIter=1000)
from pyspark.ml.regression import DecisionTreeRegressor
dtree = DecisionTreeRegressor(featuresCol = 'features', labelCol='price')

In [24]:
import pyspark.ml.tuning as tune
import numpy as np 
# Create the parameter grid
grid = tune.ParamGridBuilder()

# Add the hyperparameters
grid = grid.addGrid(dtree.maxBins, [16,32])
grid = grid.addGrid(dtree.minInstancesPerNode, [100,1000])
grid = grid.addGrid(dtree.maxDepth, [2,5,10])


# Build the grid
grid = grid.build()

In [25]:
from pyspark.ml.evaluation import RegressionEvaluator

dtree_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="price",metricName="rmse")
# Create the CrossValidator (3 folds)
cv = tune.CrossValidator(estimator=dtree,
               estimatorParamMaps=grid,
               evaluator=dtree_evaluator,
               numFolds=3,
               parallelism=8
               )

In [26]:
# Fit cross validation models
models = cv.fit(train)

# Extract the best model
best_dtree = models.bestModel

In [27]:
# Use the model to predict the test set
validation_results_dtree = best_dtree.transform(validation)

In [28]:
# Evaluate the predictions
print('Best RMSE:',dtree_evaluator.evaluate(validation_results_dtree))

Best RMSE: 32854.28298911264


In [29]:
# Best tree parameters
models.getEstimatorParamMaps()[np.argmin(models.avgMetrics)]

{Param(parent='DecisionTreeRegressor_c6f64c60d96b', name='maxBins', doc='Max number of bins for discretizing continuous features.  Must be >=2 and >= number of categories for any categorical feature.'): 16,
 Param(parent='DecisionTreeRegressor_c6f64c60d96b', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 2,
 Param(parent='DecisionTreeRegressor_c6f64c60d96b', name='minInstancesPerNode', doc='Minimum number of instances each child must have after split. If a split causes the left or right child to have fewer than minInstancesPerNode, the split will be discarded as invalid. Should be >= 1.'): 1000}

On constante qu'on obtient une RMSE très élevée. Cela est peut-être dû à la disparité entre les prix des logements.

#### Architecture de l'arbre


In [30]:
print(best_dtree.toDebugString)

DecisionTreeRegressionModel: uid=DecisionTreeRegressor_c6f64c60d96b, depth=2, numNodes=7, numFeatures=3
  If (feature 1 <= 33.3874)
   If (feature 1 <= 32.05625)
    Predict: 1685.8335991236972
   Else (feature 1 > 32.05625)
    Predict: 125147.20276683381
  Else (feature 1 > 33.3874)
   If (feature 2 <= -75.60155)
    Predict: 1556.31335290114
   Else (feature 2 > -75.60155)
    Predict: 3100.964449404169



#### Features importance

In [31]:
best_featureImportances = best_dtree.featureImportances

In [32]:
best_featureImportances

SparseVector(3, {1: 0.9998, 2: 0.0002})

In [71]:
best_featureImportances

SparseVector(3, {1: 0.9998, 2: 0.0002})

## Stratégie 2

Pour cette stratégie nous allons traiter les données de manière plus approfondie, en essayant d'imputer les valeurs manquantes, identifier les valeurs aberrantes et nous restreindre aux attributs les plus pertinents.

### Attributs non pertinents

On s'affranchit déjà des attributs que nous avons ignorés dans la stratégie 1 : *id*, *url*, *image_url*, *description* et *region_url*.  
On peut aussi supprimer les variables *long* et *lat* car elles sont redondantes avec la variable *region* qui est bien plus pertinente. Autre argument : ces deux variables présentent un grand nombre de valeurs distinctes (respectivement 56772 et 54035) et quelques valeurs manquantes (1918 pour les deux).

In [33]:
df2 = df.drop('id', 'url', 'image_url', 'description', 'region_url', 'lat', 'long')

### Valeurs manquantes

In [34]:
# Nombre de valeurs manquantes pour chaque variable
df2_missing = df2.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df2.columns])
df2_missing.show()

+------+-----+----+------+----+-----+------------+------------+---------------+-----------------+-----------------------+---------------+---------------+---------------+-----+
|region|price|type|sqfeet|beds|baths|cats_allowed|dogs_allowed|smoking_allowed|wheelchair_access|electric_vehicle_charge|comes_furnished|laundry_options|parking_options|state|
+------+-----+----+------+----+-----+------------+------------+---------------+-----------------+-----------------------+---------------+---------------+---------------+-----+
|     0|    0|   0|     0|   0|    0|           0|           0|              0|                0|                      0|              0|          79026|         140687|    0|
+------+-----+----+------+----+-----+------------+------------+---------------+-----------------+-----------------------+---------------+---------------+---------------+-----+



Seules les variables *laundry_options* et *parking_options* présentent des valeurs manquantes.

In [35]:
# Valeurs distinctes pour "laundry_options"
df2.select("laundry_options").distinct().show()

+------------------+
|   laundry_options|
+------------------+
|   laundry in bldg|
|no laundry on site|
|              null|
|       w/d hookups|
|   laundry on site|
|       w/d in unit|
+------------------+



In [36]:
# Valeurs distinctes pour "parking_options"
df2.select("parking_options").distinct().show()

+------------------+
|   parking_options|
+------------------+
|           carport|
|        no parking|
|    street parking|
|   attached garage|
|              null|
|   detached garage|
|     valet parking|
|off-street parking|
+------------------+



On fait l'hypothèse raisonnable qu'une absence de valeur correspond simplement à "no laundry on site" pour *laundry_options* et "no parking" pour *parking_options*.

In [37]:
df2 = df2.fillna({'laundry_options': "no laundry on site", 'parking_options': "no parking"})

### Valeurs aberrantes

Commençons déjà par refédinir nos variables continues et qualitatives :

In [38]:
df2.dtypes

[('region', 'string'),
 ('price', 'bigint'),
 ('type', 'string'),
 ('sqfeet', 'int'),
 ('beds', 'int'),
 ('baths', 'double'),
 ('cats_allowed', 'int'),
 ('dogs_allowed', 'int'),
 ('smoking_allowed', 'int'),
 ('wheelchair_access', 'int'),
 ('electric_vehicle_charge', 'int'),
 ('comes_furnished', 'int'),
 ('laundry_options', 'string'),
 ('parking_options', 'string'),
 ('state', 'string')]

In [39]:
# Variables continues
cont_vars2 = ['price','sqfeet','beds','baths']
# Variables qualitatives
qual_vars2 = list(set(df2.columns)^set(cont_vars2))

Intéressons-nous aux variables continues :

In [40]:
df2.select(cont_vars2).summary().show()

+-------+-----------------+------------------+------------------+------------------+
|summary|            price|            sqfeet|              beds|             baths|
+-------+-----------------+------------------+------------------+------------------+
|  count|           384977|            384977|            384977|            384977|
|   mean|8825.722318476168| 1059.899565428584|1.9053449946360432|1.4807183286274246|
| stddev|4462199.876539068|19150.757244214397| 3.494571619825234|0.6180605193011444|
|    min|                0|                 0|                 0|               0.0|
|    25%|              806|               750|                 1|               1.0|
|    50%|             1037|               949|                 2|               1.0|
|    75%|             1395|              1150|                 2|               2.0|
|    max|       2768307249|           8388607|              1100|              75.0|
+-------+-----------------+------------------+------------------+

Investigons variable par variable.

#### *price*

On remarque immédiatement que l'écart-type est énorme : certains logements sont à des prix faramineux (avec un maximum à 2768307249) tandis que d'autres sont à 0. La moyenne s'en retrouve fortement biaisée (8826 environ), alors que le 3ème quartile n'est qu'à 1395, la médiane à 1037 et le 1er quartile à 806. 
Il semble donc adéquat de retirer les valeurs trop extrêmes.  
Le choix des seuils se fera de manière arbitraire (mais avec un peu de bons sens tout de même) en l'absence d'informations supplémentaires.

In [41]:
# Nombre de logements pour lesquels le prix est supérieur à 5000
df2.select('price').where("price > 5000").count()

951

In [42]:
# Nombre de logements pour lesquels le prix est inférieur à 50
df2.select('price').where("price < 50").count()

2147

In [43]:
(951+2147)/data_len * 100

0.8047233990602035

In [44]:
# On coupe aux seuils définis juste au-dessus
temp = df2.withColumn('price', when((col('price')<50)|(col('price')>5000), None).otherwise(col('price')))
temp.select('price').summary().show()

+-------+------------------+
|summary|             price|
+-------+------------------+
|  count|            381879|
|   mean|1177.6544533739745|
| stddev| 549.5856456756536|
|    min|                50|
|    25%|               810|
|    50%|              1040|
|    75%|              1395|
|    max|              5000|
+-------+------------------+



In [45]:
temp.show()

+------------+-----+---------+------+----+-----+------------+------------+---------------+-----------------+-----------------------+---------------+------------------+------------------+-----+
|      region|price|     type|sqfeet|beds|baths|cats_allowed|dogs_allowed|smoking_allowed|wheelchair_access|electric_vehicle_charge|comes_furnished|   laundry_options|   parking_options|state|
+------------+-----+---------+------+----+-----+------------+------------+---------------+-----------------+-----------------------+---------------+------------------+------------------+-----+
|reno / tahoe| 1148|apartment|  1078|   3|  2.0|           1|           1|              0|                0|                      0|              0|       w/d in unit|           carport|   ca|
|reno / tahoe| 1200|    condo|  1001|   2|  2.0|           0|           0|              0|                0|                      0|              0|       w/d hookups|           carport|   ca|
|reno / tahoe| 1813|apartment|  168

Le troncage nous a permis de nous ramener à des valeurs moins extravagantes, et ce en ne perdant que 3098 données, soit 0,8% des données totales.

#### *sqfeet*

Comme pour *price*, l'écart-type est très élevé (même si c'est, relativement, dans une moindre mesure). Procédons de manière analogue en retirant les valeurs extrêmes.  
Par exemple, on peut seuiller à 9m2 (97sqft) pour un logement habitable et 1000m2 (10764 sqft) pour un logement beaucoup plus grand qu'une villa.

In [46]:
# Nombre de logements pour lesquels la surface est supérieure à sqft (m^2) 
df2.select('sqfeet').where("sqfeet > 10764").count()

74

In [47]:
# Nombre de logements pour lesquels la surface est inférieure à 97 sqft (9 m^2) 
df2.select('sqfeet').where("sqfeet < 97").count()

973

In [48]:
(74+973)/data_len * 100

0.27196429916592424

In [49]:
# On coupe aux seuils définis juste au-dessus
temp = temp.withColumn('sqfeet', when((col('sqfeet')<97)|(col('sqfeet')>10764), None).otherwise(col('sqfeet')))
temp.select('sqfeet').summary().show()

+-------+-----------------+
|summary|           sqfeet|
+-------+-----------------+
|  count|           383930|
|   mean|996.8494777693851|
| stddev|401.7042515406062|
|    min|               97|
|    25%|              750|
|    50%|              950|
|    75%|             1150|
|    max|            10740|
+-------+-----------------+



#### *beds*

La variable *beds* présente une valeur maximale bien supérieure au reste ; or d'après ce qu'on a vu auparavant, cette variable ne présente que 11 valeurs distinctes.  
Intéressons-nous à la répartition de ces valeurs :

In [50]:
df2.select('beds').groupBy('beds').count().sort('beds').show()

+----+------+
|beds| count|
+----+------+
|   0| 10978|
|   1|117226|
|   2|175513|
|   3| 67037|
|   4| 11575|
|   5|  2324|
|   6|   240|
|   7|    49|
|   8|    31|
|1000|     2|
|1100|     2|
+----+------+



On constate que 2 logements comptent 1000 lits et 2 autres en comptent 1100. Ces valeurs sont tout à fait loufoques, on s'en débarasse donc.

In [51]:
temp = temp.withColumn('beds', when(col('beds') > 200, None).otherwise(col('beds')))
temp.select('beds').summary().show()

+-------+------------------+
|summary|              beds|
+-------+------------------+
|  count|            384973|
|   mean|1.8944549358006926|
| stddev|0.8787611042200295|
|    min|                 0|
|    25%|                 1|
|    50%|                 2|
|    75%|                 2|
|    max|                 8|
+-------+------------------+



#### *baths*

De manière analogue à la variable *beds* :

In [52]:
df2.select('baths').groupBy('baths').count().sort('baths').show()

+-----+------+
|baths| count|
+-----+------+
|  0.0|  3107|
|  1.0|198184|
|  1.5| 27363|
|  2.0|134649|
|  2.5| 13162|
|  3.0|  5549|
|  3.5|  1007|
|  4.0|  1495|
|  4.5|   231|
|  5.0|   131|
|  5.5|    57|
|  6.0|    26|
|  6.5|     4|
|  7.0|     4|
|  7.5|     2|
|  8.0|     1|
|  8.5|     1|
| 25.0|     1|
| 35.0|     1|
| 75.0|     2|
+-----+------+



Ici on constate que 2 logements comptent 75 salles de bain, 1 en compte 35 et 1 en compte 25. On pourrait aussi s'interroger sur la présence de nombres décimaux ; peut-être que 0.5 correspond à une petite salle d'eau avec seulement un lavabo.  
On se débarasse encore des valeurs aberrantes :

In [53]:
temp = temp.withColumn('baths', when(col('baths') > 20, None).otherwise(col('baths')))
temp.select('baths').summary().show()

+-------+------------------+
|summary|             baths|
+-------+------------------+
|  count|            384973|
|   mean|1.4801882209921216|
| stddev| 0.591241668191792|
|    min|               0.0|
|    25%|               1.0|
|    50%|               1.0|
|    75%|               2.0|
|    max|               8.5|
+-------+------------------+



#### Imputation

Reste maintenant à imputer les valeurs aberrantes dont nous nous sommes débarassés précédemment. Une stratégie basique est de remplacer ces valeurs par la médiane (en général robuste aux valeurs aberrantes).

In [54]:
from pyspark.ml.feature import Imputer

imputer = Imputer(inputCols=cont_vars2,outputCols=cont_vars2)
df2 = imputer.setStrategy("median").setMissingValue(None).fit(temp).transform(temp)

In [55]:
df2.select(cont_vars2).summary().show()

+-------+------------------+------------------+------------------+------------------+
|summary|             price|            sqfeet|              beds|             baths|
+-------+------------------+------------------+------------------+------------------+
|  count|            384977|            384977|            384977|            384977|
|   mean|1176.5467157778257| 996.7220639155066| 1.951488010972084|1.4882538437361192|
| stddev| 547.5080018827741|401.16505191394344|0.8166609185782979| 0.577649496422139|
|    min|                50|                97|                 1|               1.0|
|    25%|               815|               750|                 1|               1.0|
|    50%|              1040|               950|                 2|               1.0|
|    75%|              1395|              1150|                 2|               2.0|
|    max|              5000|             10740|                 8|               8.5|
+-------+------------------+------------------+-------

### Encodage des variables

In [56]:
from pyspark.sql.types import StructType, StructField, DoubleType, IntegerType, StringType
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import SQLContext
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.feature import MinMaxScaler


pipeline=Pipeline(stages=[StringIndexer().setInputCol(c[0]).setOutputCol(c[0]+'Index') for c in df2.dtypes if c[1]=='string']+\
                  [VectorAssembler(inputCols=[c[0] for c in df2.dtypes if c[1] in ['int', 'double']], outputCol='features')])
                  

model2 = pipeline.fit(df2).transform(df2)
model2.dtypes
dataframe2 = model2.select("features","price")


In [57]:
dataframe2.show()

+--------------------+-----+
|            features|price|
+--------------------+-----+
|[1078.0,3.0,2.0,1...| 1148|
|(9,[0,1,2],[1001....| 1200|
|[1683.0,2.0,2.0,1...| 1813|
|[708.0,1.0,1.0,1....| 1095|
|[250.0,2.0,1.0,1....|  289|
|[720.0,1.0,1.0,1....| 1093|
|[661.0,1.0,1.0,1....|  935|
|[708.0,1.0,1.0,1....| 1095|
|[1053.0,2.0,2.0,1...| 1525|
|(9,[0,1,2],[930.0...| 1295|
|(9,[0,1,2,5],[720...| 1086|
|(9,[0,1,2],[3115....| 2195|
|[1181.0,3.0,2.0,1...| 1159|
|[766.0,1.0,1.0,1....| 1349|
|[764.0,1.0,1.0,1....| 1449|
|(9,[0,1,2],[951.0...| 1295|
|[919.0,2.0,2.0,1....|  982|
|[1193.0,2.0,2.0,1...| 1895|
|[1192.0,3.0,1.0,1...| 1495|
|(9,[0,1,2],[1600....| 1899|
+--------------------+-----+
only showing top 20 rows



In [58]:
train2, validation2 = dataframe2.randomSplit([0.8, 0.2])
train2.count()


308160

### Arbre de décision

#### Régression

In [59]:
# from pyspark.ml.regression import LinearRegression
# lr2 = LinearRegression(featuresCol = 'features', labelCol='price', maxIter=1000)
from pyspark.ml.regression import DecisionTreeRegressor
dtree2 = DecisionTreeRegressor(featuresCol = 'features', labelCol='price')

In [60]:
import pyspark.ml.tuning as tune
import numpy as np 
# Create the parameter grid
grid2 = tune.ParamGridBuilder()

# Add the hyperparameters
grid2 = grid2.addGrid(dtree2.maxBins, [16,32])
grid2 = grid2.addGrid(dtree2.minInstancesPerNode, [100,1000])
grid2 = grid2.addGrid(dtree2.maxDepth, [2,5,10])


# Build the grid
grid2 = grid2.build()

In [61]:
from pyspark.ml.evaluation import RegressionEvaluator

dtree_evaluator2 = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="price",metricName="rmse")
# Create the CrossValidator (3 folds)
cv2 = tune.CrossValidator(estimator=dtree2,
               estimatorParamMaps=grid2,
               evaluator=dtree_evaluator2,
               numFolds=3,
               parallelism=8
               )

In [62]:
# Fit cross validation models
models2 = cv2.fit(train2)

# Extract the best model
best_dtree2 = models2.bestModel

In [63]:
# Use the model to predict the test set
validation_results_dtree2 = best_dtree2.transform(validation2)

# Evaluate the predictions
print('Best RMSE:', dtree_evaluator2.evaluate(validation_results_dtree2))

Best RMSE: 480.9715651452944


La RMSE est désormais beaucoup plus faible par rapport à la stratégie 1. Le traitement des données avant l'application du modèle de régression a donc bien porté ses fruits.

#### Architecture de l'arbre

In [64]:
print(best_dtree2.toDebugString)

DecisionTreeRegressionModel: uid=DecisionTreeRegressor_e34ad12c1773, depth=10, numNodes=655, numFeatures=9
  If (feature 0 <= 1000.5)
   If (feature 7 <= 0.5)
    If (feature 0 <= 700.5)
     If (feature 5 <= 0.5)
      If (feature 0 <= 484.0)
       If (feature 8 <= 0.5)
        If (feature 4 <= 0.5)
         If (feature 1 <= 1.5)
          Predict: 877.7690677966102
         Else (feature 1 > 1.5)
          If (feature 3 <= 0.5)
           Predict: 906.3776435045318
          Else (feature 3 > 0.5)
           Predict: 966.6454545454545
        Else (feature 4 > 0.5)
         If (feature 1 <= 1.5)
          Predict: 940.3458823529412
         Else (feature 1 > 1.5)
          If (feature 6 <= 0.5)
           Predict: 1146.2707275803723
          Else (feature 6 > 0.5)
           Predict: 1081.5052083333333
       Else (feature 8 > 0.5)
        If (feature 6 <= 0.5)
         If (feature 1 <= 1.5)
          If (feature 4 <= 0.5)
           Predict: 856.6808510638298
          Else (featu

On constate que l'architecture de l'arbre est bien plus compliquée par rapport à notre première itération.

In [None]:
best_dtree2.featureImportances.toArray()

Un exemple d'implémentation avec une random forest :

In [None]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

rfm_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="price",metricName="rmse")
best_score = -5
for x in list(np.arange(10, 40, 10)):
  for y in list(np.arange(10,40,10)):
    rfm = RandomForestRegressor(featuresCol = 'features', labelCol='price', maxDepth = x, numTrees = y) 
    rfm_fit = rfm.fit(train2)
    validation_results_rfm2 = rfm_fit.transform(validation2)
    score = rfm_evaluator.evaluate(validation_results_rfm2)

    if score > best_score:
      best_score = score
      best_rfm = rfm_fit

    print('rmse : ',score)

## Conclusion

Nous avons pu constaté une nette amélioration des résultats entre la stratégie 1 et 2. Cela montre bien qu'il est primordial de bien traiter les données en amont d'un modèle de machine learning.
Dans la stratégie 1, 1e modèle de l'arbre était plus simple à calculer mais l'erreur de prédiction était très élevée, tandis que le modèle de l'arbre dans la stratégie 2 était plus compliqué mais donnait de bien meilleurs résultats. 
Ces résultats sont néanmoins biaisés par les choix effectués lors du traitement des données, c'est pourquoi il faut pouvoir les justifier d'après les connaissances dont nous disposons sur les données.