# Uso di Pipeline di Machine Learning in Spark

Parteciperemo "virtualmente" alla competizione sul Titanic data set organizzata sul portale [Kaggle](https://www.kaggle.com/c/titanic/overview) da cui trarremo i due data set di addestramento e test. Nel resto dell'esempio i dati si troveranno all'interno del file system Hadoop.

Il training set consta di 891 righe e il test set di 418 e non ha la colonna dei sopravvissuti. Addestreremo un classificatore Random Forests su una griglia di iperparametri valutati con una procedura di 10-fold crossvalidation.

Dapprima ci preoccuperemo di gestire le feature mancanti e, successivamente, costruiremo la Pipeline Spark di addestramento e valutazione.

In [1]:
# inizializziamo la SparkSession e importiamo le librerie
import findspark

location = findspark.find()
findspark.init(location)

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import IndexToString
from pyspark.ml.classification import RandomForestClassifier 
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

spark = SparkSession \
    .builder \
    .appName("Spark ML example on titanic data ") \
    .getOrCreate()

In [2]:
# Carichiamo il data set

trainDF = spark \
    .read \
    .csv('hdfs://localhost:9099/user/pirrone/spark/ml/input/train.csv',header = 'True', inferSchema='True')

testDF = spark \
    .read \
    .csv('hdfs://localhost:9099/user/pirrone/spark/ml/input/test.csv',header = 'True', inferSchema='True')


In [8]:
# Mostriamo un il data set po' di informazioni sulle feature

display(trainDF)
display(testDF)

DataFrame[PassengerId: int, Survived: int, Pclass: int, Name: string, Sex: string, Age: double, SibSp: int, Parch: int, Ticket: string, Fare: double, Cabin: string, Embarked: string]

DataFrame[PassengerId: int, Pclass: int, Name: string, Sex: string, Age: double, SibSp: int, Parch: int, Ticket: string, Fare: double, Cabin: string, Embarked: string]

In [9]:
# Mostriamo lo schema
print("training set")
trainDF.printSchema()

print("\n\ntest set")
testDF.printSchema()

training set
root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



test set
root
 |-- PassengerId: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [3]:
# Contiamo imbarcati e sopravvissuti e verifichiamo se ci sono valori mancanti
# nelle righe

passengers_count = trainDF.count()
survived_count = trainDF.filter("Survived == 1").count()

print(f"Passeggeri: {passengers_count}\nDi cui sopravvissuti: {survived_count}")

trainDF.describe()['summary','Fare','Age','Embarked','Cabin'].show()

print(f"Passeggeri: {testDF.count()}")
testDF.describe()['summary','Fare','Age','Embarked','Cabin'].show()

Passeggeri: 891
Di cui sopravvissuti: 342
+-------+-----------------+------------------+--------+-----+
|summary|             Fare|               Age|Embarked|Cabin|
+-------+-----------------+------------------+--------+-----+
|  count|              891|               714|     889|  204|
|   mean| 32.2042079685746| 29.69911764705882|    null| null|
| stddev|49.69342859718089|14.526497332334035|    null| null|
|    min|              0.0|              0.42|       C|  A10|
|    max|         512.3292|              80.0|       S|    T|
+-------+-----------------+------------------+--------+-----+

Passeggeri: 418
+-------+------------------+------------------+--------+-----+
|summary|              Fare|               Age|Embarked|Cabin|
+-------+------------------+------------------+--------+-----+
|  count|               417|               332|     418|   91|
|   mean|  35.6271884892086|30.272590361445783|    null| null|
| stddev|55.907576179973844|14.181209235624424|    null| null|
|    

In [4]:
# contiamo i valori nulli nelle diverse colonne tramite una UDF

def null_value_count(df):
  null_columns_counts = []
  for k in df.columns:
    nullRows = df.where(col(k).isNull()).count()
    if(nullRows > 0):
      temp = k,nullRows
      null_columns_counts.append(temp)
  return(null_columns_counts)

null_columns_count_list_train = null_value_count(trainDF)
null_columns_count_list_test = null_value_count(testDF)

spark.createDataFrame(null_columns_count_list_train, ['Column_With_Null_Value', 'Null_Values_Count']).show()

spark.createDataFrame(null_columns_count_list_test, ['Column_With_Null_Value', 'Null_Values_Count']).show()

+----------------------+-----------------+
|Column_With_Null_Value|Null_Values_Count|
+----------------------+-----------------+
|                   Age|              177|
|                 Cabin|              687|
|              Embarked|                2|
+----------------------+-----------------+

+----------------------+-----------------+
|Column_With_Null_Value|Null_Values_Count|
+----------------------+-----------------+
|                   Age|               86|
|                  Fare|                1|
|                 Cabin|              327|
+----------------------+-----------------+



In [5]:
"""
Gestione dei valori nulli:

-- Age: calcoleremo l'età media dei paseggeri raggruppati per 'titolo' nel nome (Mr, Mrs, Miss, ...) poiché questo corrisponde a delle precise fasce di età
-- Fare (solo test): calcoleremo il valor medio della tariffa
-- Cabin: faremo il drop della feature perché non interessa
-- Embarked: non è molto rilevante ai fini della classificazione, ma faremo imputazione con il valore più frequente e cioé 'S'

"""

# estraiamo la lista dei titoli su entrambi i dataframe
trainDF=trainDF.withColumn("Initial",regexp_extract(col("Name"),".*, (.*?)\\..*",1))
testDF=testDF.withColumn("Initial",regexp_extract(col("Name"),".*, (.*?)\\..*",1))

trainDF.select("Initial").distinct().show()
testDF.select("Initial").distinct().show()


+------------+
|     Initial|
+------------+
|         Don|
|        Miss|
|         Col|
|         Rev|
|        Lady|
|      Master|
|         Mme|
|        Capt|
|          Mr|
|          Dr|
|         Mrs|
|         Sir|
|    Jonkheer|
|        Mlle|
|       Major|
|          Ms|
|the Countess|
+------------+

+-------+
|Initial|
+-------+
|   Dona|
|   Miss|
|    Col|
|    Rev|
| Master|
|     Mr|
|     Dr|
|    Mrs|
|     Ms|
+-------+



In [6]:
testDF.select(testDF.Name).where(testDF.Initial=='Dona').collect()

[Row(Name='Oliva y Ocana, Dona. Fermina')]

In [7]:
# I titoli sono molto vari e, dall'analisi diretta del dataframe, emerge la necessità di mapparne alcuni in un set standard

trainDF = trainDF.replace(\
    ['Mlle','Mme', 'Ms', 'Major','Lady','the Countess','Jonkheer','Capt'],\
    ['Miss','Miss','Miss','Col',  'Mrs',  'Mrs',  'Sir',  'Col',])

trainDF.select("Initial").distinct().show()

testDF = testDF.replace(['Dona','Ms'],['Mrs','Miss'])

testDF.select("Initial").distinct().show()



+-------+
|Initial|
+-------+
|    Don|
|   Miss|
|    Col|
|    Rev|
| Master|
|     Mr|
|     Dr|
|    Mrs|
|    Sir|
+-------+

+-------+
|Initial|
+-------+
|   Miss|
|    Col|
|    Rev|
| Master|
|     Mr|
|     Dr|
|    Mrs|
+-------+



In [8]:
# calcoliamo l'età media per i vari gruppi di titoli e imputiamo i valori nulli di età
from math import floor

# creiamo un unico dataframe per gestire i valori medi di 'Age' e 'Fare' nonché il valore maggiormente 
# occorrente di 'Embarked'

pivotDF = trainDF['Initial','Age','Fare','Embarked'].unionByName(testDF['Initial','Age','Fare','Embarked'])

pivotDF.show()



+-------+----+-------+--------+
|Initial| Age|   Fare|Embarked|
+-------+----+-------+--------+
|     Mr|22.0|   7.25|       S|
|    Mrs|38.0|71.2833|       C|
|   Miss|26.0|  7.925|       S|
|    Mrs|35.0|   53.1|       S|
|     Mr|35.0|   8.05|       S|
|     Mr|null| 8.4583|       Q|
|     Mr|54.0|51.8625|       S|
| Master| 2.0| 21.075|       S|
|    Mrs|27.0|11.1333|       S|
|    Mrs|14.0|30.0708|       C|
|   Miss| 4.0|   16.7|       S|
|   Miss|58.0|  26.55|       S|
|     Mr|20.0|   8.05|       S|
|     Mr|39.0| 31.275|       S|
|   Miss|14.0| 7.8542|       S|
|    Mrs|55.0|   16.0|       S|
| Master| 2.0| 29.125|       Q|
|     Mr|null|   13.0|       S|
|    Mrs|31.0|   18.0|       S|
|    Mrs|null|  7.225|       C|
+-------+----+-------+--------+
only showing top 20 rows



In [14]:
avg_age_lis=pivotDF.groupby('Initial').avg('Age').collect()

In [15]:
for row in avg_age_list:
    trainDF=trainDF.withColumn("Age",when((trainDF["Initial"]==row[0]) & \
        (trainDF["Age"].isNull()),floor(row[1]+0.5)).otherwise(trainDF["Age"]))
    testDF=testDF.withColumn("Age",when((testDF["Initial"]==row[0]) & \
        (testDF["Age"].isNull()),floor(row[1]+0.5)).otherwise(testDF["Age"]))

avg_fare = pivotDF.select(avg(pivotDF.Fare)).collect()[0][0]

avg_fare

33.29547928134553

In [16]:
testDF.withColumn("Fare",when(testDF["Fare"].isNull(),avg_fare).otherwise(testDF["Fare"]))

DataFrame[PassengerId: int, Pclass: int, Name: string, Sex: string, Age: double, SibSp: int, Parch: int, Ticket: string, Fare: double, Cabin: string, Embarked: string, Initial: string]

In [17]:
# Completiamo l'imputazione Verificando il luogo di imbarco della maggior parte dei passeggeri

pivotDF.groupBy("Embarked").count().show()



+--------+-----+
|Embarked|count|
+--------+-----+
|       Q|  123|
|    null|    2|
|       C|  270|
|       S|  914|
+--------+-----+



In [18]:
# Imputiamo il valore 'S' per il campo 'Embarked'

trainDF = trainDF.na.fill({"Embarked" : 'S'})

In [19]:
# Creiamo una colonna "FamilySize" che somma "Parch" (Parents/children) e "Sibsp" (Sibling/spouses) + 1

trainDF = trainDF.withColumn("FamilySize",col('SibSp')+col('Parch')+1)
testDF = testDF.withColumn("FamilySize",col('SibSp')+col('Parch')+1)

trainDF.groupBy("FamilySize").count().show()
testDF.groupBy("FamilySize").count().show()

+----------+-----+
|FamilySize|count|
+----------+-----+
|         1|  537|
|         6|   22|
|         3|  102|
|         5|   15|
|         4|   29|
|         8|    6|
|         7|   12|
|        11|    7|
|         2|  161|
+----------+-----+

+----------+-----+
|FamilySize|count|
+----------+-----+
|         1|  253|
|         6|    3|
|         3|   57|
|         5|    7|
|         4|   14|
|         8|    2|
|         7|    4|
|        11|    4|
|         2|   74|
+----------+-----+



In [14]:
# Creiamo anche una colonna binaria "Alone" per indicar coloro che viaggiano soli ("FamilySize = 1")

trainDF = trainDF.withColumn('Alone',lit(0))
trainDF = trainDF.withColumn("Alone",when(trainDF["FamilySize"] == 1, 1).otherwise(trainDF["Alone"]))

testDF = testDF.withColumn('Alone',lit(0))
testDF = testDF.withColumn("Alone",when(testDF["FamilySize"] == 1, 1).otherwise(testDF["Alone"]))


In [15]:
# Cominciamo a creare la pipeline di addestramento e test
# Una pipeline è una sequenza ordinata di:
#
# Transformers: algoritmi che trasformano effettivamente un dataframe (metodo transform())
# Estimators: algoritmi che si addestrano sui dati per generare un Transformer (metodo fit() che usa 
#   la "eager execution" ovvero l'esecuzione immediata)
# Evaluators: algoritmi di calcolo dei criteri di valutazione delle performance
#
# Ad es. un algoritmo di machine learning è un Estimator che, opportunamente parametrizzato e 
# addestrato, genera un Transformer cioè il modello che trasforma feature in predizioni
#
# Sono Transformer anche gli algoritmi di gestione delle feature in ingresso e uscita
#
# La Pipeline può contenere anche algoritmi per il tuning degli iperparametri

# Eseguiremo il fit degli Estimators (incluso quindi l'addestramento vero e proprio) sul trainDF
# Eseguiremo il transform del modello addestrato (che è un Transformer) e la valutazione sul testDF

# Indicizziamo i dati categorici (tranne "Survived") su un unico dataframe contenente training e test 
# set. Questa soluzione si giustifica con il fatto che lo StringIndexer crea degli indici numerici con 
# le frequenze di occorrenza delle etichette catgoriche, quindi è più opportuno farne il "fit" su 
# tutti i dati

labelDF = trainDF["Pclass","Sex","Embarked","Initial","Alone"].\
          unionByName(testDF["Pclass","Sex","Embarked","Initial","Alone"])

indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(labelDF) for column in ["Pclass","Sex","Embarked","Initial","Alone"]]

# Trasformiamo il data set per ottenere realmente le colonne indicizzate e utilizzarle poi come feature
for indexer in indexers:
    trainDF=indexer.transform(trainDF)
    testDF=indexer.transform(testDF)

labelIndexer = StringIndexer(inputCol='Survived',outputCol='Survived_index').fit(trainDF)



In [16]:
# Creiamo un mapping per le label predette dall'algoritmo che, dopo l'addestramento, restituirà una 
# colonna "prediction" indicizzata

labelConverter = IndexToString(inputCol='prediction',outputCol='predictedLabel').setLabels(labelIndexer.labels)


In [28]:
# Assembliamo le feature con un VectorAssembler cioè un Transformer che crea il vettore di tutte le 
# feature concatenate

assembler = VectorAssembler().setInputCols(["Age", "SibSp", "Parch", "Fare", "FamilySize", "Pclass_index","Sex_index","Embarked_index","Initial_index","Alone_index"]).setOutputCol("Features").setHandleInvalid("keep")

In [29]:
# Creiamo il classificatore come Evaluator impostando le colonne delle feature e delle label
# Il "fit" di questo Evaluator restituisce un Transformer di tipo RandomForestClassificationModel
# il cui metodo "transform" possiamo usare per predire i sopravvissuti sul testDF

randomForest = RandomForestClassifier().setFeaturesCol("Features").setLabelCol("Survived_index")

In [30]:
# Creiamo la pipeline per una singola classificazione
# La pipeline, nel suo complesso, è un Evaluator il cui metodo "fit" genererà un Transformer per il 
# test set

pipeline = Pipeline().setStages([\
                                labelIndexer,\
                                assembler,\
                                randomForest,\
                                labelConverter])

In [31]:
# Costruiamo l'algoritmo di addestramento, come una 10-fold Cross-validation che si addestra su una 
# griglia di iperparametri:
# -- l'indice di Gini, e l'entropia per controlla la purezza dei nodi foglia
# -- il numero di bin cioè di categorie da generare per ogni feature categorica
# -- la profondità massima dell'albero

# Generiamo la griglia
paramGrid = ParamGridBuilder().addGrid(randomForest.maxBins,[25, 28, 31])\
                              .addGrid(randomForest.maxDepth,[4,6,8])\
                              .addGrid(randomForest.impurity,["entropy","gini"])\
                              .build()

In [32]:
# Generiamo l'evaluator che inseriremo nell'algoritmo di addestramento. Questo utilizzerà la metrica 
# Area Under Precision-Recall curve che si adattameglio ad una classificazione binaria con classi 
# sbilanciate, com'è il nostro caso in cui i sopravvissuti sono pochi

evaluator = BinaryClassificationEvaluator().setLabelCol("Survived_index")\
                                           .setMetricName("areaUnderPR")
                                           

In [33]:
# Infine costruiamo l'Estimator che implementa la 10-fold cross-validation, inserendo la pipeline come 
# Estimator dei dati, l'evaluator e la griglia degli iperparametri per l'addestramento

cv = CrossValidator().setEstimator(pipeline)\
                     .setEvaluator(evaluator)\
                     .setEstimatorParamMaps(paramGrid)\
                     .setNumFolds(10)

In [34]:
# Addesrtiamo sul training set
cvModel = cv.fit(trainDF)

In [35]:
# Facciamo la predizione sul test set

predictions = cvModel.transform(testDF)

In [36]:
# Analizziamo l'accuratezza sul training set

performance = cvModel.transform(trainDF)
auprc = evaluator.evaluate(performance)
print(f"Area Under PR Curve: {(100*auprc):05.2f}%")

Area Under PR Curve: 93.29%


In [40]:
# Creiamo il file dei risultati per sottometterlo sul sito della competizione

# Salviamo il modello addestrato

cvModel.write().overwrite().save('hdfs://localhost:9099/user/pirrone/spark/ml/output/RF_10xfold_cv_model')

In [39]:
# Salviamo in csv con le colonne "PassengerId" "Survived"

predictions\
  .withColumn("Survived", col("predictedLabel"))\
  .select("PassengerId", "Survived")\
  .coalesce(1)\
  .write\
  .csv('hdfs://localhost:9099/user/pirrone/spark/ml/output/titanic_predictions.csv',\
    header=True, mode='overwrite')
