# Spark Machine Learning (MLIb) 

MLlib est une bibliothèque qui permet de faire du Machile Learning dans Spark. Son objectif est de rendre l'apprentissage automatique pratique évolutif et facile. Cette API fournit un ensemble outils tels que:

* **Algorithmes ML**: la plupart des algorithmes d'apprentissage courants tels que la classification, la régression, le clustering et le filtrage collaboratif
* **Featurization**: l'extraction de features, la transformation, la réduction de dimension et la selection des features
* **Pipelines**: outils de construction, d'évaluation et de tunning des pipelines Machine Learning
* **Persistance**: enregistrement et chargement d'algorithmes, de modèles et de pipelines
* **Utilitaires**: algèbre linéaire, statistiques, traitement des données, etc.

**Plus de details sur Apache Spark MLIb voir la [documentation](https://spark.apache.org/docs/latest/ml-guide.html)**

## Description des données
Airbnb est un service de plateforme communautaire payant de location de vacances, de logements et d'hotels. Dans ce use case, nous allons utiliser les de données de location SF Airbnb de [Inside Airbnb](http://insideairbnb.com/get-the-data.html) pour prédire eventuellement le prix d'un logement d'un client. La figure ci-dessous montre l'activité immobiliere de San Francisco, selon Airbnb.
 
 <img src="http://insideairbnb.com/images/insideairbnb_graphic_site_1200px.png" style="width:800px"/>

Chargeons l'ensemble de données SF Airbnb. Les données ci-dessous concernent essentiellement la ville de San Francisco.

## Data Cleaning
Dans cette partie nous allons faire l'exploration et le nettoyage des données.

la listes de features ou attributs du datasets

In [1]:
!head -1 "datasets/sf-airbnb/sf-airbnb.csv"

id,listing_url,scrape_id,last_scraped,name,summary,space,description,experiences_offered,neighborhood_overview,notes,transit,access,interaction,house_rules,thumbnail_url,medium_url,picture_url,xl_picture_url,host_id,host_url,host_name,host_since,host_location,host_about,host_response_time,host_response_rate,host_acceptance_rate,host_is_superhost,host_thumbnail_url,host_picture_url,host_neighbourhood,host_listings_count,host_total_listings_count,host_verifications,host_has_profile_pic,host_identity_verified,street,neighbourhood,neighbourhood_cleansed,neighbourhood_group_cleansed,city,state,zipcode,market,smart_location,country_code,country,latitude,longitude,is_location_exact,property_type,room_type,accommodates,bathrooms,bedrooms,beds,bed_type,amenities,square_feet,price,weekly_price,monthly_price,security_deposit,cleaning_fee,guests_included,extra_people,minimum_nights,maximum_nights,minimum_minimum_nights,maximum_minimum_nights,minimum_maximum_nights,maximum_maximum_nights,minimum_ni

Chargement des données dans un DataFrame Spark

In [2]:
val filePath = "datasets/sf-airbnb/sf-airbnb.csv"

val rawDF = spark.read
  .option("header", "true")
  .option("multiLine", "true")
  .option("inferSchema", "true")
  .option("escape", "\"")
  .csv(filePath)

Intitializing Scala interpreter ...

Spark Web UI available at http://192.168.115.226:4043
SparkContext available as 'sc' (version = 3.0.1, master = local[*], app id = local-1659197725312)
SparkSession available as 'spark'


filePath: String = datasets/sf-airbnb/sf-airbnb.csv
rawDF: org.apache.spark.sql.DataFrame = [id: int, listing_url: string ... 104 more fields]


In [3]:
rawDF.printSchema

root
 |-- id: integer (nullable = true)
 |-- listing_url: string (nullable = true)
 |-- scrape_id: long (nullable = true)
 |-- last_scraped: string (nullable = true)
 |-- name: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- space: string (nullable = true)
 |-- description: string (nullable = true)
 |-- experiences_offered: string (nullable = true)
 |-- neighborhood_overview: string (nullable = true)
 |-- notes: string (nullable = true)
 |-- transit: string (nullable = true)
 |-- access: string (nullable = true)
 |-- interaction: string (nullable = true)
 |-- house_rules: string (nullable = true)
 |-- thumbnail_url: string (nullable = true)
 |-- medium_url: string (nullable = true)
 |-- picture_url: string (nullable = true)
 |-- xl_picture_url: string (nullable = true)
 |-- host_id: integer (nullable = true)
 |-- host_url: string (nullable = true)
 |-- host_name: string (nullable = true)
 |-- host_since: string (nullable = true)
 |-- host_location: string (nullable

In [13]:
rawDF.columns

res9: Array[String] = Array(id, listing_url, scrape_id, last_scraped, name, summary, space, description, experiences_offered, neighborhood_overview, notes, transit, access, interaction, house_rules, thumbnail_url, medium_url, picture_url, xl_picture_url, host_id, host_url, host_name, host_since, host_location, host_about, host_response_time, host_response_rate, host_acceptance_rate, host_is_superhost, host_thumbnail_url, host_picture_url, host_neighbourhood, host_listings_count, host_total_listings_count, host_verifications, host_has_profile_pic, host_identity_verified, street, neighbourhood, neighbourhood_cleansed, neighbourhood_group_cleansed, city, state, zipcode, market, smart_location, country_code, country, latitude, longitude, is_location_exact, property_type, room_type, accommod...


In [14]:
rawDF.columns.length

res10: Int = 106


In [15]:
val df = rawDF.select("id","name","host_id","host_name"
                      ,"neighbourhood",
                      "latitude","longitude","room_type","price",
                      "minimum_nights","number_of_reviews","last_review",
                      "reviews_per_month","calculated_host_listings_count","availability_365")
df.write.csv("datasets/sf-airbnb/sf_airbnb_prep.csv") 

org.apache.spark.sql.AnalysisException:  path file:/home/dmboup/Desktop/apache_spark_data_processing/apache_spark_data_processing/datasets/sf-airbnb/sf_airbnb_prep.csv already exists.;

**Note** Dans ce notebook nous avons sauter la partie feature extration et feature selection.  
Pour simplifier la tache, nous avons choisi de garder que les colonnes utiles du datasets apres l'étape de feature engineering.

In [None]:
val baseDF = rawDF.select(
  "host_is_superhost",
  "cancellation_policy",
  "instant_bookable",
  "host_total_listings_count",
  "neighbourhood_cleansed",
  "latitude",
  "longitude",
  "property_type",
  "room_type",
  "accommodates",
  "bathrooms",
  "bedrooms",
  "beds",
  "bed_type",
  "minimum_nights",
  "number_of_reviews",
  "review_scores_rating",
  "review_scores_accuracy",
  "review_scores_cleanliness",
  "review_scores_checkin",
  "review_scores_communication",
  "review_scores_location",
  "review_scores_value",
  "price")

baseDF.cache().count

Avec le nombre colonnes élevé, l'affichage devient illisible

In [16]:
baseDF.show(1)

+-----------------+-------------------+----------------+-------------------------+----------------------+--------+----------+-------------+---------------+------------+---------+--------+----+--------+--------------+-----------------+--------------------+----------------------+-------------------------+---------------------+---------------------------+----------------------+-------------------+-------+
|host_is_superhost|cancellation_policy|instant_bookable|host_total_listings_count|neighbourhood_cleansed|latitude| longitude|property_type|      room_type|accommodates|bathrooms|bedrooms|beds|bed_type|minimum_nights|number_of_reviews|review_scores_rating|review_scores_accuracy|review_scores_cleanliness|review_scores_checkin|review_scores_communication|review_scores_location|review_scores_value|  price|
+-----------------+-------------------+----------------+-------------------------+----------------------+--------+----------+-------------+---------------+------------+---------+--------+-

Nous avons choisi quelques colonnes du datasets pour avoir une bonne rendue

In [17]:
baseDF.select(
  "host_is_superhost",
  "instant_bookable",
  "latitude",
  "longitude",
  "accommodates",
  "bathrooms",
  "bedrooms",
  "beds",
  "bed_type",
  "minimum_nights",
  "number_of_reviews",
  "price").show

+-----------------+----------------+--------+----------+------------+---------+--------+----+--------+--------------+-----------------+-------+
|host_is_superhost|instant_bookable|latitude| longitude|accommodates|bathrooms|bedrooms|beds|bed_type|minimum_nights|number_of_reviews|  price|
+-----------------+----------------+--------+----------+------------+---------+--------+----+--------+--------------+-----------------+-------+
|                t|               t|37.76931|-122.43386|           3|      1.0|       1|   2|Real Bed|             1|              180|$170.00|
|                f|               f|37.74511|-122.42102|           5|      1.0|       2|   3|Real Bed|            30|              111|$235.00|
|                f|               f|37.76669| -122.4525|           2|      4.0|       1|   1|Real Bed|            32|               17| $65.00|
|                f|               f|37.76487|-122.45183|           2|      4.0|       1|   1|Real Bed|            32|                8| 

Dans le schéma ci-dessus. Vous remarquerez que le champ `price` a été sélectionné sous forme de chaîne. Dans cette tache, nous avons besoin qu'il soit champ numérique (type double).

In [18]:
import org.apache.spark.sql.functions.translate

val fixedPriceDF = baseDF.withColumn("price", translate($"price", "$", "").cast("double"))

import org.apache.spark.sql.functions.translate
fixedPriceDF: org.apache.spark.sql.DataFrame = [host_is_superhost: string, cancellation_policy: string ... 22 more fields]


In [19]:
fixedPriceDF.select(
  "host_is_superhost",
  "instant_bookable",
  "latitude",
  "longitude",
  "accommodates",
  "bathrooms",
  "bedrooms",
  "beds",
  "bed_type",
  "minimum_nights",
  "number_of_reviews",
  "price").show

+-----------------+----------------+--------+----------+------------+---------+--------+----+--------+--------------+-----------------+-----+
|host_is_superhost|instant_bookable|latitude| longitude|accommodates|bathrooms|bedrooms|beds|bed_type|minimum_nights|number_of_reviews|price|
+-----------------+----------------+--------+----------+------------+---------+--------+----+--------+--------------+-----------------+-----+
|                t|               t|37.76931|-122.43386|           3|      1.0|       1|   2|Real Bed|             1|              180|170.0|
|                f|               f|37.74511|-122.42102|           5|      1.0|       2|   3|Real Bed|            30|              111|235.0|
|                f|               f|37.76669| -122.4525|           2|      4.0|       1|   1|Real Bed|            32|               17| 65.0|
|                f|               f|37.76487|-122.45183|           2|      4.0|       1|   1|Real Bed|            32|                8| 65.0|
|     

**Quelques statistiques sur les données avec Describe & Summary**

In [20]:
val desc = fixedPriceDF.describe()

desc: org.apache.spark.sql.DataFrame = [summary: string, host_is_superhost: string ... 23 more fields]


In [12]:
desc.select(
"summary",
"host_is_superhost",
  "latitude",
  "longitude",
  "beds",
  "minimum_nights",
  "number_of_reviews",
  "price").show()

+-------+-----------------+--------------------+--------------------+------------------+------------------+-----------------+------------------+
|summary|host_is_superhost|            latitude|           longitude|              beds|    minimum_nights|number_of_reviews|             price|
+-------+-----------------+--------------------+--------------------+------------------+------------------+-----------------+------------------+
|  count|             7151|                7151|                7151|              7144|              7151|             7151|              7059|
|   mean|             null|   37.76580945042649| -122.43052552230478|1.7648376259798433|14000.302335337716|43.52915676129213|191.33914152146195|
| stddev|             null|0.022527191846014046|0.026791775802673057| 1.176852628831775|1182541.9078980184|72.51922886627213|141.76871042769946|
|    min|                f|            37.70743|          -122.51306|                 0|                 1|                0|     

In [21]:
val stats = fixedPriceDF.summary()

stats: org.apache.spark.sql.DataFrame = [summary: string, host_is_superhost: string ... 23 more fields]


In [22]:
stats.select(
"summary",
"host_is_superhost",
  "latitude",
  "longitude",
  "beds",
  "minimum_nights",
  "number_of_reviews",
  "price").show()

+-------+-----------------+--------------------+--------------------+------------------+------------------+-----------------+------------------+
|summary|host_is_superhost|            latitude|           longitude|              beds|    minimum_nights|number_of_reviews|             price|
+-------+-----------------+--------------------+--------------------+------------------+------------------+-----------------+------------------+
|  count|             7151|                7151|                7151|              7144|              7151|             7151|              7059|
|   mean|             null|   37.76580945042649| -122.43052552230478|1.7648376259798433|14000.302335337716|43.52915676129213|191.33914152146195|
| stddev|             null|0.022527191846014046|0.026791775802673057| 1.176852628831775|1182541.9078980184|72.51922886627213|141.76871042769946|
|    min|                f|            37.70743|          -122.51306|                 0|                 1|                0|     

### Imputation des valeurs nulles

Il existe de nombreuses façons de gérer les valeurs nulles. Parfois, **null** peut, en fait, être un indicateur clé de ce que vous essayez de prédire (par exemple, si vous ne remplissez pas certaines parties d'un formulaire, la probabilité qu'il soit approuvé diminue).
 
  Quelques façons de gérer les valeurs nulles:
  * Supprimez tous les enregistrements contenant des valeurs nulles
  * Numérique:
    * Remplacez-les par moyenne/médiane/zéro/etc.
  * Catégorique:
    * Remplacez-les par le mode
    * Créez une catégorie spéciale pour null
  * Utilisez d'autres techniques pour imputer les valeurs manquantes
   
**Si vous appliquez TOUTES les techniques d'imputation pour les attributs de type catégoriques/numériques, vous DEVEZ inclure un champ supplémentaire spécifiant que ce champ a été imputé (pensez à la raison pour laquelle cela est nécessaire)**

Il y a quelques valeurs nulles dans l'attribut catégorique `host_is_superhost`. Débarrassons-nous des lignes où l'une de ces colonnes est nulle.
 
L'option d'imputation de SparkML ne prend pas en charge l'imputation pour les attributs catégoriques, c'est donc l'approche la plus simple pour le moment.

In [15]:
val noNullsDF = fixedPriceDF.na.drop(cols = Seq("host_is_superhost"))

noNullsDF: org.apache.spark.sql.DataFrame = [host_is_superhost: string, cancellation_policy: string ... 22 more fields]


#### Casting des attributs de Integer vers le type Double

In [16]:
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.IntegerType

import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.IntegerType


In [17]:
val integerColumns = for (x <- baseDF.schema.fields if (x.dataType == IntegerType)) yield x.name  
var doublesDF = noNullsDF

for (c <- integerColumns)
  doublesDF = doublesDF.withColumn(c, col(c).cast("double"))

val columns = integerColumns.mkString("\n - ")
println(s"Colonnes convertis de Integer vers Double:\n - $columns \n")
println("*-"*80)

Colonnes convertis de Integer vers Double:
 - host_total_listings_count
 - accommodates
 - bedrooms
 - beds
 - minimum_nights
 - number_of_reviews
 - review_scores_rating
 - review_scores_accuracy
 - review_scores_cleanliness
 - review_scores_checkin
 - review_scores_communication
 - review_scores_location
 - review_scores_value 

*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-


integerColumns: Array[String] = Array(host_total_listings_count, accommodates, bedrooms, beds, minimum_nights, number_of_reviews, review_scores_rating, review_scores_accuracy, review_scores_cleanliness, review_scores_checkin, review_scores_communication, review_scores_location, review_scores_value)
doublesDF: org.apache.spark.sql.DataFrame = [host_is_superhost: string, cancellation_policy: string ... 22 more fields]
columns: String =
host_total_listings_count
 - accommodates
 - bedrooms
 - beds
 - minimum_nights
 - number_of_reviews
 - review_scores_rating
 - review_scores_accuracy
 - review_scores_cleanliness
 - review_scores_checkin
 - review_scores_communication
 - review_scores_location
 - review_scores_value


Nous allons ajouter une variable factive si nous allons imputer une valeur.

In [18]:
import org.apache.spark.sql.functions.when

val imputeCols = Array(
  "bedrooms",
  "bathrooms",
  "beds", 
  "review_scores_rating",
  "review_scores_accuracy",
  "review_scores_cleanliness",
  "review_scores_checkin",
  "review_scores_communication",
  "review_scores_location",
  "review_scores_value"
)

for (c <- imputeCols)
  doublesDF = doublesDF.withColumn(c + "_na", when(col(c).isNull, 1.0).otherwise(0.0))

import org.apache.spark.sql.functions.when
imputeCols: Array[String] = Array(bedrooms, bathrooms, beds, review_scores_rating, review_scores_accuracy, review_scores_cleanliness, review_scores_checkin, review_scores_communication, review_scores_location, review_scores_value)


In [19]:
doublesDF.select(
"host_is_superhost",
  "instant_bookable",
  "accommodates",
  "bathrooms",
  "bedrooms",
  "beds",
  "beds_na",
  "bed_type",
  "minimum_nights",
  "number_of_reviews",
  "review_scores_rating",
  "review_scores_accuracy",
  "price").show

+-----------------+----------------+------------+---------+--------+----+-------+--------+--------------+-----------------+--------------------+----------------------+-----+
|host_is_superhost|instant_bookable|accommodates|bathrooms|bedrooms|beds|beds_na|bed_type|minimum_nights|number_of_reviews|review_scores_rating|review_scores_accuracy|price|
+-----------------+----------------+------------+---------+--------+----+-------+--------+--------------+-----------------+--------------------+----------------------+-----+
|                t|               t|         3.0|      1.0|     1.0| 2.0|    0.0|Real Bed|           1.0|            180.0|                97.0|                  10.0|170.0|
|                f|               f|         5.0|      1.0|     2.0| 3.0|    0.0|Real Bed|          30.0|            111.0|                98.0|                  10.0|235.0|
|                f|               f|         2.0|      4.0|     1.0| 1.0|    0.0|Real Bed|          32.0|             17.0|       

In [20]:
import org.apache.spark.ml.feature.Imputer

val imputer = new Imputer()
  .setStrategy("median")
  .setInputCols(imputeCols)
  .setOutputCols(imputeCols)

val imputedDF = imputer.fit(doublesDF).transform(doublesDF)

import org.apache.spark.ml.feature.Imputer
imputer: org.apache.spark.ml.feature.Imputer = imputer_a805529d38e9
imputedDF: org.apache.spark.sql.DataFrame = [host_is_superhost: string, cancellation_policy: string ... 32 more fields]


Il faut jeter un coup d'œil sur les valeurs *min* et *max* de la colonne `price`:

In [21]:
imputedDF.select("price").describe().show

+-------+------------------+
|summary|             price|
+-------+------------------+
|  count|              7059|
|   mean|191.33914152146195|
| stddev|141.76871042769946|
|    min|               0.0|
|    max|             999.0|
+-------+------------------+



Il existe des annonces qui sont très chères. Mais c'est le travail du Data Scientist de décider quoi en faire. Nous pouvons certainement filtrer les Airbnbs "gratuits".
 
Voyons d'abord, combien d'annonces  dont leur *prix* est nul

In [22]:
imputedDF.filter($"price" === 0).count

res13: Long = 1


maintenant, nous allons seulement garder les lignes dont les prix sont strictement positifs.

In [23]:
val posPricesDF = imputedDF.filter($"price" > 0)

posPricesDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [host_is_superhost: string, cancellation_policy: string ... 32 more fields]


Voyons le min max de la colonne *minimum_nights*

In [24]:
posPricesDF.select("minimum_nights").describe().show

+-------+------------------+
|summary|    minimum_nights|
+-------+------------------+
|  count|              7058|
|   mean| 14184.45820345707|
| stddev|1190307.3162499827|
|    min|               1.0|
|    max|             1.0E8|
+-------+------------------+



In [25]:
posPricesDF
  .groupBy("minimum_nights").count()
  .orderBy($"count".desc, $"minimum_nights").show

+--------------+-----+
|minimum_nights|count|
+--------------+-----+
|          30.0| 2744|
|           2.0| 1433|
|           1.0| 1232|
|           3.0|  808|
|           4.0|  258|
|           5.0|  170|
|          31.0|  132|
|           7.0|   72|
|           6.0|   31|
|          32.0|   31|
|          60.0|   31|
|         180.0|   28|
|          90.0|   27|
|          45.0|    7|
|         120.0|    6|
|         365.0|    6|
|          14.0|    4|
|          10.0|    3|
|          40.0|    3|
|          28.0|    2|
+--------------+-----+
only showing top 20 rows



Un séjour minimum d'un an semble être une limite raisonnable ici. Filtrons les enregistrements où le *minimum_nights* est supérieur à 365 jours:

In [26]:
val cleanDF = posPricesDF.filter($"minimum_nights" <= 365)

cleanDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [host_is_superhost: string, cancellation_policy: string ... 32 more fields]


In [27]:
cleanDF.select(
  "host_is_superhost",
  "instant_bookable",
  "latitude",
  "longitude",
  "accommodates",
  "bathrooms",
  "bedrooms",
  "beds",
  "bed_type",
  "minimum_nights",
  "number_of_reviews",
  "price").show

+-----------------+----------------+--------+----------+------------+---------+--------+----+--------+--------------+-----------------+-----+
|host_is_superhost|instant_bookable|latitude| longitude|accommodates|bathrooms|bedrooms|beds|bed_type|minimum_nights|number_of_reviews|price|
+-----------------+----------------+--------+----------+------------+---------+--------+----+--------+--------------+-----------------+-----+
|                t|               t|37.76931|-122.43386|         3.0|      1.0|     1.0| 2.0|Real Bed|           1.0|            180.0|170.0|
|                f|               f|37.74511|-122.42102|         5.0|      1.0|     2.0| 3.0|Real Bed|          30.0|            111.0|235.0|
|                f|               f|37.76669| -122.4525|         2.0|      4.0|     1.0| 1.0|Real Bed|          32.0|             17.0| 65.0|
|                f|               f|37.76487|-122.45183|         2.0|      4.0|     1.0| 1.0|Real Bed|          32.0|              8.0| 65.0|
|     

OK, nos données sont maintenant nettoyées. Maintenant, nous pouvons sauvegarder ce DataFrame dans un fichier afin de pouvoir l'utiliser pour faire des modèle ML.

In [28]:
val outputPath = "datasets/sf-airbnb/sf-airbnb-clean.parquet"

cleanDF.write.mode("overwrite").parquet(outputPath)

outputPath: String = datasets/sf-airbnb/sf-airbnb-clean.parquet


### Machine Learning modèles
Nous rappelons, notre objectif est de developper un modèle ML de prédire les prix de location à partir des données de Airbnb à San Francisco.

Maintenant, nous allons utiliser les données que nous avons nettoyé et souvegardé.

In [29]:
val filePath = "datasets/sf-airbnb/sf-airbnb-clean.parquet"
val airbnbDF = spark.read.parquet(filePath)

filePath: String = datasets/sf-airbnb/sf-airbnb-clean.parquet
airbnbDF: org.apache.spark.sql.DataFrame = [host_is_superhost: string, cancellation_policy: string ... 32 more fields]


In [30]:
airbnbDF.show(2)

+-----------------+--------------------+----------------+-------------------------+----------------------+--------+----------+-------------+---------------+------------+---------+--------+----+--------+--------------+-----------------+--------------------+----------------------+-------------------------+---------------------+---------------------------+----------------------+-------------------+-----+-----------+------------+-------+-----------------------+-------------------------+----------------------------+------------------------+------------------------------+-------------------------+----------------------+
|host_is_superhost| cancellation_policy|instant_bookable|host_total_listings_count|neighbourhood_cleansed|latitude| longitude|property_type|      room_type|accommodates|bathrooms|bedrooms|beds|bed_type|minimum_nights|number_of_reviews|review_scores_rating|review_scores_accuracy|review_scores_cleanliness|review_scores_checkin|review_scores_communication|review_scores_location|

### Train/Test Split

Lorsqu'on construit un modèle Machine Learning, pourquoi on utilise pas les données de test ?
 
Nous allons prendre 80% des données pour le train et 20% pour le test. Nous utiliserons la méthode `randomSplit` [Python](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.randomSplit)/[Scala](https://spark.apache.org/docs/latest/api/scala/#org.apache.spark.sql.Dataset).

In [31]:
val Array(trainDF, testDF) = airbnbDF.randomSplit(Array(.8, .2), seed=42)
println(f" Il y a ${trainDF.cache().count()} lignes dans le  training set, et ${testDF.cache().count()} dans le test set")

 Il y a 5708 lignes dans le  training set, et 1347 dans le test set


trainDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [host_is_superhost: string, cancellation_policy: string ... 32 more fields]
testDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [host_is_superhost: string, cancellation_policy: string ... 32 more fields]


In [32]:
val Array(trainRepartitionDF, testRepartitionDF) = airbnbDF
  .repartition(24)
  .randomSplit(Array(.8, .2), seed=42)

println(trainRepartitionDF.count())

5658


trainRepartitionDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [host_is_superhost: string, cancellation_policy: string ... 32 more fields]
testRepartitionDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [host_is_superhost: string, cancellation_policy: string ... 32 more fields]


Le *spliting* de train/test 80/20 n'est pas exacte, il est juste «approximatif». Lorsque le partitionnement des données change, la taille des donnée est différent dans le train/test ainsi que les data ponts différents.
 
Nous recommandation est de partitionner vos données une fois, puis de les écrire dans un dossier train/test afin d'éviter ces problèmes de reproductibilité.

Nous allons construire un modèle de régression linéaire très simple prédisant le **"prix"** juste compte tenu du nombre de **"chambres"**.
 
**Question**: Quelles sont hypothèses du modèle de régression linéaire?

In [33]:
trainDF.select("price", "bedrooms").summary().show

+-------+------------------+------------------+
|summary|             price|          bedrooms|
+-------+------------------+------------------+
|  count|              5708|              5708|
|   mean|191.64453398738613| 1.326208829712684|
| stddev|142.23860298327358|0.9083263732042874|
|    min|              10.0|               0.0|
|    25%|             100.0|               1.0|
|    50%|             150.0|               1.0|
|    75%|             230.0|               2.0|
|    max|             999.0|              14.0|
+-------+------------------+------------------+



Il y a des valeurs aberrantes dans les données pour le prix (Par exemple 10 000$ la nuité ??). Faite attention à cela, lorsque vous construisez vos modèles :).

### Modèle 1

Pour ce premier modèle, nous allons  prédire le prix du loyer en fonctions du nombre de chambres

### Vector Assembler
La régression linéaire prend une colonne de type Vector comme entrée.
 
 Nous pouvons facilement obtenir les valeurs de la colonne `chambres` dans un seul vecteur en utilisant `VectorAssembler` [Python](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark. ml.feature.VectorAssembler)/[Scala](https://spark.apache.org/docs/latest/api/scala/#org.apache.spark.ml.feature.VectorAssembler). VectorAssembler est un exemple de **transformer**. Les *transformers* prennent un DataFrame et renvoient un nouveau DataFrame avec une ou plusieurs colonnes qui lui sont ajoutées. Ils n'apprennent pas de vos données, mais appliquent des transformations basées sur des règles.

In [34]:
import org.apache.spark.ml.feature.VectorAssembler

val vecAssembler = new VectorAssembler()
  .setInputCols(Array("bedrooms"))
  .setOutputCol("features")

val vecTrainDF = vecAssembler.transform(trainDF)

vecTrainDF.select("bedrooms", "features", "price").show(10)

+--------+--------+-----+
|bedrooms|features|price|
+--------+--------+-----+
|     1.0|   [1.0]|200.0|
|     1.0|   [1.0]|130.0|
|     1.0|   [1.0]| 95.0|
|     1.0|   [1.0]|250.0|
|     3.0|   [3.0]|250.0|
|     1.0|   [1.0]|115.0|
|     1.0|   [1.0]|105.0|
|     1.0|   [1.0]| 86.0|
|     1.0|   [1.0]|100.0|
|     2.0|   [2.0]|220.0|
+--------+--------+-----+
only showing top 10 rows



import org.apache.spark.ml.feature.VectorAssembler
vecAssembler: org.apache.spark.ml.feature.VectorAssembler = VectorAssembler: uid=vecAssembler_4e00ae925b87, handleInvalid=error, numInputCols=1
vecTrainDF: org.apache.spark.sql.DataFrame = [host_is_superhost: string, cancellation_policy: string ... 33 more fields]


### Regression Linéaire

Maintenant que nous avons préparé les données, nous pouvons utiliser l'estimateur `LinearRegression` pour construire notre premier modèle [Python](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark .ml.regression.LinearRegression)/[Scala](https://spark.apache.org/docs/latest/api/scala/#org.apache.spark.ml.regression.LinearRegression). Les estimateurs acceptent un DataFrame comme entrée et retournent un modèle. Ils ont une méthode `.fit ()` pour faire le fitting ou l'entrainement du modèle.

In [35]:
import org.apache.spark.ml.regression.LinearRegression
val lr = new LinearRegression()
  .setFeaturesCol("features")
  .setLabelCol("price")

val lrModel = lr.fit(vecTrainDF)

import org.apache.spark.ml.regression.LinearRegression
lr: org.apache.spark.ml.regression.LinearRegression = linReg_9fe2eb78a95b
lrModel: org.apache.spark.ml.regression.LinearRegressionModel = LinearRegressionModel: uid=linReg_9fe2eb78a95b, numFeatures=1


### La droite de regression price= a* bedrooms +b

In [36]:
val m = lrModel.coefficients(0)
val b = lrModel.intercept

println(f"La droite de Regression price = $m%1.2f*bedrooms + $b%1.2f")
println("*-"*80)


La droite de Regression price = 90.94*bedrooms + 71.04
*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-


m: Double = 90.93678677027269
b: Double = 71.0433644269509


### Pipeline

In [37]:
import org.apache.spark.ml.Pipeline

val pipeline = new Pipeline().setStages(Array(vecAssembler, lr))
val pipelineModel = pipeline.fit(trainDF)

import org.apache.spark.ml.Pipeline
pipeline: org.apache.spark.ml.Pipeline = pipeline_32375b7f4dee
pipelineModel: org.apache.spark.ml.PipelineModel = pipeline_32375b7f4dee


### Test 

In [38]:
val predDF = pipelineModel.transform(testDF)

predDF.select("bedrooms", "features", "price", "prediction").show(10)

+--------+--------+-----+------------------+
|bedrooms|features|price|        prediction|
+--------+--------+-----+------------------+
|     1.0|   [1.0]| 85.0| 161.9801511972236|
|     1.0|   [1.0]| 45.0| 161.9801511972236|
|     1.0|   [1.0]| 70.0| 161.9801511972236|
|     1.0|   [1.0]|128.0| 161.9801511972236|
|     1.0|   [1.0]|159.0| 161.9801511972236|
|     2.0|   [2.0]|250.0|252.91693796749627|
|     1.0|   [1.0]| 99.0| 161.9801511972236|
|     1.0|   [1.0]| 95.0| 161.9801511972236|
|     1.0|   [1.0]|100.0| 161.9801511972236|
|     1.0|   [1.0]|270.0| 161.9801511972236|
+--------+--------+-----+------------------+
only showing top 10 rows



predDF: org.apache.spark.sql.DataFrame = [host_is_superhost: string, cancellation_policy: string ... 34 more fields]


Calculons le «prix» moyen sur les données d'entraînement, et utilisons-le comme colonne de prédiction sur les données de test, puis évaluons le résultat.

In [39]:
import org.apache.spark.sql.functions.{avg, lit}

import org.apache.spark.sql.functions.{avg, lit}


In [54]:
val avgPrice = trainDF.select(avg("price")).first().getDouble(0)

val predDF = testDF.withColumn("avgPrediction", lit(avgPrice))

avgPrice: Double = 191.64453398738613
predDF: org.apache.spark.sql.DataFrame = [host_is_superhost: string, cancellation_policy: string ... 33 more fields]


Le *Root Mean Square Error* (RMSE) ou Erreur quadratique moyenne définit ci-dessous nous permet d'évaluer le modèle.  
$$RMSE = \sqrt{\frac{1}{n}\Sigma_{i=1}^{n}(y_i- \hat{y})^2}$$

In [55]:
import org.apache.spark.ml.evaluation.RegressionEvaluator

val regressionMeanEvaluator = new RegressionEvaluator()
  .setPredictionCol("avgPrediction")
  .setLabelCol("price")
  .setMetricName("rmse")

import org.apache.spark.ml.evaluation.RegressionEvaluator
regressionMeanEvaluator: org.apache.spark.ml.evaluation.RegressionEvaluator = RegressionEvaluator: uid=regEval_17e8f40d60fd, metricName=rmse, throughOrigin=false


In [57]:
val rmse = regressionMeanEvaluator.evaluate(predDF)
println (f"The RMSE pour la prediction du prix moyen de loyer: $rmse%1.2f")

The RMSE pour la prediction du prix moyen de loyer: 139.82


rmse: Double = 139.81513140129226


### Modèle 2.
Pour prèdire le prix du loyer nous allons ajouter d'autres features supplementaires, en faisant OneHoteEncoding pour les attributs categoriques.

* Nous allons faire du OneHotEncode (OHE) des variables catégoriques. La première approche que nous allons utiliser combinera `StringIndexer`, `OneHotEncoder` et `VectorAssembler`.
* Nous devons d'abord utiliser `StringIndexer` pour mapper la colonne en string des labels à une colonne d'index de labels [Python](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.feature.StringIndexer)/[Scala](https://spark.apache.org/docs/latest/api/scala/#org.apache.spark.ml.feature.StringIndexer).
* Ensuite, nous pouvons appliquer le `OneHotEncoder` à la sortie du `StringIndexer` [Python](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.feature.OneHotEncoder )/[Scala](https://spark.apache.org/docs/latest/api/scala/#org.apache.spark.ml.feature.OneHotEncoder).

In [58]:
import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer}

val categoricalCols = trainDF.dtypes.filter(_._2 == "StringType").map(_._1)
val indexOutputCols = categoricalCols.map(_ + "Index")
val oheOutputCols = categoricalCols.map(_ + "OHE")

val stringIndexer = new StringIndexer()
  .setInputCols(categoricalCols)
  .setOutputCols(indexOutputCols)
  .setHandleInvalid("skip")

val oheEncoder = new OneHotEncoder()
  .setInputCols(indexOutputCols)
  .setOutputCols(oheOutputCols)

import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer}
categoricalCols: Array[String] = Array(host_is_superhost, cancellation_policy, instant_bookable, neighbourhood_cleansed, property_type, room_type, bed_type)
indexOutputCols: Array[String] = Array(host_is_superhostIndex, cancellation_policyIndex, instant_bookableIndex, neighbourhood_cleansedIndex, property_typeIndex, room_typeIndex, bed_typeIndex)
oheOutputCols: Array[String] = Array(host_is_superhostOHE, cancellation_policyOHE, instant_bookableOHE, neighbourhood_cleansedOHE, property_typeOHE, room_typeOHE, bed_typeOHE)
stringIndexer: org.apache.spark.ml.feature.StringIndexer = strIdx_8adf69b2bb91
oheEncoder: org.apache.spark.ml.feature.OneHotEncoder = oneHotEncoder_1aa4a8e7119c


Nous pouvons désormais combiner les attributs catégoriques OHE avec les attributs numériques.

In [59]:
import org.apache.spark.ml.feature.VectorAssembler

val numericCols = trainDF.dtypes.filter{ case (field, dataType) => 
                      dataType == "DoubleType" && field != "price"}.map(_._1)

import org.apache.spark.ml.feature.VectorAssembler
numericCols: Array[String] = Array(host_total_listings_count, latitude, longitude, accommodates, bathrooms, bedrooms, beds, minimum_nights, number_of_reviews, review_scores_rating, review_scores_accuracy, review_scores_cleanliness, review_scores_checkin, review_scores_communication, review_scores_location, review_scores_value, bedrooms_na, bathrooms_na, beds_na, review_scores_rating_na, review_scores_accuracy_na, review_scores_cleanliness_na, review_scores_checkin_na, review_scores_communication_na, review_scores_location_na, review_scores_value_na)


In [60]:
val assemblerInputs = oheOutputCols ++ numericCols
val vecAssembler = new VectorAssembler()
  .setInputCols(assemblerInputs)
  .setOutputCol("features")

assemblerInputs: Array[String] = Array(host_is_superhostOHE, cancellation_policyOHE, instant_bookableOHE, neighbourhood_cleansedOHE, property_typeOHE, room_typeOHE, bed_typeOHE, host_total_listings_count, latitude, longitude, accommodates, bathrooms, bedrooms, beds, minimum_nights, number_of_reviews, review_scores_rating, review_scores_accuracy, review_scores_cleanliness, review_scores_checkin, review_scores_communication, review_scores_location, review_scores_value, bedrooms_na, bathrooms_na, beds_na, review_scores_rating_na, review_scores_accuracy_na, review_scores_cleanliness_na, review_scores_checkin_na, review_scores_communication_na, review_scores_location_na, review_scores_value_na)
vecAssembler: org.apache.spark.ml.feature.VectorAssembler = VectorAssembler: uid=vecAssembler_f9fb...


### Linear Regression
Maintenant nous avons tous les features pour construire le modèle.

In [61]:
import org.apache.spark.ml.regression.LinearRegression

val lr = new LinearRegression()
  .setLabelCol("price")
  .setFeaturesCol("features")

import org.apache.spark.ml.regression.LinearRegression
lr: org.apache.spark.ml.regression.LinearRegression = linReg_d9aa8d73cc1b


In [62]:
import org.apache.spark.ml.Pipeline

val stages = Array(stringIndexer, oheEncoder, vecAssembler,  lr)

val pipeline = new Pipeline()
  .setStages(stages)

val pipelineModel = pipeline.fit(trainDF)
val predDF = pipelineModel.transform(testDF)
predDF.select("features", "price", "prediction").show(5)

+--------------------+-----+------------------+
|            features|price|        prediction|
+--------------------+-----+------------------+
|(98,[0,3,6,22,43,...| 85.0|109.20433611042972|
|(98,[0,3,6,22,43,...| 45.0| 39.69910013655317|
|(98,[0,3,6,22,43,...| 70.0| 44.60628120705951|
|(98,[0,3,6,12,42,...|128.0|-41.91602285458066|
|(98,[0,3,6,12,43,...|159.0|106.31505350096995|
+--------------------+-----+------------------+
only showing top 5 rows



import org.apache.spark.ml.Pipeline
stages: Array[org.apache.spark.ml.PipelineStage with org.apache.spark.ml.util.DefaultParamsWritable{def copy(extra: org.apache.spark.ml.param.ParamMap): org.apache.spark.ml.PipelineStage with org.apache.spark.ml.util.DefaultParamsWritable{def copy(extra: org.apache.spark.ml.param.ParamMap): org.apache.spark.ml.PipelineStage with org.apache.spark.ml.util.DefaultParamsWritable}}] = Array(strIdx_8adf69b2bb91, oneHotEncoder_1aa4a8e7119c, VectorAssembler: uid=vecAssembler_f9fb04a610f7, handleInvalid=error, numInputCols=33, linReg_d9aa8d73cc1b)
pipeline: org.apache.spark.ml.Pipeline = pipeline_c57c6a047f24
pipelineModel: org.apache.spark.ml.PipelineModel = pipeline_c57c6a047f24
predDF: org.apache.spark.sql.DataFrame = [host_is_superhost: string, cancellatio...


## Option 2 : RFormula
* Au lieu de spécifier manuellement quelles colonnes sont catégoriques pour StringIndexer et OneHotEncoder, RFormula peut le faire automatiquement pour vous [Python](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.feature.RFormula)/[Scala](https://spark.apache.org/docs/latest/api/scala/#org.apache.spark.ml.feature.RFormula).
* Avec RFormula, si vous avez des colonnes de type String, il la traite comme un *attribut* catégorique et crée les indexes et le one hot encoding pour nous. Sinon, il laisse tel quel. Ensuite, il combine toutes les features encodées et  numériques en un seul vecteur, appelé «features».

In [63]:
import org.apache.spark.ml.feature.RFormula

import org.apache.spark.ml.feature.RFormula


In [64]:

val rFormula = new RFormula()
  .setFormula("price ~ .")
  .setFeaturesCol("features")
  .setLabelCol("price")
  .setHandleInvalid("skip")

rFormula: org.apache.spark.ml.feature.RFormula = RFormula: uid=rFormula_7d8c6db5b56f, formula = price ~ .


In [65]:
import org.apache.spark.ml.Pipeline

val pipeline = new Pipeline().setStages(Array(rFormula, lr))

val pipelineModel = pipeline.fit(trainDF)
val predDF = pipelineModel.transform(testDF)
predDF.select("features", "price", "prediction").show(5)

+--------------------+-----+------------------+
|            features|price|        prediction|
+--------------------+-----+------------------+
|(98,[0,3,6,7,23,4...| 85.0|109.20433653512737|
|(98,[0,3,6,7,23,4...| 45.0| 39.69909986710991|
|(98,[0,3,6,7,23,4...| 70.0|  44.6062804524845|
|(98,[0,3,6,7,13,4...|128.0| -41.9160226271124|
|(98,[0,3,6,7,13,4...|159.0|106.31505335881957|
+--------------------+-----+------------------+
only showing top 5 rows



import org.apache.spark.ml.Pipeline
pipeline: org.apache.spark.ml.Pipeline = pipeline_2a199c55cc36
pipelineModel: org.apache.spark.ml.PipelineModel = pipeline_2a199c55cc36
predDF: org.apache.spark.sql.DataFrame = [host_is_superhost: string, cancellation_policy: string ... 34 more fields]


### Evaluation du modèle

In [66]:
import org.apache.spark.ml.evaluation.RegressionEvaluator

val regressionEvaluator = new RegressionEvaluator()
  .setPredictionCol("prediction")
  .setLabelCol("price")
  .setMetricName("rmse")

val rmse = regressionEvaluator.evaluate(predDF)
println(f"RMSE est égale à $rmse%1.2f")

RMSE est égale à 97.75


import org.apache.spark.ml.evaluation.RegressionEvaluator
regressionEvaluator: org.apache.spark.ml.evaluation.RegressionEvaluator = RegressionEvaluator: uid=regEval_04344a9f6760, metricName=rmse, throughOrigin=false
rmse: Double = 97.7482812471497


**R2** représente le coefficient de l'adéquation des valeurs par rapport aux valeurs d'origine. La valeur de 0 à 1 interprétée comme des pourcentages. Plus la valeur est élevée, meilleur est le modèle.

$$R2 = 1- \frac{\sum_{i=1}^{n}(y_{i}-{\hat{y_{i}}})^{2}}{\sum_{i=1}^{n}(y_{i}-{\bar{y}})^{2}} = 1 - \frac{RMSE^2}{\sigma^2}$$

In [67]:
val r2 = regressionEvaluator.setMetricName("r2").evaluate(predDF)
println(f"R2 est égale à $r2%1.2f")

R2 est égale à 0.51


r2: Double = 0.5113945481435695


### Sauvegarde du Modèle

In [69]:
val pipelinePath = "datasets/sf-airbnb/lr-pipeline-model"
pipelineModel.write.overwrite().save(pipelinePath)

pipelinePath: String = datasets/sf-airbnb/lr-pipeline-model
