In [1]:
from mpl_toolkits.mplot3d import Axes3D
import matplotlib.pyplot as plt # plotting
import numpy as np
import pandas as pd
from IPython.display import display
import pyarrow
import pyspark
from pyspark.sql import SparkSession,SQLContext
from pyspark.sql.functions import count,isnan,when,col,lit,mean,expr,last,to_date


In [2]:
def distribution(data, continous_feats ,transformed = False):
    """
    Visualization code for displaying skewed distributions of features
    """
    
    # Create figure
    fig = plt.figure(figsize = (11,5))

    # Skewed feature plotting
    for i, feature in enumerate(continous_feats):
        ax = fig.add_subplot(1, 4, i+1)
        ax.hist(data[feature], bins = 25, color = '#00A0A0')
        ax.set_title("'%s' Feature Distribution"%(feature), fontsize = 14)
        ax.set_xlabel("Value")
        ax.set_ylabel("Number of Records")
        ax.set_ylim((0, 2000))
        ax.set_yticks([0, 500, 1000, 1500, 2000])
        ax.set_yticklabels([0, 500, 1000, 1500, ">2000"])

    # Plot aesthetics
    if transformed:
        fig.suptitle("Log-transformed Distributions of Continuous Census Data Features", \
            fontsize = 16, y = 1.03)
    else:
        fig.suptitle("Skewed Distributions of Continuous Census Data Features", \
            fontsize = 16, y = 1.03)

    fig.tight_layout()
    fig.show()

## reading data and general cleaning

In [18]:
#initialize spark session
spark = SparkSession.builder \
    .master('local') \
    .appName('myAppName') \
    .config('spark.executor.memory', '5gb') \
    .config("spark.cores.max", "8") \
    .getOrCreate()

In [19]:
sc = spark.sparkContext #sparkcontext
sqlContext = SQLContext(sc)



In [20]:
dataset_17 = sqlContext.read.option("header", True).option("inferSchema",True).csv('./Dataset/2017.csv')
dataset_18 = sqlContext.read.option("header", True).option("inferSchema",True).csv('./Dataset/2018.csv')
dataset = dataset_17.union(dataset_18)

In [21]:
print("shape :",dataset.count(), len(dataset.columns))

shape : 12888067 28


In [23]:
#show number of partitions -- > it should be equal to number of cores
dataset.rdd.getNumPartitions()


13

In [24]:

dataset.show(5)

+----------+----------+-----------------+------+----+------------+--------+---------+--------+----------+---------+-------+------------+--------+---------+---------+-----------------+--------+----------------+-------------------+--------+--------+-------------+-------------+---------+--------------+-------------------+-----------+
|   FL_DATE|OP_CARRIER|OP_CARRIER_FL_NUM|ORIGIN|DEST|CRS_DEP_TIME|DEP_TIME|DEP_DELAY|TAXI_OUT|WHEELS_OFF|WHEELS_ON|TAXI_IN|CRS_ARR_TIME|ARR_TIME|ARR_DELAY|CANCELLED|CANCELLATION_CODE|DIVERTED|CRS_ELAPSED_TIME|ACTUAL_ELAPSED_TIME|AIR_TIME|DISTANCE|CARRIER_DELAY|WEATHER_DELAY|NAS_DELAY|SECURITY_DELAY|LATE_AIRCRAFT_DELAY|Unnamed: 27|
+----------+----------+-----------------+------+----+------------+--------+---------+--------+----------+---------+-------+------------+--------+---------+---------+-----------------+--------+----------------+-------------------+--------+--------+-------------+-------------+---------+--------------+-------------------+-----------+
|

In [25]:
#drop column that all have NA values
dataset.dtypes


[('FL_DATE', 'string'),
 ('OP_CARRIER', 'string'),
 ('OP_CARRIER_FL_NUM', 'int'),
 ('ORIGIN', 'string'),
 ('DEST', 'string'),
 ('CRS_DEP_TIME', 'int'),
 ('DEP_TIME', 'double'),
 ('DEP_DELAY', 'double'),
 ('TAXI_OUT', 'double'),
 ('WHEELS_OFF', 'double'),
 ('WHEELS_ON', 'double'),
 ('TAXI_IN', 'double'),
 ('CRS_ARR_TIME', 'int'),
 ('ARR_TIME', 'double'),
 ('ARR_DELAY', 'double'),
 ('CANCELLED', 'double'),
 ('CANCELLATION_CODE', 'string'),
 ('DIVERTED', 'double'),
 ('CRS_ELAPSED_TIME', 'double'),
 ('ACTUAL_ELAPSED_TIME', 'double'),
 ('AIR_TIME', 'double'),
 ('DISTANCE', 'double'),
 ('CARRIER_DELAY', 'double'),
 ('WEATHER_DELAY', 'double'),
 ('NAS_DELAY', 'double'),
 ('SECURITY_DELAY', 'double'),
 ('LATE_AIRCRAFT_DELAY', 'double'),
 ('Unnamed: 27', 'string')]

In [26]:
dataset =dataset.drop('Unnamed: 27')

In [43]:
dataset.printSchema()

AttributeError: 'NoneType' object has no attribute 'printSchema'

In [28]:
dataset.show(1)


+----------+----------+-----------------+------+----+------------+--------+---------+--------+----------+---------+-------+------------+--------+---------+---------+-----------------+--------+----------------+-------------------+--------+--------+-------------+-------------+---------+--------------+-------------------+
|   FL_DATE|OP_CARRIER|OP_CARRIER_FL_NUM|ORIGIN|DEST|CRS_DEP_TIME|DEP_TIME|DEP_DELAY|TAXI_OUT|WHEELS_OFF|WHEELS_ON|TAXI_IN|CRS_ARR_TIME|ARR_TIME|ARR_DELAY|CANCELLED|CANCELLATION_CODE|DIVERTED|CRS_ELAPSED_TIME|ACTUAL_ELAPSED_TIME|AIR_TIME|DISTANCE|CARRIER_DELAY|WEATHER_DELAY|NAS_DELAY|SECURITY_DELAY|LATE_AIRCRAFT_DELAY|
+----------+----------+-----------------+------+----+------------+--------+---------+--------+----------+---------+-------+------------+--------+---------+---------+-----------------+--------+----------------+-------------------+--------+--------+-------------+-------------+---------+--------------+-------------------+
|2017-01-01|        AA|              

In [29]:
t =dataset.count()

In [72]:
#print number of null values for each column
amount_missing_df = dataset.select([count(when(col(c).isNull(), c)).alias(c) for c in dataset.columns]).show()

+-------+----------+-----------------+------+----+------------+--------+---------+--------+----------+---------+-------+------------+--------+---------+---------+-----------------+--------+----------------+-------------------+--------+--------+-------------+-------------+---------+--------------+-------------------+
|FL_DATE|OP_CARRIER|OP_CARRIER_FL_NUM|ORIGIN|DEST|CRS_DEP_TIME|DEP_TIME|DEP_DELAY|TAXI_OUT|WHEELS_OFF|WHEELS_ON|TAXI_IN|CRS_ARR_TIME|ARR_TIME|ARR_DELAY|CANCELLED|CANCELLATION_CODE|DIVERTED|CRS_ELAPSED_TIME|ACTUAL_ELAPSED_TIME|AIR_TIME|DISTANCE|CARRIER_DELAY|WEATHER_DELAY|NAS_DELAY|SECURITY_DELAY|LATE_AIRCRAFT_DELAY|
+-------+----------+-----------------+------+----+------------+--------+---------+--------+----------+---------+-------+------------+--------+---------+---------+-----------------+--------+----------------+-------------------+--------+--------+-------------+-------------+---------+--------------+-------------------+
|      0|         0|                0|     0| 

In [71]:
# amount_missing_df.head()

Row(FL_DATE=0, OP_CARRIER=0, OP_CARRIER_FL_NUM=0, ORIGIN=0, DEST=0, CRS_DEP_TIME=0, DEP_TIME=192625, DEP_DELAY=197577, TAXI_OUT=197975, WHEELS_OFF=197970, WHEELS_ON=203920, TAXI_IN=203920, CRS_ARR_TIME=0, ARR_TIME=203919, ARR_DELAY=232251, CANCELLED=0, CANCELLATION_CODE=12688790, DIVERTED=0, CRS_ELAPSED_TIME=17, ACTUAL_ELAPSED_TIME=229653, AIR_TIME=229653, DISTANCE=0, CARRIER_DELAY=10505884, WEATHER_DELAY=10505884, NAS_DELAY=10505884, SECURITY_DELAY=10505884, LATE_AIRCRAFT_DELAY=10505884)

In [30]:
delays = ["CARRIER_DELAY", "WEATHER_DELAY", "NAS_DELAY", "SECURITY_DELAY" ,"LATE_AIRCRAFT_DELAY"]

In [14]:
# ARR_Delay is the total delay on arrival in minutes , only 1.8% is missing 
# on the other hand ,  [CARRIER_DELAY WEATHER_DELAY NAS_DELAY SECURITY_DELAY LATE_AIRCRAFT_DELAY] values have 81.5% NA 
# removing all records with NA values will severely shrink dataset 
# before we remove all these records , we can try to fill NA values of delay features
# we compare number of zeros of a row with total number of records without NA
for delay in delays:
    print("type of delay :",delay)
    print("# of zeros :",dataset.filter(col(delay)==0).count())
    print("# of zeros percent :",dataset.filter(col(delay)==0).count()/((18.5/100)*t))
    print("mean : ",dataset.select(mean(delay)).collect())
    print("--------")

type of delay : CARRIER_DELAY
# of zeros : 1204800
# of zeros percent : 0.5053071521456579
mean :  [Row(avg(CARRIER_DELAY)=19.659985399946184)]
--------
type of delay : WEATHER_DELAY
# of zeros : 2247720
# of zeros percent : 0.942719946896446
mean :  [Row(avg(WEATHER_DELAY)=3.2402355318630014)]
--------
type of delay : NAS_DELAY
# of zeros : 1084689
# of zeros percent : 0.4549311998287861
mean :  [Row(avg(NAS_DELAY)=15.947230754312326)]
--------
type of delay : SECURITY_DELAY
# of zeros : 2374675
# of zeros percent : 0.9959663525244772
mean :  [Row(avg(SECURITY_DELAY)=0.09264653471206873)]
--------
type of delay : LATE_AIRCRAFT_DELAY
# of zeros : 1139721
# of zeros percent : 0.47801226157918436
mean :  [Row(avg(LATE_AIRCRAFT_DELAY)=25.4363867091655)]
--------


In [31]:
# we can deduce that number of zeros is dominant in these features , so filling NAs with mean,ffill,backfill,mode or interpolation are not sensible (outliers will increase mean) 
# best options are zero filling or median(which will probably be 0)
dataset = dataset.fillna(0,subset=delays)

In [32]:
dataset.select([count(when(col(c).isNull(), c)).alias(c) for c in delays]).show()

+-------------+-------------+---------+--------------+-------------------+
|CARRIER_DELAY|WEATHER_DELAY|NAS_DELAY|SECURITY_DELAY|LATE_AIRCRAFT_DELAY|
+-------------+-------------+---------+--------------+-------------------+
|            0|            0|        0|             0|                  0|
+-------------+-------------+---------+--------------+-------------------+



In [33]:
dataset.show(1)

+----------+----------+-----------------+------+----+------------+--------+---------+--------+----------+---------+-------+------------+--------+---------+---------+-----------------+--------+----------------+-------------------+--------+--------+-------------+-------------+---------+--------------+-------------------+
|   FL_DATE|OP_CARRIER|OP_CARRIER_FL_NUM|ORIGIN|DEST|CRS_DEP_TIME|DEP_TIME|DEP_DELAY|TAXI_OUT|WHEELS_OFF|WHEELS_ON|TAXI_IN|CRS_ARR_TIME|ARR_TIME|ARR_DELAY|CANCELLED|CANCELLATION_CODE|DIVERTED|CRS_ELAPSED_TIME|ACTUAL_ELAPSED_TIME|AIR_TIME|DISTANCE|CARRIER_DELAY|WEATHER_DELAY|NAS_DELAY|SECURITY_DELAY|LATE_AIRCRAFT_DELAY|
+----------+----------+-----------------+------+----+------------+--------+---------+--------+----------+---------+-------+------------+--------+---------+---------+-----------------+--------+----------------+-------------------+--------+--------+-------------+-------------+---------+--------------+-------------------+
|2017-01-01|        AA|              

In [34]:
# drop cancellation code - 98% missing and irrelevant
dataset =dataset.drop('CANCELLATION_CODE')

In [35]:
#drop NAs
dataset_visualization = dataset.dropna() 

In [36]:
#removed around 1.8% of data only
print("shape :",dataset_visualization.count(), len(dataset_visualization.columns))

shape : 12651227 26


In [37]:
dataset_visualization.select("OP_CARRIER").distinct().show()

+----------+
|OP_CARRIER|
+----------+
|        UA|
|        NK|
|        AA|
|        EV|
|        B6|
|        DL|
|        OO|
|        F9|
|        HA|
|        AS|
|        VX|
|        WN|
|        YV|
|        MQ|
|        OH|
|        G4|
|        YX|
|        9E|
+----------+



In [38]:
#after inspection of op_carriers feature levels , we figured actual carrier names 
carriers_dict = {
    'UA':'United Airlines',
    'AS':'Alaska Airlines',
    '9E':'Endeavor Air',
    'B6':'JetBlue Airways',
    'EV':'ExpressJet',
    'F9':'Frontier Airlines',
    'G4':'Allegiant Air',
    'HA':'Hawaiian Airlines',
    'MQ':'Envoy Air',
    'NK':'Spirit Airlines',
    'OH':'PSA Airlines',
    'OO':'SkyWest Airlines',
    'VX':'Virgin America',
    'WN':'Southwest Airlines',
    'YV':'Mesa Airline',
    'YX':'Republic Airways',
    'AA':'American Airlines',
    'DL':'Delta Airlines'
}
dataset_visualization = dataset_visualization.replace(carriers_dict,1,'OP_CARRIER')




In [39]:
dataset_visualization.select("OP_CARRIER").distinct().show()

+------------------+
|        OP_CARRIER|
+------------------+
|   United Airlines|
|    Virgin America|
| Hawaiian Airlines|
|        ExpressJet|
|  SkyWest Airlines|
| Frontier Airlines|
| American Airlines|
|   JetBlue Airways|
|    Delta Airlines|
|   Alaska Airlines|
|   Spirit Airlines|
|Southwest Airlines|
|      Mesa Airline|
|     Allegiant Air|
|  Republic Airways|
|      Endeavor Air|
|         Envoy Air|
|      PSA Airlines|
+------------------+



In [42]:
dataset = dataset.select(col("FL_DATE"),to_date(col("FL_DATE"),"yyyy-MM-dd").alias("date"))

AttributeError: 'NoneType' object has no attribute 'select'

In [None]:
dataset_visualization['Month'] = pd.DatetimeIndex(dataset_visualization['FL_DATE']).month
dataset_visualization['Day'] = pd.DatetimeIndex(dataset_visualization['FL_DATE']).day
dataset_visualization['Year'] = pd.DatetimeIndex(dataset_visualization['FL_DATE']).year

In [None]:
display(dataset_visualization.tail(n=1))

In [None]:
#write the dataset for visualization using parquet dataformat , fast and low memory usage
dataset_visualization.to_parquet("./Dataset/dataset_visualization.parquet",index=False)

## After preparing a dataset for visualization , prepare the data for ML 

In [None]:
#drop flight number - not needed
dataset_visualization.drop('OP_CARRIER_FL_NUM', inplace=True, axis=1)

#drop flight date - irrelevant
dataset_visualization.drop('FL_DATE', inplace=True, axis=1)

# drop diverted - feature irrelevant to problem 
dataset_visualization.drop('DIVERTED', inplace=True, axis=1)

In [None]:
dataset_visualization.shape

In [None]:
#remove all rows having NA values
dataset_ML = dataset_visualization.dropna()

In [None]:
dataset_ML.shape

In [None]:
display(dataset_ML.head(n=10))

In [None]:
#print insights on number of origin and dest 
categorical = ["ORIGIN","DEST"] 
print("Number of departure locations : ",len(dataset_ML[categorical[0]].unique()))
print("Number of destination locations : ",len(dataset_ML[categorical[1]].unique()))

### Looking at the ORIGIN and DEST features , they have huge number of unique values , this means that :
- using hot encoding for converting to numerical features will yield huge number of features which will lead to the curse of dimensionality 
- using label encoding will yield to only one feature , but the feature values will have great std deviation and labels will have different priority
- so it would be better if we drop both

### for the op_carrier :
- different carriers may differ in plane services but irrelevant to our problem so it will be removed

### FL_Date is only used for visualization so it will be removed , same for month day and year features
- we could have made use of month feature because of its relation to a season in a year , but the data is only based on domestic flights of the US so it will biased to seasonality in the US only , model needs to generalize regardless

In [None]:
dataset_ML=dataset_ML.drop('OP_CARRIER', axis=1)

In [None]:
dataset_ML=dataset_ML.drop('ORIGIN',axis=1)

In [None]:
dataset_ML=dataset_ML.drop('DEST',axis=1)

In [None]:
dataset_ML=dataset_ML.drop('Month',axis=1)

In [None]:
dataset_ML=dataset_ML.drop('Day',axis=1)

In [None]:
dataset_ML=dataset_ML.drop('Year',axis=1)

In [None]:
display(dataset_ML.head(n=5))

In [None]:
#write the dataset for visualization using parquet dataformat , fast and low memory usage
dataset_ML.to_parquet("./Dataset/dataset_ML.parquet",index=False)