In [1]:
import datetime, warnings, scipy 
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib as mpl
import matplotlib.pyplot as plt
import glob
from itertools import chain

from pyspark.sql import functions as fn
from pyspark.sql.functions import col as col
from pyspark.sql.types import StringType
from pyspark.sql import Window
from pyspark.sql import Row 
from pyspark.sql.types import IntegerType, TimestampType, DateType
from pyspark.sql.functions  import to_timestamp
from pyspark.sql.functions import create_map, lit, avg, count
from pyspark.sql.functions import year, month, dayofweek, hour, minute, second
from pyspark.ml.feature import StringIndexer

In [2]:
def clean_dataset(df):
    
    df = df.drop('OP_CARRIER_FL_NUM', 'DEP_TIME', 'CRS_ARR_TIME', 'ARR_TIME','TAXI_OUT', 'WHEELS_OFF','WHEELS_ON','TAXI_IN', 
                           'CANCELLED', 'CANCELLATION_CODE','DIVERTED','AIR_TIME','CARRIER_DELAY','WEATHER_DELAY',
                           'NAS_DELAY', 'SECURITY_DELAY','LATE_AIRCRAFT_DELAY','CRS_ELAPSED_TIME','ACTUAL_ELAPSED_TIME',
                           'Unnamed: 27')

    
    mapping = dict(zip(['FL_DATE', 'OP_CARRIER', 'DEST', 'CRS_DEP_TIME', 'DEP_DELAY', 'ARR_DELAY'], ['DATE', 'AIRLINE', 'DESTIN', 'SCHED_DEPARTURE', 'DEPARTURE_DELAY', 'ARRIVAL_DELAY']))
    df = df.select([col(c).alias(mapping.get(c, c)) for c in df.columns])
    

    df = df.dropna(subset=df.columns)
    
    return df

def time_format(df):
  df = df.withColumn('DATE_TIME', fn.format_string("%04d", col("SCHED_DEPARTURE")))\
    .withColumn(
        'DATE_TIME',
        fn.concat_ws(
            ":",
            fn.array(
                [
                    fn.substring(
                        'DATE_TIME',
                        1,
                        2
                    ),
                    fn.substring(
                        'DATE_TIME',
                        3,
                        2
                    ),
                    fn.lit("00")
                ]
            )
        )
    )\
  .withColumn('DATE_TIME', to_timestamp(fn.concat(col('DATE').cast('date'), col('DATE_TIME')), "yyyy-MM-ddHH:mm:ss"))
  df = df.drop('DATE')
  return df
    

def airline_name(df):
  mapping = dict(zip(airlines.select('CODE').rdd.flatMap(lambda x: x).collect(), airlines.select('AIRLINE').rdd.flatMap(lambda x: x).collect()))
  mapping_expr = create_map([lit(x) for x in chain(*mapping.items())])
  
  df = df.withColumn('AIRLINE_NAME', mapping_expr[col("AIRLINE")])
    
  return df

def geo (df):
  latitud_mapping = dict(zip(airports.select('IATA_CODE').rdd.flatMap(lambda x: x).collect(), airports.select('LATITUDE').rdd.flatMap(lambda x: x).collect()))
  latitud_mapping_expr = create_map([lit(x) for x in chain(*latitud_mapping.items())])
  
  longitude_mapping = dict(zip(airports.select('IATA_CODE').rdd.flatMap(lambda x: x).collect(), airports.select('LONGITUDE').rdd.flatMap(lambda x: x).collect()))
  longitude_mapping_expr = create_map([lit(x) for x in chain(*longitude_mapping.items())])
  
  df = df.withColumn('OR_LATITUDE', latitud_mapping_expr[col("ORIGIN")])
  df = df.withColumn('OR_LONGITUDE', longitude_mapping_expr[col("ORIGIN")])
  
  df = df.withColumn('DEST_LATITUDE', latitud_mapping_expr[col("DESTIN")])
  df = df.withColumn('DEST_LONGITUDE', longitude_mapping_expr[col("DESTIN")])
    
  return df


def outlier (df):
  #Outliers 
  df = df.filter((col("ARRIVAL_DELAY") > -30))
  return df


def get_stats(group):
    return {'min': group.min(), 'max': group.max(),
            'count': group.count(), 'mean': group.mean()}

In [3]:
airports = spark.read.format('csv').options(header='true', inferSchema='true', delimiter=',') \
.load('/FileStore/tables/dict/airports.csv')
airlines = spark.read.format('csv').options(header='true', inferSchema='true', delimiter=';') \
.load('/FileStore/tables/dict/allairlines.csv')

In [4]:
%fs rm -r /my.csv

In [5]:
# Cleaning and Preprocesing
paths= ["/FileStore/tables/data/2014.csv", "/FileStore/tables/data/2015.csv","/FileStore/tables/data/2016.csv","/FileStore/tables/data/2017.csv"]
data = spark.read.format('csv').options(header='true', inferSchema='true', delimiter=',') \
.load(paths)
data.cache()
# Cleaning
data = clean_dataset(data)
# Datetime Transform
data = time_format(data)
# Outliers
data = outlier(data)
#Airlines names
data = airline_name(data)
#Geo
data = geo(data)
data.cache()
# Saving
# data.coalesce(1).write.format('com.databricks.spark.csv').save('clean_data.csv',header = 'true')

In [6]:
## American Airlines (AA)
df_AA=data.filter(col('AIRLINE')=='AA')
df_AA=df_AA.drop('AIRLINE','AIRLINE_NAME','OR_LATITUDE', 'OR_LONGITUDE', 'DEST_LATITUDE', 
                 'DEST_LONGITUDE','DEPARTURE_DELAY','SCHED_DEPARTURE')
# Timestamp to column
df_AA = df_AA.withColumn('MONTH',month(df_AA.DATE_TIME))
df_AA = df_AA.withColumn('HOUR',hour(df_AA.DATE_TIME))
df_AA = df_AA.withColumn('YEAR',year(df_AA.DATE_TIME))
df_AA = df_AA.withColumn('DAYOFWEEK',dayofweek(df_AA.DATE_TIME))
df_AA = df_AA.withColumn('SCHEDULED_DEPARTURE', hour(df_AA.DATE_TIME)*3600+minute(df_AA.DATE_TIME)*60+second(df_AA.DATE_TIME))
# StringIndexer
indexer = StringIndexer(inputCol='ORIGIN', outputCol="ORIGIN_LABEL") 
df_AA = indexer.fit(df_AA).transform(df_AA) 
indexer = StringIndexer(inputCol='DESTIN', outputCol="DESTIN_LABEL") 
df_AA = indexer.fit(df_AA).transform(df_AA)
# Classes
df_AA = df_AA.withColumn('DELAY_LEVEL',
    fn.when((col("ARRIVAL_DELAY") < 15), 0)\
    .when((fn.col("ARRIVAL_DELAY") < 50), 1)\
#     .when((fn.col("ARRIVAL_DELAY") < 120), 2)\
    .otherwise(2))

df_AA.cache()

In [7]:
display(df_AA) #year avg

In [8]:
display(df_AA) # year count

In [9]:
display(df_AA) #month avg

In [10]:
display(df_AA) #dayof week avg

In [11]:
%fs rm -r /FileStore/tables/clean/clean_data_2018/

In [12]:
# Test preparing - Data 2018
paths= ["/FileStore/tables/data/2018.csv"]
data_2018 = spark.read.format('csv').options(header='true', inferSchema='true', delimiter=',') \
.load(paths)
data_2018.cache()
# Cleaning
data_2018 = clean_dataset(data_2018)
# Datetime Transform
data_2018 = time_format(data_2018)
# Outliers
data_2018 = outlier(data_2018)
#Airlines names
data_2018 = airline_name(data_2018)
#Geo
data_2018 = geo(data_2018)
# Saving
# data_2018.write.format("com.databricks.spark.csv").save("/FileStore/tables/clean/clean_data_2018")

## American Airlines (AA)
df_AA_2018=data_2018.filter(col('AIRLINE')=='AA')
df_AA_2018=df_AA_2018.drop('AIRLINE','AIRLINE_NAME','OR_LATITUDE', 'OR_LONGITUDE', 'DEST_LATITUDE', 
                 'DEST_LONGITUDE','DEPARTURE_DELAY','SCHED_DEPARTURE')
# Timestamp to column
df_AA_2018 = df_AA_2018.withColumn('MONTH',month(df_AA_2018.DATE_TIME))
df_AA_2018 = df_AA_2018.withColumn('HOUR',hour(df_AA_2018.DATE_TIME))
df_AA_2018 = df_AA_2018.withColumn('YEAR',year(df_AA_2018.DATE_TIME))
df_AA_2018 = df_AA_2018.withColumn('DAYOFWEEK',dayofweek(df_AA_2018.DATE_TIME))
df_AA_2018 = df_AA_2018.withColumn('SCHEDULED_DEPARTURE', hour(df_AA_2018.DATE_TIME)*3600
                                   +minute(df_AA_2018.DATE_TIME)*60+second(df_AA_2018.DATE_TIME))
# Labeling
indexer = StringIndexer(inputCol='ORIGIN', outputCol="ORIGIN_LABEL") 
df_AA_2018 = indexer.fit(df_AA_2018).transform(df_AA_2018) 
indexer = StringIndexer(inputCol='DESTIN', outputCol="DESTIN_LABEL") 
df_AA_2018 = indexer.fit(df_AA_2018).transform(df_AA_2018)
# Classes
df_AA_2018 = df_AA_2018.withColumn('DELAY_LEVEL',
    fn.when((col("ARRIVAL_DELAY") < 15), 0)\
    .when((fn.col("ARRIVAL_DELAY") < 50), 1)\
#     .when((fn.col("ARRIVAL_DELAY") < 120), 2)\
    .otherwise(2))

In [13]:
display(df_AA_2018) #month avg 2018

In [14]:
display(df_AA_2018) #dayofweek avg 2018

In [15]:
print((df_AA.count(), len(df_AA.columns)))
print((df_AA_2018.count(), len(df_AA_2018.columns)))

In [16]:
print('Conjunto de Train')
df_AA.groupBy(df_AA['DELAY_LEVEL'].cast(IntegerType()).alias('DELAY_LEVEL')).count().show(10)
print('Conjunto de Test')
df_AA_2018.groupBy(df_AA_2018['DELAY_LEVEL'].cast(IntegerType()).alias('DELAY_LEVEL')).count().show(10)

In [17]:
#Undersampling
highdelay = df_AA.filter(fn.col('DELAY_LEVEL')=='2.0')
mediumdelay = df_AA.filter(fn.col('DELAY_LEVEL')=='1.0')
ontime = df_AA.filter(fn.col('DELAY_LEVEL')=='0.0')
sampleRatio = float(mediumdelay.count()) / float(ontime.count())
SampleDf = ontime.sample(False, sampleRatio)
df_AA2 = mediumdelay.unionAll(SampleDf)
df_AA_under = df_AA2.unionAll(highdelay)

In [18]:
highdelay = df_AA_2018.filter(fn.col('DELAY_LEVEL')=='2.0')
mediumdelay = df_AA_2018.filter(fn.col('DELAY_LEVEL')=='1.0')
ontime = df_AA_2018.filter(fn.col('DELAY_LEVEL')=='0.0')
sampleRatio = float(mediumdelay.count()) / float(df_AA_2018.count())
SampleDf = ontime.sample(False, sampleRatio)
df_AA_2018_2 = mediumdelay.unionAll(SampleDf)
df_AA_2018_under = df_AA_2018_2.unionAll(highdelay)

In [19]:
print('Conjunto de Train')
df_AA_under.groupBy(df_AA['DELAY_LEVEL'].cast(IntegerType()).alias('DELAY_LEVEL')).count().show(10)
print('Conjunto de Test')
df_AA_2018_under.groupBy(df_AA_2018['DELAY_LEVEL'].cast(IntegerType()).alias('DELAY_LEVEL')).count().show(10)

In [20]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import FMClassifier
from pyspark.ml.feature import StringIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics

df=df_AA_under.drop('ORIGIN','DESTIN','DATE_TIME','ARRIVAL_DELAY','DISTANCE','YEAR')
df2018=df_AA_2018.drop('ORIGIN','DESTIN','DATE_TIME','ARRIVAL_DELAY','DISTANCE','YEAR')

# Realizamos un assebler para unir todas las columnas predictoras del dataset en un vector de features
vecAssembler = VectorAssembler(inputCols=['MONTH', 'HOUR', 
                                          'SCHEDULED_DEPARTURE','DAYOFWEEK','ORIGIN_LABEL','DESTIN_LABEL'], outputCol="features")
DfAssembled = vecAssembler.transform(df)
DfAssembled2018 = vecAssembler.transform(df2018)
DfAssembled.show(3)

# Creamos los DataFrame de entrenamiento y test
#splitado = DfAssembled.randomSplit([0.7, 0.3], 124)
dfTraining = DfAssembled
dfTest = DfAssembled2018

# Definimos el algoritmo de RandomForest y creamos el modelo 
rf = RandomForestClassifier(labelCol="DELAY_LEVEL", numTrees=900, maxDepth=4, seed=42, maxBins=111)
rfModel = rf.fit(dfTraining)

# Aplicamos el modelo sobre el conjunto de test para obtener las prediciones 
prediccionDfTest = rfModel.transform(dfTest)
prediccionDfTest.select(prediccionDfTest.features, prediccionDfTest.probability, \
                        prediccionDfTest.DELAY_LEVEL, prediccionDfTest.prediction).show(5, truncate=False)

# Aplicamos el evaluador multiclase par ver la eficiencia del modelo
evaluator = MulticlassClassificationEvaluator(labelCol="DELAY_LEVEL")
print('Precision: {0}, Recall: {1}, F1: {2}'.format(evaluator.evaluate(prediccionDfTest, {evaluator.metricName: "weightedPrecision"}), \
                                           evaluator.evaluate(prediccionDfTest, {evaluator.metricName: "weightedRecall"}), \
                                           evaluator.evaluate(prediccionDfTest, {evaluator.metricName: "f1"})))

In [21]:
rfModel.featureImportances

In [22]:
from sklearn.metrics import confusion_matrix
y_true = prediccionDfTest.select('DELAY_LEVEL')
y_true = y_true.toPandas()

y_pred = prediccionDfTest.select('prediction')
y_pred = y_pred.toPandas()

cnf_matrix = confusion_matrix(y_true, y_pred)
print(cnf_matrix)