In [1]:
import pprint, datetime
from pyspark.sql.types import *
from pyspark.sql.functions import unix_timestamp
import math
from pyspark.sql import functions as F

In [2]:
%sql
--Drop the existing tables
drop table airport_code_location_lookup_clean;
drop table flight_delays_with_airport_codes;
drop table flight_weather_with_airport_code;

In [3]:
%scala
//Configure the ADLS (Gen2) storage account 
spark.conf.set("fs.azure.account.key.accountsamplestorage.dfs.core.windows.net", "")
dbutils.fs.ls("abfss://sample@accountsamplestorage.dfs.core.windows.net/")

In [4]:
%scala
// Read the CSV file & store as a table
val df = spark.read.option("header", "true").csv("abfss://sample@accountsamplestorage.dfs.core.windows.net/data/AirportCodeLocationLookupClean.csv");
df.write.saveAsTable("airport_code_location_lookup_clean");

In [5]:
%scala
// Read the CSV file & store as a table
val df1 = spark.read.option("header", "true").csv("abfss://sample@accountsamplestorage.dfs.core.windows.net/data/FlightDelaysWithAirportCodes.csv");
df1.write.saveAsTable("flight_delays_with_airport_codes");

In [6]:
%scala
// Read the CSV file & store as a table
val df2 = spark.read.option("header", "true").csv("abfss://sample@accountsamplestorage.dfs.core.windows.net/data/FlightWeatherWithAirportCode.csv");
df2.write.saveAsTable("flight_weather_with_airport_code");

In [7]:
%sql
select * from airport_code_location_lookup_clean;

AIRPORT_ID,AIRPORT,DISPLAY_AIRPORT_NAME,LATITUDE,LONGITUDE
10001,01A,Afognak Lake Airport,58.10944444,-152.9066667
10003,03A,Bear Creek Mining Strip,65.54805556,-161.0716667
10004,04A,Lik Mining Camp,68.08333333,-163.1666667
10005,05A,Little Squaw Airport,67.57,-148.1838889
10006,06A,Kizhuyak Bay,57.74527778,-152.8827778
10007,07A,Klawock Seaplane Base,55.55472222,-133.1016667
10008,08A,Elizabeth Island Airport,59.15694444,-151.8291667
10009,09A,Augustin Island,59.36277778,-153.4305556
10010,1B1,Columbia County,42.29138889,-73.71027778
10011,1G4,Grand Canyon West,35.98611111,-113.8169444


In [8]:
%sql
select * from flight_delays_with_airport_codes

Year,Month,DayofMonth,DayOfWeek,Carrier,CRSDepTime,DepDelay,DepDel15,CRSArrTime,ArrDelay,ArrDel15,Cancelled,OriginAirportCode,OriginAirportName,OriginLatitude,OriginLongitude,DestAirportCode,DestAirportName,DestLatitude,DestLongitude
2013,5,1,3,MQ,1350,-8.0,0.0,1625,-12.0,0,0,SAT,San Antonio International,29.53388889,-98.46916667,ORD,Chicago O'Hare International,41.97944444,-87.9075
2013,5,2,4,MQ,1350,-4.0,0.0,1625,-16.0,0,0,SAT,San Antonio International,29.53388889,-98.46916667,ORD,Chicago O'Hare International,41.97944444,-87.9075
2013,5,3,5,MQ,1350,214.0,1.0,1625,227.0,1,0,SAT,San Antonio International,29.53388889,-98.46916667,ORD,Chicago O'Hare International,41.97944444,-87.9075
2013,5,4,6,MQ,1350,0.0,0.0,1625,11.0,0,0,SAT,San Antonio International,29.53388889,-98.46916667,ORD,Chicago O'Hare International,41.97944444,-87.9075
2013,5,5,7,MQ,1350,-8.0,0.0,1625,-7.0,0,0,SAT,San Antonio International,29.53388889,-98.46916667,ORD,Chicago O'Hare International,41.97944444,-87.9075
2013,5,6,1,MQ,1350,-5.0,0.0,1625,-4.0,0,0,SAT,San Antonio International,29.53388889,-98.46916667,ORD,Chicago O'Hare International,41.97944444,-87.9075
2013,5,7,2,MQ,1350,-6.0,0.0,1625,-8.0,0,0,SAT,San Antonio International,29.53388889,-98.46916667,ORD,Chicago O'Hare International,41.97944444,-87.9075
2013,5,8,3,MQ,1350,27.0,1.0,1625,21.0,1,0,SAT,San Antonio International,29.53388889,-98.46916667,ORD,Chicago O'Hare International,41.97944444,-87.9075
2013,5,9,4,MQ,1350,6.0,0.0,1625,12.0,0,0,SAT,San Antonio International,29.53388889,-98.46916667,ORD,Chicago O'Hare International,41.97944444,-87.9075
2013,5,10,5,MQ,1350,90.0,1.0,1625,75.0,1,0,SAT,San Antonio International,29.53388889,-98.46916667,ORD,Chicago O'Hare International,41.97944444,-87.9075


In [9]:
%sql
select count(*) from flight_delays_with_airport_codes

count(1)
2719418


In [10]:
%sql
--Column 'DepDel15' displays a 1 when the flight was delayed at least 15 minutes and 0 if there was no such delay. In the model you will construct, you will try to predict the value of this column for future data.
select count(*) from flight_delays_with_airport_codes where DepDel15 is null

count(1)
27444


In [11]:
dfFlightDelays = spark.sql("select * from flight_delays_with_airport_codes")

In [12]:
pprint.pprint(dfFlightDelays.dtypes)

Perform Transformation on the data
*	Remove rows with missing values
*	Generate a new column, named “CRSDepHour,” which contains the rounded down value from CRSDepTime
*	Pare down columns to only those needed for our model

In [14]:
%r
library(SparkR)

# Select only the columns we need, casting CRSDepTime as long and DepDel15 as int, into a new DataFrame
dfflights <- sql("SELECT OriginAirportCode, OriginLatitude, OriginLongitude, Month, DayofMonth, cast(CRSDepTime as long) CRSDepTime, DayOfWeek, Carrier, DestAirportCode, DestLatitude, DestLongitude, cast(DepDel15 as int) DepDel15 from flight_delays_with_airport_codes")

# Delete rows containing missing values
dfflights <- na.omit(dfflights)

# Round departure times down to the nearest hour, and export the result as a new column named "CRSDepHour"
dfflights$CRSDepHour <- floor(dfflights$CRSDepTime / 100)

# Trim the columns to only those we will use for the predictive model
dfflightsClean = dfflights[, c("OriginAirportCode","OriginLatitude", "OriginLongitude", "Month", "DayofMonth", "CRSDepHour", "DayOfWeek", "Carrier", "DestAirportCode", "DestLatitude", "DestLongitude", "DepDel15")]

createOrReplaceTempView(dfflightsClean, "flight_delays_view")


In [15]:
%sql
select * from flight_delays_view

OriginAirportCode,OriginLatitude,OriginLongitude,Month,DayofMonth,CRSDepHour,DayOfWeek,Carrier,DestAirportCode,DestLatitude,DestLongitude,DepDel15
SAT,29.53388889,-98.46916667,5,1,13,3,MQ,ORD,41.97944444,-87.9075,0
SAT,29.53388889,-98.46916667,5,2,13,4,MQ,ORD,41.97944444,-87.9075,0
SAT,29.53388889,-98.46916667,5,3,13,5,MQ,ORD,41.97944444,-87.9075,1
SAT,29.53388889,-98.46916667,5,4,13,6,MQ,ORD,41.97944444,-87.9075,0
SAT,29.53388889,-98.46916667,5,5,13,7,MQ,ORD,41.97944444,-87.9075,0
SAT,29.53388889,-98.46916667,5,6,13,1,MQ,ORD,41.97944444,-87.9075,0
SAT,29.53388889,-98.46916667,5,7,13,2,MQ,ORD,41.97944444,-87.9075,0
SAT,29.53388889,-98.46916667,5,8,13,3,MQ,ORD,41.97944444,-87.9075,1
SAT,29.53388889,-98.46916667,5,9,13,4,MQ,ORD,41.97944444,-87.9075,0
SAT,29.53388889,-98.46916667,5,10,13,5,MQ,ORD,41.97944444,-87.9075,1


In [16]:
%sql
select count(*) from flight_delays_view

count(1)
2691974


In [17]:
dfFlightDelays_Clean = spark.sql("select * from flight_delays_view")

In [18]:
dfFlightDelays_Clean.write.mode("overwrite").saveAsTable("flight_delays_clean")

Transform the weather data

In [20]:
%sql
select * from flight_weather_with_airport_code

Year,Month,Day,Time,TimeZone,SkyCondition,Visibility,WeatherType,DryBulbFarenheit,DryBulbCelsius,WetBulbFarenheit,WetBulbCelsius,DewPointFarenheit,DewPointCelsius,RelativeHumidity,WindSpeed,WindDirection,ValueForWindCharacter,StationPressure,PressureTendency,PressureChange,SeaLevelPressure,RecordType,HourlyPrecip,Altimeter,AirportCode,DISPLAY_AIRPORT_NAME,LATITUDE,LONGITUDE
2013,6,20,153,-7,FEW040,10.0,,46,7.8,42,5.6,38,3.3,74,5,130,,27.06,,,30.04,AA,,30.04,BOI,Boise Air Terminal,43.56444444,-116.2227778
2013,6,20,253,-7,BKN039 OVC065,10.0,,48,8.9,44,6.7,40,4.4,74,5,120,,27.06,,,30.03,AA,,30.04,BOI,Boise Air Terminal,43.56444444,-116.2227778
2013,6,20,353,-7,SCT055,10.0,,47,8.3,43,6.2,39,3.9,74,7,140,,27.05,,,30.02,AA,,30.03,BOI,Boise Air Terminal,43.56444444,-116.2227778
2013,6,20,453,-7,SCT038,10.0,,47,8.3,43,6.2,39,3.9,74,5,150,,27.06,,,30.02,AA,,30.04,BOI,Boise Air Terminal,43.56444444,-116.2227778
2013,6,20,553,-7,BKN080,10.0,,47,8.3,43,5.9,38,3.3,71,5,190,,27.06,,,30.03,AA,,30.05,BOI,Boise Air Terminal,43.56444444,-116.2227778
2013,6,20,653,-7,BKN035 OVC050,10.0,,50,10.0,44,6.7,38,3.3,64,3,150,,27.06,,,30.03,AA,,30.05,BOI,Boise Air Terminal,43.56444444,-116.2227778
2013,6,20,753,-7,FEW037 OVC050,10.0,,53,11.7,46,7.6,38,3.3,57,0,000,,27.07,,,30.04,AA,,30.06,BOI,Boise Air Terminal,43.56444444,-116.2227778
2013,6,20,853,-7,OVC070,10.0,,55,12.8,47,8.1,38,3.3,53,5,270,,27.07,,,30.04,AA,,30.06,BOI,Boise Air Terminal,43.56444444,-116.2227778
2013,6,20,953,-7,FEW050 OVC070,10.0,,56,13.3,46,7.7,35,1.7,45,0,000,,27.07,,,30.04,AA,,30.06,BOI,Boise Air Terminal,43.56444444,-116.2227778
2013,6,20,1053,-7,BKN070,10.0,,60,15.6,48,8.9,36,2.2,41,0,000,,27.07,,,30.04,AA,,30.06,BOI,Boise Air Terminal,43.56444444,-116.2227778


In [21]:
%sql
select count(*) from flight_weather_with_airport_code

count(1)
406516


In [22]:
%sql
select distinct WindSpeed from flight_weather_with_airport_code

WindSpeed
7
51
15
11
29
3
30
34
8
22


In [23]:
%sql
select distinct SeaLevelPressure from flight_weather_with_airport_code

SeaLevelPressure
29.68
29.45
30.43
30.59
29.58
30.13
30.66
29.39
30.41
30.17


In [24]:
%sql
select distinct HourlyPrecip from flight_weather_with_airport_code

HourlyPrecip
0.55
0.07
0.75
1.30
0.59
2.49
1.53
0.32
1.38
0.03


Clean up weather data
* WindSpeed: Replace missing values with 0.0, and “M” values with 0.005
* HourlyPrecip: Replace missing values with 0.0, and “T” values with 0.005
* SeaLevelPressure: Replace “M” values with 29.92 (the average pressure)
* Convert WindSpeed, HourlyPrecip, and SeaLevelPressure to numeric columns
* Round “Time” column down to the nearest hour, and add value to a new column named “Hour”
* Eliminate unneeded columns from the dataset

In [26]:
dfWeather = spark.sql("select AirportCode, cast(Month as int) Month, cast(Day as int) Day, cast(Time as int) Time, WindSpeed, SeaLevelPressure, HourlyPrecip from flight_weather_with_airport_code")

In [27]:
dfWeather.show()

In [28]:
pprint.pprint(dfWeather.dtypes)

In [29]:
# Round Time down to the next hour, since that is the hour for which we want to use flight data. Then, add the rounded Time to a new column named "Hour", and append that column to the dfWeather DataFrame.
df = dfWeather.withColumn('Hour', F.floor(dfWeather['Time']/100))

# Replace any missing HourlyPrecip and WindSpeed values with 0.0
df = df.fillna('0.0', subset=['HourlyPrecip', 'WindSpeed'])

# Replace any WindSpeed values of "M" with 0.005
df = df.replace('M', '0.005', 'WindSpeed')

# Replace any SeaLevelPressure values of "M" with 29.92 (the average pressure)
df = df.replace('M', '29.92', 'SeaLevelPressure')

# Replace any HourlyPrecip values of "T" (trace) with 0.005
df = df.replace('T', '0.005', 'HourlyPrecip')

# Be sure to convert WindSpeed, SeaLevelPressure, and HourlyPrecip columns to float
# Define a new DataFrame that includes just the columns being used by the model, including the new Hour feature
dfWeather_Clean = df.select('AirportCode', 'Month', 'Day', 'Hour', df['WindSpeed'].cast('float'), df['SeaLevelPressure'].cast('float'), df['HourlyPrecip'].cast('float'))


Now let's take a look at the new `dfWeather_Clean` DataFrame.

In [31]:
display(dfWeather_Clean)

AirportCode,Month,Day,Hour,WindSpeed,SeaLevelPressure,HourlyPrecip
BOI,6,20,1,5.0,30.04,0.0
BOI,6,20,2,5.0,30.03,0.0
BOI,6,20,3,7.0,30.02,0.0
BOI,6,20,4,5.0,30.02,0.0
BOI,6,20,5,5.0,30.03,0.0
BOI,6,20,6,3.0,30.03,0.0
BOI,6,20,7,0.0,30.04,0.0
BOI,6,20,8,5.0,30.04,0.0
BOI,6,20,9,0.0,30.04,0.0
BOI,6,20,10,0.0,30.04,0.0


In [32]:
pprint.pprint(dfWeather_Clean.dtypes)

In [33]:
dfWeather_Clean.write.mode("overwrite").saveAsTable("flight_weather_clean")

In [34]:
dfWeather_Clean.select("*").count()

Join the Flight and Weather datasets

In [36]:
dfFlightDelaysWithWeather = spark.sql("SELECT d.OriginAirportCode, \
                 d.Month, d.DayofMonth, d.CRSDepHour, d.DayOfWeek, \
                 d.Carrier, d.DestAirportCode, d.DepDel15, w.WindSpeed, \
                 w.SeaLevelPressure, w.HourlyPrecip \
                 FROM flight_delays_clean d \
                 INNER JOIN flight_weather_clean w ON \
                 d.OriginAirportCode = w.AirportCode AND \
                 d.Month = w.Month AND \
                 d.DayofMonth = w.Day AND \
                 d.CRSDepHour = w.Hour")

In [37]:
display(dfFlightDelaysWithWeather)

OriginAirportCode,Month,DayofMonth,CRSDepHour,DayOfWeek,Carrier,DestAirportCode,DepDel15,WindSpeed,SeaLevelPressure,HourlyPrecip
SAT,5,1,13,3,MQ,ORD,0,6.0,29.8,0.0
SAT,5,2,13,4,MQ,ORD,0,22.0,30.18,0.0
SAT,5,3,13,5,MQ,ORD,1,14.0,30.18,0.0
SAT,5,4,13,6,MQ,ORD,0,6.0,29.93,0.0
SAT,5,5,13,7,MQ,ORD,0,9.0,30.03,0.0
SAT,5,6,13,1,MQ,ORD,0,7.0,30.01,0.0
SAT,5,7,13,2,MQ,ORD,0,10.0,29.97,0.0
SAT,5,8,13,3,MQ,ORD,1,14.0,29.91,0.0
SAT,5,9,13,4,MQ,ORD,0,7.0,29.85,0.0
SAT,5,10,13,5,MQ,ORD,1,11.0,29.87,0.0


In [38]:
dfFlightDelaysWithWeather.write.mode("overwrite").saveAsTable("flight_delays_with_weather")