# ETL for flights

In [0]:
from pyspark import SparkContext
from pyspark.sql import functions as f
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, NullType, ShortType, DateType, BooleanType, BinaryType
from pyspark.sql import SQLContext, SparkSession
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
%matplotlib inline

from pyspark.sql import types
import pyspark.sql.functions as F
SEED = 7

import warnings
warnings.filterwarnings('ignore')

from IPython.display import Image
from pyspark.sql.functions import desc

In [0]:
file_path = "dbfs:/mnt/mids-w261/"
project_path = "dbfs:/user/chitra.agastya@ischool.berkeley.edu/FinalProject/"

ARR_DELAY_THRESHOLD = 45
dataset = "test"
compute_graph = False




In [0]:
# Enable for pretty viewing of tables
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)

In [0]:
display(dbutils.fs.ls("dbfs:/mnt/mids-w261/datasets_final_project/parquet_airlines_data"))

path,name,size
dbfs:/mnt/mids-w261/datasets_final_project/parquet_airlines_data/2015.parquet/,2015.parquet/,0
dbfs:/mnt/mids-w261/datasets_final_project/parquet_airlines_data/2016.parquet/,2016.parquet/,0
dbfs:/mnt/mids-w261/datasets_final_project/parquet_airlines_data/2017.parquet/,2017.parquet/,0
dbfs:/mnt/mids-w261/datasets_final_project/parquet_airlines_data/2018.parquet/,2018.parquet/,0
dbfs:/mnt/mids-w261/datasets_final_project/parquet_airlines_data/2019.parquet/,2019.parquet/,0
dbfs:/mnt/mids-w261/datasets_final_project/parquet_airlines_data/airlines_size_test.parquet/,airlines_size_test.parquet/,0


In [0]:
display(dbutils.fs.ls(project_path))

path,name,size
dbfs:/user/chitra.agastya@ischool.berkeley.edu/FinalProject/Chitra_feature_data/,Chitra_feature_data/,0
dbfs:/user/chitra.agastya@ischool.berkeley.edu/FinalProject/airline_singleday.parquet/,airline_singleday.parquet/,0
dbfs:/user/chitra.agastya@ischool.berkeley.edu/FinalProject/airlines_3m_features_ext.parquet/,airlines_3m_features_ext.parquet/,0
dbfs:/user/chitra.agastya@ischool.berkeley.edu/FinalProject/airlines_3m_full_features.parquet/,airlines_3m_full_features.parquet/,0
dbfs:/user/chitra.agastya@ischool.berkeley.edu/FinalProject/airlines_weather_data/,airlines_weather_data/,0
dbfs:/user/chitra.agastya@ischool.berkeley.edu/FinalProject/airport-timezones.csv,airport-timezones.csv,439779
dbfs:/user/chitra.agastya@ischool.berkeley.edu/FinalProject/airport_edges/,airport_edges/,0
dbfs:/user/chitra.agastya@ischool.berkeley.edu/FinalProject/airport_edges_1_year/,airport_edges_1_year/,0
dbfs:/user/chitra.agastya@ischool.berkeley.edu/FinalProject/airport_edges_3_month/,airport_edges_3_month/,0
dbfs:/user/chitra.agastya@ischool.berkeley.edu/FinalProject/airport_edges_4_year/,airport_edges_4_year/,0


# 1. Airlines Data

### Load the Airlines Data

In [0]:
#--- Full data set
airlines = spark.read.option("header", "true").parquet(file_path+"datasets_final_project/parquet_airlines_data/201*.parquet")
airlines.createOrReplaceTempView("airlines")

### Limit date range

In [0]:
# train = 2015 through 2018, minus the dev set (see below)
if dataset == "train":
  airlines = airlines \
    .filter("make_timestamp(year, month, day_of_month,0,0,0) < '2019-1-1'") \
    .filter("year < 2018 or month in (2,3,5,6,8,9,11,12)")
  # train_3m = first three months of 2015
elif dataset == "train_3m":
  airlines = airlines \
    .filter("make_timestamp(year, month, day_of_month,0,0,0) < '2015-4-1'")
  # dev = first month of every quarter in 2018
elif dataset == "dev":
  airlines = airlines \
    .filter("year == 2018") \
    .filter("month in (1, 4, 7, 10)")
  # test set = 2019
elif dataset == "test":
  airlines = airlines \
    .filter("year == 2019")
else:
  print("what? ",dataset)
  print(1/0)
        
airlines.createOrReplaceTempView("airlines")

In [0]:
print(airlines.count())
display(airlines.limit(2))

YEAR,QUARTER,MONTH,DAY_OF_MONTH,DAY_OF_WEEK,FL_DATE,OP_UNIQUE_CARRIER,OP_CARRIER_AIRLINE_ID,OP_CARRIER,TAIL_NUM,OP_CARRIER_FL_NUM,ORIGIN_AIRPORT_ID,ORIGIN_AIRPORT_SEQ_ID,ORIGIN_CITY_MARKET_ID,ORIGIN,ORIGIN_CITY_NAME,ORIGIN_STATE_ABR,ORIGIN_STATE_FIPS,ORIGIN_STATE_NM,ORIGIN_WAC,DEST_AIRPORT_ID,DEST_AIRPORT_SEQ_ID,DEST_CITY_MARKET_ID,DEST,DEST_CITY_NAME,DEST_STATE_ABR,DEST_STATE_FIPS,DEST_STATE_NM,DEST_WAC,CRS_DEP_TIME,DEP_TIME,DEP_DELAY,DEP_DELAY_NEW,DEP_DEL15,DEP_DELAY_GROUP,DEP_TIME_BLK,TAXI_OUT,WHEELS_OFF,WHEELS_ON,TAXI_IN,CRS_ARR_TIME,ARR_TIME,ARR_DELAY,ARR_DELAY_NEW,ARR_DEL15,ARR_DELAY_GROUP,ARR_TIME_BLK,CANCELLED,CANCELLATION_CODE,DIVERTED,CRS_ELAPSED_TIME,ACTUAL_ELAPSED_TIME,AIR_TIME,FLIGHTS,DISTANCE,DISTANCE_GROUP,CARRIER_DELAY,WEATHER_DELAY,NAS_DELAY,SECURITY_DELAY,LATE_AIRCRAFT_DELAY,FIRST_DEP_TIME,TOTAL_ADD_GTIME,LONGEST_ADD_GTIME,DIV_AIRPORT_LANDINGS,DIV_REACHED_DEST,DIV_ACTUAL_ELAPSED_TIME,DIV_ARR_DELAY,DIV_DISTANCE,DIV1_AIRPORT,DIV1_AIRPORT_ID,DIV1_AIRPORT_SEQ_ID,DIV1_WHEELS_ON,DIV1_TOTAL_GTIME,DIV1_LONGEST_GTIME,DIV1_WHEELS_OFF,DIV1_TAIL_NUM,DIV2_AIRPORT,DIV2_AIRPORT_ID,DIV2_AIRPORT_SEQ_ID,DIV2_WHEELS_ON,DIV2_TOTAL_GTIME,DIV2_LONGEST_GTIME,DIV2_WHEELS_OFF,DIV2_TAIL_NUM,DIV3_AIRPORT,DIV3_AIRPORT_ID,DIV3_AIRPORT_SEQ_ID,DIV3_WHEELS_ON,DIV3_TOTAL_GTIME,DIV3_LONGEST_GTIME,DIV3_WHEELS_OFF,DIV3_TAIL_NUM,DIV4_AIRPORT,DIV4_AIRPORT_ID,DIV4_AIRPORT_SEQ_ID,DIV4_WHEELS_ON,DIV4_TOTAL_GTIME,DIV4_LONGEST_GTIME,DIV4_WHEELS_OFF,DIV4_TAIL_NUM,DIV5_AIRPORT,DIV5_AIRPORT_ID,DIV5_AIRPORT_SEQ_ID,DIV5_WHEELS_ON,DIV5_TOTAL_GTIME,DIV5_LONGEST_GTIME,DIV5_WHEELS_OFF,DIV5_TAIL_NUM
2019,2,6,1,6,2019-06-01,B6,20409,B6,N929JB,620,14679,1467903,33570,SAN,"San Diego, CA",CA,6,California,91,10721,1072102,30721,BOS,"Boston, MA",MA,25,Massachusetts,13,1449,1501,12.0,12.0,0.0,0,1400-1459,18.0,1519,2315,5.0,2324,2320,-4.0,0.0,0.0,-1,2300-2359,0.0,,0.0,335.0,319.0,296.0,1.0,2588.0,11,,,,,,,,,0,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
2019,2,6,1,6,2019-06-01,B6,20409,B6,N981JT,623,12478,1247805,31703,JFK,"New York, NY",NY,36,New York,22,12892,1289208,32575,LAX,"Los Angeles, CA",CA,6,California,91,1445,1439,-6.0,0.0,0.0,-1,1400-1459,14.0,1453,1736,6.0,1800,1742,-18.0,0.0,0.0,-2,1800-1859,0.0,,0.0,375.0,363.0,343.0,1.0,2475.0,10,,,,,,,,,0,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,


#2. Feature Engineering

Here are the feature we think will help our airlines data analysis:
  > 1. Throughput related features
  
  > 2. Trip related features
  
  > 3. Pagerank of airports from airport graph
  
  > 4. Edge weights of origin/destination airport graph

##2.1 Airport/Airlines Throughput

#### 2.1.1. Number of Flights per day by origin

In [0]:
mysql = f"""
  select t1.origin, ceiling(avg(t1.count_per_day)) as avg_tp_per_day from
    (select origin, fl_date, count(fl_date) as count_per_day  
    from airlines
    group by origin, fl_date) t1
  group by origin
  order by avg_tp_per_day desc
"""
data = spark.sql(mysql)
data.createOrReplaceTempView("airport_thruput1")
display(data)

origin,avg_tp_per_day
ATL,1083
ORD,931
DFW,834
DEN,691
CLT,646
LAX,603
IAH,493
PHX,481
LGA,471
SFO,469


#### 2.1.2 Number of flights to destination per day from origin

In [0]:
mysql = f"""
  select t1.origin, t1.dest, round(avg(t1.count_per_day),2) as avg_flights_per_day from
    (select origin, dest, fl_date, count(*) as count_per_day  
    from airlines
    group by origin, dest, fl_date) t1
  group by origin, dest
  order by avg_flights_per_day desc, origin, dest
"""
data = spark.sql(mysql)
data.createOrReplaceTempView("airport_thruput2")
display(data)

origin,dest,avg_flights_per_day
ORD,LGA,40.28
LGA,ORD,40.27
LAX,SFO,40.08
SFO,LAX,40.04
LAX,JFK,35.24
JFK,LAX,35.08
LAX,LAS,32.12
LAS,LAX,32.1
OGG,HNL,29.42
HNL,OGG,29.41


#### 2.1.3. Number of flights by origin and airlines

In [0]:
mysql = f"""
  select t1.origin, op_carrier, ceiling(avg(t1.count_per_day)) as avg_car_per_day from
    (select origin, op_carrier, fl_date, count(fl_date) as count_per_day  
    from airlines
    group by origin, op_carrier, fl_date) t1
  group by origin, op_carrier
  order by avg_car_per_day desc, origin, op_carrier
"""
data = spark.sql(mysql)
data.createOrReplaceTempView("airport_thruput3")
display(data)

origin,op_carrier,avg_car_per_day
ATL,DL,669
DFW,AA,422
CLT,AA,274
CLT,OH,261
MDW,WN,216
ORD,UA,213
DEN,WN,197
LAS,WN,195
BWI,WN,193
ORD,MQ,190


#### 2.1.4. Number of flights by origin, dest and airlines

In [0]:
mysql = f"""
  select t1.origin, t1.dest, t1.op_carrier, round(avg(t1.count_per_day),2) as avg_car_flights_per_day from
    (select origin, dest, op_carrier, fl_date, count(*) as count_per_day  
    from airlines
    group by origin, dest, op_carrier, fl_date) t1
  group by origin, dest, op_carrier
  order by avg_car_flights_per_day desc, origin, dest, op_carrier
"""
data = spark.sql(mysql)
data.createOrReplaceTempView("airport_thruput4")
display(data)

origin,dest,op_carrier,avg_car_flights_per_day
OGG,HNL,HA,26.7
HNL,OGG,HA,26.69
HNL,KOA,HA,18.85
KOA,HNL,HA,18.84
HNL,LIH,HA,18.47
LIH,HNL,HA,18.47
HOU,DAL,WN,18.17
DAL,HOU,WN,18.12
ATL,MCO,DL,16.29
MCO,ATL,DL,16.29


### 2.1.5 Create Join of Thruput Features

In [0]:
mysql = f"""
        select t21.*, t22.op_carrier, t22.avg_car_per_day, t22.avg_car_flights_per_day 
        from
          (select t1.*, t2.dest, t2.avg_flights_per_day from
            airport_thruput1 t1 
            inner join airport_thruput2 t2
            on t1.origin == t2.origin ) t21
          inner join
          (select t3.*, t4.dest, t4.avg_car_flights_per_day from
            airport_thruput3 t3
            inner join airport_thruput4 t4
            on t3.origin == t4.origin and t3.op_carrier == t4.op_carrier) t22
          on t21.origin == t22.origin 
          and t21.dest == t22.dest       
"""
thruputfeatures = spark.sql(mysql)
display(thruputfeatures)

origin,avg_tp_per_day,dest,avg_flights_per_day,op_carrier,avg_car_per_day,avg_car_flights_per_day
PHL,327,MCO,16.01,WN,23,3.24
SJC,173,LIH,1.0,AS,21,1.0
MCI,151,IAH,4.35,UA,7,1.35
FSD,20,ATL,1.0,9E,1,1.0
IAD,190,ILM,1.5,EV,10,1.5
SJC,173,ONT,4.44,WN,99,4.44
TPA,210,CVG,1.66,F9,11,1.0
CAE,21,ATL,7.86,9E,4,3.49
JFK,349,SRQ,1.07,B6,114,1.0
ORD,931,FWA,6.08,OO,176,4.35


In [0]:
if dataset=="train":
  thruputfeatures.write.mode("overwrite").parquet(project_path+f"airport_thruput")
else:
  thruputfeatures = spark.read.option("header", "true").parquet(project_path+f"airport_thruput/*.parquet")

## 2.2 Tail Number

In [0]:
mysql = f"""
  select t1.origin, t1.tail_num, round(avg(t1.count_per_day),2) as avg_trips_per_day 
  from
    (select origin, tail_num, fl_date, count(*) as count_per_day  
    from airlines
    where tail_num is not null
    group by origin, tail_num, fl_date) t1
  group by origin, tail_num
  order by avg_trips_per_day desc, tail_num, origin
"""
data = spark.sql(mysql)
display(data)

origin,tail_num,avg_trips_per_day
HNL,N488HA,4.8
HNL,N485HA,4.79
HNL,N480HA,4.78
HNL,N477HA,4.68
HNL,N489HA,4.68
HNL,N494HA,4.68
HNL,N481HA,4.6
HNL,N495HA,4.6
HNL,N491HA,4.59
HNL,N487HA,4.57


In [0]:
if dataset=="train":
  data.write.mode("overwrite").parquet(project_path+f"tailnum_trips")
else:
  data = spark.read.option("header", "true").parquet(project_path+f"tailnum_trips/*.parquet")
data.createOrReplaceTempView("trips")


In [0]:
airlines = airlines.where('CANCELLED != 1 AND DIVERTED != 1')
print(airlines.count())
airlines.createOrReplaceTempView("airlines")

In [0]:
display(airlines.limit(10))

YEAR,QUARTER,MONTH,DAY_OF_MONTH,DAY_OF_WEEK,FL_DATE,OP_UNIQUE_CARRIER,OP_CARRIER_AIRLINE_ID,OP_CARRIER,TAIL_NUM,OP_CARRIER_FL_NUM,ORIGIN_AIRPORT_ID,ORIGIN_AIRPORT_SEQ_ID,ORIGIN_CITY_MARKET_ID,ORIGIN,ORIGIN_CITY_NAME,ORIGIN_STATE_ABR,ORIGIN_STATE_FIPS,ORIGIN_STATE_NM,ORIGIN_WAC,DEST_AIRPORT_ID,DEST_AIRPORT_SEQ_ID,DEST_CITY_MARKET_ID,DEST,DEST_CITY_NAME,DEST_STATE_ABR,DEST_STATE_FIPS,DEST_STATE_NM,DEST_WAC,CRS_DEP_TIME,DEP_TIME,DEP_DELAY,DEP_DELAY_NEW,DEP_DEL15,DEP_DELAY_GROUP,DEP_TIME_BLK,TAXI_OUT,WHEELS_OFF,WHEELS_ON,TAXI_IN,CRS_ARR_TIME,ARR_TIME,ARR_DELAY,ARR_DELAY_NEW,ARR_DEL15,ARR_DELAY_GROUP,ARR_TIME_BLK,CANCELLED,CANCELLATION_CODE,DIVERTED,CRS_ELAPSED_TIME,ACTUAL_ELAPSED_TIME,AIR_TIME,FLIGHTS,DISTANCE,DISTANCE_GROUP,CARRIER_DELAY,WEATHER_DELAY,NAS_DELAY,SECURITY_DELAY,LATE_AIRCRAFT_DELAY,FIRST_DEP_TIME,TOTAL_ADD_GTIME,LONGEST_ADD_GTIME,DIV_AIRPORT_LANDINGS,DIV_REACHED_DEST,DIV_ACTUAL_ELAPSED_TIME,DIV_ARR_DELAY,DIV_DISTANCE,DIV1_AIRPORT,DIV1_AIRPORT_ID,DIV1_AIRPORT_SEQ_ID,DIV1_WHEELS_ON,DIV1_TOTAL_GTIME,DIV1_LONGEST_GTIME,DIV1_WHEELS_OFF,DIV1_TAIL_NUM,DIV2_AIRPORT,DIV2_AIRPORT_ID,DIV2_AIRPORT_SEQ_ID,DIV2_WHEELS_ON,DIV2_TOTAL_GTIME,DIV2_LONGEST_GTIME,DIV2_WHEELS_OFF,DIV2_TAIL_NUM,DIV3_AIRPORT,DIV3_AIRPORT_ID,DIV3_AIRPORT_SEQ_ID,DIV3_WHEELS_ON,DIV3_TOTAL_GTIME,DIV3_LONGEST_GTIME,DIV3_WHEELS_OFF,DIV3_TAIL_NUM,DIV4_AIRPORT,DIV4_AIRPORT_ID,DIV4_AIRPORT_SEQ_ID,DIV4_WHEELS_ON,DIV4_TOTAL_GTIME,DIV4_LONGEST_GTIME,DIV4_WHEELS_OFF,DIV4_TAIL_NUM,DIV5_AIRPORT,DIV5_AIRPORT_ID,DIV5_AIRPORT_SEQ_ID,DIV5_WHEELS_ON,DIV5_TOTAL_GTIME,DIV5_LONGEST_GTIME,DIV5_WHEELS_OFF,DIV5_TAIL_NUM
2019,2,6,1,6,2019-06-01,B6,20409,B6,N929JB,620,14679,1467903,33570,SAN,"San Diego, CA",CA,6,California,91,10721,1072102,30721,BOS,"Boston, MA",MA,25,Massachusetts,13,1449,1501,12.0,12.0,0.0,0,1400-1459,18.0,1519,2315,5.0,2324,2320,-4.0,0.0,0.0,-1,2300-2359,0.0,,0.0,335.0,319.0,296.0,1.0,2588.0,11,,,,,,,,,0,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
2019,2,6,1,6,2019-06-01,B6,20409,B6,N981JT,623,12478,1247805,31703,JFK,"New York, NY",NY,36,New York,22,12892,1289208,32575,LAX,"Los Angeles, CA",CA,6,California,91,1445,1439,-6.0,0.0,0.0,-1,1400-1459,14.0,1453,1736,6.0,1800,1742,-18.0,0.0,0.0,-2,1800-1859,0.0,,0.0,375.0,363.0,343.0,1.0,2475.0,10,,,,,,,,,0,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
2019,2,6,1,6,2019-06-01,B6,20409,B6,N969JT,624,12892,1289208,32575,LAX,"Los Angeles, CA",CA,6,California,91,12478,1247805,31703,JFK,"New York, NY",NY,36,New York,22,1450,1444,-6.0,0.0,0.0,-1,1400-1459,12.0,1456,2256,10.0,2322,2306,-16.0,0.0,0.0,-2,2300-2359,0.0,,0.0,332.0,322.0,300.0,1.0,2475.0,10,,,,,,,,,0,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
2019,2,6,1,6,2019-06-01,B6,20409,B6,N339JB,626,10821,1082106,30852,BWI,"Baltimore, MD",MD,24,Maryland,35,10721,1072102,30721,BOS,"Boston, MA",MA,25,Massachusetts,13,1105,1055,-10.0,0.0,0.0,-1,1100-1159,16.0,1111,1210,4.0,1228,1214,-14.0,0.0,0.0,-1,1200-1259,0.0,,0.0,83.0,79.0,59.0,1.0,369.0,2,,,,,,,,,0,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
2019,2,6,1,6,2019-06-01,B6,20409,B6,N997JL,629,13204,1320402,31454,MCO,"Orlando, FL",FL,12,Florida,33,10732,1073203,30732,BQN,"Aguadilla, PR",PR,72,Puerto Rico,3,2316,26,70.0,70.0,1.0,4,2300-2359,13.0,39,246,4.0,157,250,53.0,53.0,1.0,3,0001-0559,0.0,,0.0,161.0,144.0,127.0,1.0,1129.0,5,14.0,0.0,0.0,0.0,39.0,,,,0,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
2019,2,6,25,2,2019-06-25,DL,19790,DL,N986AT,1584,14576,1457606,34576,ROC,"Rochester, NY",NY,36,New York,22,10397,1039707,30397,ATL,"Atlanta, GA",GA,13,Georgia,34,1804,1759,-5.0,0.0,0.0,-1,1800-1859,8.0,1807,1949,5.0,2019,1954,-25.0,0.0,0.0,-2,2000-2059,0.0,,0.0,135.0,115.0,102.0,1.0,749.0,3,,,,,,,,,0,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
2019,2,6,25,2,2019-06-25,DL,19790,DL,N327DN,1585,12953,1295304,31703,LGA,"New York, NY",NY,36,New York,22,13204,1320402,31454,MCO,"Orlando, FL",FL,12,Florida,33,1830,1928,58.0,58.0,1.0,3,1800-1859,24.0,1952,2204,9.0,2121,2213,52.0,52.0,1.0,3,2100-2159,0.0,,0.0,171.0,165.0,132.0,1.0,950.0,4,0.0,0.0,6.0,0.0,46.0,,,,0,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
2019,2,6,25,2,2019-06-25,DL,19790,DL,N315DN,1586,10397,1039707,30397,ATL,"Atlanta, GA",GA,13,Georgia,34,12953,1295304,31703,LGA,"New York, NY",NY,36,New York,22,1330,1341,11.0,11.0,0.0,0,1300-1359,24.0,1405,1547,12.0,1547,1559,12.0,12.0,0.0,0,1500-1559,0.0,,0.0,137.0,138.0,102.0,1.0,762.0,4,,,,,,,,,0,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
2019,2,6,25,2,2019-06-25,DL,19790,DL,N920DU,1587,12892,1289208,32575,LAX,"Los Angeles, CA",CA,6,California,91,14747,1474703,30559,SEA,"Seattle, WA",WA,53,Washington,93,600,553,-7.0,0.0,0.0,-1,0600-0659,12.0,605,814,8.0,847,822,-25.0,0.0,0.0,-2,0800-0859,0.0,,0.0,167.0,149.0,129.0,1.0,954.0,4,,,,,,,,,0,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
2019,2,6,25,2,2019-06-25,DL,19790,DL,N352NB,1588,12953,1295304,31703,LGA,"New York, NY",NY,36,New York,22,14635,1463502,31714,RSW,"Fort Myers, FL",FL,12,Florida,33,806,818,12.0,12.0,0.0,0,0800-0859,44.0,902,1132,2.0,1118,1134,16.0,16.0,1.0,1,1100-1159,0.0,,0.0,192.0,196.0,150.0,1.0,1080.0,5,0.0,12.0,4.0,0.0,0.0,,,,0,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,


##2.3. Create an Airports Graph Network

In [0]:
from graphframes import GraphFrame
from pyspark.sql import SparkSession

if compute_graph:
  nodes = spark.sql("""
    select distinct
        ORIGIN as id,
        ORIGIN_CITY_NAME as name
    from 
        airlines
    union
    select distinct
        DEST as id,
        DEST_CITY_NAME as name
    from 
        airlines
  """)

In [0]:
if compute_graph:
  edges = spark.sql("""
    select distinct
        ORIGIN as src,
        DEST as dst
    from 
        airlines
  """)

In [0]:
if compute_graph:
  nodes.count()

In [0]:
if compute_graph:
  edges.count()

In [0]:
if compute_graph:
  airlines_graph = GraphFrame(nodes, edges)

In [0]:
from pyspark.sql.functions import desc

if compute_graph:
  in_deg = airlines_graph.inDegrees
  display(in_deg.orderBy(desc("inDegree")).limit(10))

In [0]:
if compute_graph:
  display(airlines_graph.outDegrees.orderBy(desc("outDegree")).limit(10))

In [0]:
if compute_graph:
  # Run PageRank algorithm, and show results.
  results = airlines_graph.pageRank(tol=0.01)
  results.vertices.select("id", "pagerank").show()

In [0]:
if compute_graph:
  display(results.vertices.orderBy(desc("pagerank")).limit(10))

In [0]:
if compute_graph:
  display(results.edges.orderBy(desc("weight")).limit(10))

In [0]:
if dataset == "train" and compute_graph:
  # Save vertices and edges as Parquet to some location.
  results.vertices.write.mode("overwrite").parquet(project_path+f"airport_vertices")
  results.edges.write.mode("overwrite").parquet(project_path+f"airport_edges")


In [0]:
ap_vertices = spark.read.option("header", "true").parquet(project_path+f"airport_vertices/*.parquet")
ap_vertices.createOrReplaceTempView("ap_vertices")

In [0]:
display(ap_vertices.orderBy(desc("pagerank")).limit(10))

id,name,pagerank
ORD,"Chicago, IL",10.191810455848527
DFW,"Dallas/Fort Worth, TX",10.026750186176836
DEN,"Denver, CO",9.487754969011524
ATL,"Atlanta, GA",9.031150214803477
MSP,"Minneapolis, MN",7.157564553451184
CLT,"Charlotte, NC",7.042400521922968
IAH,"Houston, TX",6.074249511495245
DTW,"Detroit, MI",5.796729329887283
LAX,"Los Angeles, CA",5.630069510450403
LAS,"Las Vegas, NV",5.478207812481511


In [0]:
#ap_edges = results.edges
ap_edges = spark.read.option("header", "true").parquet(project_path+"airport_edges/*.parquet")
ap_edges.createOrReplaceTempView("ap_edges")

In [0]:
display(ap_edges.orderBy(desc("weight")).limit(10))

src,dst,weight
CYS,DFW,1.0
LBF,DEN,1.0
BTM,SLC,1.0
SJT,DFW,1.0
YUM,PHX,1.0
ADK,ANC,1.0
BJI,MSP,1.0
GTR,ATL,1.0
EKO,SLC,1.0
GST,JNU,1.0


#3. Load time zone data

In [0]:
timezone = spark.read.format("csv").option("header", True).load(project_path + "airport-timezones.csv")
timezone.printSchema()

In [0]:
display(timezone)

iata_code,iana_tz,windows_tz
AAA,Pacific/Tahiti,Hawaiian Standard Time
AAB,Australia/Brisbane,E. Australia Standard Time
AAC,Africa/Cairo,Egypt Standard Time
AAD,Africa/Mogadishu,E. Africa Standard Time
AAE,Africa/Algiers,W. Central Africa Standard Time
AAF,America/New_York,Eastern Standard Time
AAG,America/Sao_Paulo,E. South America Standard Time
AAH,Europe/Berlin,W. Europe Standard Time
AAI,America/Araguaina,Tocantins Standard Time
AAJ,America/Paramaribo,SA Eastern Standard Time


In [0]:
print(timezone.count())
print(timezone.select("iata_code").distinct().count())
timezone.createOrReplaceTempView("timezone")

#4. Flight data joins

##4.1. Join with TimeZone to create UTC time format for departure/arrival time

In [0]:
airlines2 = spark.sql("""
    select
        ORIGIN as origin,
        DEST as destination,
        year,
        month,
        day_of_month,
        day_of_week,
        to_utc_timestamp(concat(FL_DATE, ' ', int(CRS_DEP_TIME/100), ':', CRS_DEP_TIME%100, ':00'), iana_tz) as crs_dep_time_utc,
        to_utc_timestamp(concat(FL_DATE, ' ', int(CRS_ARR_TIME/100), ':', CRS_ARR_TIME%100, ':00'), iana_tz) as naive_crs_arr_time_utc,
        tail_num,
        op_carrier,
        distance,
        cast(element_at(split(dep_time_blk, '-'), 1) / 100 as int) as dep_blk,
        dep_del15,
        nas_delay,
        carrier_delay,
        weather_delay,
        security_delay,
        late_aircraft_delay,
        arr_delay
    from 
        airlines,timezone
    where 
        ORIGIN == iata_code    
""")
airlines2.createOrReplaceTempView("airlines2")

In [0]:
# adjust naive arr time, filter out security delays

airlines2a1 = spark.sql("""
    select
      airlines2.*,
      case when naive_crs_arr_time_utc < crs_dep_time_utc then naive_crs_arr_time_utc + interval 1 day else naive_crs_arr_time_utc end as crs_arr_time_utc
    from 
      airlines2 
    where 
      dep_del15 == 0 
      or security_delay < 15
""")
airlines2a1.createOrReplaceTempView("airlines2a1")

In [0]:
airlines2a2 = spark.sql("""
    select
        airlines2a1.*,
        date_trunc('hour', crs_arr_time_utc) as arr_hour_utc,
        date_trunc('hour', crs_dep_time_utc) as dep_hour_utc
    from 
      airlines2a1
""")

In [0]:
airlines2 = airlines2a2
airlines2.createOrReplaceTempView("airlines2")

In [0]:
#display(airlines2.filter("tail_num == 'N001AA'").orderBy("tail_num","arr_hour_utc").limit(20))

### 4.1.5 Propagate delayed arrivals, join with main

In [0]:
# find equipment delayed significantly on arrival
print("arrival delay threshold:", ARR_DELAY_THRESHOLD)

delayed = spark.sql(f"""
  select
    tail_num, 
    arr_hour_utc as hour
  from
    airlines2
  where
    int(arr_delay) > {ARR_DELAY_THRESHOLD}
""")
delayed.createOrReplaceTempView("delayed")
#display(delayed.orderBy("tail_num", "hour").limit(10))
#print(delayed.count())

In [0]:
# Make a table where that delay information is forward-propagated 8 hours

ripples = []
for i in range(1,5):
  ripple = spark.sql(f"""
    select
      tail_num, 
      hour + interval {i} hour as hour
    from
      delayed
  """)
  ripples.append(ripple)
  
# a loop of unions with shifted selves was the best way I was able to find
delayed2 = ripples[0]
for i in range(1,len(ripples)):
    print(i)
    delayed2 = delayed2.union(ripples[i])
delayed2.createOrReplaceTempView("delayed2")
#display(delayed2.orderBy("tail_num", "hour").limit(20))
    

In [0]:
# Compute intersection of flights and equipment delayed on arrival in the last 8 hours

delayed3 = spark.sql("""
  select
    delayed2.*
  from
    delayed2,
    airlines2
  where
    delayed2.tail_num == airlines2.tail_num
    and delayed2.hour == airlines2.arr_hour_utc
""")
delayed3.createOrReplaceTempView("delayed3")
#display(delayed2.orderBy("tail_num", "hour").limit(10))
#print(delayed2.count())

In [0]:
# Establish a new indicator field in the main table

airlines2a = spark.sql("""
  select
    airlines2.*,
    delayed3.hour,
    delayed3.tail_num as del_tail_num
  from
    airlines2 left outer join delayed3
  on
    airlines2.tail_num == delayed3.tail_num 
    and (airlines2.dep_hour_utc - interval 2 hours) == delayed3.hour
""")



In [0]:
airlines2a.printSchema()

In [0]:
#display(airlines2a.orderBy("tail_num", "hour").limit(1000))

In [0]:
airlines2b = airlines2a.withColumn("equipment_delayed", ~airlines2a.del_tail_num.isNull())
airlines2b.createOrReplaceTempView("airlines2b")
airlines2b.printSchema()
#display(airlines2b.limit(10))

In [0]:
#print(airlines2b.filter("equipment_delayed").count())

##4.2. Join with Pagerank and Edge Weights

In [0]:
# Get Destination Page Rank
airlines3 = spark.sql("""
    select
        airlines2b.*,
        ap_vertices.pagerank as dest_pagerank
    from 
        airlines2b inner join ap_vertices
    where 
        airlines2b.destination == ap_vertices.id 
    
""")
airlines3.createOrReplaceTempView("airlines3")

In [0]:
airlines4 = spark.sql("""
    select
        airlines3.*,
        ap_vertices.pagerank as src_pagerank
    from 
        airlines3 inner join ap_vertices
    where 
        airlines3.origin == ap_vertices.id 
    
""")
airlines4.createOrReplaceTempView("airlines4")

In [0]:
airlines5 = spark.sql("""
    select
        airlines4.*,
        ap_edges.weight as weight
    from 
        airlines4 inner join ap_edges
    where 
        airlines4.origin == ap_edges.src and
        airlines4.destination == ap_edges.dst
    
""")
airlines5.createOrReplaceTempView("airlines5")

In [0]:
#display(airlines5.orderBy(desc("src_pagerank")).limit(10))

In [0]:
#airlines5.count()

##4.3. Join With Trip Features

In [0]:
mysql = f"""
  select t1.*, avg_trips_per_day
  from
    airlines5 t1
    left outer join trips t2
    on
    t1.origin == t2.origin and t1.tail_num == t2.tail_num
"""
airlines6 = spark.sql(mysql)
airlines6.createOrReplaceTempView("airlines6")
#display(airlines6)

In [0]:
#airlines6.count()

##4.4 Join with Throughput Features

In [0]:
thruputfeatures.createOrReplaceTempView("thruputfeatures")
mysql = f"""
  select t1.*, avg_tp_per_day, avg_flights_per_day, avg_car_per_day, avg_car_flights_per_day
  from
    airlines6 t1
    inner join thruputfeatures t2
    on
    t1.origin == t2.origin and 
    t1.destination == t2.dest and
    t1.op_carrier == t2.op_carrier
 """
airlines7 = spark.sql(mysql)
airlines7.createOrReplaceTempView("airlines7")
#display(airlines7)

In [0]:
#airlines7.count()

#5. Save the Airlines Features

In [0]:
airlines7.write.mode("overwrite").format("parquet").save(project_path+f"{dataset}_flights.parquet")
print("Saved",project_path+f"{dataset}_flights.parquet")

##5.1. Verify Saved Data

In [0]:
flights = spark.read.option("header", "true").parquet(project_path+f"{dataset}_flights.parquet/*.parquet")
flights.createOrReplaceTempView("flights")
#display(flights)

In [0]:
flights.count()

In [0]:
display(dbutils.fs.ls(project_path))

path,name,size
dbfs:/user/chitra.agastya@ischool.berkeley.edu/FinalProject/Chitra_feature_data/,Chitra_feature_data/,0
dbfs:/user/chitra.agastya@ischool.berkeley.edu/FinalProject/airline_singleday.parquet/,airline_singleday.parquet/,0
dbfs:/user/chitra.agastya@ischool.berkeley.edu/FinalProject/airlines_3m_features_ext.parquet/,airlines_3m_features_ext.parquet/,0
dbfs:/user/chitra.agastya@ischool.berkeley.edu/FinalProject/airlines_3m_full_features.parquet/,airlines_3m_full_features.parquet/,0
dbfs:/user/chitra.agastya@ischool.berkeley.edu/FinalProject/airlines_weather_data/,airlines_weather_data/,0
dbfs:/user/chitra.agastya@ischool.berkeley.edu/FinalProject/airport-timezones.csv,airport-timezones.csv,439779
dbfs:/user/chitra.agastya@ischool.berkeley.edu/FinalProject/airport_edges/,airport_edges/,0
dbfs:/user/chitra.agastya@ischool.berkeley.edu/FinalProject/airport_edges_1_year/,airport_edges_1_year/,0
dbfs:/user/chitra.agastya@ischool.berkeley.edu/FinalProject/airport_edges_3_month/,airport_edges_3_month/,0
dbfs:/user/chitra.agastya@ischool.berkeley.edu/FinalProject/airport_edges_4_year/,airport_edges_4_year/,0


In [0]:
print("done with", dataset)