# I. Import libraries

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
import datetime
import numpy as np
import pandas as pd
%matplotlib inline
import matplotlib.pyplot as plt

# II. Connect to the data

In [2]:
spark = SparkSession \
.builder \
.appName("Wrangling covid19 data") \
.getOrCreate()

In [3]:
path = 'data/jantojun2020.csv'
data = spark.read.csv(path, header=True)

# III. Assess the data

In [4]:
data.count()

2745847

In [5]:
data = data.na.drop(how='all')
data.count()

2745847

In [6]:
data.columns

['YEAR',
 'QUARTER',
 'MONTH',
 'DAY_OF_MONTH',
 'DAY_OF_WEEK',
 'FL_DATE',
 'MKT_UNIQUE_CARRIER',
 'MKT_CARRIER_FL_NUM',
 'TAIL_NUM',
 'ORIGIN',
 'ORIGIN_CITY_NAME',
 'ORIGIN_STATE_ABR',
 'ORIGIN_STATE_NM',
 'DEST',
 'DEST_CITY_NAME',
 'DEST_STATE_ABR',
 'DEST_STATE_NM',
 'CRS_DEP_TIME',
 'DEP_TIME',
 'DEP_DELAY',
 'DEP_DELAY_NEW',
 'DEP_DEL15',
 'DEP_DELAY_GROUP',
 'DEP_TIME_BLK',
 'TAXI_OUT',
 'WHEELS_OFF',
 'WHEELS_ON',
 'TAXI_IN',
 'CRS_ARR_TIME',
 'ARR_TIME',
 'ARR_DELAY',
 'ARR_DELAY_NEW',
 'ARR_DEL15',
 'ARR_DELAY_GROUP',
 'ARR_TIME_BLK',
 'CANCELLED',
 'CANCELLATION_CODE',
 'CRS_ELAPSED_TIME',
 'ACTUAL_ELAPSED_TIME',
 'AIR_TIME',
 'DISTANCE',
 'DISTANCE_GROUP',
 'CARRIER_DELAY',
 'WEATHER_DELAY',
 'NAS_DELAY',
 'SECURITY_DELAY',
 'LATE_AIRCRAFT_DELAY']

# IV. Wrangling data
## 1. Split data to dim tables

### 1.1 Create port_loc dataframe

In [7]:
def create_port_loc_df(path):
    df = spark.read.csv(path, header=True)
    for column in df.columns:
        df = df.withColumnRenamed(column, column.lower())
    port_loc_df = df.select('origin', 'origin_city_name', 'origin_state_abr').dropDuplicates()
    port_loc_df = port_loc_df.withColumn('origin_city_name', split(port_loc_df['origin_city_name'], ',').getItem(0))
    port_loc_df.toPandas().to_csv('data/port_loc.csv', index=False)

In [8]:
create_port_loc_df(path)

In [9]:
# test
port_loc_df = spark.read.csv('data/port_loc.csv', header=True)
port_loc_df.show(10)

+------+----------------+----------------+
|origin|origin_city_name|origin_state_abr|
+------+----------------+----------------+
|   SAF|        Santa Fe|              NM|
|   MSP|     Minneapolis|              MN|
|   TUL|           Tulsa|              OK|
|   DBQ|         Dubuque|              IA|
|   LFT|       Lafayette|              LA|
|   ROW|         Roswell|              NM|
|   PIT|      Pittsburgh|              PA|
|   SLN|          Salina|              KS|
|   EAU|      Eau Claire|              WI|
|   DCA|      Washington|              VA|
+------+----------------+----------------+
only showing top 10 rows



### 1.2 Create states dataframe

In [10]:
def create_states_df(path):
    df = spark.read.csv(path, header=True)
    for column in df.columns:
        df = df.withColumnRenamed(column, column.lower())
    state_df = df.select('origin_state_abr', 'origin_state_nm').dropDuplicates()
    state_df.toPandas().to_csv('data/states.csv', index=False)

In [11]:
create_states_df(path)

In [12]:
# test
states_df = spark.read.csv('data/states.csv', header=True)
states_df.show(10)

+----------------+-------------------+
|origin_state_abr|    origin_state_nm|
+----------------+-------------------+
|              VI|U.S. Virgin Islands|
|              MT|            Montana|
|              NC|     North Carolina|
|              MD|           Maryland|
|              CO|           Colorado|
|              CT|        Connecticut|
|              IL|           Illinois|
|              WY|            Wyoming|
|              NJ|         New Jersey|
|              LA|          Louisiana|
+----------------+-------------------+
only showing top 10 rows



In [13]:
states_df.count()

52

### 1.3 Create airline dataframe

In [14]:
def create_airline_df(path1,path2):
        """
        Function: Generate and create airlines table code
        param:
            - path1: txt file
            - path2: dataset file
        output: airline.csv file stored in data folder
        """
        df = spark.read.csv(path2, header=True)
        df2 = df.select("MKT_UNIQUE_CARRIER","MKT_CARRIER_FL_NUM","ORIGIN","DEST","TAIL_NUM").dropDuplicates()
        with open(path1) as f:
            content = f.readlines()
            content = [x.strip() for x in content]
            airline = content[10:20]
            splitted_airline = [c.split(":") for c in airline]
            c_airline = [x[0].replace("'","").strip() for x in splitted_airline]
            airline_name = [x[1].replace("'","").strip() for x in splitted_airline]
            airline_df = spark.createDataFrame(zip(c_airline, airline_name), schema=['c_airline', 'airline_name'])
            airline_df = airline_df.join(df2,airline_df.c_airline == df2.MKT_UNIQUE_CARRIER,"inner")\
                        .drop("MKT_UNIQUE_CARRIER","ORIGIN","DEST")
            return airline_df.toPandas().to_csv("data/airline.csv", index=False)

In [15]:
path_txt = 'data/ColumnDescriptions.txt'
path_csv = 'data/jantojun2020.csv'
create_airline_df(path_txt, path_csv)

In [16]:
# test
airline_df = spark.read.csv('data/airline.csv', header=True)
airline_df.show(10)

+---------+---------------+------------------+--------+
|c_airline|   airline_name|MKT_CARRIER_FL_NUM|TAIL_NUM|
+---------+---------------+------------------+--------+
|       NK|Spirit Airlines|              1009|  N624NK|
|       NK|Spirit Airlines|               103|  N654NK|
|       NK|Spirit Airlines|              1069|  N913NK|
|       NK|Spirit Airlines|               109|  N507NK|
|       NK|Spirit Airlines|              1247|  N648NK|
|       NK|Spirit Airlines|              1360|  N629NK|
|       NK|Spirit Airlines|              1400|  N602NK|
|       NK|Spirit Airlines|              1440|  N915NK|
|       NK|Spirit Airlines|              1520|  N672NK|
|       NK|Spirit Airlines|              1782|  N627NK|
+---------+---------------+------------------+--------+
only showing top 10 rows



### 1.4 Create distance_group dataframe

In [17]:
data.select('distance', 'distance_group').show(10)

+--------+--------------+
|distance|distance_group|
+--------+--------------+
|     363|             2|
|     363|             2|
|     333|             2|
|     333|             2|
|     333|             2|
|     333|             2|
|     333|             2|
|     390|             2|
|     390|             2|
|     390|             2|
+--------+--------------+
only showing top 10 rows



By observing the table above, we can clearly figure out there are some wrong values in this dataframe: it should be 1 for those distances (Based on the explanation in ColumnDescriptions.txt). Therefore, we will fix it first.
##### Fix the wrong distance_group values

In [18]:
# use floor() to get the integer division
data = data.withColumn('distance_group', floor(data['distance'].cast('int')/250))
# test
data.select('distance', 'distance_group').show(10)

+--------+--------------+
|distance|distance_group|
+--------+--------------+
|     363|             1|
|     363|             1|
|     333|             1|
|     333|             1|
|     333|             1|
|     333|             1|
|     333|             1|
|     390|             1|
|     390|             1|
|     390|             1|
+--------+--------------+
only showing top 10 rows



##### Create the wanted dataframe

In [19]:
data.select('distance_group').describe().show()

+-------+------------------+
|summary|    distance_group|
+-------+------------------+
|  count|           2745847|
|   mean|  2.50195659117205|
| stddev|2.2411625220911175|
|    min|                 0|
|    max|                20|
+-------+------------------+



Because the min and the max values of the distance_group are 0 and 20, respectively,  we choose the range from 0 to 22 for this distance_group data frame.

In [20]:
def create_distance_group():
    data = []
    for i in range(23):
        data.append([i, "{} <= distance < {}".format(i * 250, (i + 1) * 250)])
        
    df = pd.DataFrame(data=data, columns=['distance_group', 'distance_range(miles)'])
    df.to_csv('data/distance_group.csv', index=False)    

In [21]:
create_distance_group()

In [22]:
# test
distance_group_df = spark.read.csv('data/distance_group.csv', header=True)
distance_group_df.show(10, truncate=False)

+--------------+-----------------------+
|distance_group|distance_range(miles)  |
+--------------+-----------------------+
|0             |0 <= distance < 250    |
|1             |250 <= distance < 500  |
|2             |500 <= distance < 750  |
|3             |750 <= distance < 1000 |
|4             |1000 <= distance < 1250|
|5             |1250 <= distance < 1500|
|6             |1500 <= distance < 1750|
|7             |1750 <= distance < 2000|
|8             |2000 <= distance < 2250|
|9             |2250 <= distance < 2500|
+--------------+-----------------------+
only showing top 10 rows



### 1.5 Create cancellation dataframe

In [23]:
def create_cancellation_df(path):
        """
        Function: Generate and create Cancelation_code table:
        param: Path of datafile
        input: .txt file
        output: cancel.csv file stored in data folder
        """
        import re
        with open(path) as f:
            content = f.readlines()
            content = [x.strip() for x in content]
            cancel = [re.search('\(([^)]+)', content[49]).group(1)][0].split(",")
            splitted_cancel = [c.split(":") for c in cancel]
            c_cancel = [x[0].replace("'","").strip() for x in splitted_cancel]
            cancel_des= [x[1].replace("'","").strip() for x in splitted_cancel]
            c_cancel.append('O')
            cancel_des.append('Non-cancel')
            cancel_df = pd.DataFrame({"c_cancel" : c_cancel, "cancel_des": cancel_des})
            return cancel_df.to_csv("data/cancellation.csv", index=False)

In [24]:
create_cancellation_df(path_txt)

In [25]:
# test
cancel_df = spark.read.csv('data/cancellation.csv', header=True)
cancel_df.show(truncate=False)

+--------+------------------------+
|c_cancel|cancel_des              |
+--------+------------------------+
|A       |Carrier                 |
|B       |Weather                 |
|C       |National Aviation System|
|D       |Security                |
|O       |Non-cancel              |
+--------+------------------------+



### 1.6 Create delay_group dataframe

In [26]:
def create_delay_group():
        """
        function
        """
        data = []
        for i in range(-1,188):
            if i == -1:
                data.append([-1,"Early"])
            elif i == 0:
                data.append([0,"On Time"])
            else:
                data.append([i, "{} <= delay time < {}".format(i * 15, (i + 1) * 15)])

        df = pd.DataFrame(data=data, columns=['delay_group', 'delay_time_range(minutes)'])
        df.to_csv('data/delay_group.csv', index=False)

In [27]:
create_delay_group()

In [28]:
# test
delay_group_df = spark.read.csv('data/delay_group.csv', header=True)
delay_group_df.show(10, truncate=False)

+-----------+-------------------------+
|delay_group|delay_time_range(minutes)|
+-----------+-------------------------+
|-1         |Early                    |
|0          |On Time                  |
|1          |15 <= delay time < 30    |
|2          |30 <= delay time < 45    |
|3          |45 <= delay time < 60    |
|4          |60 <= delay time < 75    |
|5          |75 <= delay time < 90    |
|6          |90 <= delay time < 105   |
|7          |105 <= delay time < 120  |
|8          |120 <= delay time < 135  |
+-----------+-------------------------+
only showing top 10 rows



## 2. Check null values
### 2.1 airline dataframe

In [29]:
airline_df.show(10)

+---------+---------------+------------------+--------+
|c_airline|   airline_name|MKT_CARRIER_FL_NUM|TAIL_NUM|
+---------+---------------+------------------+--------+
|       NK|Spirit Airlines|              1009|  N624NK|
|       NK|Spirit Airlines|               103|  N654NK|
|       NK|Spirit Airlines|              1069|  N913NK|
|       NK|Spirit Airlines|               109|  N507NK|
|       NK|Spirit Airlines|              1247|  N648NK|
|       NK|Spirit Airlines|              1360|  N629NK|
|       NK|Spirit Airlines|              1400|  N602NK|
|       NK|Spirit Airlines|              1440|  N915NK|
|       NK|Spirit Airlines|              1520|  N672NK|
|       NK|Spirit Airlines|              1782|  N627NK|
+---------+---------------+------------------+--------+
only showing top 10 rows



In [30]:
airline_df.filter(col('c_airline').isNull()).show()

+---------+------------+------------------+--------+
|c_airline|airline_name|MKT_CARRIER_FL_NUM|TAIL_NUM|
+---------+------------+------------------+--------+
+---------+------------+------------------+--------+



There is no null value for column c_airline in airline dataframe
### 2.2 distance_group dataframe

In [31]:
distance_group_df.show(10, truncate=False)

+--------------+-----------------------+
|distance_group|distance_range(miles)  |
+--------------+-----------------------+
|0             |0 <= distance < 250    |
|1             |250 <= distance < 500  |
|2             |500 <= distance < 750  |
|3             |750 <= distance < 1000 |
|4             |1000 <= distance < 1250|
|5             |1250 <= distance < 1500|
|6             |1500 <= distance < 1750|
|7             |1750 <= distance < 2000|
|8             |2000 <= distance < 2250|
|9             |2250 <= distance < 2500|
+--------------+-----------------------+
only showing top 10 rows



In [32]:
distance_group_df.filter(col('distance_group').isNull()).show()

+--------------+---------------------+
|distance_group|distance_range(miles)|
+--------------+---------------------+
+--------------+---------------------+



There is no null value for column distance_group in distance_group dataframe
### 2.3 States dataframe

In [33]:
states_df.show(10)

+----------------+-------------------+
|origin_state_abr|    origin_state_nm|
+----------------+-------------------+
|              VI|U.S. Virgin Islands|
|              MT|            Montana|
|              NC|     North Carolina|
|              MD|           Maryland|
|              CO|           Colorado|
|              CT|        Connecticut|
|              IL|           Illinois|
|              WY|            Wyoming|
|              NJ|         New Jersey|
|              LA|          Louisiana|
+----------------+-------------------+
only showing top 10 rows



In [34]:
states_df.filter(col('origin_state_abr').isNull()).show()

+----------------+---------------+
|origin_state_abr|origin_state_nm|
+----------------+---------------+
+----------------+---------------+



There is no null value for column origin_state_abr in states dataframe
### 2.4 port_loc dataframe

In [35]:
port_loc_df.show(10)

+------+----------------+----------------+
|origin|origin_city_name|origin_state_abr|
+------+----------------+----------------+
|   SAF|        Santa Fe|              NM|
|   MSP|     Minneapolis|              MN|
|   TUL|           Tulsa|              OK|
|   DBQ|         Dubuque|              IA|
|   LFT|       Lafayette|              LA|
|   ROW|         Roswell|              NM|
|   PIT|      Pittsburgh|              PA|
|   SLN|          Salina|              KS|
|   EAU|      Eau Claire|              WI|
|   DCA|      Washington|              VA|
+------+----------------+----------------+
only showing top 10 rows



In [36]:
port_loc_df.filter(col('origin').isNull()).show()

+------+----------------+----------------+
|origin|origin_city_name|origin_state_abr|
+------+----------------+----------------+
+------+----------------+----------------+



There is no null value for column origin in port_loc dataframe

## 3. Create fact table

In [37]:
data.select('year', 'quarter', 'month', 'day_of_month','day_of_week', 'fl_date').show(10)

+----+-------+-----+------------+-----------+--------+
|year|quarter|month|day_of_month|day_of_week| fl_date|
+----+-------+-----+------------+-----------+--------+
|2020|      1|    1|           1|          3|1/1/2020|
|2020|      1|    1|           1|          3|1/1/2020|
|2020|      1|    1|           1|          3|1/1/2020|
|2020|      1|    1|           1|          3|1/1/2020|
|2020|      1|    1|           1|          3|1/1/2020|
|2020|      1|    1|           1|          3|1/1/2020|
|2020|      1|    1|           1|          3|1/1/2020|
|2020|      1|    1|           1|          3|1/1/2020|
|2020|      1|    1|           1|          3|1/1/2020|
|2020|      1|    1|           1|          3|1/1/2020|
+----+-------+-----+------------+-----------+--------+
only showing top 10 rows



In [38]:
fact_df = data.withColumn("flight_date",concat_ws("-",col("year"),col("month"),col("day_of_month")).cast("date"))
# test
fact_df.select('fl_date', 'flight_date').show(10)

+--------+-----------+
| fl_date|flight_date|
+--------+-----------+
|1/1/2020| 2020-01-01|
|1/1/2020| 2020-01-01|
|1/1/2020| 2020-01-01|
|1/1/2020| 2020-01-01|
|1/1/2020| 2020-01-01|
|1/1/2020| 2020-01-01|
|1/1/2020| 2020-01-01|
|1/1/2020| 2020-01-01|
|1/1/2020| 2020-01-01|
|1/1/2020| 2020-01-01|
+--------+-----------+
only showing top 10 rows



#### Drop column year, quarter, month, day_of_month, day_of_week, fl_date

In [39]:
fact_df = fact_df.drop('year', 'quarter', 'month', 'day_of_month', 'day_of_week', 'fl_date')
fact_df.printSchema()

root
 |-- MKT_UNIQUE_CARRIER: string (nullable = true)
 |-- MKT_CARRIER_FL_NUM: string (nullable = true)
 |-- TAIL_NUM: string (nullable = true)
 |-- ORIGIN: string (nullable = true)
 |-- ORIGIN_CITY_NAME: string (nullable = true)
 |-- ORIGIN_STATE_ABR: string (nullable = true)
 |-- ORIGIN_STATE_NM: string (nullable = true)
 |-- DEST: string (nullable = true)
 |-- DEST_CITY_NAME: string (nullable = true)
 |-- DEST_STATE_ABR: string (nullable = true)
 |-- DEST_STATE_NM: string (nullable = true)
 |-- CRS_DEP_TIME: string (nullable = true)
 |-- DEP_TIME: string (nullable = true)
 |-- DEP_DELAY: string (nullable = true)
 |-- DEP_DELAY_NEW: string (nullable = true)
 |-- DEP_DEL15: string (nullable = true)
 |-- DEP_DELAY_GROUP: string (nullable = true)
 |-- DEP_TIME_BLK: string (nullable = true)
 |-- TAXI_OUT: string (nullable = true)
 |-- WHEELS_OFF: string (nullable = true)
 |-- WHEELS_ON: string (nullable = true)
 |-- TAXI_IN: string (nullable = true)
 |-- CRS_ARR_TIME: string (nullable =

#### Drop columns related to origin and destination

In [40]:
fact_df = fact_df.drop('origin_city_name', 'origin_state_abr', 'origin_state_nm',\
                       'dest_city_name', 'dest_state_abr', 'dest_state_nm')
fact_df.printSchema()

root
 |-- MKT_UNIQUE_CARRIER: string (nullable = true)
 |-- MKT_CARRIER_FL_NUM: string (nullable = true)
 |-- TAIL_NUM: string (nullable = true)
 |-- ORIGIN: string (nullable = true)
 |-- DEST: string (nullable = true)
 |-- CRS_DEP_TIME: string (nullable = true)
 |-- DEP_TIME: string (nullable = true)
 |-- DEP_DELAY: string (nullable = true)
 |-- DEP_DELAY_NEW: string (nullable = true)
 |-- DEP_DEL15: string (nullable = true)
 |-- DEP_DELAY_GROUP: string (nullable = true)
 |-- DEP_TIME_BLK: string (nullable = true)
 |-- TAXI_OUT: string (nullable = true)
 |-- WHEELS_OFF: string (nullable = true)
 |-- WHEELS_ON: string (nullable = true)
 |-- TAXI_IN: string (nullable = true)
 |-- CRS_ARR_TIME: string (nullable = true)
 |-- ARR_TIME: string (nullable = true)
 |-- ARR_DELAY: string (nullable = true)
 |-- ARR_DELAY_NEW: string (nullable = true)
 |-- ARR_DEL15: string (nullable = true)
 |-- ARR_DELAY_GROUP: string (nullable = true)
 |-- ARR_TIME_BLK: string (nullable = true)
 |-- CANCELLED:

#### Drop column cancelled

In [41]:
fact_df = fact_df.drop('cancelled')

Then we substitute all null values of column cancellation code with 'O' (means ' Non-cancel')

In [42]:
fact_df.select('cancellation_code').dropDuplicates().show()

+-----------------+
|cancellation_code|
+-----------------+
|             null|
|                B|
|                D|
|                C|
|                A|
+-----------------+



In [43]:
fact_df = fact_df.na.fill(value='O',subset=["cancellation_code"])
# test
fact_df.select('cancellation_code').dropDuplicates().show()

+-----------------+
|cancellation_code|
+-----------------+
|                B|
|                O|
|                D|
|                C|
|                A|
+-----------------+



#### Modify the departure time and the arrival time

In [44]:
fact_df.select('crs_arr_time', 'arr_time', 'crs_dep_time', 'dep_time').show(10)

+------------+--------+------------+--------+
|crs_arr_time|arr_time|crs_dep_time|dep_time|
+------------+--------+------------+--------+
|        1945|  2053.0|        1810|  1851.0|
|        1320|  1318.0|        1150|  1146.0|
|        2130|  2124.0|        2020|  2016.0|
|        1455|  1505.0|        1340|  1350.0|
|        1035|  1023.0|         915|   916.0|
|         715|   722.0|         600|   602.0|
|        1740|  1736.0|        1620|  1624.0|
|        1630|  1717.0|        1505|  1604.0|
|        1355|  1405.0|        1230|  1225.0|
|         900|   904.0|         740|   740.0|
+------------+--------+------------+--------+
only showing top 10 rows



In [45]:
fact_df.filter(col('arr_time').isNull() | col('dep_time').isNull()).count()

283554

There are null values in the departure time and the arrival time. Because we do not have any appropriate substituted values and because we would like to consider the delay and cancelation, it is non-sense to keep those null values, so we will drop all those rows.

In [46]:
fact_df = fact_df.dropna(subset=['arr_time', 'dep_time'])
# test
fact_df.filter(col('arr_time').isNull() | col('dep_time').isNull()).count()

0

In [47]:
# use lpad() to add '0' leading
fact_df = (fact_df.withColumn('crs_arr_time', lpad(col('crs_arr_time'), 4, '0'))
                  .withColumn('arr_time', lpad(col('arr_time').cast('int'), 4, '0'))
                  .withColumn('crs_dep_time', lpad(col('crs_dep_time'), 4, '0'))
                  .withColumn('dep_time', lpad(col('dep_time').cast('int'), 4, '0')))
# test
fact_df.select('crs_arr_time', 'arr_time', 'crs_dep_time', 'dep_time').show(10)

+------------+--------+------------+--------+
|crs_arr_time|arr_time|crs_dep_time|dep_time|
+------------+--------+------------+--------+
|        1945|    2053|        1810|    1851|
|        1320|    1318|        1150|    1146|
|        2130|    2124|        2020|    2016|
|        1455|    1505|        1340|    1350|
|        1035|    1023|        0915|    0916|
|        0715|    0722|        0600|    0602|
|        1740|    1736|        1620|    1624|
|        1630|    1717|        1505|    1604|
|        1355|    1405|        1230|    1225|
|        0900|    0904|        0740|    0740|
+------------+--------+------------+--------+
only showing top 10 rows



In [48]:
# convert time from str to timestamp
fact_df = (fact_df.withColumn('crs_arr_time', date_format(to_timestamp(col('crs_arr_time'), 'Hmm'),'HH:mm'))
                  .withColumn('arr_time', date_format(to_timestamp(col('arr_time'), 'Hmm'),'HH:mm'))
                  .withColumn('crs_dep_time', date_format(to_timestamp(col('crs_dep_time'), 'Hmm'),'HH:mm'))
                  .withColumn('dep_time', date_format(to_timestamp(col('dep_time'), 'Hmm'),'HH:mm')))
# test
fact_df.select('crs_arr_time', 'arr_time', 'crs_dep_time', 'dep_time').show(10)

+------------+--------+------------+--------+
|crs_arr_time|arr_time|crs_dep_time|dep_time|
+------------+--------+------------+--------+
|       19:45|   20:53|       18:10|   18:51|
|       13:20|   13:18|       11:50|   11:46|
|       21:30|   21:24|       20:20|   20:16|
|       14:55|   15:05|       13:40|   13:50|
|       10:35|   10:23|       09:15|   09:16|
|       07:15|   07:22|       06:00|   06:02|
|       17:40|   17:36|       16:20|   16:24|
|       16:30|   17:17|       15:05|   16:04|
|       13:55|   14:05|       12:30|   12:25|
|       09:00|   09:04|       07:40|   07:40|
+------------+--------+------------+--------+
only showing top 10 rows



Then, when we created a delay_group dataframe, we realized that there are some wrong numbers in the column dep_delay_group. Thus, we will fix them right now.

In [49]:
# calculate the dep_delay and arr_delay
(fact_df.withColumn('arr_delay', col('arr_time') - col('crs_arr_time'))
                  .withColumn('dep_delay', col('dep_time') - col('crs_dep_time'))).select('crs_arr_time', 'arr_time', 'arr_delay', 'crs_dep_time', 'dep_time', 'dep_delay').show(10)
# test
#fact_df.select('crs_arr_time', 'arr_time', 'arr_delay', 'crs_dep_time', 'dep_time', 'dep_delay').show(10)

+------------+--------+---------+------------+--------+---------+
|crs_arr_time|arr_time|arr_delay|crs_dep_time|dep_time|dep_delay|
+------------+--------+---------+------------+--------+---------+
|       19:45|   20:53|     null|       18:10|   18:51|     null|
|       13:20|   13:18|     null|       11:50|   11:46|     null|
|       21:30|   21:24|     null|       20:20|   20:16|     null|
|       14:55|   15:05|     null|       13:40|   13:50|     null|
|       10:35|   10:23|     null|       09:15|   09:16|     null|
|       07:15|   07:22|     null|       06:00|   06:02|     null|
|       17:40|   17:36|     null|       16:20|   16:24|     null|
|       16:30|   17:17|     null|       15:05|   16:04|     null|
|       13:55|   14:05|     null|       12:30|   12:25|     null|
|       09:00|   09:04|     null|       07:40|   07:40|     null|
+------------+--------+---------+------------+--------+---------+
only showing top 10 rows



In [50]:
fact_df.dtypes

[('MKT_UNIQUE_CARRIER', 'string'),
 ('MKT_CARRIER_FL_NUM', 'string'),
 ('TAIL_NUM', 'string'),
 ('ORIGIN', 'string'),
 ('DEST', 'string'),
 ('crs_dep_time', 'string'),
 ('dep_time', 'string'),
 ('DEP_DELAY', 'string'),
 ('DEP_DELAY_NEW', 'string'),
 ('DEP_DEL15', 'string'),
 ('DEP_DELAY_GROUP', 'string'),
 ('DEP_TIME_BLK', 'string'),
 ('TAXI_OUT', 'string'),
 ('WHEELS_OFF', 'string'),
 ('WHEELS_ON', 'string'),
 ('TAXI_IN', 'string'),
 ('crs_arr_time', 'string'),
 ('arr_time', 'string'),
 ('ARR_DELAY', 'string'),
 ('ARR_DELAY_NEW', 'string'),
 ('ARR_DEL15', 'string'),
 ('ARR_DELAY_GROUP', 'string'),
 ('ARR_TIME_BLK', 'string'),
 ('CANCELLATION_CODE', 'string'),
 ('CRS_ELAPSED_TIME', 'string'),
 ('ACTUAL_ELAPSED_TIME', 'string'),
 ('AIR_TIME', 'string'),
 ('DISTANCE', 'string'),
 ('distance_group', 'bigint'),
 ('CARRIER_DELAY', 'string'),
 ('WEATHER_DELAY', 'string'),
 ('NAS_DELAY', 'string'),
 ('SECURITY_DELAY', 'string'),
 ('LATE_AIRCRAFT_DELAY', 'string'),
 ('flight_date', 'date')]

In [51]:
a =

SyntaxError: invalid syntax (905710932.py, line 1)

Then, when we created a delay_group dataframe, we realized that there are some wrong numbers in the column dep_delay_group. Thus, we will fix them right now.

In [None]:
fact_df = fact_df.withColumn('dep_delay_group', when(col('dep_delay').cast('int') < 0, -1)
                                     .when(col('dep_delay').cast('int') < 15, 0)
                                     .otherwise(floor(col('dep_delay').cast('int')/15)))
# test
fact_df.select('dep_delay_group').dropDuplicates().show(10)

In [None]:
# drop columns dep_delay, dep_delay_new, dep_del15, dep_time_blk
fact_df = fact_df.drop('dep_delay', 'dep_delay_new', 'dep_del15', 'dep_time_blk')

In [None]:
# test
fact_df.printSchema()