# Uso di Pipeline di Machine Learning in Spark

Parteciperemo "virtualmente" alla competizione sul Titanic data set organizzata sul portale [Kaggle](https://www.kaggle.com/c/titanic/overview) da cui trarremo i due data set di addestramento e test. Nel resto dell'esempio i dati si troveranno all'interno del file system Hadoop.

Il training set consta di 891 righe e il test set di 418 e non ha la colonna dei sopravvissuti. Addestreremo un classificatore Random Forests su una griglia di iperparametri valutati con una procedura di 10-fold crossvalidation.

Dapprima ci preoccuperemo di gestire le feature mancanti e, successivamente, costruiremo la Pipeline Spark di addestramento e valutazione.

In [1]:
# inizializziamo la SparkSession e importiamo le librerie
import findspark

location = findspark.find()
findspark.init(location)

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import IndexToString
from pyspark.ml.classification import RandomForestClassifier 
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

spark = SparkSession \
    .builder \
    .appName("Spark ML example on titanic data ") \
    .getOrCreate()

23/05/10 13:56:57 WARN Utils: Your hostname, deeplearning resolves to a loopback address: 127.0.1.1; using 147.163.26.113 instead (on interface enp6s0)
23/05/10 13:56:57 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/05/10 13:56:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Carichiamo  i data set e mostriamo il loro schema

In [2]:

trainDF = spark \
    .read \
    .csv('/home/rpirrone/data/train.csv',header = 'True', inferSchema='True')

testDF = spark \
    .read \
    .csv('/home/rpirrone/data/test.csv',header = 'True', inferSchema='True')


                                                                                

In [3]:
print("training set")
trainDF.printSchema()

print("\n\ntest set")
testDF.printSchema()

training set
root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



test set
root
 |-- PassengerId: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



Per osservare il nostro data set, inziamo contando imbarcati e sopravvisuti ed estraendo le statistiche di base delle feature numeriche.

In [4]:


passengers_count = trainDF.count()
survived_count = trainDF.filter("Survived == 1").count()

print(f"Passeggeri: {passengers_count}\nDi cui sopravvissuti: {survived_count}")

trainDF.describe()['summary','Pclass','Age','SibSp','Parch','Fare'].show()

print(f"Passeggeri: {testDF.count()}")
testDF.describe()['summary','Pclass','Age','SibSp','Parch','Fare'].show()


Passeggeri: 891
Di cui sopravvissuti: 342


                                                                                

+-------+------------------+------------------+------------------+-------------------+-----------------+
|summary|            Pclass|               Age|             SibSp|              Parch|             Fare|
+-------+------------------+------------------+------------------+-------------------+-----------------+
|  count|               891|               714|               891|                891|              891|
|   mean| 2.308641975308642| 29.69911764705882|0.5230078563411896|0.38159371492704824| 32.2042079685746|
| stddev|0.8360712409770491|14.526497332334035|1.1027434322934315| 0.8060572211299488|49.69342859718089|
|    min|                 1|              0.42|                 0|                  0|              0.0|
|    max|                 3|              80.0|                 8|                  6|         512.3292|
+-------+------------------+------------------+------------------+-------------------+-----------------+

Passeggeri: 418


[Stage 16:>                                                         (0 + 1) / 1]

+-------+------------------+------------------+------------------+------------------+------------------+
|summary|            Pclass|               Age|             SibSp|             Parch|              Fare|
+-------+------------------+------------------+------------------+------------------+------------------+
|  count|               418|               332|               418|               418|               417|
|   mean|2.2655502392344498|30.272590361445783|0.4473684210526316|0.3923444976076555|  35.6271884892086|
| stddev|0.8418375519640503|14.181209235624424|0.8967595611217135|0.9814288785371694|55.907576179973844|
|    min|                 1|              0.17|                 0|                 0|               0.0|
|    max|                 3|              76.0|                 8|                 9|          512.3292|
+-------+------------------+------------------+------------------+------------------+------------------+



                                                                                

Contiamo i valori nulli nelle diverse colonne tramite una UDF

In [5]:
def null_value_count(df):
  null_columns_counts = []
  for k in df.columns:
    nullRows = df.where(col(k).isNull()).count()
    if(nullRows > 0):
      temp = k,nullRows
      null_columns_counts.append(temp)
  return(null_columns_counts)

null_columns_count_list_train = null_value_count(trainDF)
null_columns_count_list_test = null_value_count(testDF)

spark.createDataFrame(null_columns_count_list_train, ['Column_With_Null_Value', 'Null_Values_Count']).show()

spark.createDataFrame(null_columns_count_list_test, ['Column_With_Null_Value', 'Null_Values_Count']).show()

                                                                                

+----------------------+-----------------+
|Column_With_Null_Value|Null_Values_Count|
+----------------------+-----------------+
|                   Age|              177|
|                 Cabin|              687|
|              Embarked|                2|
+----------------------+-----------------+

+----------------------+-----------------+
|Column_With_Null_Value|Null_Values_Count|
+----------------------+-----------------+
|                   Age|               86|
|                  Fare|                1|
|                 Cabin|              327|
+----------------------+-----------------+



### Gestione dei valori nulli

- Age: calcoleremo l'età media dei paseggeri raggruppati per 'titolo' nel nome (Mr, Mrs, Miss, ...) poiché questo corrisponde a delle fasce di età piuttosto precise
- Fare (solo test): calcoleremo il valor medio della tariffa
- Cabin: faremo il drop della feature perché non interessa
- Embarked: non è molto rilevante ai fini della classificazione, ma faremo imputazione con il valore più frequente e cioé 'S'


In [6]:
# estraiamo la lista dei titoli su entrambi i dataframe e aggiungiamo una nuova colonna con l'iniziale

trainDF=trainDF.withColumn("Initial",regexp_extract(col("Name"),".*, (.*?)\\..*",1))
testDF=testDF.withColumn("Initial",regexp_extract(col("Name"),".*, (.*?)\\..*",1))

trainDF.select("Initial").distinct().show()
testDF.select("Initial").distinct().show()


+------------+
|     Initial|
+------------+
|         Don|
|        Miss|
|         Col|
|         Rev|
|        Lady|
|      Master|
|         Mme|
|        Capt|
|          Mr|
|          Dr|
|         Mrs|
|         Sir|
|    Jonkheer|
|        Mlle|
|       Major|
|          Ms|
|the Countess|
+------------+

+-------+
|Initial|
+-------+
|   Dona|
|   Miss|
|    Col|
|    Rev|
| Master|
|     Mr|
|     Dr|
|    Mrs|
|     Ms|
+-------+



Ispezioniamo esplicitamente le iniziali 'Dona' e ' Master' per capire a che fascia di età appartiene

In [7]:
testDF.select(testDF.Name,testDF.Age).where(testDF.Initial=='Dona').collect()

[Row(Name='Oliva y Ocana, Dona. Fermina', Age=39.0)]

In [8]:
testDF.select(testDF.Name,testDF.Age).where(testDF.Initial=='Master').collect()

[Row(Name='Olsen, Master. Artur Karl', Age=9.0),
 Row(Name='Rice, Master. Albert', Age=10.0),
 Row(Name='Ryerson, Master. John Borie', Age=13.0),
 Row(Name='Boulos, Master. Akar', Age=6.0),
 Row(Name='Wells, Master. Ralph Lester', Age=2.0),
 Row(Name='Asplund, Master. Filip Oscar', Age=13.0),
 Row(Name='Touma, Master. Georges Youssef', Age=7.0),
 Row(Name='van Billiard, Master. Walter John', Age=11.5),
 Row(Name='Drew, Master. Marshall Brines', Age=8.0),
 Row(Name='Spedden, Master. Robert Douglas', Age=6.0),
 Row(Name='Danbom, Master. Gilbert Sigvard Emanuel', Age=0.33),
 Row(Name='Johnston, Master. William Arthur Willie""""', Age=None),
 Row(Name='Peacock, Master. Alfred Edward', Age=0.75),
 Row(Name='Aks, Master. Philip Frank', Age=0.83),
 Row(Name='Betros, Master. Seman', Age=None),
 Row(Name='van Billiard, Master. James William', Age=None),
 Row(Name='Sage, Master. William Henry', Age=14.5),
 Row(Name='Asplund, Master. Carl Edgar', Age=5.0),
 Row(Name='Palsson, Master. Paul Folke',

I titoli sono molto vari e, dall'analisi diretta del dataframe, emerge la necessità di mapparne alcuni in un set standard

In [9]:

trainDF = trainDF.replace(\
    ['Mlle','Mme', 'Ms', 'Major','Lady','the Countess','Jonkheer','Capt'],\
    ['Miss','Miss','Miss','Col',  'Mrs',  'Mrs',  'Sir',  'Col',])

trainDF.select("Initial").distinct().show()

testDF = testDF.replace(['Dona','Ms'],['Mrs','Miss'])

testDF.select("Initial").distinct().show()



+-------+
|Initial|
+-------+
|    Don|
|   Miss|
|    Col|
|    Rev|
| Master|
|     Mr|
|     Dr|
|    Mrs|
|    Sir|
+-------+

+-------+
|Initial|
+-------+
|   Miss|
|    Col|
|    Rev|
| Master|
|     Mr|
|     Dr|
|    Mrs|
+-------+



creiamo un unico dataframe per gestire i valori medi di 'Age' e 'Fare' nonché il valore maggiormente occorrente di 'Embarked'

In [10]:
from math import floor


pivotDF = trainDF['Initial','Age','Fare','Embarked'].unionByName(testDF['Initial','Age','Fare','Embarked'])

pivotDF.show()



+-------+----+-------+--------+
|Initial| Age|   Fare|Embarked|
+-------+----+-------+--------+
|     Mr|22.0|   7.25|       S|
|    Mrs|38.0|71.2833|       C|
|   Miss|26.0|  7.925|       S|
|    Mrs|35.0|   53.1|       S|
|     Mr|35.0|   8.05|       S|
|     Mr|null| 8.4583|       Q|
|     Mr|54.0|51.8625|       S|
| Master| 2.0| 21.075|       S|
|    Mrs|27.0|11.1333|       S|
|    Mrs|14.0|30.0708|       C|
|   Miss| 4.0|   16.7|       S|
|   Miss|58.0|  26.55|       S|
|     Mr|20.0|   8.05|       S|
|     Mr|39.0| 31.275|       S|
|   Miss|14.0| 7.8542|       S|
|    Mrs|55.0|   16.0|       S|
| Master| 2.0| 29.125|       Q|
|     Mr|null|   13.0|       S|
|    Mrs|31.0|   18.0|       S|
|    Mrs|null|  7.225|       C|
+-------+----+-------+--------+
only showing top 20 rows



In [11]:
avg_age_list=pivotDF.groupby('Initial').avg('Age').collect()

print(avg_age_list)

[Row(Initial='Don', avg(Age)=40.0), Row(Initial='Miss', avg(Age)=21.834532710280374), Row(Initial='Col', avg(Age)=54.714285714285715), Row(Initial='Rev', avg(Age)=41.25), Row(Initial='Master', avg(Age)=5.482641509433963), Row(Initial='Mr', avg(Age)=32.25215146299484), Row(Initial='Dr', avg(Age)=43.57142857142857), Row(Initial='Mrs', avg(Age)=37.04624277456647), Row(Initial='Sir', avg(Age)=43.5)]


In [12]:
for row in avg_age_list:
    trainDF=trainDF.withColumn("Age",when((trainDF["Initial"]==row[0]) & \
        (trainDF["Age"].isNull()),floor(row[1]+0.5)).otherwise(trainDF["Age"]))
    testDF=testDF.withColumn("Age",when((testDF["Initial"]==row[0]) & \
        (testDF["Age"].isNull()),floor(row[1]+0.5)).otherwise(testDF["Age"]))

avg_fare = pivotDF.select(avg(pivotDF.Fare)).collect()[0][0]

avg_fare

33.29547928134553

In [13]:
testDF.withColumn("Fare",when(testDF["Fare"].isNull(),avg_fare).otherwise(testDF["Fare"]))

DataFrame[PassengerId: int, Pclass: int, Name: string, Sex: string, Age: double, SibSp: int, Parch: int, Ticket: string, Fare: double, Cabin: string, Embarked: string, Initial: string]

In [14]:
# Completiamo l'imputazione Verificando il luogo di imbarco della maggior parte dei passeggeri

pivotDF.groupBy("Embarked").count().show()



+--------+-----+
|Embarked|count|
+--------+-----+
|       Q|  123|
|    null|    2|
|       C|  270|
|       S|  914|
+--------+-----+



In [15]:
# Imputiamo il valore 'S' per il campo 'Embarked'

trainDF = trainDF.na.fill({"Embarked" : 'S'})

Creiamo una colonna "FamilySize" che somma "Parch" (Parents/children) e "Sibsp" (Sibling/spouses) + 1

In [16]:
trainDF = trainDF.withColumn("FamilySize",col('SibSp')+col('Parch')+1)
testDF = testDF.withColumn("FamilySize",col('SibSp')+col('Parch')+1)

trainDF.groupBy("FamilySize").count().show()
testDF.groupBy("FamilySize").count().show()

+----------+-----+
|FamilySize|count|
+----------+-----+
|         1|  537|
|         6|   22|
|         3|  102|
|         5|   15|
|         4|   29|
|         8|    6|
|         7|   12|
|        11|    7|
|         2|  161|
+----------+-----+

+----------+-----+
|FamilySize|count|
+----------+-----+
|         1|  253|
|         6|    3|
|         3|   57|
|         5|    7|
|         4|   14|
|         8|    2|
|         7|    4|
|        11|    4|
|         2|   74|
+----------+-----+



Vista la grande prevalenza di passeggeri che viaggiano da soli, Creiamo anche una colonna binaria apposita "Alone" per indicar coloro che hanno "FamilySize = 1"

In [17]:
trainDF = trainDF.withColumn('Alone',lit(0))
trainDF = trainDF.withColumn("Alone",when(trainDF["FamilySize"] == 1, 1).otherwise(trainDF["Alone"]))

testDF = testDF.withColumn('Alone',lit(0))
testDF = testDF.withColumn("Alone",when(testDF["FamilySize"] == 1, 1).otherwise(testDF["Alone"]))


### Pipeline di addestramento del classificatore

Come già sappiamo, una pipeline è una sequenza ordinata di:

- Transformers: algoritmi che trasformano effettivamente un dataframe (metodo `transform()`)
- Estimators: algoritmi che si addestrano sui dati per generare un Transformer (metodo `fit()` che usa la "eager execution" ovvero l'esecuzione immediata)
- Evaluators: algoritmi di calcolo dei criteri di valutazione delle performance

Sono Transformer anche gli algoritmi di gestione delle feature in ingresso e uscita. La pipeline può contenere anche algoritmi per il tuning degli iperparametri.

Eseguiremo il `fit` degli Estimators (incluso quindi l'addestramento vero e proprio) sul trainDF. mentre il `transform` del modello addestrato (che è un Transformer) e la valutazione sul testDF.

Indicizziamo i dati categorici (tranne "Survived") su un unico dataframe contenente training e test set. Questa soluzione si giustifica con il fatto che lo StringIndexer crea degli indici numerici con le frequenze di occorrenza delle etichette catgoriche, quindi è più opportuno farne il "fit" su tutti i dati.

In [18]:
labelDF = trainDF["Pclass","Sex","Embarked","Initial","Alone"].\
          unionByName(testDF["Pclass","Sex","Embarked","Initial","Alone"])

indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(labelDF) for column in ["Pclass","Sex","Embarked","Initial","Alone"]]

# Trasformiamo il data set per ottenere realmente le colonne indicizzate e utilizzarle poi come feature
for indexer in indexers:
    trainDF=indexer.transform(trainDF)
    testDF=indexer.transform(testDF)

labelIndexer = StringIndexer(inputCol='Survived',outputCol='Survived_index').fit(trainDF)



Creiamo un mapping per le label predette dall'algoritmo che, dopo l'addestramento, restituirà una colonna "prediction" indicizzata.

In [19]:
labelConverter = IndexToString(inputCol='prediction',outputCol='predictedLabel').setLabels(labelIndexer.labels)


Assembliamo le feature con un VectorAssembler cioè un Transformer che crea il vettore di tutte le feature concatenate. Il classificatore richiederà il vettore delle feature in un'unica colonna.

In [20]:
assembler = VectorAssembler()\
        .setInputCols(["Age", "SibSp", "Parch", "Fare", "FamilySize", "Pclass_index","Sex_index","Embarked_index","Initial_index","Alone_index"])\
        .setOutputCol("Features")\
        .setHandleInvalid("keep")

Creiamo il classificatore come Evaluator impostando le colonne delle feature e delle label Il "fit" di questo Evaluator restituisce un Transformer di tipo RandomForestClassificationModel il cui metodo "transform" possiamo usare per predire i sopravvissuti sul testDF.

In [21]:
randomForest = RandomForestClassifier().setFeaturesCol("Features").setLabelCol("Survived_index")

Creiamo la pipeline per una singola classificazione. La pipeline, nel suo complesso, è un Evaluator il cui metodo "fit" genererà un Transformer per il test set.

In [22]:
pipeline = Pipeline().setStages([\
                                labelIndexer,\
                                assembler,\
                                randomForest,\
                                labelConverter])

Costruiamo l'algoritmo di addestramento, come una 10-fold Cross-validation che si addestra su una griglia di iperparametri:

- l'indice di Gini, e l'entropia per controllare la purezza dei nodi foglia
- il numero di bin cioè di categorie da generare per ogni feature categorica
- la profondità massima dell'albero

In [23]:
paramGrid = ParamGridBuilder().addGrid(randomForest.maxBins,[25, 28, 31])\
                              .addGrid(randomForest.maxDepth,[4,6,8])\
                              .addGrid(randomForest.impurity,["entropy","gini"])\
                              .build()

Generiamo l'evaluator che inseriremo nell'algoritmo di addestramento. Questo utilizzerà la metrica Area Under Precision-Recall Curve (AUPRC) che si adatta meglio ad una classificazione binaria con classi sbilanciate, com'è il nostro caso in cui i sopravvissuti sono pochi.

La AUPRC va esplicitamente confrontata con una baseline di riferimento definita come $\frac{P}{P+N}$ che ha valori diversi se la classe positiva contiene pochi campioni. 

In [24]:
evaluator = BinaryClassificationEvaluator().setLabelCol("Survived_index")\
                                           .setMetricName("areaUnderPR")
                                           

Infine costruiamo l'Estimator che implementa la 10-fold cross-validation, inserendo la pipeline come Estimator dei dati, l'evaluator e la griglia degli iperparametri per l'addestramento.

In [25]:
cv = CrossValidator().setEstimator(pipeline)\
                     .setEvaluator(evaluator)\
                     .setEstimatorParamMaps(paramGrid)\
                     .setNumFolds(10)

In [26]:
# Addesrtiamo sul training set
cvModel = cv.fit(trainDF)

### Predizione e valutazione dei risultati

Facciamo la predizione sul test set e analizziamo l'accuratezza sul training set. Confronteremo la misura AUPRC con la baseline per la classe dei sopravvisuti.

In [27]:
predictions = cvModel.transform(testDF)

23/05/10 14:03:15 WARN StringIndexerModel: Input column Survived does not exist during transformation. Skip StringIndexerModel for this column.


In [31]:
performance = cvModel.transform(trainDF)
auprc = evaluator.evaluate(performance)
print(f"Area Under PR Curve: {(100*auprc):05.2f}%\nBaseline: {100*survived_count/passengers_count:05.2f}%")

Area Under PR Curve: 90.31%
Baseline: 38.38%


Aggiungiamo anche la misura di AUC richiedendola al nostro `evaluator`

In [35]:
auroc = evaluator.evaluate(performance,{evaluator.metricName: 'areaUnderROC'})
print(f"Area Under ROC: {(100*auroc):05.2f}%")

Area Under ROC: 91.42%


In [32]:
# Creiamo il file dei risultati per sottometterlo sul sito della competizione

# Salviamo il modello addestrato

cvModel.write().overwrite().save('/home/rpirrone/data/RF_10xfold_cv_model')

In [33]:
# Salviamo in csv con le colonne "PassengerId" "Survived"

predictions\
  .withColumn("Survived", col("predictedLabel"))\
  .select("PassengerId", "Survived")\
  .coalesce(1)\
  .write\
  .csv('/home/rpirrone/data/titanic_predictions.csv',\
    header=True, mode='overwrite')


In [None]:
titanic3 = spark \
    .read \
    .csv('/home/rpirrone/data/titanic3.csv',header = 'True', inferSchema='True')

validation = 
