In [31]:
import numpy as np
import pandas as pd
import sqlite3
from sqlite3 import Error
import matplotlib.pyplot as plt
import seaborn as sns

import imblearn

from imblearn.over_sampling import SMOTE


from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier

from sklearn.metrics import confusion_matrix, classification_report
import pyspark
from pyspark.sql import SparkSession
import xgboost as xgb

In [2]:
### download the db from link in readme
db_path = "../../dbs/delays.db"

In [3]:
def evaluate_model(model, X_test, y_test):
    
    model_acc = model.score(X_test, y_test)
    print("Test Accuracy: {:.2f}%".format(model_acc * 100))
    
    y_true = np.array(y_test)
    y_pred = model.predict(X_test)
    
    cm = confusion_matrix(y_true, y_pred)
    clr = classification_report(y_true, y_pred, target_names=["Not Delayed", "Delayed"])
    
    plt.figure(figsize=(8, 8))
    sns.heatmap(cm, annot=True, vmin=0, fmt='g', cmap='Blues', cbar=False)
    plt.xticks(np.arange(2) + 0.5, ["Not Delayed", "Delayed"])
    plt.yticks(np.arange(2) + 0.5, ["Not Delayed", "Delayed"])
    plt.xlabel("Predicted")
    plt.ylabel("Actual")
    plt.title("Confusion Matrix")
    plt.show()
    
    print("Classification Report:\n----------------------\n", clr)

In [4]:
def create_connection(path):
    connection = None
    try:
        connection = sqlite3.connect(path)
        connection.text_factory = str
    except Error as e:
        print("Error occurred: " + str(e))
    return connection


def execute_query(connection, query):
    cursor = connection.cursor()
    try:
        if query == "":
            return "Query Blank"
        else:
            df = pd.read_sql_query(query, connection)
            return df
    except Error as e:
        return "Error occurred: " + str(e)

In [5]:
sample_query = \
    """
    select 
     *
    from 
      delay
    where Year = 2015
    and Month = 1
    """

connection = create_connection(db_path)
my =  execute_query(connection, sample_query)

In [6]:
my

Unnamed: 0,Year,Quarter,Month,DayofMonth,DayOfWeek,FlightDate,DOT_ID_Reporting_Airline,IATA_CODE_Reporting_Airline,Tail_Number,Flight_Number_Reporting_Airline,...,CRSElapsedTime,ActualElapsedTime,AirTime,Flights,Distance,CarrierDelay,WeatherDelay,NASDelay,SecurityDelay,LateAircraftDelay
0,2015,1,1,1,4,2015-01-01,19805,AA,N787AA,1,...,390,402,378,1,2475,0,0,0,0,0
1,2015,1,1,2,5,2015-01-02,19805,AA,N795AA,1,...,390,381,357,1,2475,0,0,0,0,0
2,2015,1,1,3,6,2015-01-03,19805,AA,N788AA,1,...,390,358,330,1,2475,0,0,0,0,0
3,2015,1,1,4,7,2015-01-04,19805,AA,N791AA,1,...,390,385,352,1,2475,0,0,0,0,0
4,2015,1,1,5,1,2015-01-05,19805,AA,N783AA,1,...,390,389,338,1,2475,0,0,0,0,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
469963,2015,1,1,28,3,2015-01-28,20416,NK,N524NK,188,...,83,82,64,1,407,0,0,0,0,0
469964,2015,1,1,29,4,2015-01-29,20416,NK,N502NK,188,...,83,82,64,1,407,0,0,0,0,0
469965,2015,1,1,30,5,2015-01-30,20416,NK,N519NK,188,...,83,89,66,1,407,0,0,0,0,0
469966,2015,1,1,31,6,2015-01-31,20416,NK,N502NK,188,...,83,95,69,1,407,0,0,0,0,0


In [7]:
Flights = pd.read_csv('../../datasets/flights.csv')

  exec(code_obj, self.user_global_ns, self.user_ns)


In [8]:
Flights = Flights.iloc[:,:23]
Flights.dropna(inplace=True)
Flights = Flights[Flights["MONTH"]==1]

In [9]:
Flights

Unnamed: 0,YEAR,MONTH,DAY,DAY_OF_WEEK,AIRLINE,FLIGHT_NUMBER,TAIL_NUMBER,ORIGIN_AIRPORT,DESTINATION_AIRPORT,SCHEDULED_DEPARTURE,...,WHEELS_OFF,SCHEDULED_TIME,ELAPSED_TIME,AIR_TIME,DISTANCE,WHEELS_ON,TAXI_IN,SCHEDULED_ARRIVAL,ARRIVAL_TIME,ARRIVAL_DELAY
0,2015,1,1,4,AS,98,N407AS,ANC,SEA,5,...,15.0,205.0,194.0,169.0,1448,404.0,4.0,430,408.0,-22.0
1,2015,1,1,4,AA,2336,N3KUAA,LAX,PBI,10,...,14.0,280.0,279.0,263.0,2330,737.0,4.0,750,741.0,-9.0
2,2015,1,1,4,US,840,N171US,SFO,CLT,20,...,34.0,286.0,293.0,266.0,2296,800.0,11.0,806,811.0,5.0
3,2015,1,1,4,AA,258,N3HYAA,LAX,MIA,20,...,30.0,285.0,281.0,258.0,2342,748.0,8.0,805,756.0,-9.0
4,2015,1,1,4,AS,135,N527AS,SEA,ANC,25,...,35.0,235.0,215.0,199.0,1448,254.0,5.0,320,259.0,-21.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
469963,2015,1,31,6,B6,839,N658JB,JFK,BQN,2359,...,17.0,221.0,200.0,179.0,1576,416.0,3.0,440,419.0,-21.0
469964,2015,1,31,6,DL,1887,N855NW,SEA,DTW,2359,...,15.0,252.0,239.0,209.0,1927,644.0,14.0,711,658.0,-13.0
469965,2015,1,31,6,F9,300,N218FR,DEN,TPA,2359,...,37.0,192.0,212.0,168.0,1506,525.0,9.0,511,534.0,23.0
469966,2015,1,31,6,F9,422,N954FR,DEN,ATL,2359,...,39.0,162.0,191.0,141.0,1199,500.0,10.0,441,510.0,29.0


In [10]:
his = Flights.copy()

In [11]:
cols = ['Month',
 'DayofMonth',
 'DayOfWeek',
 'IATA_CODE_Reporting_Airline',
 'OriginAirportID',
 'DestAirportID',
 'AirTime',
 'ArrDelay',
 'DepDelay']

In [12]:
his.rename({
     'YEAR': 'Year',
     'MONTH': 'Month',
     'AIRLINE' : 'IATA_CODE_Reporting_Airline',
     'DAY': 'DayofMonth',
     'DAY_OF_WEEK': 'DayOfWeek',
     'ORIGIN_AIRPORT': 'OriginAirportID',
     'DESTINATION_AIRPORT': 'DestAirportID',
     'DEPARTURE_DELAY' : 'DepDelay',
     'AIR_TIME': 'AirTime',
     'ARRIVAL_DELAY' : 'ArrDelay'
}, axis=1
, inplace=True)

In [13]:
his

Unnamed: 0,Year,Month,DayofMonth,DayOfWeek,IATA_CODE_Reporting_Airline,FLIGHT_NUMBER,TAIL_NUMBER,OriginAirportID,DestAirportID,SCHEDULED_DEPARTURE,...,WHEELS_OFF,SCHEDULED_TIME,ELAPSED_TIME,AirTime,DISTANCE,WHEELS_ON,TAXI_IN,SCHEDULED_ARRIVAL,ARRIVAL_TIME,ArrDelay
0,2015,1,1,4,AS,98,N407AS,ANC,SEA,5,...,15.0,205.0,194.0,169.0,1448,404.0,4.0,430,408.0,-22.0
1,2015,1,1,4,AA,2336,N3KUAA,LAX,PBI,10,...,14.0,280.0,279.0,263.0,2330,737.0,4.0,750,741.0,-9.0
2,2015,1,1,4,US,840,N171US,SFO,CLT,20,...,34.0,286.0,293.0,266.0,2296,800.0,11.0,806,811.0,5.0
3,2015,1,1,4,AA,258,N3HYAA,LAX,MIA,20,...,30.0,285.0,281.0,258.0,2342,748.0,8.0,805,756.0,-9.0
4,2015,1,1,4,AS,135,N527AS,SEA,ANC,25,...,35.0,235.0,215.0,199.0,1448,254.0,5.0,320,259.0,-21.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
469963,2015,1,31,6,B6,839,N658JB,JFK,BQN,2359,...,17.0,221.0,200.0,179.0,1576,416.0,3.0,440,419.0,-21.0
469964,2015,1,31,6,DL,1887,N855NW,SEA,DTW,2359,...,15.0,252.0,239.0,209.0,1927,644.0,14.0,711,658.0,-13.0
469965,2015,1,31,6,F9,300,N218FR,DEN,TPA,2359,...,37.0,192.0,212.0,168.0,1506,525.0,9.0,511,534.0,23.0
469966,2015,1,31,6,F9,422,N954FR,DEN,ATL,2359,...,39.0,162.0,191.0,141.0,1199,500.0,10.0,441,510.0,29.0


In [17]:
my = my[cols]
his = his[cols]

In [29]:
his.sort_values(["Month", 'DayofMonth', 'DayOfWeek', 'IATA_CODE_Reporting_Airline', 'AirTime'], ascending=[True, True, True, True, False]).head(10)

Unnamed: 0,Month,DayofMonth,DayOfWeek,IATA_CODE_Reporting_Airline,OriginAirportID,DestAirportID,AirTime,ArrDelay,DepDelay
6205,1,1,4,AA,DFW,HNL,499.0,300.0,289.0
4130,1,1,4,AA,DFW,HNL,490.0,174.0,164.0
5440,1,1,4,AA,DFW,OGG,490.0,195.0,178.0
3791,1,1,4,AA,DFW,OGG,485.0,7.0,5.0
10603,1,1,4,AA,HNL,DFW,420.0,109.0,108.0
12447,1,1,4,AA,OGG,DFW,418.0,24.0,21.0
11964,1,1,4,AA,HNL,DFW,417.0,276.0,276.0
871,1,1,4,AA,BOS,LAX,390.0,10.0,-7.0
10858,1,1,4,AA,JFK,SFO,389.0,1.0,-6.0
1593,1,1,4,AA,JFK,SFO,388.0,48.0,-5.0


In [28]:
my.sort_values(["Month", 'DayofMonth', 'DayOfWeek', 'IATA_CODE_Reporting_Airline', 'AirTime'], ascending=[True, True, True, True, False]).head(10)

Unnamed: 0,Month,DayofMonth,DayOfWeek,IATA_CODE_Reporting_Airline,OriginAirportID,DestAirportID,AirTime,ArrDelay,DepDelay
124,1,1,4,AA,11298,12173,499,300,289
186,1,1,4,AA,11298,13830,490,195,178
1152,1,1,4,AA,11298,12173,490,174,164
1081,1,1,4,AA,11298,13830,485,7,5
217,1,1,4,AA,12173,11298,420,109,108
1010,1,1,4,AA,13830,11298,418,24,21
3993,1,1,4,AA,12173,11298,417,276,276
4807,1,1,4,AA,10721,12892,390,10,7
6648,1,1,4,AA,12478,14771,389,1,6
365,1,1,4,AA,12478,14771,388,48,5


In [32]:
spark = SparkSession.builder \
    .appName('airlines_delay')\
    .config('spark.jars', '../../jars/snowflake-jdbc-3.13.6.jar, ../../jars/spark-snowflake_2.12-2.9.0-spark_3.1.jar') \
    .getOrCreate()

22/11/13 08:46:28 WARN Utils: Your hostname, SPMBP136.local resolves to a loopback address: 127.0.0.1; using 192.168.0.101 instead (on interface en6)
22/11/13 08:46:28 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
22/11/13 08:46:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [34]:
spark_cols = ['Year',
 'Quarter',
 'Month',
 'DayofMonth',
 'DayOfWeek',
 'FlightDate',
 'DOT_ID_Reporting_Airline',
 'IATA_CODE_Reporting_Airline',
 'Tail_Number',
 'Flight_Number_Reporting_Airline',
 'OriginAirportID',
 'DestAirportID',
 'CRSDepTime',
 'DepTime',
 'DepDelay',
 'DepDelayMinutes',
 'CRSArrTime',
 'ArrTime',
 'ArrDelay',
 'ArrDelayMinutes',
 'Cancelled',
 'CancellationCode',
 'Diverted',
 'CRSElapsedTime',
 'ActualElapsedTime',
 'AirTime',
 'Flights',
 'Distance',
 'CarrierDelay',
 'WeatherDelay',
 'NASDelay',
 'SecurityDelay',
 'LateAircraftDelay']

In [36]:
file_path = "../../datasets/delays.csv"

delay = spark.read.csv(file_path, header=False)
delay = delay.toDF(*spark_cols)

In [37]:
delay

DataFrame[Year: string, Quarter: string, Month: string, DayofMonth: string, DayOfWeek: string, FlightDate: string, DOT_ID_Reporting_Airline: string, IATA_CODE_Reporting_Airline: string, Tail_Number: string, Flight_Number_Reporting_Airline: string, OriginAirportID: string, DestAirportID: string, CRSDepTime: string, DepTime: string, DepDelay: string, DepDelayMinutes: string, CRSArrTime: string, ArrTime: string, ArrDelay: string, ArrDelayMinutes: string, Cancelled: string, CancellationCode: string, Diverted: string, CRSElapsedTime: string, ActualElapsedTime: string, AirTime: string, Flights: string, Distance: string, CarrierDelay: string, WeatherDelay: string, NASDelay: string, SecurityDelay: string, LateAircraftDelay: string]

In [38]:
delay = delay.withColumn('ArrDelay', delay.ArrDelay.cast('double'))

In [39]:
negs = delay.filter(delay.ArrDelay < 0)

In [40]:
negs.head(5)

22/11/13 09:01:36 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


                                                                                

[]

In [41]:
# fuck

In [44]:
cols = ['Year',
    'Quarter',
    'Month',
    'DayofMonth',
    'DayOfWeek',
    'FlightDate',
    'Reporting_Airline',
    'DOT_ID_Reporting_Airline',
    'IATA_CODE_Reporting_Airline',
    'Tail_Number',
    'Flight_Number_Reporting_Airline',
    'OriginAirportID',
    'OriginAirportSeqID',
    'OriginCityMarketID',
    'Origin',
    'OriginCityName',
    'OriginState',
    'OriginStateFips',
    'OriginStateName',
    'OriginWac',
    'DestAirportID',
    'DestAirportSeqID',
    'DestCityMarketID',
    'Dest',
    'DestCityName',
    'DestState',
    'DestStateFips',
    'DestStateName',
    'DestWac',
    'CRSDepTime',
    'DepTime',
    'DepDelay',
    'DepDelayMinutes',
    'DepDel15',
    'DepartureDelayGroups',
    'DepTimeBlk',
    'TaxiOut',
    'WheelsOff',
    'WheelsOn',
    'TaxiIn',
    'CRSArrTime',
    'ArrTime',
    'ArrDelay',
    'ArrDelayMinutes',
    'ArrDel15',
    'ArrivalDelayGroups',
    'ArrTimeBlk',
    'Cancelled',
    'CancellationCode',
    'Diverted',
    'CRSElapsedTime',
    'ActualElapsedTime',
    'AirTime',
    'Flights',
    'Distance',
    'DistanceGroup',
    'CarrierDelay',
    'WeatherDelay',
    'NASDelay',
    'SecurityDelay',
    'LateAircraftDelay',
    'FirstDepTime',
    'TotalAddGTime',
    'LongestAddGTime',
    'DivAirportLandings',
    'DivReachedDest',
    'DivActualElapsedTime',
    'DivArrDelay',
    'DivDistance',
    'Div1Airport',
    'Div1AirportID',
    'Div1AirportSeqID',
    'Div1WheelsOn',
    'Div1TotalGTime',
    'Div1LongestGTime',
    'Div1WheelsOff',
    'Div1TailNum',
    'Div2Airport',
    'Div2AirportID',
    'Div2AirportSeqID',
    'Div2WheelsOn',
    'Div2TotalGTime',
    'Div2LongestGTime',
    'Div2WheelsOff',
    'Div2TailNum',
    'Div3Airport',
    'Div3AirportID',
    'Div3AirportSeqID',
    'Div3WheelsOn',
    'Div3TotalGTime',
    'Div3LongestGTime',
    'Div3WheelsOff',
    'Div3TailNum',
    'Div4Airport',
    'Div4AirportID',
    'Div4AirportSeqID',
    'Div4WheelsOn',
    'Div4TotalGTime',
    'Div4LongestGTime',
    'Div4WheelsOff',
    'Div4TailNum',
    'Div5Airport',
    'Div5AirportID',
    'Div5AirportSeqID',
    'Div5WheelsOn',
    'Div5TotalGTime',
    'Div5LongestGTime',
    'Div5WheelsOff',
    'Div5TailNum'
]

In [45]:
file_path = "../../datasets/2014.csv"

delay_2014 = spark.read.csv(file_path)
delay_2014 = delay_2014.toDF(*cols)

In [46]:
delay_2014.show(2)

+----+-------+-----+----------+---------+----------+-----------------+------------------------+---------------------------+-----------+-------------------------------+---------------+------------------+------------------+------+--------------+-----------+---------------+---------------+---------+-------------+----------------+----------------+-----+------------+---------+-------------+-------------+-------+----------+-------+--------+---------------+--------+--------------------+----------+-------+---------+--------+------+----------+-------+--------+---------------+--------+------------------+----------+---------+----------------+--------+--------------+-----------------+-------+-------+--------+-------------+------------+------------+--------+-------------+-----------------+------------+-------------+---------------+------------------+--------------+--------------------+-----------+-----------+-----------+-------------+----------------+------------+--------------+----------------+---

In [49]:
delay_2014 = delay_2014.select(*['Year',
                 'Month',
                 'DayofMonth',
                 'DayOfWeek',
                 'IATA_CODE_Reporting_Airline',
                 'OriginAirportID',
                 'DestAirportID',
                 'AirTime',
                 'ArrDelay',
                 'DepDelay'
])  

In [50]:
delay_2014 = delay_2014.withColumn('ArrDelay', delay_2014.ArrDelay.cast('double'))

In [52]:
negs = delay_2014.filter(delay_2014.ArrDelay < 0)

In [53]:
negs.head(5)

[Row(Year='2014', Month='5', DayofMonth='16', DayOfWeek='5', IATA_CODE_Reporting_Airline='AS', OriginAirportID='12889', DestAirportID='10666', AirTime='136', ArrDelay=-4.0, DepDelay='-10'),
 Row(Year='2014', Month='5', DayofMonth='16', DayOfWeek='5', IATA_CODE_Reporting_Airline='AS', OriginAirportID='14057', DestAirportID='14107', AirTime='127', ArrDelay=-11.0, DepDelay='-5'),
 Row(Year='2014', Month='5', DayofMonth='16', DayOfWeek='5', IATA_CODE_Reporting_Airline='AS', OriginAirportID='15376', DestAirportID='14747', AirTime='166', ArrDelay=-8.0, DepDelay='-14'),
 Row(Year='2014', Month='5', DayofMonth='16', DayOfWeek='5', IATA_CODE_Reporting_Airline='AS', OriginAirportID='10666', DestAirportID='12889', AirTime='125', ArrDelay=-11.0, DepDelay='-6'),
 Row(Year='2014', Month='5', DayofMonth='16', DayOfWeek='5', IATA_CODE_Reporting_Airline='AS', OriginAirportID='12889', DestAirportID='10666', AirTime='134', ArrDelay=-10.0, DepDelay='-11')]