# Prepare flight delay data

To start, let's import the Python libraries and modules we will use in this notebook.

In [3]:
import pprint, datetime
from pyspark.sql.types import *
from pyspark.sql.functions import unix_timestamp
import math
from pyspark.sql import functions as F

First, let's execute the below command to make sure all three tables were created.
You should see an output like the following:

| database | tableName | isTemporary |
| --- | --- | --- |
| default | airport_code_loca... | false |
| default | flight_delays_wit... | false |
| default | flight_weather_wi... | false |

In [5]:
spark.sql("show tables").show()

Now execute a SQL query using the `%sql` magic to select all columns from flight_delays_with_airport_codes. By default, only the first 1,000 rows will be returned.

In [7]:
%sql
select * from flight_delays_with_airport_codes

Year,Month,DayofMonth,DayOfWeek,Carrier,CRSDepTime,DepDelay,DepDel15,CRSArrTime,ArrDelay,ArrDel15,Cancelled,OriginAirportCode,OriginAirportName,OriginLatitude,OriginLongitude,DestAirportCode,DestAirportName,DestLatitude,DestLongitude
2013,4,19,5,DL,837,-3.0,0.0,1138,1.0,0,0,DTW,Detroit Metro Wayne County,42.2125,-83.35333333,MIA,Miami International,25.79527778,-80.29
2013,4,19,5,DL,1705,0.0,0.0,2336,-8.0,0,0,SLC,Salt Lake City International,40.78833333,-111.9777778,JFK,John F. Kennedy International,40.64,-73.77861111
2013,4,19,5,DL,600,-4.0,0.0,851,-15.0,0,0,PDX,Portland International,45.58861111,-122.5969444,SLC,Salt Lake City International,40.78833333,-111.9777778
2013,4,19,5,DL,1630,28.0,1.0,1903,24.0,1,0,STL,Lambert-St. Louis International,38.74861111,-90.37,DTW,Detroit Metro Wayne County,42.2125,-83.35333333
2013,4,19,5,DL,1615,-6.0,0.0,1805,-11.0,0,0,CVG,Cincinnati/Northern Kentucky International,39.04888889,-84.66777778,LAX,Los Angeles International,33.9425,-118.4080556
2013,4,19,5,DL,1726,-1.0,0.0,1818,-19.0,0,0,ATL,Hartsfield-Jackson Atlanta International,33.63666667,-84.42777778,STL,Lambert-St. Louis International,38.74861111,-90.37
2013,4,19,5,DL,1900,0.0,0.0,2133,-1.0,0,0,STL,Lambert-St. Louis International,38.74861111,-90.37,ATL,Hartsfield-Jackson Atlanta International,33.63666667,-84.42777778
2013,4,19,5,DL,2145,15.0,1.0,2356,24.0,1,0,ATL,Hartsfield-Jackson Atlanta International,33.63666667,-84.42777778,SLC,Salt Lake City International,40.78833333,-111.9777778
2013,4,19,5,DL,2157,33.0,1.0,2333,34.0,1,0,ATL,Hartsfield-Jackson Atlanta International,33.63666667,-84.42777778,AUS,Austin - Bergstrom International,30.19444444,-97.67
2013,4,19,5,DL,1900,323.0,1.0,2055,322.0,1,0,DCA,Ronald Reagan Washington National,38.85138889,-77.03777778,ATL,Hartsfield-Jackson Atlanta International,33.63666667,-84.42777778


Now let's see how many rows there are in the dataset.

In [9]:
%sql
select count(*) from flight_delays_with_airport_codes

count(1)
2719418


Based on the `count` result, you can see that the dataset has a total of 2,719,418 rows (also referred to as examples in Machine Learning literature). Looking at the table output from the previous query, you can see that the dataset contains 20 columns (also referred to as features).

Because all 20 columns are displayed, you can scroll the grid horizontally. Scroll until you see the **DepDel15** column. This column displays a 1 when the flight was delayed at least 15 minutes and 0 if there was no such delay. In the model you will construct, you will try to predict the value of this column for future data.

Let's execut another query that shows us how many rows do not have a value in the DepDel15 column.

In [12]:
%sql
select count(*) from flight_delays_with_airport_codes where DepDel15 is null

count(1)
27444


Notice that the `count` result is 27444. This means that 27,444 rows do not have a value in this column. Since this value is very important to our model, we will need to eliminate any rows that do not have a value for this column.

Next, scroll over to the **CRSDepTime** column within the table view above. Our model will approximate departure times to the nearest hour, but departure time is captured as an integer. For example, 8:37 am is captured as 837. Therefore, we will need to process the CRSDepTime column, and round it down to the nearest hour. To perform this rounding will require two steps, first you will need to divide the value by 100 (so that 837 becomes 8.37). Second, you will round this value down to the nearest hour (so that 8.37 becomes 8).

Finally, we do not need all 20 columns present in the flight_delays_with_airport_codes dataset, so we will pare down the columns, or features, in the dataset to the 12 we do need.

Using `%sql` magic allows us view and visualize the data, but for working with the data in our tables, we want to take advantage of the rich optimizations provided by DataFrames. Let's execute the same query using Spark SQL, this time saving the query to a DataFrame.

In [17]:
dfFlightDelays = spark.sql("select * from flight_delays_with_airport_codes")

Let's print the schema for the DataFrame.

In [19]:
pprint.pprint(dfFlightDelays.dtypes)

Notice that the DepDel15 and CRSDepTime columns are both `string` data types. Both of these features need to be numeric, according to their descriptions above. We will cast these columns to their required data types next.

## Perform data munging

To perform our data munging, we have multiple options, but in this case, we’ve chosen to take advantage of some useful features of R to perform the following tasks:

*	Remove rows with missing values
*	Generate a new column, named “CRSDepHour,” which contains the rounded down value from CRSDepTime
*	Pare down columns to only those needed for our model

SparkR is an R package that provides a light-weight frontend to use Apache Spark from R. To use SparkR we will call `library(SparkR)` within a cell that uses the `%r` magic, which denotes the language to use for the cell. The SparkR session is already configured, and all SparkR functions will talk to your attached cluster using the existing session.

In [24]:
%r
library(SparkR)

# Select only the columns we need, casting CRSDepTime as long and DepDel15 as int, into a new DataFrame
dfflights <- sql("SELECT OriginAirportCode, OriginLatitude, OriginLongitude, Month, DayofMonth, cast(CRSDepTime as long) CRSDepTime, DayOfWeek, Carrier, DestAirportCode, DestLatitude, DestLongitude, cast(DepDel15 as int) DepDel15 from flight_delays_with_airport_codes")

# Delete rows containing missing values
dfflights <- na.omit(dfflights)

# str(dfflights)

# Round departure times down to the nearest hour, and export the result as a new column named "CRSDepHour"
dfflights$CRSDepHour <- floor(dfflights$CRSDepTime / 100)

# Trim the columns to only those we will use for the predictive model
dfflightsClean = dfflights[, c("OriginAirportCode","OriginLatitude", "OriginLongitude", "Month", "DayofMonth", "CRSDepHour", "DayOfWeek", "Carrier", "DestAirportCode", "DestLatitude", "DestLongitude", "DepDel15")]

createOrReplaceTempView(dfflightsClean, "flight_delays_view")


Now let's take a look at the resulting data. Take note of the **CRSDepHour** column that we created, as well as the number of columns we now have (12). Verify that the new CRSDepHour column contains the rounded hour values from our CRSDepTime column.

In [26]:
%sql
select * from flight_delays_view

OriginAirportCode,OriginLatitude,OriginLongitude,Month,DayofMonth,CRSDepHour,DayOfWeek,Carrier,DestAirportCode,DestLatitude,DestLongitude,DepDel15
DTW,42.2125,-83.35333333,4,19,8,5,DL,MIA,25.79527778,-80.29,0
SLC,40.78833333,-111.9777778,4,19,17,5,DL,JFK,40.64,-73.77861111,0
PDX,45.58861111,-122.5969444,4,19,6,5,DL,SLC,40.78833333,-111.9777778,0
STL,38.74861111,-90.37,4,19,16,5,DL,DTW,42.2125,-83.35333333,1
CVG,39.04888889,-84.66777778,4,19,16,5,DL,LAX,33.9425,-118.4080556,0
ATL,33.63666667,-84.42777778,4,19,17,5,DL,STL,38.74861111,-90.37,0
STL,38.74861111,-90.37,4,19,19,5,DL,ATL,33.63666667,-84.42777778,0
ATL,33.63666667,-84.42777778,4,19,21,5,DL,SLC,40.78833333,-111.9777778,1
ATL,33.63666667,-84.42777778,4,19,21,5,DL,AUS,30.19444444,-97.67,1
DCA,38.85138889,-77.03777778,4,19,19,5,DL,ATL,33.63666667,-84.42777778,1


Now verify that the rows with missing data for the **DepDel15** column have been removed.

In [28]:
%sql
select count(*) from flight_delays_view

count(1)
2691974


You should see a count of **2,691,974**. This is equal to the original 2,719,418 rows minus the 27,444 rows with missing data in the DepDel15 column.

Now save the contents of the temporary view into a new DataFrame.

In [30]:
dfFlightDelays_Clean = spark.sql("select * from flight_delays_view")

## Export the prepared data to persistent a global table

There are two types of tables in Databricks. 

* Global tables, which are accessible across all clusters
* Local tables, which are available only within one cluster

To create a global table, you use the `saveAsTable()` method. To create a local table, you would use either the `createOrReplaceTempView()` or `registerTempTable()` method.

The `flight_delays_view` table was created as a local table using `createOrReplaceTempView`, and is therefore temporary. Local tables are tied to the Spark/SparkSQL Context that was used to create their associated DataFrame. When you shut down the SparkSession that is associated with the cluster (such as shutting down the cluster) then local, temporary tables will disappear. If we want our cleansed data to remain permanently, we should create a global table. 

Run the following to copy the data from the source location into a global table named `flight_delays_clean`.

In [33]:
dfFlightDelays_Clean.write.mode("overwrite").saveAsTable("flight_delays_clean")

# Prepare the weather data

To begin, take a look at the `flight_weather_with_airport_code` data that was imported to get a sense of the data we will be working with.

In [36]:
%sql
select * from flight_weather_with_airport_code

Year,Month,Day,Time,TimeZone,SkyCondition,Visibility,WeatherType,DryBulbFarenheit,DryBulbCelsius,WetBulbFarenheit,WetBulbCelsius,DewPointFarenheit,DewPointCelsius,RelativeHumidity,WindSpeed,WindDirection,ValueForWindCharacter,StationPressure,PressureTendency,PressureChange,SeaLevelPressure,RecordType,HourlyPrecip,Altimeter,AirportCode,DISPLAY_AIRPORT_NAME,LATITUDE,LONGITUDE
2013,4,1,56,-4,FEW018 SCT044 BKN070,10.0,-RA,76,24.4,74,23.3,73,22.8,90,13,080,,30.06,,,30.06,AA,T,30.07,SJU,Luis Munoz Marin International,18.43944444,-66.00222222
2013,4,1,156,-4,FEW037 SCT070,10.0,,76,24.4,73,22.5,71,21.7,85,10,090,,30.05,6.0,17.0,30.05,AA,,30.06,SJU,Luis Munoz Marin International,18.43944444,-66.00222222
2013,4,1,256,-4,FEW037 SCT070,10.0,,76,24.4,73,22.5,71,21.7,85,9,100,,30.03,,,30.03,AA,,30.04,SJU,Luis Munoz Marin International,18.43944444,-66.00222222
2013,4,1,356,-4,FEW025 SCT070,10.0,,76,24.4,72,22.2,70,21.1,82,9,100,,30.02,,,30.03,AA,,30.03,SJU,Luis Munoz Marin International,18.43944444,-66.00222222
2013,4,1,456,-4,FEW025,10.0,,76,24.4,72,22.2,70,21.1,82,7,110,,30.03,5.0,4.0,30.04,AA,,30.04,SJU,Luis Munoz Marin International,18.43944444,-66.00222222
2013,4,1,556,-4,FEW025 SCT080,10.0,,76,24.4,71,21.8,69,20.6,79,7,100,,30.04,,,30.05,AA,,30.05,SJU,Luis Munoz Marin International,18.43944444,-66.00222222
2013,4,1,656,-4,FEW028 BKN080,10.0,,77,25.0,71,21.7,68,20.0,74,9,110,,30.07,,,30.07,AA,,30.08,SJU,Luis Munoz Marin International,18.43944444,-66.00222222
2013,4,1,756,-4,FEW028 BKN080,10.0,,79,26.1,72,22.4,69,20.6,72,13,100,,30.09,3.0,20.0,30.10,AA,,30.1,SJU,Luis Munoz Marin International,18.43944444,-66.00222222
2013,4,1,856,-4,FEW030 BKN080,10.0,,82,27.8,73,22.9,69,20.6,65,14,100,21.0,30.11,,,30.11,AA,,30.12,SJU,Luis Munoz Marin International,18.43944444,-66.00222222
2013,4,1,956,-4,SCT035 BKN090,10.0,,83,28.3,74,23.0,69,20.6,63,16,090,23.0,30.11,,,30.12,AA,,30.12,SJU,Luis Munoz Marin International,18.43944444,-66.00222222


Next, count the number of records so we know how many rows we are working with.

In [38]:
%sql
select count(*) from flight_weather_with_airport_code

count(1)
406516


Observe that this data set has 406,516 rows and 29 columns. For this model, we are going to focus on predicting delays using WindSpeed (in MPH), SeaLevelPressure (in inches of Hg), and HourlyPrecip (in inches). We will focus on preparing the data for those features.

Let's start out by taking a look at the **WindSpeed** column. You may scroll through the values in the table above, but reviewing just the distinct values will be faster.

In [41]:
%sql
select distinct WindSpeed from flight_weather_with_airport_code

WindSpeed
7
51
15
11
29
3
30
34
8
22


Try clicking on the **WindSpeed** column header to sort the list by ascending and then by descending order. Observe that the values are all numbers, with the exception of some having `null` values and a string value of `M` for Missing. We will need to ensure that we remove any missing values and convert WindSpeed to its proper type as a numeric feature.

Next, let's take a look at the **SeaLevelPressure** column in the same way, by listing its distinct values.

In [44]:
%sql
select distinct SeaLevelPressure from flight_weather_with_airport_code

SeaLevelPressure
29.68
29.45
30.43
29.58
30.59
30.13
30.66
29.39
30.17
29.61


Like you did before, click on the **SeaLevelPressure** column header to sort the values in ascending and then descending order. Observe that many of the features are of a numeric value (e.g., 29.96, 30.01, etc.), but some contain the string value of M for Missing. We will need to replace this value of "M" with a suitable numeric value so that we can convert this feature to be a numeric feature.

Finally, let's observe the **HourlyPrecip** feature by selecting its distinct values.

In [47]:
%sql
select distinct HourlyPrecip from flight_weather_with_airport_code

HourlyPrecip
0.55
0.07
0.75
1.30
0.59
1.53
2.49
0.32
1.38
0.03


Click on the column header to sort the list and ascending and then descending order. Observe that this column contains mostly numeric values, but also `null` values and values with `T` (for Trace amount of rain). We need to replace T with a suitable numeric value and convert this to a numeric feature.

## Clean up weather data

To preform our data cleanup, we will execute a Python script, in which we will perform the following tasks:

* WindSpeed: Replace missing values with 0.0, and “M” values with 0.005
* HourlyPrecip: Replace missing values with 0.0, and “T” values with 0.005
* SeaLevelPressure: Replace “M” values with 29.92 (the average pressure)
* Convert WindSpeed, HourlyPrecip, and SeaLevelPressure to numeric columns
* Round “Time” column down to the nearest hour, and add value to a new column named “Hour”
* Eliminate unneeded columns from the dataset

Let's begin by creating a new DataFrame from the table. While we're at it, we'll pare down the number of columns to just the ones we need (AirportCode, Month, Day, Time, WindSpeed, SeaLevelPressure, HourlyPrecip).

In [52]:
dfWeather = spark.sql("select AirportCode, cast(Month as int) Month, cast(Day as int) Day, cast(Time as int) Time, WindSpeed, SeaLevelPressure, HourlyPrecip from flight_weather_with_airport_code")

In [53]:
dfWeather.show()

First, let's define a new function named `roundDown` that will round any number passed to it down.

Round Time down to the next hour, since that is the hour for which we want to use flight data. Then, add the rounded Time to a new column named "Hour", and append that column to the dfWeather DataFrame.

In [56]:
pprint.pprint(dfWeather.dtypes)

In [57]:
df = dfWeather.withColumn('Hour', F.floor(dfWeather['Time']/100))

# Replace any missing HourlyPrecip and WindSpeed values with 0.0
df = df.fillna('0.0', subset=['HourlyPrecip', 'WindSpeed'])

# Replace any WindSpeed values of "M" with 0.005
df = df.replace('M', '0.005', 'WindSpeed')

# Replace any SeaLevelPressure values of "M" with 29.92 (the average pressure)
df = df.replace('M', '29.92', 'SeaLevelPressure')

# Replace any HourlyPrecip values of "T" (trace) with 0.005
df = df.replace('T', '0.005', 'HourlyPrecip')

# Be sure to convert WindSpeed, SeaLevelPressure, and HourlyPrecip columns to float
#dfWeather = dfWeather.withColumn('WindSpeed', int(dfWeather['WindSpeed']))
#dfWeather = dfWeather.withColumn('SeaLevelPressure', int(dfWeather['SeaLevelPressure']))
#dfWeather = dfWeather.withColumn('HourlyPrecip', int(dfWeather['HourlyPrecip']))

# Define a new DataFrame that includes just the columns being used by the model, including the new Hour feature
dfWeather_Clean = df.select('AirportCode', 'Month', 'Day', 'Hour', df['WindSpeed'].cast('float'), df['SeaLevelPressure'].cast('float'), df['HourlyPrecip'].cast('float'))


Now let's take a look at the new `dfWeather_Clean` DataFrame.

In [59]:
display(dfWeather_Clean)

AirportCode,Month,Day,Hour,WindSpeed,SeaLevelPressure,HourlyPrecip
SJU,4,1,0,13.0,30.06,0.005
SJU,4,1,1,10.0,30.05,0.0
SJU,4,1,2,9.0,30.03,0.0
SJU,4,1,3,9.0,30.03,0.0
SJU,4,1,4,7.0,30.04,0.0
SJU,4,1,5,7.0,30.05,0.0
SJU,4,1,6,9.0,30.07,0.0
SJU,4,1,7,13.0,30.1,0.0
SJU,4,1,8,14.0,30.11,0.0
SJU,4,1,9,16.0,30.12,0.0


Observe that the new DataFrame only has 7 columns. Also, the WindSpeed, SeaLevelPressure, and HourlyPrecip fields are all numeric and contain no missing values. To ensure they are indeed numeric, we can take a look at the DataFrame's schema.

In [61]:
pprint.pprint(dfWeather_Clean.dtypes)

Now let's persist the cleaned weather data to a persistent global table.

In [63]:
dfWeather_Clean.write.mode("overwrite").saveAsTable("flight_weather_clean")

In [64]:
dfWeather_Clean.select("*").count()

# Join the Flight and Weather datasets

With both datasets ready, we want to join them together so that we can associate historical flight delays with the weather data at departure time.

In [67]:
dfFlightDelaysWithWeather = spark.sql("SELECT d.OriginAirportCode, \
                 d.Month, d.DayofMonth, d.CRSDepHour, d.DayOfWeek, \
                 d.Carrier, d.DestAirportCode, d.DepDel15, w.WindSpeed, \
                 w.SeaLevelPressure, w.HourlyPrecip \
                 FROM flight_delays_clean d \
                 INNER JOIN flight_weather_clean w ON \
                 d.OriginAirportCode = w.AirportCode AND \
                 d.Month = w.Month AND \
                 d.DayofMonth = w.Day AND \
                 d.CRSDepHour = w.Hour")

Now let's take a look at the combined data.

In [69]:
display(dfFlightDelaysWithWeather)

OriginAirportCode,Month,DayofMonth,CRSDepHour,DayOfWeek,Carrier,DestAirportCode,DepDel15,WindSpeed,SeaLevelPressure,HourlyPrecip
SAT,5,1,13,3,MQ,ORD,0,6.0,29.8,0.0
SAT,5,2,13,4,MQ,ORD,0,22.0,30.18,0.0
SAT,5,3,13,5,MQ,ORD,1,14.0,30.18,0.0
SAT,5,4,13,6,MQ,ORD,0,6.0,29.93,0.0
SAT,5,5,13,7,MQ,ORD,0,9.0,30.03,0.0
SAT,5,6,13,1,MQ,ORD,0,7.0,30.01,0.0
SAT,5,7,13,2,MQ,ORD,0,10.0,29.97,0.0
SAT,5,8,13,3,MQ,ORD,1,14.0,29.91,0.0
SAT,5,9,13,4,MQ,ORD,0,7.0,29.85,0.0
SAT,5,10,13,5,MQ,ORD,1,11.0,29.87,0.0


Persist the combined dataset to a new persistent global table.

In [71]:
dfFlightDelaysWithWeather.write.mode("overwrite").saveAsTable("flight_delays_with_weather")