# ENVIRONMENT SETUP

In [1]:
# print('You can run the script automatically or iteratively. The automatic mode will run in the spark cluster, execute data profiling, executing ML training and testing, without data visualization') 
autoExec = input('Automatic Execution (Y-yes , N-no)?: ').lower()=='y'
# autoExec = True

In [2]:
#TODO: remove unnecessary imports
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
#from pyspark.sql import SQLContext
#from pyspark.sql.functions import col
#from pyspark.sql.types import StringType,BooleanType,DateType, NumericType, DoubleType, StructType, IntegerType

if autoExec:
    localExec = False
else: 
    localExec = input('Local Execution (Y-yes , N-no)?: ').lower()=='y'

if localExec:
    spark = SparkSession\
    .builder\
    .master("local[8]")\
    .config("spark.ui.port", "4041")\
    .getOrCreate()
else:
    spark = SparkSession \
        .builder \
        .config("spark.executor.memory", "4g") \
        .config("spark.executor.instances", "4") \
        .config("spark.executor.cores", "4") \
        .config("spark.eventLog.enabled", "true") \
        .config("spark.eventLog.dir", "hdfs:///spark-history") \
        .config("spark.default.parallelism", "4") \
        .master("yarn")\
        .appName("egd group1 final_4-4-4-4") \
        .getOrCreate()

sc = spark.sparkContext
app_id = spark._sc.applicationId
print(app_id)
print("http://sparkhistoryserver.hdp:18080/history/" + app_id)

application_1652382646560_0297
http://sparkhistoryserver.hdp:18080/history/application_1652382646560_0297


In [3]:
#TODO: remove in final version
#sqlContext = SQLContext(sc)

# INGESTION

## Data Input

In [4]:
if localExec:
    # data input local FS
    path = "./data/2018_10k.txt"
    rdd0 = spark.sparkContext.textFile(path)
else:
    # data input HDFS
    path = "hdfs:///user/group1/2018_nh.csv"
    rdd0 = sc.textFile(path)

## Data Profiling

In [5]:
# initial profiling
dpTotalNrRecords = rdd0.count()
print(f'Total number of records: {dpTotalNrRecords}')

dpTotalNrColumns = len(rdd0.take(1)[0].split(","))
print(f'Total number of columns: {dpTotalNrColumns}')

print('Data sample:')
rdd0.take(10)

Total number of records: 7213446
Total number of columns: 28
Data sample:


['2018-01-01,UA,2429,EWR,DEN,1517,1512.0,-5.0,15.0,1527.0,1712.0,10.0,1745,1722.0,-23.0,0.0,,0.0,268.0,250.0,225.0,1605.0,,,,,,',
 '2018-01-01,UA,2427,LAS,SFO,1115,1107.0,-8.0,11.0,1118.0,1223.0,7.0,1254,1230.0,-24.0,0.0,,0.0,99.0,83.0,65.0,414.0,,,,,,',
 '2018-01-01,UA,2426,SNA,DEN,1335,1330.0,-5.0,15.0,1345.0,1631.0,5.0,1649,1636.0,-13.0,0.0,,0.0,134.0,126.0,106.0,846.0,,,,,,',
 '2018-01-01,UA,2425,RSW,ORD,1546,1552.0,6.0,19.0,1611.0,1748.0,6.0,1756,1754.0,-2.0,0.0,,0.0,190.0,182.0,157.0,1120.0,,,,,,',
 '2018-01-01,UA,2424,ORD,ALB,630,650.0,20.0,13.0,703.0,926.0,10.0,922,936.0,14.0,0.0,,0.0,112.0,106.0,83.0,723.0,,,,,,',
 '2018-01-01,UA,2422,ORD,OMA,2241,2244.0,3.0,15.0,2259.0,1.0,2.0,14,3.0,-11.0,0.0,,0.0,93.0,79.0,62.0,416.0,,,,,,',
 '2018-01-01,UA,2421,IAH,LAS,750,747.0,-3.0,14.0,801.0,854.0,6.0,916,900.0,-16.0,0.0,,0.0,206.0,193.0,173.0,1222.0,,,,,,',
 '2018-01-01,UA,2420,DEN,CID,1324,1318.0,-6.0,11.0,1329.0,1554.0,6.0,1619,1600.0,-19.0,0.0,,0.0,115.0,102.0,85.0,692.0,,,,,,',
 '2

In [6]:
# data type conversion aux functions
from datetime import datetime

def txt2float(t):
    s = t.strip()
    fvalue = 0
    if s == '':
        return None
    try:
        fvalue = float(s)
    except:
        fvalue = None
    
    return fvalue

def txt2str(t):
    s = t.strip()
    if s == '':
        return None
    
    return str(t)

def txt2date(t):
    s = t.strip()
    dvalue = None
    
    if s == '':
        return None
    try:
        dvalue = datetime.strptime(s, '%Y-%m-%d').date()
    except:
        dvalue = None

    return dvalue

def filterByIndexT(li, idx):
    return [(i,li[i]) for i in idx]

def filterByIndexV(li, idx):
    return [li[i] for i in idx]

# data type conversion
inSchema = {
    0:txt2date,   #FL_DATE
    1:txt2str,    #OP_CARRIER
    2:txt2str,    #OP_CARRIER_FL_NUM
    3:txt2str,    #ORIGIN
    4:txt2str,    #DEST
    5:txt2float,  #CRS_DEP_TIME
    6:txt2float,  #DEP_TIME
    7:txt2float,  #DEP_DELAY
    8:txt2float,  #TAXI_OUT
    9:txt2float,  #WHEELS_OFF
    10:txt2float, #WHEELS_ON
    11:txt2float, #TAXI_IN
    12:txt2float, #CRS_ARR_TIME
    13:txt2float, #ARR_TIME
    14:txt2float, #ARR_DELAY
    15:txt2float, #CANCELLED
    16:txt2str,   #CANCELLED REASON
    17:txt2float, #DIVERTED
    18:txt2float, #CRS_ELAPSED_TIME
    19:txt2float, #ACTUAL_ELAPSED_TIME
    20:txt2float, #AIR_TIME
    21:txt2float  #DISTANCE
    # 22: REL_INIT_DELAY
    # 23: T_LATE
    # 24: T_REL_ELAP_TIME
    # 25: FL_DATE_DAY_IN_WEEK
    # 26: FL_DATE_DAY_IN_MONTH
    # 27: FL_DATE_MONTH
    # 28: DEP_HOUR_IN_DAY
    # 29: ARR_HOUR_IN_DAY
}

In [7]:
# parse file
rdd2 = rdd0.map(lambda x: x.split(','))

# data type conversion based on data schema
rdd3 = rdd2.map(lambda x: [inSchema[idx](val) for (idx,val) in filterByIndexT(x, list(inSchema.keys()))])
rdd3.cache()

PythonRDD[5] at RDD at PythonRDD.scala:53

In [8]:
# data profiling aux functions

# nr of records, excl missing values
def dpCount(rdd, idx):
    return rdd.filter(lambda x: x[idx] is not None).count()

# mean value
def dpMean(rdd, idx):
    val = rdd\
        .filter(lambda x: x[idx] is not None)\
        .map(lambda x: (1, x[idx]))\
        .reduce(lambda x,y: (x[0]+y[0],x[1]+y[1]))
    
    return round(val[1]/val[0],3)

# max value
def dpMax(rdd, idx):
    val = rdd\
        .filter(lambda x: x[idx] is not None)\
        .map(lambda x: x[idx])\
        .reduce(lambda x,y: x if x > y else y)
    
    return round(val, 3)

# min value
def dpMin(rdd, idx):
    val = rdd\
        .filter(lambda x: x[idx] is not None)\
        .map(lambda x: x[idx])\
        .reduce(lambda x,y: x if x < y else y)
    
    return round(val, 3)

# proportions (categ attributes)
def dpProp(rdd, idx):
    val = rdd\
        .filter(lambda x: x[idx] is not None)\
        .map(lambda x: (x[idx],1))\
        .reduceByKey(lambda x, y: x+y)\
        .collect()
    
    return {key:count for (key, count) in val}

# variance
def dpVar(rdd, idx):
    
    avg = dpMean(rdd, idx)
    
    val = rdd\
        .filter(lambda x: x[idx] is not None)\
        .map(lambda x: (1, pow(x[idx]-avg,2)))\
        .reduce(lambda x,y: (x[0]+y[0],x[1]+y[1]))
    
    return round(val[1]/val[0],3)

# nr of distinct values (categ attributes)
def dpDist(rdd, idx):
    return rdd.map(lambda x: x[idx]).distinct().count()

# covariance
def dpCovar(rdd, idx1, idx2):
    
    avg1 = dpMean(rdd, idx1)
    avg2 = dpMean(rdd, idx2)
    
    val = rdd\
        .filter(lambda x: x[idx1] is not None and x[idx2] is not None)\
        .map(lambda x: (1, (x[idx1]-avg1)*(x[idx2]-avg2)))\
        .reduce(lambda x,y: (x[0]+y[0],x[1]+y[1]))
    
    return round(val[1]/val[0],3)

# correlation
def dpCorr(rdd, idx1, idx2):
    return round(dpCovar(rdd, idx1, idx2) / (math.sqrt(dpVar(rdd, idx1) * dpVar(rdd, idx2))),3)

In [9]:
import math

# data profilling
# NOTE: run only when profilling data

profillingOps = {
    0: ('FL_DATE',            [('count',dpCount), ('distinct',dpDist)]),
    1: ('OP_CARRIER',         [('count',dpCount), ('distinct',dpDist)]),
    2: ('OP_CARRIER_FL_NUM',  [('count',dpCount), ('distinct',dpDist)]),
    3: ('ORIGIN',             [('count',dpCount), ('distinct',dpDist)]), 
    4: ('DEST',               [('count',dpCount), ('distinct',dpDist)]), 
    5: ('CRS_DEP_TIME',       [('count',dpCount)]),
    6: ('DEP_TIME',           [('count',dpCount),('min',dpMin),('max',dpMax)]),
    7: ('DEP_DELAY',          [('count',dpCount),('mean',dpMean),('min',dpMin),('max',dpMax),('stdev',lambda x,y: round(math.sqrt(dpVar(x,y)),3))]),
    8: ('TAXI_OUT',           [('count',dpCount),('mean',dpMean),('min',dpMin),('max',dpMax),('stdev',lambda x,y: round(math.sqrt(dpVar(x,y)),3))]),
    9: ('WHEELS_OFF',         [('count',dpCount)]),
    10:('WHEELS_ON',          [('count',dpCount)]),
    11:('TAXI_IN',            [('count',dpCount),('mean',dpMean),('min',dpMin),('max',dpMax),('stdev',lambda x,y: round(math.sqrt(dpVar(x,y)),3))]),
    12:('CRS_ARR_TIME',       [('count',dpCount),('min',dpMin),('max',dpMax)]),
    13:('ARR_TIME',           [('count',dpCount),('min',dpMin),('max',dpMax)]),
    14:('ARR_DELAY',          [('count',dpCount),('mean',dpMean),('min',dpMin),('max',dpMax),('stdev',lambda x,y: round(math.sqrt(dpVar(x,y)),3))]),
    15:('CANCELLED',          [('count',dpCount),('prop',dpProp)]),
    16:('CANCELLED REASON',   [('count',dpCount),('prop',dpProp)]),
    17:('DIVERTED',           [('count',dpCount),('prop',dpProp)]),
    18:('CRS_ELAP_TIME',      [('count',dpCount),('mean',dpMean),('min',dpMin),('max',dpMax),('stdev',lambda x,y: round(math.sqrt(dpVar(x,y)),3))]),
    19:('ACT_ELAP_TIME',      [('count',dpCount),('mean',dpMean),('min',dpMin),('max',dpMax),('stdev',lambda x,y: round(math.sqrt(dpVar(x,y)),3))]),
    20:('AIR_TIME',           [('count',dpCount),('min',dpMin),('max',dpMax),('stdev',lambda x,y: round(math.sqrt(dpVar(x,y)),3))]),
    21:('DISTANCE',           [('count',dpCount),('mean',dpMean),('min',dpMin),('max',dpMax),('stdev',lambda x,y: round(math.sqrt(dpVar(x,y)),3))])
}

def dpPrettyPrint(dpStatistics):
    for attrib, statsList in dpStatistics.items():
        print(f'{attrib}:\n')
        for metric, value in statsList.items():
            print(f'\t| {metric}: {value}\n')

if autoExec:
    dpExec = True
else: 
    dpExec = input('Data Profiling (Y-yes , N-no)?: ').lower()=='y'
    
if dpExec:
    stats = {opers[0]:{ operName:operFunc(rdd3, idx) for operName, operFunc in opers[1]} for (idx,opers) in profillingOps.items()}
    print('Data statistics (smaller sample / original dataset):')
    dpPrettyPrint(stats)

Data statistics (smaller sample / original dataset):
FL_DATE:

	| count: 7213446

	| distinct: 365

OP_CARRIER:

	| count: 7213446

	| distinct: 18

OP_CARRIER_FL_NUM:

	| count: 7213446

	| distinct: 7113

ORIGIN:

	| count: 7213446

	| distinct: 358

DEST:

	| count: 7213446

	| distinct: 358

CRS_DEP_TIME:

	| count: 7213446

DEP_TIME:

	| count: 7101129

	| min: 1.0

	| max: 2400.0

DEP_DELAY:

	| count: 7096212

	| mean: 9.97

	| min: -122.0

	| max: 2710.0

	| stdev: 44.83

TAXI_OUT:

	| count: 7097616

	| mean: 17.411

	| min: 1.0

	| max: 196.0

	| stdev: 9.92

WHEELS_OFF:

	| count: 7097617

WHEELS_ON:

	| count: 7094200

TAXI_IN:

	| count: 7094200

	| mean: 7.601

	| min: 1.0

	| max: 259.0

	| stdev: 6.065

CRS_ARR_TIME:

	| count: 7213446

	| min: 1.0

	| max: 2400.0

ARR_TIME:

	| count: 7094201

	| min: 1.0

	| max: 2400.0

ARR_DELAY:

	| count: 7076406

	| mean: 5.049

	| min: -120.0

	| max: 2692.0

	| stdev: 46.927

CANCELLED:

	| count: 7213446

	| prop: {0.0: 709686

In [10]:
# correlation calculation

if dpExec:
    dpCovrAttribs = filterByIndexV([(idx,val[0]) for idx, val in profillingOps.items()],[7, 13, 18, 19, 20, 21])

    dpCorrRes = {}
    for i,iName in dpCovrAttribs:
        for j,jName in dpCovrAttribs:
            key, revKey = f'{iName} x {jName}', f'{jName} x {iName}'
            if revKey not in dpCorrRes.keys():
                if i!=j:
                    dpCorrRes[key] = dpCorr(rdd3, i,j)

    print('Data Statistics - Correlations:')
    for key, value in dpCorrRes.items():
        print(f'\t{key}: {value}')


Data Statistics - Correlations:
	DEP_DELAY x ARR_TIME: 0.03
	DEP_DELAY x CRS_ELAP_TIME: 0.013
	DEP_DELAY x ACT_ELAP_TIME: 0.017
	DEP_DELAY x AIR_TIME: 0.009
	DEP_DELAY x DISTANCE: 0.009
	ARR_TIME x CRS_ELAP_TIME: 0.013
	ARR_TIME x ACT_ELAP_TIME: 0.015
	ARR_TIME x AIR_TIME: 0.011
	ARR_TIME x DISTANCE: 0.006
	CRS_ELAP_TIME x ACT_ELAP_TIME: 0.985
	CRS_ELAP_TIME x AIR_TIME: 0.991
	CRS_ELAP_TIME x DISTANCE: 0.983
	ACT_ELAP_TIME x AIR_TIME: 0.987
	ACT_ELAP_TIME x DISTANCE: 0.971
	AIR_TIME x DISTANCE: 0.987


## Data Quality

In [11]:
# instance filtering: instances with relevant missing attributes
idxRelProc = [0,1,3,4,6,7,8,11,12,14,15,17,18,19,21] #attribs relevant for missing value proc
rdd4 = rdd3.filter(lambda x: None not in filterByIndexV(x, idxRelProc))

dpNrRecordsActual = dpTotalNrRecords
dpNrOfRecordsAfterRemoval = rdd4.count()
print(f'Total number of removed records with missing values: {dpNrRecordsActual-dpNrOfRecordsAfterRemoval}')
dpNrRecordsActual = dpNrOfRecordsAfterRemoval

# instance filtering: cancelled, diverted and on-time departure fligths
rdd5 = rdd4\
    .filter(lambda x: x[15]!=1)\
    .filter(lambda x: x[17]!=1)\
    .filter(lambda x: x[7]>0)

dpNrOfRecordsAfterRemoval = rdd5.count()
print(f'Total number of removed records based on inclusion criteria: {dpNrRecordsActual-dpNrOfRecordsAfterRemoval}')
dpNrRecordsActual = dpNrOfRecordsAfterRemoval

# exclude outliers in ARR_DELAY
rdd6 = rdd5.sortBy(lambda x: x[14]).cache()
cnt = rdd6.count()
idx25 = int(cnt*0.25) - 1
idx75 = int(cnt*0.75) - 1
iqr = rdd6.zipWithIndex().filter(lambda x: x[1] in [idx25, idx75]).reduce(lambda x, y: abs(x[0][14] - y[0][14]))
rdd7 = rdd6.filter(lambda x: x[14] < iqr * 1.5 and x[14] > iqr * -1.5)

dpNrOfRecordsAfterRemoval = rdd7.count()
print(f'Total number of removed records with outliers: {dpNrRecordsActual-dpNrOfRecordsAfterRemoval}')
dpNrRecordsActual = dpNrOfRecordsAfterRemoval

print(f'Total number of resulting records: {dpNrRecordsActual} ({round((dpNrRecordsActual/dpTotalNrRecords)*100,2)}%)')

Total number of removed records with missing values: 141629
Total number of removed records based on inclusion criteria: 4634767
Total number of removed records with outliers: 417698
Total number of resulting records: 2019352 (27.99%)


## Data Transformation

In [12]:
# calculated attributes: 
# 1. REL_INIT_DELAY = (DEP_DELAY + TAXI_OUT) / CRS_ELAPSED_TIME
# 2. T_LATE = 1 if ACTUAL_ELAPSED_TIME - CRS_ELAPSED_TIME > 15, 0 otherwise
# 3. T_REL_ELAP_TIME = ACTUAL_ELAPSED_TIME / frdd.CRS_ELAPSED_TIME
rdd8 = rdd7\
    .map(lambda x: x + [(x[7]+x[8])/x[18]])\
    .map(lambda x: x + [1 if x[19]-x[18] > 15 else 0])\
    .map(lambda x: x + [x[19]/x[18]])

# derived attributes:
# 1. FL_DATE_DAY_IN_WEEK
# 2. FL_DATE_DAY_IN_MONTH
# 3. FL_DATE_MONTH
rdd9 = rdd8\
    .map(lambda x: x + [x[0].weekday()])\
    .map(lambda x: x + [x[0].day])\
    .map(lambda x: x + [x[0].month])

# calculated attributes:
# 1. DEP_HOUR_IN_DAY from DEP_TIME
# 2. ARR_HOUR_IN_DAY from CRS_ARR_TIME
rdd10 = rdd9\
    .map(lambda x: x + [x[6]//100])\
    .map(lambda x: x + [x[12]//100])

## Data Output

In [13]:
# exclude the attributes not needed for analysis
idxRelAna = [1,3,4,14,18,21,22,23,24,25,26,27,28,29] # attribs relevant for analysis
rdd11 = rdd10.map(lambda x: filterByIndexV(x, idxRelAna))

# convert RDD to DF
outSchemaL = ["OP_CARRIER",
              "ORIGIN",
              "DEST",
              "ARR_DELAY",
              "CRS_ELAPSED_TIME",
              "DISTANCE",
              "REL_INIT_DELAY",
              "T_LATE",
              "T_REL_ELAP_TIME",
              "FL_DATE_DAY_IN_WEEK",
              "FL_DATE_DAY_IN_MONTH",
              "FL_DATE_MONTH",
              "DEP_HOUR_IN_DAY",
              "ARR_HOUR_IN_DAY"]

df = rdd11.toDF(outSchemaL)

# QUERIES

In [14]:
# Top 10 flight routes
df.groupBy('ORIGIN','DEST').count()\
    .orderBy('count',ascending=False)\
    .show(10)

+------+----+-----+
|ORIGIN|DEST|count|
+------+----+-----+
|   LAX| SFO| 4154|
|   SFO| LAX| 4048|
|   ORD| LGA| 3969|
|   LAS| LAX| 3657|
|   LAX| JFK| 3605|
|   LAX| LAS| 3552|
|   LGA| ORD| 3496|
|   ATL| LGA| 3448|
|   ATL| MCO| 3410|
|   ATL| FLL| 3363|
+------+----+-----+
only showing top 10 rows



In [15]:
df.createOrReplaceTempView("df")

In [16]:
# Classification of delays
# TODO: Review intervals, review the ordering, order by qty

spark.sql("""SELECT COUNT(1) AS QTD, Arrival_Flight_Delays FROM (
SELECT ARR_DELAY, ORIGIN, DEST, 
              CASE
                  WHEN ARR_DELAY > 360 THEN 'Very Long Delays'
                  WHEN ARR_DELAY >= 120 AND ARR_DELAY <= 360 THEN 'Long Delays'
                  WHEN ARR_DELAY >= 60 AND ARR_DELAY < 120 THEN 'Short Delays'
                  WHEN ARR_DELAY > 0 and ARR_DELAY < 60 THEN 'Tolerable Delays'
                  WHEN ARR_DELAY = 0 THEN 'No Delays'
                  ELSE 'Early'
               END AS Arrival_Flight_Delays
               FROM df
               ORDER BY ARR_DELAY DESC) AS A 
            GROUP BY Arrival_Flight_Delays
               """).show(10) ## RDD = 9 s + 47 ms + 75 ms + 0.3 s + 0.2 s = 9.62s
                             ## Cache = 0.3 s + 51 ms + 0.1 s + 0.2 s + 0.2 s = 0.85s

+-------+---------------------+
|    QTD|Arrival_Flight_Delays|
+-------+---------------------+
|1340757|     Tolerable Delays|
|  50152|            No Delays|
| 605771|                Early|
|  22672|         Short Delays|
+-------+---------------------+



In [17]:
#proportion of flights with delays
spark.sql("""select T_LATE, count(1) FLIGHTS_COUNT 
            from DF group by T_LATE""").show(10)
## RDD = 12 s + 37 ms + 0.1 s + 0.7 s + 0.6 s = 13,43s
## Cache = 0.2 s + 27 ms + 49 ms + 0.2 s + 0.1 s = 0.57s                                       

+------+-------------+
|T_LATE|FLIGHTS_COUNT|
+------+-------------+
|     0|      1921186|
|     1|        98166|
+------+-------------+



In [18]:
##considering the delayed flights average relative delay per D/W
spark.sql("""select 
                FL_DATE_DAY_IN_WEEK, 
                avg(T_REL_ELAP_TIME) AVG_REL_DELAY
            from DF
            where T_LATE = 1
            group by FL_DATE_DAY_IN_WEEK 
            order by FL_DATE_DAY_IN_WEEK asc""").show(10) ## RDD = 10 s, cache = 0.8 s

+-------------------+------------------+
|FL_DATE_DAY_IN_WEEK|     AVG_REL_DELAY|
+-------------------+------------------+
|                  0|1.2058198579991954|
|                  1|1.2052453830417045|
|                  2|1.2029307448047917|
|                  3|1.1999864207081972|
|                  4| 1.201655428258325|
|                  5|1.1948468573064042|
|                  6|1.2025400101303374|
+-------------------+------------------+



In [19]:
# Top 10 relative delayed flights per day in the week
spark.sql("""select 
                FL_DATE_DAY_IN_MONTH, 
                avg(T_REL_ELAP_TIME) AVG_REL_DELAY
            from DF
            where T_LATE = 1
            group by FL_DATE_DAY_IN_MONTH 
            order by FL_DATE_DAY_IN_MONTH asc""").show(10)

+--------------------+------------------+
|FL_DATE_DAY_IN_MONTH|     AVG_REL_DELAY|
+--------------------+------------------+
|                   1|1.1965535847918214|
|                   2|1.2040730760590717|
|                   3| 1.206615735203679|
|                   4|1.2005635261365217|
|                   5| 1.201093686449931|
|                   6|1.2052568391294207|
|                   7|1.2068821445295985|
|                   8|1.1989050285957803|
|                   9| 1.208585095817865|
|                  10|  1.19708517081429|
+--------------------+------------------+
only showing top 10 rows



In [20]:
##average relative delay per Month
spark.sql("""select 
                FL_DATE_MONTH, 
                avg(T_REL_ELAP_TIME
                ) AVG_REL_DELAY
            from DF
            where T_LATE = 1
            group by FL_DATE_MONTH 
            order by FL_DATE_MONTH asc""").show(12)

+-------------+------------------+
|FL_DATE_MONTH|     AVG_REL_DELAY|
+-------------+------------------+
|            1|1.2143748592950934|
|            2|1.2052373094845752|
|            3|1.1988932333568176|
|            4|1.2027828186801572|
|            5|1.2029547211539404|
|            6|1.2010854200850174|
|            7|1.1978532703795364|
|            8|1.2018947448085386|
|            9|1.2071519172314917|
|           10|1.1969102796743984|
|           11|1.2057191087917536|
|           12|1.1947725148710568|
+-------------+------------------+



In [21]:
#TOP 10 Carriers on avg delays
df.filter("T_LATE = 1").groupBy("OP_CARRIER").agg({'OP_CARRIER':'count', 'T_REL_ELAP_TIME': 'avg'}).orderBy('avg(T_REL_ELAP_TIME)',ascending=False).show(10)

+----------+--------------------+-----------------+
|OP_CARRIER|avg(T_REL_ELAP_TIME)|count(OP_CARRIER)|
+----------+--------------------+-----------------+
|        OH|  1.2796839370205886|             3495|
|        MQ|  1.2766248194050211|             4861|
|        OO|  1.2627619459473463|             9929|
|        EV|  1.2457965593937206|             2767|
|        9E|  1.2456329577493501|             2878|
|        YV|  1.2440199282693694|             2619|
|        YX|   1.218283853416388|             4304|
|        WN|  1.2119350247324765|            15183|
|        DL|  1.1882825925393505|            10262|
|        G4|  1.1854409455457788|              987|
+----------+--------------------+-----------------+
only showing top 10 rows



In [22]:
#TOP 10 origin airports on avg delays
df.filter("T_LATE = 1").groupBy("ORIGIN")\
  .agg({'ORIGIN':'count', 'T_REL_ELAP_TIME': 'avg'})\
  .orderBy('avg(T_REL_ELAP_TIME)',ascending=False)\
  .show(10)

+------+-------------+--------------------+
|ORIGIN|count(ORIGIN)|avg(T_REL_ELAP_TIME)|
+------+-------------+--------------------+
|   WRG|            4|  2.0115384615384615|
|   PIB|            2|  1.9390243902439024|
|   GST|            1|  1.8518518518518519|
|   RHI|            1|  1.6764705882352942|
|   AKN|            1|  1.6379310344827587|
|   SIT|            8|  1.5736194691938201|
|   SCC|            1|  1.5510204081632653|
|   SHD|            4|  1.5334670231729055|
|   OTZ|            8|  1.5026870443095257|
|   CDV|            3|  1.4983530961791833|
+------+-------------+--------------------+
only showing top 10 rows



In [23]:
df.select('ORIGIN','DEST').show(10) ## RDD = 45 ms, cache = 32 ms

+------+----+
|ORIGIN|DEST|
+------+----+
|   EWR| HNL|
|   BWI| LAS|
|   PHL| LAX|
|   PDX| KOA|
|   SEA| HNL|
|   SEA| LIH|
|   PDX| KOA|
|   JFK| SEA|
|   LGA| DAL|
|   SEA| KOA|
+------+----+
only showing top 10 rows



In [24]:
df.count() ## RDD = 9 s, cache = 9s

2019352

In [25]:
##caching my view
df.repartition(1).createOrReplaceTempView('df')
spark.catalog.cacheTable('df')

In [26]:
df.rdd.getNumPartitions()

4

# MACHINE LEARNING

In [27]:
import numpy as np

# hold out a small set of data for data predition
# can also be used for reducing the nr of instances used for learning and testing
# df, dfpred = df.randomSplit([0.1, 0.9], seed = 13) # Uncomment this line if you need a smaller dataset
dftrain, dfpred = df.randomSplit([0.9, 0.1], seed = 13)

# bluid ml pipeline for classif and reg
#------------------------------
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator , RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.regression import DecisionTreeRegressor


# convert string attribs to categorical attribs
string_attribs = ["OP_CARRIER",
                  "ORIGIN",
                  "DEST"]

indexers = [StringIndexer(inputCol=attrib_name, outputCol = attrib_name+"_IDX") for attrib_name in string_attribs]

# group features
feature_attribs = ["FL_DATE_DAY_IN_WEEK",
                   "FL_DATE_DAY_IN_MONTH",
                   "FL_DATE_MONTH",
                   "DEP_HOUR_IN_DAY",
                   "REL_INIT_DELAY",
                   "ARR_HOUR_IN_DAY",
                   "OP_CARRIER_IDX",
                   "ORIGIN_IDX",
                   "DEST_IDX",
                   "DISTANCE",
                   "CRS_ELAPSED_TIME"]


label_attrib_clf = "T_LATE"
label_attrib_reg = "T_REL_ELAP_TIME"

assembler = VectorAssembler(inputCols=feature_attribs, outputCol="features")

#estimators for classification and regression
estimator_clf = DecisionTreeClassifier(featuresCol = "features", labelCol = label_attrib_clf)
estimator_reg = DecisionTreeRegressor(labelCol = label_attrib_reg, featuresCol = "features")

# parameters for estimators
#TODO: apply final list of hyperparameters
paramGrid_clf = ParamGridBuilder()\
                .addGrid(estimator_clf.maxBins, [360])\
                .addGrid(estimator_clf.seed, [13])\
                .addGrid(estimator_clf.impurity, ["gini", "entropy"])\
                .addGrid(estimator_clf.maxDepth, [*range(0, 31, 10)])\
                .addGrid(estimator_clf.minInstancesPerNode, [*range(1, 101, 25)])\
                .addGrid(estimator_clf.minInfoGain, [*np.arange(0, 1.1, 0.25)])\
                .addGrid(estimator_clf.minWeightFractionPerNode, [*np.arange(0, 0.49, 0.2)])\
                .build()

paramGrid_reg = ParamGridBuilder()\
                .addGrid(estimator_reg.maxBins, [360])\
                .addGrid(estimator_reg.impurity, ["variance"])\
                .addGrid(estimator_reg.seed, [13])\
                .addGrid(estimator_reg.maxDepth, [0, 15])\
                .addGrid(estimator_reg.minInstancesPerNode, [1, 25, 50])\
                .addGrid(estimator_reg.minInfoGain, [0, 0.25, 0.50])\
                .addGrid(estimator_reg.minWeightFractionPerNode, [0, 0.2])\
                .build()

# evaluators
evaluator_clf = BinaryClassificationEvaluator(labelCol = label_attrib_clf,
                                              metricName = 'areaUnderROC')
evaluator_reg = RegressionEvaluator(labelCol = label_attrib_reg, 
                                    predictionCol = "prediction", 
                                    metricName = "rmse")

# prep training
tvs_clf = TrainValidationSplit(estimator = estimator_clf,
                               estimatorParamMaps = paramGrid_clf,
                               evaluator = evaluator_clf,
                               trainRatio = 0.8,
                               parallelism = 8,
                               seed = 13)

tvs_reg = TrainValidationSplit(estimator = estimator_reg,
                               estimatorParamMaps = paramGrid_reg,
                               evaluator = evaluator_reg,
                               trainRatio = 0.8,
                               parallelism = 8,
                               seed = 13)

# pipeline for transforming and training the data
common_transf = Pipeline(stages = indexers + [assembler])
stages_clf = [common_transf, tvs_clf]
stages_reg = [common_transf, tvs_reg]

In [28]:
# the result of the TVS goes here, i.e. the last element in stages (TrainValidationSplitModel)

if autoExec:
    mlExecReg = True
    mlExecClf = True
else: 
    mlExecReg = input('Train regressor (Y-yes, N-no)?: ').lower()=='y'
    mlExecClf = input('Train classifier (Y-yes, N-no)?: ').lower()=='y'  

if mlExecClf:
    tvs_model_clf = Pipeline(stages=stages_clf).fit(dftrain).stages[-1]
    print('Classifier training completed')

if mlExecReg:
    tvs_model_reg = Pipeline(stages=stages_reg).fit(dftrain).stages[-1]
    print('Regressor training completed')

Classifier training completed
Regressor training completed


In [29]:
from dtreeviz.models.spark_decision_tree import ShadowSparkTree
from dtreeviz import trees

# the best decision tree model goes here as a plot
# https://explained.ai/decision-tree-viz/index.html
# https://github.com/parrt/dtreeviz
# https://github.com/parrt/dtreeviz/blob/master/notebooks/dtreeviz_spark_visualisations.ipynb


if mlExecClf:
    
    if autoExec:
        vizClf = False
    else: 
        vizClf = input('Visualise Classifier (Y-yes, N-no)?').lower()=='y'   
  
    if vizClf:
        dt_model_clf = tvs_model_clf.bestModel

        viz_dataset = Pipeline(stages=[common_transf])\
             .fit(dftrain)\
             .transform(dftrain)\
             .toPandas()[feature_attribs + [label_attrib_clf,label_attrib_reg]]

        dt_clf = ShadowSparkTree(dt_model_clf, 
                                  viz_dataset[feature_attribs], 
                                  viz_dataset[label_attrib_clf], 
                                  feature_names = feature_attribs, 
                                  target_name = label_attrib_clf, 
                                  class_names=[0, 1])

        display(trees.dtreeviz(dt_clf, fancy=True, fontname = "DejaVu Sans"))
        
if mlExecReg:
    if autoExec:
        vizReg = False
    else: 
        vizReg = input('Visualise Regresspr (Y-yes, N-no)?').lower()=='y'    
    
    if vizReg:
        dt_model_reg = tvs_model_reg.bestModel
        
        dt_reg = ShadowSparkTree(dt_model_reg, 
                                  viz_dataset[feature_attribs], 
                                  viz_dataset[label_attrib_reg], 
                                  feature_names = feature_attribs, 
                                  target_name = label_attrib_reg)

        display(trees.dtreeviz(dt_reg, fancy=True))

In [30]:
#the best decision tree model go here
#tvs_model_clf.bestModel.toDebugString
#tvs_model_reg.bestModel.toDebugString

#best models performance
print("Performance metrics:")
if mlExecClf:
    print(f"Classifier metrics ({evaluator_clf.getMetricName()}) : ", round(max(tvs_model_clf.validationMetrics), 3))
if mlExecReg:
    print(f"Regressor metrics ({evaluator_reg.getMetricName()}) :", round(min(tvs_model_reg.validationMetrics), 3))

#model hyperparameters and metrics report
#tvs_model_clf.write().overwrite().save("./model_clf")
#tvs_model_reg.write().overwrite().save("./model_reg")

#predictions on unseen data
#predictions = Pipeline(stages=stages_clf).fit(dftrain).transform(dfpred)
#predictions.select("prediction", "features").show(5)

Performance metrics:
Classifier metrics (areaUnderROC) :  0.598
Regressor metrics (rmse) : 0.093


In [31]:
# Stops the process
spark.stop()