# Connexion au cluster

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Airplane ML") \
    .config('spark.executor.memory', '6g') \
    .getOrCreate()

# Chargement des données

In [2]:
flights = spark.read.parquet('/data/parquet/flights')

In [3]:
from pyspark.sql import *
from pyspark.sql.types import *
import pyspark.sql.functions as F

flights = flights.where((flights.Year == '2008') & (flights.Origin == 'JFK')) \
    .select('Year', 'Month', 'DayOfMonth', 'DayOfWeek', 'CRSDepTime', 'UniqueCarrier', 'FlightNum', 'Origin', 'Dest', 'ArrDelay') \
    .na.drop() \
    .withColumn('Delayed', (F.when(flights.ArrDelay == 'NA', 0).otherwise(flights.ArrDelay).cast('integer') > 20).cast('string')) \
    .withColumn('Year', flights.Year.cast('integer')) \
    .withColumn('Month', flights.Month.cast('integer')) \
    .withColumn('DayOfMonth', flights.DayOfMonth.cast('integer')) \
    .withColumn('DayOfWeek', flights.DayOfWeek.cast('integer')) \
    .withColumn('DepTime', flights.CRSDepTime.cast('integer'))
    
features = ['Year', 'Month', 'DayOfMonth', 'DayOfWeek', 'DepTime', 'UniqueCarrierIndex', 'FlightNumIndex', 'OriginIndex', 'DestIndex']

In [4]:
flights.limit(10) \
    .toPandas()

Unnamed: 0,Year,Month,DayOfMonth,DayOfWeek,CRSDepTime,UniqueCarrier,FlightNum,Origin,Dest,ArrDelay,Delayed,DepTime
0,2008,4,4,5,815,AA,1165,JFK,MIA,15.0,False,815
1,2008,4,4,5,840,AA,1323,JFK,ORD,,False,840
2,2008,4,4,5,1200,AA,1567,JFK,STT,,False,1200
3,2008,4,4,5,805,AA,1635,JFK,SJU,7.0,False,805
4,2008,4,4,5,1735,AA,1639,JFK,SJU,14.0,False,1735
5,2008,4,4,5,1540,AA,1815,JFK,ORD,128.0,True,1540
6,2008,4,4,5,750,AA,1821,JFK,DFW,,False,750
7,2008,4,4,5,1555,AA,1881,JFK,MIA,14.0,False,1555
8,2008,4,4,5,1545,AA,1917,JFK,MCO,43.0,True,1545
9,2008,4,4,5,1455,AA,2033,JFK,DFW,1.0,False,1455


In [5]:
flights.count()

118804

# Préparation du modèle

## Séparation entrainement / test

In [6]:
from pyspark.ml.feature import VectorAssembler, VectorIndexer
from pyspark.ml.feature import StringIndexer, IndexToString
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline

(trainingData, testData) = [dataset.cache() for dataset in flights.randomSplit([0.7, 0.3])]

## Feature engineering

In [7]:
labelIndexer = StringIndexer(inputCol='Delayed', outputCol='DelayedIndex').fit(trainingData)
categoricalIndexers = [ StringIndexer(inputCol=inputColName, outputCol=inputColName + 'Index', handleInvalid='skip').fit(flights) \
                       for inputColName in ['UniqueCarrier', 'FlightNum', 'Origin', 'Dest'] ]

In [8]:
assembler = VectorAssembler(
    inputCols=['Year', 'Month', 'DayOfMonth', 'DayOfWeek', 'DepTime', 'UniqueCarrierIndex', 'FlightNumIndex', 'OriginIndex', 'DestIndex'],
    outputCol='features')

## Paramétrage du modèle

In [9]:
rf = RandomForestClassifier(labelCol="DelayedIndex", featuresCol="features", maxBins=8000)

pipeline = Pipeline(stages=[labelIndexer, *categoricalIndexers, assembler, rf])

model = pipeline.fit(trainingData)

In [10]:
testPredictions = model.transform(testData)
testPredictions.limit(10).toPandas()


Unnamed: 0,Year,Month,DayOfMonth,DayOfWeek,CRSDepTime,UniqueCarrier,FlightNum,Origin,Dest,ArrDelay,...,DepTime,DelayedIndex,UniqueCarrierIndex,FlightNumIndex,OriginIndex,DestIndex,features,rawPrediction,probability,prediction
0,2008,1,2,3,1000,B6,1083,JFK,CLT,1,...,1000,0.0,0.0,436.0,0.0,23.0,"[2008.0, 1.0, 2.0, 3.0, 1000.0, 0.0, 436.0, 0....","[17.5463375622, 2.45366243778]","[0.877316878111, 0.122683121889]",0.0
1,2008,1,2,3,1010,B6,133,JFK,RSW,6,...,1010,0.0,0.0,3.0,0.0,20.0,"[2008.0, 1.0, 2.0, 3.0, 1010.0, 0.0, 3.0, 0.0,...","[17.2260549481, 2.7739450519]","[0.861302747405, 0.138697252595]",0.0
2,2008,1,2,3,1015,B6,345,JFK,SRQ,-11,...,1015,0.0,0.0,79.0,0.0,50.0,"[2008.0, 1.0, 2.0, 3.0, 1015.0, 0.0, 79.0, 0.0...","[17.0884975533, 2.91150244672]","[0.854424877664, 0.145575122336]",0.0
3,2008,1,2,3,1015,MQ,4620,JFK,BOS,37,...,1015,1.0,4.0,76.0,0.0,1.0,"[2008.0, 1.0, 2.0, 3.0, 1015.0, 4.0, 76.0, 0.0...","[16.6274596338, 3.3725403662]","[0.83137298169, 0.16862701831]",0.0
4,2008,1,2,3,1055,B6,66,JFK,BUF,29,...,1055,1.0,0.0,108.0,0.0,7.0,"[2008.0, 1.0, 2.0, 3.0, 1055.0, 0.0, 108.0, 0....","[17.1479540715, 2.85204592848]","[0.857397703576, 0.142602296424]",0.0
5,2008,1,2,3,1115,UA,11,JFK,SFO,-6,...,1115,0.0,5.0,22.0,0.0,2.0,"[2008.0, 1.0, 2.0, 3.0, 1115.0, 5.0, 22.0, 0.0...","[15.6224645545, 4.37753544547]","[0.781123227726, 0.218876772274]",0.0
6,2008,1,2,3,1120,NW,315,JFK,MSP,15,...,1120,0.0,7.0,337.0,0.0,30.0,"[2008.0, 1.0, 2.0, 3.0, 1120.0, 7.0, 337.0, 0....","[16.650631254, 3.34936874598]","[0.832531562701, 0.167468437299]",0.0
7,2008,1,2,3,1125,B6,1075,JFK,RIC,-1,...,1125,0.0,0.0,84.0,0.0,27.0,"[2008.0, 1.0, 2.0, 3.0, 1125.0, 0.0, 84.0, 0.0...","[17.3545959217, 2.64540407827]","[0.867729796087, 0.132270203913]",0.0
8,2008,1,2,3,1130,AA,423,JFK,MIA,35,...,1130,1.0,3.0,182.0,0.0,12.0,"[2008.0, 1.0, 2.0, 3.0, 1130.0, 3.0, 182.0, 0....","[15.0994635897, 4.90053641034]","[0.754973179483, 0.245026820517]",0.0
9,2008,1,2,3,1130,OH,4941,JFK,DCA,-8,...,1130,0.0,1.0,270.0,0.0,11.0,"[2008.0, 1.0, 2.0, 3.0, 1130.0, 1.0, 270.0, 0....","[16.327057721, 3.67294227897]","[0.816352886051, 0.183647113949]",0.0


## Évaluation du modèle

In [11]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(labelCol='DelayedIndex')
evaluator.evaluate(testPredictions)


0.6895461729510051

In [12]:
import pandas as pd
rfModel = model.stages[6]
featureImportance = pd.DataFrame({'feature': features, 'importance': rfModel.featureImportances.toArray()})
featureImportance

Unnamed: 0,feature,importance
0,Year,0.0
1,Month,0.238652
2,DayOfMonth,0.033362
3,DayOfWeek,0.060174
4,DepTime,0.362339
5,UniqueCarrierIndex,0.027655
6,FlightNumIndex,0.259092
7,OriginIndex,0.0
8,DestIndex,0.018726


In [13]:
import plotly.plotly as py
import cufflinks as cf
featureImportance.iplot(x='feature', kind='barh')

# Exploitation du modèle

In [14]:
model.save('/data/airplane-model')

# Tuning des hyperparamètres

In [None]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [None]:
rf.getNumTrees()

In [None]:
paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [10, 20, 50, 100]) \
    .addGrid(rf.maxDepth, [3, 5, 7]) \
    .build()

In [None]:
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(labelCol='DelayedIndex'),
                          numFolds=10)

In [None]:
cvModel = crossval.fit(trainingData)

In [None]:
cvModel.bestModel

In [None]:
rfModel = cvModel.bestModel.stages[6]
rfModel.getNumTrees

In [None]:
rfModel = cvModel.bestModel.stages[6]
featureImportance = pd.DataFrame({'feature': features, 'importance': rfModel.featureImportances.toArray()})
featureImportance

In [None]:
featureImportance.iplot(x='feature', kind='barh')

In [None]:
testPredictions = cvModel.transform(testData)

In [None]:
evaluator = BinaryClassificationEvaluator(labelCol='DelayedIndex')
evaluator.evaluate(testPredictions)

In [None]:
print(rf.explainParams())