# Data Pre-processing for Airline Analysis using Pyspark

In [1]:
import pandas as pd
import numpy as np

import warnings
warnings.filterwarnings('ignore') 

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('AirlinesProject').getOrCreate()

In [3]:
from pyspark.sql.functions import when, avg, unix_timestamp, hour, minute

In [4]:
data = spark.read.csv('flights.csv', header = True, inferSchema = True)

In [5]:
data.show(1)

+----------+--------------------+--------------------+------------+--------+---------+------+-------------------+----+----------+------------+--------+---------+--------+----------+---------+-------+------------+--------+---------+---------+-----------------+--------+----------------+------------+--------+--------+-----------------+-----------------+-------------+------------------+-----------------------+
|   FL_DATE|             AIRLINE|         AIRLINE_DOT|AIRLINE_CODE|DOT_CODE|FL_NUMBER|ORIGIN|        ORIGIN_CITY|DEST| DEST_CITY|CRS_DEP_TIME|DEP_TIME|DEP_DELAY|TAXI_OUT|WHEELS_OFF|WHEELS_ON|TAXI_IN|CRS_ARR_TIME|ARR_TIME|ARR_DELAY|CANCELLED|CANCELLATION_CODE|DIVERTED|CRS_ELAPSED_TIME|ELAPSED_TIME|AIR_TIME|DISTANCE|DELAY_DUE_CARRIER|DELAY_DUE_WEATHER|DELAY_DUE_NAS|DELAY_DUE_SECURITY|DELAY_DUE_LATE_AIRCRAFT|
+----------+--------------------+--------------------+------------+--------+---------+------+-------------------+----+----------+------------+--------+---------+--------+----------

In [6]:
data.printSchema()

root
 |-- FL_DATE: date (nullable = true)
 |-- AIRLINE: string (nullable = true)
 |-- AIRLINE_DOT: string (nullable = true)
 |-- AIRLINE_CODE: string (nullable = true)
 |-- DOT_CODE: integer (nullable = true)
 |-- FL_NUMBER: integer (nullable = true)
 |-- ORIGIN: string (nullable = true)
 |-- ORIGIN_CITY: string (nullable = true)
 |-- DEST: string (nullable = true)
 |-- DEST_CITY: string (nullable = true)
 |-- CRS_DEP_TIME: integer (nullable = true)
 |-- DEP_TIME: double (nullable = true)
 |-- DEP_DELAY: double (nullable = true)
 |-- TAXI_OUT: double (nullable = true)
 |-- WHEELS_OFF: double (nullable = true)
 |-- WHEELS_ON: double (nullable = true)
 |-- TAXI_IN: double (nullable = true)
 |-- CRS_ARR_TIME: integer (nullable = true)
 |-- ARR_TIME: double (nullable = true)
 |-- ARR_DELAY: double (nullable = true)
 |-- CANCELLED: double (nullable = true)
 |-- CANCELLATION_CODE: string (nullable = true)
 |-- DIVERTED: double (nullable = true)
 |-- CRS_ELAPSED_TIME: double (nullable = true)

In [7]:
#Counting null in each
from pyspark.sql.functions import when, count, col

data.select([count(when(col(c).isNull(), c)).alias(c) for c in data.columns]).show()

+-------+-------+-----------+------------+--------+---------+------+-----------+----+---------+------------+--------+---------+--------+----------+---------+-------+------------+--------+---------+---------+-----------------+--------+----------------+------------+--------+--------+-----------------+-----------------+-------------+------------------+-----------------------+
|FL_DATE|AIRLINE|AIRLINE_DOT|AIRLINE_CODE|DOT_CODE|FL_NUMBER|ORIGIN|ORIGIN_CITY|DEST|DEST_CITY|CRS_DEP_TIME|DEP_TIME|DEP_DELAY|TAXI_OUT|WHEELS_OFF|WHEELS_ON|TAXI_IN|CRS_ARR_TIME|ARR_TIME|ARR_DELAY|CANCELLED|CANCELLATION_CODE|DIVERTED|CRS_ELAPSED_TIME|ELAPSED_TIME|AIR_TIME|DISTANCE|DELAY_DUE_CARRIER|DELAY_DUE_WEATHER|DELAY_DUE_NAS|DELAY_DUE_SECURITY|DELAY_DUE_LATE_AIRCRAFT|
+-------+-------+-----------+------------+--------+---------+------+-----------+----+---------+------------+--------+---------+--------+----------+---------+-------+------------+--------+---------+---------+-----------------+--------+--------------

In [8]:
#Replace NULLS in Delay columns with 0, since NULL would imply no delays
df = data.na.fill(0,[ 'DELAY_DUE_CARRIER', 'DELAY_DUE_WEATHER','DELAY_DUE_NAS','DELAY_DUE_SECURITY','DELAY_DUE_LATE_AIRCRAFT'])

In [9]:
#Converting into time format of hh:mm
from pyspark.sql.functions import substring, concat, floor, lit, date_format, to_timestamp, lpad
df = df.withColumns({'CRS_DEP_TIME': 
                        concat(floor(df.CRS_DEP_TIME/100).cast('string'), 
                        lit(":"),
                        lpad(substring(df.CRS_DEP_TIME, -2,2),2,'0')),
                         
                        'CRS_ARR_TIME' :
                        concat(floor(df.CRS_ARR_TIME/100).cast('string'), 
                        lit(":"),
                        lpad((df.CRS_ARR_TIME.cast('integer')%100).cast('string'),2,'0'))
                         } 
                       )



In [10]:
#Converting into time format of hh:mm
from pyspark.sql.functions import substring, concat, floor, lit, date_format, to_timestamp
df = df.withColumns({'DEP_TIME' :
                        concat(floor(df.DEP_TIME/100).cast('string'), 
                        lit(":"),
                        lpad((df.DEP_TIME.cast('integer')%100).cast('string'),2,'0')),
                         
                         'ARR_TIME' :
                        concat(floor(df.ARR_TIME/100).cast('string'), 
                        lit(":"),
                        lpad((df.ARR_TIME.cast('integer')%100).cast('string'),2,'0'))}
                         )



In [11]:
#Explicitly mentioning reason for cancellation
df = df.withColumn('CANCELLATION_CODE', when(df.CANCELLATION_CODE == 'A', 'Airline/Carrier').otherwise(
            when(df.CANCELLATION_CODE == 'B', 'Weather').otherwise(
                when(df.CANCELLATION_CODE == 'C', 'National Air System').otherwise(
                    when(df.CANCELLATION_CODE == 'D', 'Security').otherwise('Departed')))))



In [12]:
#Replacing null with 0 for cancelled flights
colz = ['DEP_DELAY','TAXI_OUT','TAXI_IN', 'ARR_DELAY', 'CRS_ELAPSED_TIME', 'ELAPSED_TIME','AIR_TIME']
for i in colz:
    df = df.withColumn(i, when(df.CANCELLED == 1, 0).otherwise(df[i]))

In [15]:
#Replace missing taxi time with average taxi times at respective airports
avg_taxi_in = df.filter(df.TAXI_IN.isNotNull()).groupby('DEST').avg('TAXI_IN')
df = df.join(avg_taxi_in, on = 'DEST', how = 'left') 
df = df.withColumn('TAXI_IN', when(df.TAXI_IN.isNull(), df['avg(TAXI_IN)']).otherwise(df.TAXI_IN))
df = df.drop('avg(TAXI_IN)')

In [16]:
#replacing elapsed_time with avg_elapsed_time of respective flights operated by same airlines on same routes
avg_elapsed_time = df.filter(df.CANCELLED == 0).groupby('FL_NUMBER', 'ORIGIN', 'DEST',
                                        "AIRLINE_CODE", "DOT_CODE").agg(avg("ELAPSED_TIME").alias('avg_elapsed_time'))
df = df.join(avg_elapsed_time, 
             on = ['FL_NUMBER', 'ORIGIN', 'DEST', "AIRLINE_CODE", "DOT_CODE"],
             how = 'left'
            )
df = df.withColumn('ELAPSED_TIME', when(df.ELAPSED_TIME.isNull(), df['avg_elapsed_time']).otherwise(df.ELAPSED_TIME))
df = df.withColumn('ELAPSED_TIME', when(df.ELAPSED_TIME.isNull(), df['CRS_ELAPSED_TIME']).otherwise(df.ELAPSED_TIME))
df = df.drop('avg_elapsed_time')

In [17]:
#Filling in null values for air_time with formula given below
df = df.withColumn('AIR_TIME', when(df.AIR_TIME.isNull(), 
                                    df['CRS_ELAPSED_TIME'] - df['TAXI_IN'] - df['TAXI_OUT']
                                   ).otherwise(df.AIR_TIME))

In [18]:
#since only 800 entries for ARR time are null, replace them with crs_arr_time
df = df.withColumn('ARR_TIME', when(df.ARR_TIME.isNull(), 
                                    df['CRS_ARR_TIME'] 
                                   ).otherwise(df.ARR_TIME))

In [19]:
##Filling in null values for arrival and departure delay with formula below
df = df.withColumn('ARR_DELAY', when(df.ARR_DELAY.isNull(),
            (hour('ARR_TIME') - hour('CRS_ARR_TIME'))*60 + (minute('ARR_TIME') - minute('CRS_ARR_TIME'))
             ).otherwise(df.ARR_DELAY))
df = df.withColumn('ARR_DELAY', when(df.ARR_DELAY.isNull(),
            (24 - hour('CRS_ARR_TIME'))*60 - minute('CRS_ARR_TIME')
             ).otherwise(df.ARR_DELAY))

In [20]:
#COnverting into valid time format in order for power-bi to read without errors
df = df.withColumn('DEP_TIME', when(df.DEP_TIME ==  "24:00", "00:00").otherwise(df.DEP_TIME))
df = df.withColumn('ARR_TIME', when(df.ARR_TIME ==  "24:00", "00:00").otherwise(df.ARR_TIME))
df = df.withColumn('CRS_ARR_TIME', when(df.CRS_ARR_TIME ==  "24:00", "00:00").otherwise(df.CRS_ARR_TIME))

In [21]:
#Saving the cleaned data
df.coalesce(1).write.mode("overwrite").option("header", "true").csv("cleaned_airplane_data.csv")

In [1]:
#can check for null count again to confirm