# Choix du jeu de données

### Type de problème 
On a un problème de classification ici c'est à dire que le but de cette étude est de créer un modèle de prédiction de la survie des passagers du Titanic suite à son naufrage.

Lecture du jeu de données (format csv) à partir de HDFS
Ici, nous utilisons le dataset titanic disponible sur kaggle https://www.kaggle.com/competitions/titanic/data

La variable cible sera **Survived** et les variables descriptives seront choisie par la suite dans ce notebook 

###Presentation des données

#####▶Nombre de variables qualitatives :

PassengerId

Survived: Survival (0 = No; 1 = Yes)

Pclass: Passenger Class (1 = 1st; 2 = 2nd; 3 = 3rd)

Sex: Sex

Ticket: Ticket Number

Cabin: Cabin

Embarked: Port of Embarkation (C = Cherbourg; Q =
Queenstown; S = Southampton)

#####▶ Nombre de variables quantitatives :

SlibSp: Number of Siblings/Spouses Aboard

Parch: Number of Parents/Children Aboard

Fare: Passenger Fare (British pound)

### Metrique choisie
La métrique choisie est l' accuracy car elle est une mesure permettant d'évaluer les modèles de classification. 
De manière informelle, elle est la fraction des prédictions que notre modèle a eu raison. 

Accuracy = Nombre de Predictions Correctes/ Nombre Total de Predictions

Pour la classification binaire comme dans notre cas, la précision peut également être calculée en termes de positifs et de négatifs comme suit :

Accuracy = (TP + TN) / (TP + TN + FP + FN)

In [ ]:
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.{RandomForestClassificationModel, RandomForestClassifier}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer}

import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.{RandomForestClassificationModel, RandomForestClassifier}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer}


In [ ]:
//importation du dataset
import org.apache.spark.sql._
val spark = SparkSession.builder().getOrCreate()

//train data
val train = spark.read.format("com.databricks.spark.csv")
              .option("header", "true")
              .option("inferSchema", "true") 
              .load("hdfs://hupi-factory-02-01-01-01/user/uppa_big_data_2022_m1/KEITA_moussa_TAHAKOURT_maya/train.csv")

//test data
val test = spark.read.format("com.databricks.spark.csv")
              .option("header", "true")
              .option("inferSchema", "true") 
              .load("hdfs://hupi-factory-02-01-01-01/user/uppa_big_data_2022_m1/KEITA_moussa_TAHAKOURT_maya/test.csv")

import org.apache.spark.sql._
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@6b5e0c0e
train: org.apache.spark.sql.DataFrame = [PassengerId: int, Survived: int ... 10 more fields]
test: org.apache.spark.sql.DataFrame = [PassengerId: int, Pclass: int ... 9 more fields]


In [ ]:
train.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [ ]:
train.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
|          6|       0|     3|    Moran, Mr. James|  male|null|    0|    0|      

In [ ]:
train.describe().show()

+-------+-----------------+-------------------+------------------+--------------------+------+------------------+------------------+-------------------+------------------+-----------------+-----+--------+
|summary|      PassengerId|           Survived|            Pclass|                Name|   Sex|               Age|             SibSp|              Parch|            Ticket|             Fare|Cabin|Embarked|
+-------+-----------------+-------------------+------------------+--------------------+------+------------------+------------------+-------------------+------------------+-----------------+-----+--------+
|  count|              891|                891|               891|                 891|   891|               714|               891|                891|               891|              891|  204|     889|
|   mean|            446.0| 0.3838383838383838| 2.308641975308642|                null|  null| 29.69911764705882|0.5230078563411896|0.38159371492704824|260318.54916792738| 32.20420

#### Remarque:
nous pouvons voir que 38% de l'ensemble d'entraînement ont survécu au Titanic. Nous pouvons également voir que l'âge des passagers varie de 0,4 à 80 ans. En plus de cela, nous pouvons déjà détecter certaines fonctionnalités contenant des valeurs manquantes, comme la fonctionnalité "Âge" qu'on va gerer par la suite.

# Analyse des données

###Etude des corrélations

Le caclul de la matrice de corrélation est faite avec la méthode Pearson.

Selon la matrice de corrélation ci-dessous (Bitmap), il existe des corrélations relativement fortes/faibles entre plusieurs colonnes de notre dataset qui sont :

SibSp and Parch: corr(correlation coefficient) = 0.41

Pclass and Fare : corr = -0,55 

Survived and Sex : corr = -0,54 

Survived et Pclass : corr = -0,34

On tient à préciser qu'avant de créer la matrice de corrélation (Bitmap) on a fait des modifications au niveau des variables "Sex" et "Embarked" 

Pour "sex" on a affecté la valeur 1 pour "male" et 0 pour "female"

De la meme façon pour "Embarked" on a affecté 0 pour "S" 1 pour "C" et 2 pour "Q"

In [ ]:
<img src="https://i.ibb.co/z89xVV4/correlation.png" />


res11: scala.xml.Elem = <img src="https://i.ibb.co/z89xVV4/correlation.png"/>


### Age et Survie
On remarque quelques relations entre l'âge et la survie. La plupart des passagers de cet ensemble de données sont âgés de 15 à 35 ans.
Le passager survivant le plus âgé avait 80 ans et les enfants de moins de 4 ans avaient un taux de survie élevé.

In [ ]:
<img src="https://i.ibb.co/Q6ChKsr/Age-Survived.png"/>

res13: scala.xml.Elem = <img src="https://i.ibb.co/Q6ChKsr/Age-Survived.png"/>


## Exploration des données

In [ ]:
(train
  .groupBy("Pclass","Embarked")
  .agg(count("*"),avg("Fare"),min("Fare"),max("Fare"),stddev("Fare"))
  .orderBy("Pclass","Embarked")
  .show())

+------+--------+--------+------------------+---------+---------+------------------+
|Pclass|Embarked|count(1)|         avg(Fare)|min(Fare)|max(Fare)| stddev_samp(Fare)|
+------+--------+--------+------------------+---------+---------+------------------+
|     1|    null|       2|              80.0|     80.0|     80.0|               0.0|
|     1|       C|      85|104.71852941176469|    26.55| 512.3292|  99.0939349696501|
|     1|       Q|       2|              90.0|     90.0|     90.0|               0.0|
|     1|       S|     127| 70.36486220472443|      0.0|    263.0|58.811277761795566|
|     2|       C|      17|25.358335294117644|     12.0|  41.5792|11.345067090697457|
|     2|       Q|       3|             12.35|    12.35|    12.35|               0.0|
|     2|       S|     164|20.327439024390245|      0.0|     73.5|13.630741099088103|
|     3|       C|      66|11.214083333333337|   4.0125|  22.3583| 4.871528353625736|
|     3|       Q|      72|11.183393055555557|     6.75|   29.125|

In [ ]:
train.groupBy("Pclass").mean("Fare").show()

+------+------------------+
|Pclass|         avg(Fare)|
+------+------------------+
|     1| 84.15468749999992|
|     3|13.675550101832997|
|     2| 20.66218315217391|
+------+------------------+



In [ ]:
train.groupBy("Pclass","Survived").count().show()

+------+--------+-----+
|Pclass|Survived|count|
+------+--------+-----+
|     1|       0|   80|
|     3|       1|  119|
|     1|       1|  136|
|     2|       1|   87|
|     2|       0|   97|
|     3|       0|  372|
+------+--------+-----+



### Remarque
Nous pouvons clairement voir que les passagers de Pclass 1 ont reçu une très haute priorité lors du sauvetage. Même si le nombre de passagers dans la Pclass 3 était beaucoup plus élevé, leur nombre de survie est toujours très faible.

In [ ]:
train.groupBy("Survived").count().show()

+--------+-----+
|Survived|count|
+--------+-----+
|       1|  342|
|       0|  549|
+--------+-----+



In [ ]:
train.filter("Survived = 1").select("Name","Pclass").show()

+--------------------+------+
|                Name|Pclass|
+--------------------+------+
|Cumings, Mrs. Joh...|     1|
|Heikkinen, Miss. ...|     3|
|Futrelle, Mrs. Ja...|     1|
|Johnson, Mrs. Osc...|     3|
|Nasser, Mrs. Nich...|     2|
|Sandstrom, Miss. ...|     3|
|Bonnell, Miss. El...|     1|
|Hewlett, Mrs. (Ma...|     2|
|Williams, Mr. Cha...|     2|
|Masselmani, Mrs. ...|     3|
|Beesley, Mr. Lawr...|     2|
|"McGowan, Miss. A...|     3|
|Sloper, Mr. Willi...|     1|
|Asplund, Mrs. Car...|     3|
|"O'Dwyer, Miss. E...|     3|
|Spencer, Mrs. Wil...|     1|
|Glynn, Miss. Mary...|     3|
|    Mamee, Mr. Hanna|     3|
|Nicola-Yarred, Mi...|     3|
|Laroche, Miss. Si...|     2|
+--------------------+------+
only showing top 20 rows



In [ ]:
train.groupBy("Sex").agg(max("Age")).show()

+------+--------+
|   Sex|max(Age)|
+------+--------+
|female|    63.0|
|  male|    80.0|
+------+--------+



# Pré-traitement des données


### Traitement des valeurs nulles

In [ ]:
import org.apache.spark.sql.functions.col

import org.apache.spark.sql.functions.col


In [ ]:
val ageMissingDataCount: Double = train.filter(col("Age").isNull).count()
val total : Double = train.count()
val ratioMissingAge: Double = (ageMissingDataCount / total) * 100

ageMissingDataCount: Double = 177.0
total: Double = 891.0
ratioMissingAge: Double = 19.865319865319865


On voit bien que la variable "Age" contient beaucoup de valeurs nulles, 177 valeurs en tout qui représente 20% du jeu de données,afin de corrigé ça on a completé ces valeurs avec la mediane de la variable Age.

In [ ]:
val Array(medianAge) = train.stat.approxQuantile("Age", Array(0.5), 0.25)
val train_df0 : DataFrame =train.na.fill(medianAge)

//pour test data
val Array(medianAge2) = test.stat.approxQuantile("Age", Array(0.5), 0.25)
val test_df0 : DataFrame =test.na.fill(medianAge2)

train_df0.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
|          6|       0|     3|    Moran, Mr. James|  male|21.0|    0|    0|      

In [ ]:
train.groupBy("Embarked").count().show()

+--------+-----+
|Embarked|count|
+--------+-----+
|       Q|   77|
|    null|    2|
|       C|  168|
|       S|  644|
+--------+-----+



Embarked feature n'a que deux valeurs manquantes et la majorité des passagers 
sont montés à partir de S. On peut imputer avec S

In [ ]:
//pour train data
val train_df1 = train_df0.na.fill("S", Seq("Embarked"))
//pour test data
val test_df1 = test_df0.na.fill("S", Seq("Embarked"))

train_df1: org.apache.spark.sql.DataFrame = [PassengerId: int, Survived: int ... 10 more fields]
test_df1: org.apache.spark.sql.DataFrame = [PassengerId: int, Pclass: int ... 9 more fields]


In [ ]:
train_df1.groupBy("Embarked").count().show()

+--------+-----+
|Embarked|count|
+--------+-----+
|       Q|   77|
|       C|  168|
|       S|  646|
+--------+-----+



In [ ]:
val cabMissingDataCount: Double = train_df1.filter(col("Cabin").isNull).count()
val total : Double = train.count()
val ratioMissingAge: Double = (cabMissingDataCount / total) * 100

cabMissingDataCount: Double = 687.0
total: Double = 891.0
ratioMissingAge: Double = 77.10437710437711


On peux supprimer cabine features car elles contient de nombreuses valeurs nulles

In [ ]:
//pour train data
val train_df2 = train_df1.drop(col("Cabin"))
//pour test data
val test_df2 = test_df1.drop(col("Cabin"))

train_df2: org.apache.spark.sql.DataFrame = [PassengerId: int, Survived: int ... 9 more fields]
test_df2: org.apache.spark.sql.DataFrame = [PassengerId: int, Pclass: int ... 8 more fields]


######Traitement des valeurs aberrantes et outliers
Nous n'avons trouvé ni de valeur aberrante ni d'outliers

#### Creation d'une variable
Nous pouvons créer une nouvelle variable appelée Family_size et Alone pour enrichir nos données.
Cette caractéristique est la somme de Parch(parents/enfants) et SibSp(frères et sœurs/conjoints).
Cela nous donne des données combinées afin que nous puissions vérifier si le taux de survie a quelque chose à voir avec 
la taille de la famille des passagers

In [ ]:
//pour train data
val train_df3 = train_df2.withColumn("Family_Size",col("SibSp")+col("Parch"))
//pour test data
val test_df3 = test_df2.withColumn("Family_Size",col("SibSp")+col("Parch"))
train_df3.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+--------+-----------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Embarked|Family_Size|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+--------+-----------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25|       S|          1|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|       C|          1|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925|       S|          0|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1|       S|          1|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05|       S|          0|
|          6|       0|     3|   

#### Creation d'une variable Alone:
Cela va nous permettre de savoir si un passager avait de la famille ou etait seul lors du nauffrage et peut être un feature important pour la prediction 

In [ ]:
//pour train data
val train_df4 = train_df3.withColumn("Alone",lit(0))
//pour test data
val test_df4 = test_df3.withColumn("Alone",lit(0))

train_df4: org.apache.spark.sql.DataFrame = [PassengerId: int, Survived: int ... 11 more fields]
test_df4: org.apache.spark.sql.DataFrame = [PassengerId: int, Pclass: int ... 10 more fields]


In [ ]:
//pour train data
val train_df5 = train_df4.withColumn("Alone",when($"Family_Size" === 0, 1).otherwise($"Alone"))
//pour test data
val test_df5 = test_df4.withColumn("Alone",when($"Family_Size" === 0, 1).otherwise($"Alone"))

train_df5: org.apache.spark.sql.DataFrame = [PassengerId: int, Survived: int ... 11 more fields]
test_df5: org.apache.spark.sql.DataFrame = [PassengerId: int, Pclass: int ... 10 more fields]


In [ ]:
train_df5.show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+--------+-----------+-----+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Embarked|Family_Size|Alone|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+--------+-----------+-----+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25|       S|          1|    0|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|       C|          1|    0|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925|       S|          0|    1|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1|       S|          1|    0|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05|       S|   

##### NB: Si Alone =1 le Passager etait seul sinon il avait une famille

###Traitement des variables catégoriques

Nous avons effectué des modifications au niveau des variables "Sex" et "Embarked" car ces variables sont categorielle, et un algorithme de machine learning exige en entrée des donnéés numeriques pour lui faciliter le calcul.

In [ ]:
//pour train
val train_df6: DataFrame = train_df5.withColumn("sexNew", when($"Sex" === "male",1)
                                             .otherwise(0))

val train_df7: DataFrame = train_df6.withColumn("embarkedNew", when($"Embarked" === "S",0)
                                                .when($"Embarked" === "C",1)
                                             .otherwise(2))

//pour test
val test_df6: DataFrame = test_df5.withColumn("sexNew", when($"Sex" === "male",1)
                                             .otherwise(0))

val test_df7: DataFrame = test_df6.withColumn("embarkedNew", when($"Embarked" === "S",0)
                                                .when($"Embarked" === "C",1)
                                             .otherwise(2))

train_df6: org.apache.spark.sql.DataFrame = [PassengerId: int, Survived: int ... 12 more fields]
train_df7: org.apache.spark.sql.DataFrame = [PassengerId: int, Survived: int ... 13 more fields]
test_df6: org.apache.spark.sql.DataFrame = [PassengerId: int, Pclass: int ... 11 more fields]
test_df7: org.apache.spark.sql.DataFrame = [PassengerId: int, Pclass: int ... 12 more fields]


##Première Approche: Modèle construit sans cross-validation

- Dans un premier temps nous allons entamer la creation de notre premier modèle de Machine Learning(Logistc Regression).
Un Simple Modèle sans utilisé la technique de la validation croisée. Ensuite l'importé  en format PMML dans le but de construire notre API par la suite.


- Dans la deuxième approche nous allons créé d'autres modèles, cette fois ci en utlisant la technique de crossvalidation pour avoir un meilleur modèle.


- **NB**: L'idée est de voir la difference et de savoir à quel point il est important d'optimiser un modèle de Machine Learning

In [ ]:
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.{StringIndexer}

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.StringIndexer


In [ ]:
//pour train data
val train_df = train_df7.drop("PassengerId","Name","Ticket","Embarked","Sex",
                        "SibSp","Parch")
//pour test data
val test_df = test_df7.drop("PassengerId","Name","Ticket","Embarked","Sex",
                        "SibSp","Parch")

train_df: org.apache.spark.sql.DataFrame = [Survived: int, Pclass: int ... 6 more fields]
test_df: org.apache.spark.sql.DataFrame = [Pclass: int, Age: double ... 5 more fields]


In [ ]:
train_df.show(5)

+--------+------+----+-------+-----------+-----+------+-----------+
|Survived|Pclass| Age|   Fare|Family_Size|Alone|sexNew|embarkedNew|
+--------+------+----+-------+-----------+-----+------+-----------+
|       0|     3|22.0|   7.25|          1|    0|     1|          0|
|       1|     1|38.0|71.2833|          1|    0|     0|          1|
|       1|     3|26.0|  7.925|          0|    1|     0|          0|
|       1|     1|35.0|   53.1|          1|    0|     0|          0|
|       0|     3|35.0|   8.05|          0|    1|     1|          0|
+--------+------+----+-------+-----------+-----+------+-----------+
only showing top 5 rows



**Vector Assembler**

Nous allons mettre toutes les variables descriptives dans une seule colonne
vectorielle nommée "features" et la colonne de la variable cible doit être renommée "label".

In [ ]:
val assembler = new VectorAssembler()
  .setInputCols(Array("Pclass","Age","Fare","Family_Size","Alone","sexNew","embarkedNew"))
  .setOutputCol("features")

assembler: org.apache.spark.ml.feature.VectorAssembler = vecAssembler_10c044176a64


Indexer la variable à expliquer

In [ ]:
val labelIndexer = new StringIndexer()
  .setInputCol("Survived")
  .setOutputCol("indexedLabel")
  .fit(train_df)

labelIndexer: org.apache.spark.ml.feature.StringIndexerModel = strIdx_ff1cb81d8cba


Indexer les variables explicative

In [ ]:
val featureIndexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexedFeatures")
  .setMaxCategories(4)

featureIndexer: org.apache.spark.ml.feature.VectorIndexer = vecIdx_20e2eec504d3


Reconvertir les labels indexées en labels d'origine.

In [ ]:
val labelConverter = new IndexToString()
  .setInputCol("prediction")
  .setOutputCol("predictedLabel")

labelConverter: org.apache.spark.ml.feature.IndexToString = idxToStr_c25ae5020f56


#### Separation du dataset  en train-set et  validation-set

La raison pour laquelle nous faisons cela est simple. Si nous ne divisons pas les données en différents ensembles, le modèle serait évalué sur les mêmes données qu'il a vues pendant la formation. Nous pourrions donc rencontrer des problèmes tels que le surajustement sans même le savoir.

In [ ]:
val Array(trainingData, validationData) = train_df.randomSplit(Array(0.8, 0.2))

trainingData: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [Survived: int, Pclass: int ... 6 more fields]
validationData: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [Survived: int, Pclass: int ... 6 more fields]


In [ ]:
import org.apache.spark.ml.classification.LogisticRegression

import org.apache.spark.ml.classification.LogisticRegression


**Modèle Logistique**

In [ ]:
val lr = new LogisticRegression()
  .setLabelCol("indexedLabel")
  .setFeaturesCol("indexedFeatures")

lr: org.apache.spark.ml.classification.LogisticRegression = logreg_cae5db3ffb79


### pipeline
Créer la chaîne pipeline avec les différentes étapes stages. Puis entraîner le modèle.

In [ ]:
val pipeline_lr = new Pipeline()
  .setStages(Array(labelIndexer, assembler, featureIndexer, lr))

pipeline_lr: org.apache.spark.ml.Pipeline = pipeline_10d5a3f859e5


In [ ]:
val model_lr = pipeline_lr.fit(trainingData)

model_lr: org.apache.spark.ml.PipelineModel = pipeline_10d5a3f859e5


####Prediction

In [ ]:
val predictions_lr = model_lr.transform(validationData)

predictions_lr: org.apache.spark.sql.DataFrame = [Survived: int, Pclass: int ... 12 more fields]


In [ ]:
display(predictions_lr
  .select("Survived","prediction","features"))

res66: notebook.front.Widget = <Tabs widget>


In [ ]:
display(predictions_lr
  .select("Survived","prediction","probability"))

res61: notebook.front.Widget = <Tabs widget>


**Evaluation du Modele**

In [ ]:

val evaluator = new MulticlassClassificationEvaluator()
  .setLabelCol("Survived")
  .setPredictionCol("prediction")
  .setMetricName("accuracy")

evaluator: org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator = mcEval_e4a07d8149ee


In [ ]:
val accuracy = evaluator.evaluate(predictions_lr)
println(s"Test Error = ${(1.0 - accuracy)}")

Test Error = 0.2282608695652174
accuracy: Double = 0.7717391304347826


**Conclusion**

La qualité du modèle est pas mal. 

Nous allons essayer d'optimiser d'avantage pour les modèles qui vont suivre en utilisant la technique de la validation croisée pour ajuster les différents paramètres de nos modèles.

###Importation du Modele en format PMML

In [ ]:
import org.apache.spark.mllib.classification.LogisticRegressionModel
import org.apache.spark.mllib.linalg.Vectors

import org.apache.spark.mllib.classification.LogisticRegressionModel
import org.apache.spark.mllib.linalg.Vectors


In [ ]:
val data1 = labelIndexer.transform(trainingData)

data1: org.apache.spark.sql.DataFrame = [Survived: int, Pclass: int ... 7 more fields]


In [ ]:
val data2 = assembler.transform(data1)

data2: org.apache.spark.sql.DataFrame = [Survived: int, Pclass: int ... 8 more fields]


In [ ]:
val ind_Model = featureIndexer.fit(data2)

ind_Model: org.apache.spark.ml.feature.VectorIndexerModel = vecIdx_20e2eec504d3


In [ ]:
val data3 = ind_Model.transform(data2)

data3: org.apache.spark.sql.DataFrame = [Survived: int, Pclass: int ... 9 more fields]


In [ ]:
val lr_model = lr.fit(data3)

lr_model: org.apache.spark.ml.classification.LogisticRegressionModel = logreg_cae5db3ffb79


In [ ]:
new LogisticRegressionModel( org.apache.spark.mllib.linalg.Vectors.dense(lr_model.coefficients.toArray ), lr_model.intercept )
.toPMML(sc,"hdfs://hupi-factory-02-01-01-01/user/uppa_big_data_2022_m1/KEITA_moussa_TAHAKOURT_maya/Model_LogReg.xml")

## Deuxième Approche: Modèles construit avec l'utilisation du cross-validation

**Indexés les variables categorielles**

In [ ]:

val catFeatColNames = Seq("Sex", "Embarked")
val stringIndexers = catFeatColNames.map { colName =>
  new StringIndexer()
    .setInputCol(colName)
    .setOutputCol(colName + "Indexed")
    .fit(train_df5)
}

catFeatColNames: Seq[String] = List(Sex, Embarked)
stringIndexers: Seq[org.apache.spark.ml.feature.StringIndexerModel] = List(strIdx_4c3d27b643da, strIdx_2edab79efbea)


#### Traitement de la variable reponse(Survived)

In [ ]:
//Indexés target feature
val labelIndexer = new StringIndexer()
.setInputCol("Survived")
.setOutputCol("SurvivedIndexed")
.fit(train_df5)

labelIndexer: org.apache.spark.ml.feature.StringIndexerModel = strIdx_03cdfc283627


 Nous allons mettre tous les variables explicative qui vont contribuer à la prediction de notre model dans un seul vecteur

In [ ]:
//
val numFeatColNames = Seq("Pclass","Age","Fare","Family_Size","Alone") //les variables numeriques
val idxdCatFeatColName = catFeatColNames.map(_ + "Indexed") //les variables categorielles
val allIdxdFeatColNames = numFeatColNames ++ idxdCatFeatColName // rassembler les deux (Num + Cat)

val assembler = new VectorAssembler()
  .setInputCols(Array(allIdxdFeatColNames: _*))
  .setOutputCol("Features")

numFeatColNames: Seq[String] = List(Pclass, Age, Fare, Family_Size, Alone)
idxdCatFeatColName: Seq[String] = List(SexIndexed, EmbarkedIndexed)
allIdxdFeatColNames: Seq[String] = List(Pclass, Age, Fare, Family_Size, Alone, SexIndexed, EmbarkedIndexed)
assembler: org.apache.spark.ml.feature.VectorAssembler = vecAssembler_1cca4678caed


#### Separation du dataset  en train-set et  validation-set

In [ ]:
val Array(trainingData, validationData) = train_df5.randomSplit(Array(0.8, 0.2))

trainingData: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [PassengerId: int, Survived: int ... 11 more fields]
validationData: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [PassengerId: int, Survived: int ... 11 more fields]


##Implementation de Modèles

Nous allons implementer deux modèles d'algorithmes qui sont le : 

**Random Forest**

**Gradient boosted tree**

### Construction du modèle Random Forest


In [ ]:
//Randomforest classifier
val randomforest = new RandomForestClassifier()
  .setLabelCol("SurvivedIndexed")
 .setFeaturesCol("indexedFeatures")
  .setFeaturesCol("Features")


randomforest: org.apache.spark.ml.classification.RandomForestClassifier = rfc_7ec65a6ca259


In [ ]:
//Reconvertir les labels indexées en labels d'origine.
val labelConverter = new IndexToString()
  .setInputCol("prediction")
  .setOutputCol("predictedLabel")
 .setLabels(labelIndexer.labels)

labelConverter: org.apache.spark.ml.feature.IndexToString = idxToStr_badf186c34a4


###Construction du Pipeline

In [ ]:
import org.apache.spark.ml.{Pipeline, PipelineModel}

//Creation du pipeline
val pipeline = new Pipeline().setStages(
  (stringIndexers :+ labelIndexer :+ assembler:+ randomforest :+ labelConverter).toArray)

import org.apache.spark.ml.{Pipeline, PipelineModel}
pipeline: org.apache.spark.ml.Pipeline = pipeline_83ea876efd85


#####Selection du Meilleur modèle

In [ ]:
import org.apache.spark.ml.tuning.{CrossValidator,ParamGridBuilder}

import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}


### Cross validation (Strategie employé pour eviter le surajustement)
L'objectif est d'utiliser la méthode de **Cross-validation** pour ajuster les différents paramètres de notre modèle.
Les paramètres à ajuster sont les suivants :
- maxBins
- maxDepth
- impurity

Une fois que la grille est créée, on va ensuite créer le modèle de cross-validation, puis pour terminer l'appliquer à l'ensemble d'entraînement.

Enfin tester à nouveau le nouveau modèle `Model` sur l'ensemble de test.

In [ ]:
val paramGrid = new ParamGridBuilder()
  .addGrid(randomforest.maxBins, Array(25, 28, 31))
  .addGrid(randomforest.maxDepth, Array(4, 6, 8))
  .addGrid(randomforest.impurity, Array("entropy", "gini"))
  .build()

paramGrid: Array[org.apache.spark.ml.param.ParamMap] =
Array({
	rfc_7ec65a6ca259-impurity: entropy,
	rfc_7ec65a6ca259-maxBins: 25,
	rfc_7ec65a6ca259-maxDepth: 4
}, {
	rfc_7ec65a6ca259-impurity: entropy,
	rfc_7ec65a6ca259-maxBins: 28,
	rfc_7ec65a6ca259-maxDepth: 4
}, {
	rfc_7ec65a6ca259-impurity: entropy,
	rfc_7ec65a6ca259-maxBins: 31,
	rfc_7ec65a6ca259-maxDepth: 4
}, {
	rfc_7ec65a6ca259-impurity: gini,
	rfc_7ec65a6ca259-maxBins: 25,
	rfc_7ec65a6ca259-maxDepth: 4
}, {
	rfc_7ec65a6ca259-impurity: gini,
	rfc_7ec65a6ca259-maxBins: 28,
	rfc_7ec65a6ca259-maxDepth: 4
}, {
	rfc_7ec65a6ca259-impurity: gini,
	rfc_7ec65a6ca259-maxBins: 31,
	rfc_7ec65a6ca259-maxDepth: 4
}, {
	rfc_7ec65a6ca259-impurity: entropy,
	rfc_7ec65a6ca259-maxBins: 25,
	rfc_7ec65a6ca259-maxDepth: 6
}, {
	rfc_7ec65a6ca259-impu...

In [ ]:
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator

import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator


In [ ]:
val evaluator = new MulticlassClassificationEvaluator()
  .setLabelCol("Survived")
  .setPredictionCol("prediction")
  .setMetricName("accuracy")

evaluator: org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator = mcEval_cb69ce3b7505


**Cross validator avec 10 fold**

In [ ]:

val cv = new CrossValidator()
  .setEstimator(pipeline)
  .setEvaluator(evaluator)
  .setEstimatorParamMaps(paramGrid)
  .setNumFolds(10)

cv: org.apache.spark.ml.tuning.CrossValidator = cv_d089f317167d


In [ ]:
//Entrainement du model avec cross validation
val crossValidatorModel = cv.fit(trainingData)

crossValidatorModel: org.apache.spark.ml.tuning.CrossValidatorModel = cv_d089f317167d


In [ ]:
//predictions sur validation data
val predictions = crossValidatorModel.transform(validationData)

predictions: org.apache.spark.sql.DataFrame = [PassengerId: int, Survived: int ... 19 more fields]


###prediction sur test data
Pour savoir si notre modele a été bien entrainé on lui passe les données test qu'on avait mit à côté pour l'evaluer.

Nous allons donc devoir utiliser la fonction .transform, qui permet d'appliquer le modèle, avec les différentes étapes du pipeline qui sont nécessaires, automatiquement.

Ensuite, évaluer les prédictions effectuées

In [ ]:
val predictions_test = crossValidatorModel.transform(test_df5)

predictions_test: org.apache.spark.sql.DataFrame = [PassengerId: int, Pclass: int ... 17 more fields]


In [ ]:
display(predictions_test
  .withColumn("Survived", col("predictedLabel"))
  .select("PassengerId", "Survived","prediction","probability"))

res112: notebook.front.Widget = <Tabs widget>


##### Evalution des performances du modèle

In [ ]:
//Accuracy
val accuracy = evaluator.evaluate(predictions)
println("Test Error DT= " + (1.0 - accuracy))

Test Error DT= 0.15204678362573099
accuracy: Double = 0.847953216374269


Remarque: Le score s'est beaucoup amelioré par rapport au précedent Modèle avec l'utilisation de la validation croisée

### Construction du modèle Gradient boosted tree 

In [ ]:
import org.apache.spark.ml.classification.{GBTClassificationModel, GBTClassifier}

import org.apache.spark.ml.classification.{GBTClassificationModel, GBTClassifier}


In [ ]:
val gbt = new GBTClassifier()
  .setLabelCol("SurvivedIndexed")
  .setFeaturesCol("Features")
  .setMaxIter(10)

gbt: org.apache.spark.ml.classification.GBTClassifier = gbtc_b6f1f854e6f9


In [ ]:
//Creating pipeline
val pipeline = new Pipeline().setStages(
  (stringIndexers :+ labelIndexer :+ assembler :+ gbt :+ labelConverter).toArray)

pipeline: org.apache.spark.ml.Pipeline = pipeline_75729298334c


In [ ]:
val model = pipeline.fit(trainingData)

model: org.apache.spark.ml.PipelineModel = pipeline_75729298334c


In [ ]:
val predictions = model.transform(validationData)

predictions: org.apache.spark.sql.DataFrame = [PassengerId: int, Survived: int ... 17 more fields]


In [ ]:
val evaluator = new MulticlassClassificationEvaluator()
  .setLabelCol("Survived")
  .setPredictionCol("prediction")
  .setMetricName("accuracy")

evaluator: org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator = mcEval_9c08cdac1b48


In [ ]:
val accuracy = evaluator.evaluate(predictions)
println("Test Error = " + (1.0 - accuracy))

Test Error = 0.15204678362573099
accuracy: Double = 0.847953216374269


###prediction sur les données de  test 

In [ ]:
val predictions_test = crossValidatorModel.transform(test_df5)

predictions_test: org.apache.spark.sql.DataFrame = [PassengerId: int, Pclass: int ... 17 more fields]


In [ ]:
display(predictions_test
  .withColumn("Survived", col("predictedLabel"))
  .select("PassengerId", "Survived","prediction","probability"))

res123: notebook.front.Widget = <Tabs widget>


#FIN !