# Lab 5 - Spark SQL

Please review Lab 4 before proceeding. For this lab we will analyze a very large dataset from the Department of Transportation. **For this lab, my advice is to use the server instead of databricks.** I only say this because I already have the data available for you. I have already converted the data to the parquet format discussed in class, but if you don't believe me that it has benefits, let's check out some stats:

Here is the original data as I downloaded it without any modification other than I unzipped it.

In [1]:
!du -sh /disk/airline-data

44G	/disk/airline-data


Let's take a look at the data after I processed it.

In [2]:
!du -sh /disk/airline-data-processed

3.6G	/disk/airline-data-processed


Well I don't know about you, but that seems amazing :) Here is the original compressed file size. It is important to realize while the .tar.gz file is "small" at 4.7 GB, we can't access it with Spark or any other program without uncompressing it. But we can do that with the parquet files!

In [3]:
!du -sh /disk/airline-data.2003-2018.tar.gz

4.7G	/disk/airline-data.2003-2018.tar.gz


If you are curious how I did this, please check out Setup_Lab5.ipynb. No need to run this or edit it or even look at it, but it's there.

I've noticed during interactions that some folks are skipping the line below. It is my fault for not explaining it. In Python when you import a file it is never reloaded even if the contents change on disk. If you run the cell below before an import, then it will reload automatically for you.

In [4]:
%load_ext autoreload
%autoreload 2

In [5]:
# make sure your run the cell above before running this
import Lab5_helper

In [6]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark") \
    .getOrCreate()

sc = spark.sparkContext

## Exploratory Data Analysis

In [7]:
# Our main data source
on_time_df = spark.read.parquet('file:///disk/airline-data-processed/airline-data.parquet')
on_time_df.show()

+-------+----------+---------+----------+-------------+---------+-------+-------+---------+---------------+------------------+------------------+------+--------------------+-----------+---------------+---------------+---------+-------------+----------------+----------------+----+------------------+---------+-------------+-------------+-------+----------+-------+--------+---------------+--------+--------------------+----------+-------+---------+--------+------+----------+-------+--------+---------------+--------+------------------+----------+---------+----------------+--------+--------------+-----------------+-------+-------+--------+-------------+------------+------------+--------+-------------+-----------------+----+-----+
|Quarter|DayofMonth|DayOfWeek|FlightDate|UniqueCarrier|AirlineID|Carrier|TailNum|FlightNum|OriginAirportID|OriginAirportSeqID|OriginCityMarketID|Origin|      OriginCityName|OriginState|OriginStateFips|OriginStateName|OriginWac|DestAirportID|DestAirportSeqID|DestCity

That is a bit brutal to look at... Consider examining like:

In [8]:
on_time_df.columns

['Quarter',
 'DayofMonth',
 'DayOfWeek',
 'FlightDate',
 'UniqueCarrier',
 'AirlineID',
 'Carrier',
 'TailNum',
 'FlightNum',
 'OriginAirportID',
 'OriginAirportSeqID',
 'OriginCityMarketID',
 'Origin',
 'OriginCityName',
 'OriginState',
 'OriginStateFips',
 'OriginStateName',
 'OriginWac',
 'DestAirportID',
 'DestAirportSeqID',
 'DestCityMarketID',
 'Dest',
 'DestCityName',
 'DestState',
 'DestStateFips',
 'DestStateName',
 'DestWac',
 'CRSDepTime',
 'DepTime',
 'DepDelay',
 'DepDelayMinutes',
 'DepDel15',
 'DepartureDelayGroups',
 'DepTimeBlk',
 'TaxiOut',
 'WheelsOff',
 'WheelsOn',
 'TaxiIn',
 'CRSArrTime',
 'ArrTime',
 'ArrDelay',
 'ArrDelayMinutes',
 'ArrDel15',
 'ArrivalDelayGroups',
 'ArrTimeBlk',
 'Cancelled',
 'CancellationCode',
 'Diverted',
 'CRSElapsedTime',
 'ActualElapsedTime',
 'AirTime',
 'Flights',
 'Distance',
 'DistanceGroup',
 'CarrierDelay',
 'WeatherDelay',
 'NASDelay',
 'SecurityDelay',
 'LateAircraftDelay',
 'Year',
 'Month']

In [9]:
# The first row
on_time_df.first()

Row(Quarter=3, DayofMonth=7, DayOfWeek=5, FlightDate=datetime.date(2018, 9, 7), UniqueCarrier='UA', AirlineID=19977, Carrier='UA', TailNum='N37502', FlightNum=2094, OriginAirportID=14679, OriginAirportSeqID=1467903, OriginCityMarketID=33570, Origin='SAN', OriginCityName='San Diego, CA', OriginState='CA', OriginStateFips=6, OriginStateName='California', OriginWac=91, DestAirportID=12266, DestAirportSeqID=1226603, DestCityMarketID=31453, Dest='IAH', DestCityName='Houston, TX', DestState='TX', DestStateFips=48, DestStateName='Texas', DestWac=74, CRSDepTime='0750', DepTime='0747', DepDelay=-3.0, DepDelayMinutes=0.0, DepDel15=0.0, DepartureDelayGroups=-1, DepTimeBlk='0700-0759', TaxiOut=18.0, WheelsOff='0805', WheelsOn='1248', TaxiIn=8.0, CRSArrTime='1256', ArrTime='1256', ArrDelay=0.0, ArrDelayMinutes=0.0, ArrDel15=0.0, ArrivalDelayGroups=0, ArrTimeBlk='1200-1259', Cancelled=0.0, CancellationCode=None, Diverted=0.0, CRSElapsedTime=186.0, ActualElapsedTime=189.0, AirTime=163.0, Flights=1.0,

What if you want to average AirTime?

In [10]:
from pyspark.sql.functions import avg, col

on_time_df.select('AirTime').agg(
    avg(col('AirTime'))
).show()

+----------------+
|    avg(AirTime)|
+----------------+
|106.840895516509|
+----------------+



So we need navigate a fine line where I don't throw the entire Spark SQL api at you, but there are some functions above that should be discussed. The first is select which you can use to get a subset of the columns. This is important for memory usage. Load only what you need :). The next few are agg which is short for aggregate. Then there is col which selects the column and then avg which of course is average. If you know sql, you can also rely on SQL to work the magic.

In [11]:
on_time_df.select('AirTime').createOrReplaceTempView("AirTimeView") # create a temporary view so we can query our data

sqlDF = spark.sql("SELECT avg(AirTime) FROM AirTimeView").show()

+----------------+
|    avg(AirTime)|
+----------------+
|106.840895516509|
+----------------+



I don't know about you, but since I already know SQL or at least some SQL, I'm very excited that I can use that. For this lab in general, please use what makes sense to you to accomplish the job.

What if I wanted average air time per month?

In [12]:
on_time_df.select('AirTime','Month').createOrReplaceTempView("AirTimeView") # create a temporary view so we can query our data

sqlDF = spark.sql("SELECT Month, avg(AirTime) FROM AirTimeView group by Month").show()

+-----+------------------+
|Month|      avg(AirTime)|
+-----+------------------+
|   12| 108.9605027894764|
|    1|106.82890250362068|
|    6|106.96005748333293|
|    3|108.13478826356425|
|    5|106.01523155420969|
|    9|105.11722965450537|
|    4|106.98955608533186|
|    8|106.53259036679218|
|    7|107.72642114700079|
|   10|105.03766685659184|
|   11|106.60017004471732|
|    2|107.08287579839748|
+-----+------------------+



In [13]:
on_time_df.select('AirTime','Month').groupBy(
    'Month'
).agg(
    avg(col('AirTime'))
).show()

+-----+------------------+
|Month|      avg(AirTime)|
+-----+------------------+
|   12| 108.9605027894764|
|    1|106.82890250362068|
|    6|106.96005748333293|
|    3|108.13478826356425|
|    5|106.01523155420969|
|    9|105.11722965450537|
|    4|106.98955608533186|
|    8|106.53259036679218|
|    7|107.72642114700079|
|   10|105.03766685659184|
|   11|106.60017004471732|
|    2|107.08287579839748|
+-----+------------------+



Pretty nice right? You can see why companies might really value engineers who can bring data processing skills with them.

Let's now read in some data that helps us map Carrier name to AirlineName.

In [14]:
airlines = spark.read.parquet('file:///disk/airline-data/DOT_airline_codes_table')

In [15]:
airlines.show()

+---------+--------------------+-------+
|AirlineID|         AirlineName|Carrier|
+---------+--------------------+-------+
|    19031|Mackey Internatio...|    MAC|
|    19032|Munz Northern Air...|     XY|
|    19033|Cochise Airlines ...|    COC|
|    19034|Golden Gate Airli...|    GSA|
|    19035|       Aeromech Inc.|    RZZ|
|    19036|Golden West Airli...|    GLW|
|    19037|Puerto Rico Intl ...|    PRN|
|    19038|    Air America Inc.|    STZ|
|    19039|Swift Aire Lines ...|    SWT|
|    19040|American Central ...|    TSF|
|    19041|     Valdez Airlines|    VEZ|
|    19042|Southeast Alaska ...|    WEB|
|    19043|Altair Airlines Inc.|    AAR|
|    19044| Chitina Air Service|    CHI|
|    19045|Marco Island Airw...|    MRC|
|    19046|Caribbean Air Ser...|    OHZ|
|    19047|   Sundance Airlines|    PRO|
|    19048|Seair Alaska Airl...|    SAI|
|    19049|Southeast Airline...|    SLZ|
|    19050|Alaska Aeronautic...|    AAZ|
+---------+--------------------+-------+
only showing top

What if we want to apply a user defined function? Here is an example where a new function is defined that combines Year and Month into a string. I also use the sample function to illustrate how to get a random subset of the data. Finally, I show an important function called ``cache``. It is important because we may want to reuse a result. Cache tells Spark that we want to reuse something so please try to keep it cached for us. Finally, I show how you can use orderBy to sort the data.

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def getYearMonthStr(year, month):
    return '%d-%02d'%(year,month)

udfGetYearMonthStr = udf(getYearMonthStr, StringType())

example1 = on_time_df.select('Year','Month').withColumn(
    'YearMonth', udfGetYearMonthStr('Year','Month')).sample(0.000001).cache()

example1.show()

In [None]:
example1.orderBy('YearMonth').show()

Finally, there are a number of things to make the world go round, such as renaming a column:
```python
df.withColumnRenamed("dob","DateOfBirth").printSchema()
```

**Exercise 1:** Create a dataframe that contains the average delay for each airline:
* Columns: Carrier, average_delay, YearMonth
* Carrier must be one of the following: 'AA','WN','DL','UA','MQ','EV','AS','VX'
* Must be ordered by YearMonth, Carrier

In [None]:
airline_delay = Lab5_helper.exercise_1(on_time_df)
## BEGIN ANSWER
answers['exercise_1'] = airline_delay.head(10)
## END ANSWER
airline_delay.show()

**Exercise 2:** Now add a column with the airline name (i.e., use a join). Here is an example from the Spark documentation.

```python
# To create DataFrame using SparkSession
people = spark.read.parquet("...")
department = spark.read.parquet("...")

people.filter(people.age > 30).join(department, people.deptId == department.id) \
  .groupBy(department.name, "gender").agg({"salary": "avg", "age": "max"})
```

In [None]:
airline_delay2 = Lab5_helper.exercise_2(airline_delay,airlines)
## BEGIN ANSWER
answers['exercise_2'] = airline_delay2.head(10)
## END ANSWER
airline_delay2.show()

If you did everything correctly, you are now rewarded with a nice graph :)

In [None]:
import numpy as np

airline_delay_pd = airline_delay2.toPandas()

import altair as alt

alt.Chart(airline_delay_pd).mark_line().encode(
    x='YearMonth',
    y='average_delay',
    color='AirlineName'
)

**Exercise 3:** Let's assume you believe that the delays experienced by some airlines are correlated. The cause is a different story as we all know correlation does not equal causation. But correlation is often what we can easily calculate, so let's do it on a month by month basis. The first step is of course to get the data in the correct format. We would like each airline to have it's own column because we can easily compute the correlation between columns. Each row in this new dataframe should be a YearMonth.

In [None]:
data_for_corr = Lab5_helper.exercise_3(airline_delay2)
## BEGIN ANSWER
answers['exercise_3'] = data_for_corr.toPandas()
## END ANSWER

# The data is now small enough to handle, so let's get it into pandas and calculate the correlation and filling
# in missing values with the mean of the column

df = data_for_corr.toPandas().set_index('YearMonth')
import numpy as np
import pandas as pd
from sklearn.impute import SimpleImputer
imp_mean = SimpleImputer(missing_values=np.nan, strategy='mean')
imp_mean.fit(df)

df_imputed_nan = pd.DataFrame(imp_mean.transform(df),columns=df.columns,index=df.index)
df_imputed_nan

Now let's take a look at the correlations

In [None]:
df_imputed_nan.corr()

Anything stand out to you? Let me clean it up and sort it for you.

In [None]:
corrs = df_imputed_nan.corr()
corrs.values[np.tril_indices(len(corrs))] = np.NaN 
corrs.stack().sort_values(ascending=False)

In [None]:
# Good job!
# Don't forget to push with ./submit.sh