## 1. Imports and Initialization

In [1]:
!java -version

java version "22" 2024-03-19
Java(TM) SE Runtime Environment (build 22+36-2370)
Java HotSpot(TM) 64-Bit Server VM (build 22+36-2370, mixed mode, sharing)


In [2]:

import altair as alt

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

import pyspark.sql.functions as F
import pyspark.sql.types as T 
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier, RandomForestClassifier, GBTClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

import warnings
warnings.filterwarnings('ignore')

In [3]:
# initialize sparkSession
spark = SparkSession.builder.config("spark.executor.memory","2g").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

In [4]:
spark.catalog.clearCache()

## 2. Loading and Cleaning the Data

In [5]:
file_names_range = list(range(2009, 2016))
file_paths = [f'{file}.csv' for file in file_names_range]

In [6]:
schema = T.StructType([
    T.StructField("FL_DATE", T.TimestampType(), nullable=True),
    T.StructField("OP_CARRIER", T.StringType(), nullable=True),
    T.StructField("OP_CARRIER_FL_NUM", T.IntegerType(), nullable=True),
    T.StructField("ORIGIN", T.StringType(), nullable=True),
    T.StructField("DEST", T.StringType(), nullable=True),
    T.StructField("CRS_DEP_TIME", T.DoubleType(), nullable=True),
    T.StructField("DEP_TIME", T.DoubleType(), nullable=True),
    T.StructField("DEP_DELAY", T.DoubleType(), nullable=True),
    T.StructField("TAXI_OUT", T.DoubleType(), nullable=True),
    T.StructField("WHEELS_OFF", T.DoubleType(), nullable=True),
    T.StructField("WHEELS_ON", T.DoubleType(), nullable=True),
    T.StructField("TAXI_IN", T.DoubleType(), nullable=True),
    T.StructField("CRS_ARR_TIME", T.DoubleType(), nullable=True),
    T.StructField("ARR_TIME",T.DoubleType(), nullable=True),
    T.StructField("ARR_DELAY", T.DoubleType(), nullable=True),
    T.StructField("CANCELLED", T.DoubleType(), nullable=True),
    T.StructField("CANCELLATION_CODE", T.StringType(), nullable=True),
    T.StructField("DIVERTED", T.DoubleType(), nullable=True),
    T.StructField("CRS_ELAPSED_TIME", T.DoubleType(), nullable=True),
    T.StructField("ACTUAL_ELAPSED_TIME", T.DoubleType(), nullable=True),
    T.StructField("AIR_TIME", T.DoubleType(), nullable=True),
    T.StructField("DISTANCE", T.DoubleType(), nullable=True),
    T.StructField("CARRIER_DELAY", T.DoubleType(), nullable=True),
    T.StructField("WEATHER_DELAY", T.DoubleType(), nullable=True),
    T.StructField("NAS_DELAY", T.DoubleType(), nullable=True),
    T.StructField("SECURITY_DELAY", T.DoubleType(), nullable=True),
    T.StructField("LATE_AIRCRAFT_DELAY", T.DoubleType(), nullable=True),
    T.StructField("Unnamed: 27", T.StringType(), nullable=True)
])

In [7]:
df = spark.read.schema(schema).format("csv").option("header", "true").load(file_paths)

In [8]:
from pyspark.sql.functions import col, count, when

# Calculate the count of nulls for each column
null_counts = df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns])

# Show the result
null_counts.show(vertical=True)

-RECORD 0-----------------------
 FL_DATE             | 0        
 OP_CARRIER          | 0        
 OP_CARRIER_FL_NUM   | 0        
 ORIGIN              | 0        
 DEST                | 0        
 CRS_DEP_TIME        | 1        
 DEP_TIME            | 679642   
 DEP_DELAY           | 679642   
 TAXI_OUT            | 700508   
 WHEELS_OFF          | 700508   
 WHEELS_ON           | 725252   
 TAXI_IN             | 725251   
 CRS_ARR_TIME        | 2        
 ARR_TIME            | 725252   
 ARR_DELAY           | 809587   
 CANCELLED           | 0        
 CANCELLATION_CODE   | 42343168 
 DIVERTED            | 0        
 CRS_ELAPSED_TIME    | 37       
 ACTUAL_ELAPSED_TIME | 809588   
 AIR_TIME            | 809587   
 DISTANCE            | 0        
 CARRIER_DELAY       | 35006921 
 WEATHER_DELAY       | 35006921 
 NAS_DELAY           | 35006921 
 SECURITY_DELAY      | 35006921 
 LATE_AIRCRAFT_DELAY | 35006921 
 Unnamed: 27         | 43051239 



In [9]:
# remove null values from the cols used for classification:
df = df.dropna(subset= [
    'FL_DATE',
 'OP_CARRIER',
 'OP_CARRIER_FL_NUM',
 'ORIGIN',
 'DEST',
 'CRS_DEP_TIME',
 'CRS_ARR_TIME',
 'CANCELLED',
 'DIVERTED',
 'CRS_ELAPSED_TIME',
 'DISTANCE'])

# save df for analysis
analysis_df = df

In [10]:
# drop the cols who indirectly indicate if a flight is canelled or not (apart from the column CANCELLED)
# most of those cols contain null values, if the flight is cancelled

classify_df = df.drop("Unnamed: 27", 
                        "CARRIER_DELAY", 
                        "WEATHER_DELAY",
                        "NAS_DELAY",
                        "SECURITY_DELAY",
                        "LATE_AIRCRAFT_DELAY",
                        "CANCELLATION_CODE",
                        "DEP_TIME",
                        "DEP_DELAY",
                        "TAXI_OUT",
                        "WHEELS_OFF",
                        "WHEELS_ON",
                        "TAXI_IN",
                        "ARR_TIME",
                        "ARR_DELAY",
                        "ACTUAL_ELAPSED_TIME", 
                        "AIR_TIME")

In [11]:
# numerical timestamp column
classify_df = classify_df.withColumn("FL_DATE", F.unix_timestamp("FL_DATE"))

In [12]:
classify_df.columns

['FL_DATE',
 'OP_CARRIER',
 'OP_CARRIER_FL_NUM',
 'ORIGIN',
 'DEST',
 'CRS_DEP_TIME',
 'CRS_ARR_TIME',
 'CANCELLED',
 'DIVERTED',
 'CRS_ELAPSED_TIME',
 'DISTANCE']

In [13]:
# Take a subset: either balanced (with subsampling) or unbalanced
# we take a subset, because of memory limitations

# select subsample of positive samples
pos_df = classify_df.filter(F.col('CANCELLED').isin(1)).sample(fraction=0.1)
# select an equal amount of negative samples (number of neg samples == number of pos samples)
neg_df = classify_df.filter(F.col('CANCELLED').isin(0)).orderBy(F.rand()).limit(pos_df.count())


# balanced df - a subset - around 141k
classify_df = pos_df.union(neg_df).sample(fraction=1.0).cache()

# unbalanced df - but a subset - around 215k
#classify_df = classify_df.sample(fraction=0.005).cache() 

In [14]:
#classify_df.rdd.countApprox(timeout = 1000,confidence = 0.90)

## 3. Analysis (on the analysis_df)

In [15]:
# get the most present flight carriers
carriers_flight_count_df = analysis_df.groupBy(F.col('OP_CARRIER')).count().orderBy(F.col('count').desc())
top_10 = carriers_flight_count_df.limit(10).toPandas()
top_10 = top_10.rename(columns={'OP_CARRIER':'Carrier'})
top_10

Unnamed: 0,Carrier,count
0,WN,8115099
1,DL,5046291
2,OO,4176455
3,AA,3954500
4,EV,3693024
5,UA,3075545
6,MQ,2920259
7,US,2657286
8,B6,1597829
9,FL,1218388


In [16]:
# visualisation
chart = alt.Chart(top_10).mark_arc(outerRadius=260, innerRadius=75).encode(
    theta = alt.Theta(field="count", type="quantitative", stack=True),
    color = alt.Color('Carrier:N', scale=alt.Scale(scheme='category20'), legend=None),
).properties(
    title='Top 10 Carriers by amount of flights',
    width=600,
    height=300
)

pie = chart.mark_arc(outerRadius=350)
value_text = pie.mark_text(radius=300, size=15).encode(text=alt.Text('count:Q'))

pie2 = chart.mark_arc(outerRadius=250)
text = pie2.mark_text(radius=200, size=15).encode(
    text=alt.Text('Carrier:N'), 
    color=alt.value("#000000")
)

(chart + text + value_text).configure_view(
    strokeWidth=0
).configure_title(
    fontSize=18
)

In [17]:
# count number of cancellations per code/reason
carriers_flight_count_df = analysis_df.filter(F.col('CANCELLATION_CODE').isNotNull()).groupBy(F.col('CANCELLATION_CODE')).count()
cancellation_reasons = carriers_flight_count_df.toPandas()
cancellation_reasons

Unnamed: 0,CANCELLATION_CODE,count
0,B,331529
1,C,129128
2,A,247074
3,D,319


In [18]:
# rename col values
cancellation_reasons['CANCELLATION_CODE'][cancellation_reasons['CANCELLATION_CODE'] == 'A'] = 'By carrier'
cancellation_reasons['CANCELLATION_CODE'][cancellation_reasons['CANCELLATION_CODE'] == 'B'] = 'Due to weather'
cancellation_reasons['CANCELLATION_CODE'][cancellation_reasons['CANCELLATION_CODE'] == 'C'] = 'By national air system'
cancellation_reasons['CANCELLATION_CODE'][cancellation_reasons['CANCELLATION_CODE'] == 'D'] = 'For security'
cancellation_reasons = cancellation_reasons.rename(columns={'CANCELLATION_CODE':'Reason'})

In [19]:
cancellation_reasons

Unnamed: 0,Reason,count
0,Due to weather,331529
1,By national air system,129128
2,By carrier,247074
3,For security,319


In [20]:
# visualisation of calcellation reasons
chart = alt.Chart(cancellation_reasons).mark_arc(outerRadius=180, innerRadius=50).encode(
    theta = alt.Theta(field="count", type="quantitative", stack=True),
    color = alt.Color('Reason:N', scale=alt.Scale(scheme='category20'), legend=None),
).properties(
    title='Reasons for flight cancellations',
    width=600,
    height=300
)

pie = chart.mark_arc(outerRadius=250)
value_text = pie.mark_text(radius=220, size=15).encode(text=alt.Text('count:Q'))

pie2 = chart.mark_arc(outerRadius=150)
text = pie2.mark_text(radius=120, size=12).encode(
    text=alt.Text('Reason:N'), 
    color=alt.value("#000000")
)

(chart + text + value_text).configure_view(
    strokeWidth=0
).configure_title(
    fontSize=18
)