In [1]:
import os

os.environ['SPARK_HOME'] = r"E:\app\spark\spark-3.5.5-bin-hadoop3"
os.environ['PYSPARK_DRIVER_PYHTON'] = 'jupyter'
os.environ['PYSPARK_DRIVER_PYTHON_OPTS']='lab'
os.environ['PYSPARK_PYTHON']='python'


In [4]:
import kagglehub

# Download latest version
path = kagglehub.dataset_download("yuanyuwendymu/airline-delay-and-cancellation-data-2009-2018")

print("Path to dataset files:", path)


  from .autonotebook import tqdm as notebook_tqdm


Downloading from https://www.kaggle.com/api/v1/datasets/download/yuanyuwendymu/airline-delay-and-cancellation-data-2009-2018?dataset_version_number=1...


100%|██████████| 1.95G/1.95G [17:03<00:00, 2.04MB/s]

Extracting files...





Path to dataset files: C:\Users\Farhan\.cache\kagglehub\datasets\yuanyuwendymu\airline-delay-and-cancellation-data-2009-2018\versions\1


In [2]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

import pyspark.sql.functions as F
import pyspark.sql.types as T 
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier, RandomForestClassifier, GBTClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

import warnings
warnings.filterwarnings('ignore')

In [3]:
spark = SparkSession.builder.config("spark.executor.memory","2g").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

In [4]:
file_names_range = list(range(2009, 2018))
file_paths = [fr'C:\Users\Farhan\.cache\kagglehub\datasets\yuanyuwendymu\airline-delay-and-cancellation-data-2009-2018\versions\1\{file}.csv' for file in file_names_range]

In [5]:
schema = T.StructType([
    T.StructField("FL_DATE", T.TimestampType(), nullable=True),
    T.StructField("OP_CARRIER", T.StringType(), nullable=True),
    T.StructField("OP_CARRIER_FL_NUM", T.IntegerType(), nullable=True),
    T.StructField("ORIGIN", T.StringType(), nullable=True),
    T.StructField("DEST", T.StringType(), nullable=True),
    T.StructField("CRS_DEP_TIME", T.DoubleType(), nullable=True),
    T.StructField("DEP_TIME", T.DoubleType(), nullable=True),
    T.StructField("DEP_DELAY", T.DoubleType(), nullable=True),
    T.StructField("TAXI_OUT", T.DoubleType(), nullable=True),
    T.StructField("WHEELS_OFF", T.DoubleType(), nullable=True),
    T.StructField("WHEELS_ON", T.DoubleType(), nullable=True),
    T.StructField("TAXI_IN", T.DoubleType(), nullable=True),
    T.StructField("CRS_ARR_TIME", T.DoubleType(), nullable=True),
    T.StructField("ARR_TIME",T.DoubleType(), nullable=True),
    T.StructField("ARR_DELAY", T.DoubleType(), nullable=True),
    T.StructField("CANCELLED", T.DoubleType(), nullable=True),
    T.StructField("CANCELLATION_CODE", T.StringType(), nullable=True),
    T.StructField("DIVERTED", T.DoubleType(), nullable=True),
    T.StructField("CRS_ELAPSED_TIME", T.DoubleType(), nullable=True),
    T.StructField("ACTUAL_ELAPSED_TIME", T.DoubleType(), nullable=True),
    T.StructField("AIR_TIME", T.DoubleType(), nullable=True),
    T.StructField("DISTANCE", T.DoubleType(), nullable=True),
    T.StructField("CARRIER_DELAY", T.DoubleType(), nullable=True),
    T.StructField("WEATHER_DELAY", T.DoubleType(), nullable=True),
    T.StructField("NAS_DELAY", T.DoubleType(), nullable=True),
    T.StructField("SECURITY_DELAY", T.DoubleType(), nullable=True),
    T.StructField("LATE_AIRCRAFT_DELAY", T.DoubleType(), nullable=True),
    T.StructField("Unnamed: 27", T.StringType(), nullable=True)
])

In [6]:
df = spark.read.schema(schema).format("csv").option("header", "true").load(file_paths)

In [7]:
# remove null values from the cols used for classification:
df = df.dropna(subset= [
    'FL_DATE',
 'OP_CARRIER',
 'OP_CARRIER_FL_NUM',
 'ORIGIN',
 'DEST',
 'CRS_DEP_TIME',
 'CRS_ARR_TIME',
 'CANCELLED',
 'DIVERTED',
 'CRS_ELAPSED_TIME',
 'DISTANCE'])

# save df for analysis
analysis_df = df

In [8]:

classify_df = df.drop("Unnamed: 27", 
                        "CARRIER_DELAY", 
                        "WEATHER_DELAY",
                        "NAS_DELAY",
                        "SECURITY_DELAY",
                        "LATE_AIRCRAFT_DELAY",
                        "CANCELLATION_CODE",
                        "DEP_TIME",
                        "DEP_DELAY",
                        "TAXI_OUT",
                        "WHEELS_OFF",
                        "WHEELS_ON",
                        "TAXI_IN",
                        "ARR_TIME",
                        "ARR_DELAY",
                        "ACTUAL_ELAPSED_TIME", 
                        "AIR_TIME")

In [9]:
classify_df = classify_df.withColumn("FL_DATE", F.unix_timestamp("FL_DATE"))

In [10]:
classify_df.columns

['FL_DATE',
 'OP_CARRIER',
 'OP_CARRIER_FL_NUM',
 'ORIGIN',
 'DEST',
 'CRS_DEP_TIME',
 'CRS_ARR_TIME',
 'CANCELLED',
 'DIVERTED',
 'CRS_ELAPSED_TIME',
 'DISTANCE']

In [11]:
pos_df = classify_df.filter(F.col('CANCELLED').isin(1)).sample(fraction=0.1)
# select an equal amount of negative samples (number of neg samples == number of pos samples)
neg_df = classify_df.filter(F.col('CANCELLED').isin(0)).orderBy(F.rand()).limit(pos_df.count())


# balanced df - a subset - around 141k
classify_df = pos_df.union(neg_df).sample(fraction=1.0).cache()


In [12]:
carriers_flight_count_df = analysis_df.groupBy(F.col('OP_CARRIER')).count().orderBy(F.col('count').desc())
top_10 = carriers_flight_count_df.limit(10).toPandas()
top_10 = top_10.rename(columns={'OP_CARRIER':'Carrier'})
top_10

Unnamed: 0,Carrier,count
0,WN,10743987
1,DL,6892597
2,AA,5765343
3,OO,5488915
4,EV,4523555
5,UA,4205093
6,MQ,2920259
7,US,2657286
8,B6,2178956
9,AS,1414778


In [14]:

import altair as alt
chart = alt.Chart(top_10).mark_arc(outerRadius=260, innerRadius=75).encode(
    theta = alt.Theta(field="count", type="quantitative", stack=True),
    color = alt.Color('Carrier:N', scale=alt.Scale(scheme='category20'), legend=None),
).properties(
    title='Top 10 Carriers by amount of flights',
    width=600,
    height=300
)

pie = chart.mark_arc(outerRadius=350)
value_text = pie.mark_text(radius=300, size=15).encode(text=alt.Text('count:Q'))

pie2 = chart.mark_arc(outerRadius=250)
text = pie2.mark_text(radius=200, size=15).encode(
    text=alt.Text('Carrier:N'), 
    color=alt.value("#000000")
)

(chart + text + value_text).configure_view(
    strokeWidth=0
).configure_title(
    fontSize=18
)

In [15]:



carriers_flight_count_df = analysis_df.filter(F.col('CANCELLATION_CODE').isNotNull()).groupBy(F.col('CANCELLATION_CODE')).count()
cancellation_reasons = carriers_flight_count_df.toPandas()
cancellation_reasons

Unnamed: 0,CANCELLATION_CODE,count
0,B,414449
1,C,155532
2,A,285946
3,D,664


In [16]:
cancellation_reasons['CANCELLATION_CODE'][cancellation_reasons['CANCELLATION_CODE'] == 'A'] = 'By carrier'
cancellation_reasons['CANCELLATION_CODE'][cancellation_reasons['CANCELLATION_CODE'] == 'B'] = 'Due to weather'
cancellation_reasons['CANCELLATION_CODE'][cancellation_reasons['CANCELLATION_CODE'] == 'C'] = 'By national air system'
cancellation_reasons['CANCELLATION_CODE'][cancellation_reasons['CANCELLATION_CODE'] == 'D'] = 'For security'
cancellation_reasons = cancellation_reasons.rename(columns={'CANCELLATION_CODE':'Reason'})

In [17]:
cancellation_reasons

Unnamed: 0,Reason,count
0,Due to weather,414449
1,By national air system,155532
2,By carrier,285946
3,For security,664


In [19]:
chart = alt.Chart(cancellation_reasons).mark_arc(outerRadius=180, innerRadius=50).encode(
    theta = alt.Theta(field="count", type="quantitative", stack=True),
    color = alt.Color('Reason:N', scale=alt.Scale(scheme='category20'), legend=None),
).properties(
    title='Reasons for flight cancellations',
    width=600,
    height=300
)

pie = chart.mark_arc(outerRadius=250)
value_text = pie.mark_text(radius=220, size=15).encode(text=alt.Text('count:Q'))

pie2 = chart.mark_arc(outerRadius=150)
text = pie2.mark_text(radius=120, size=12).encode(
    text=alt.Text('Reason:N'), 
    color=alt.value("#000000")
)

(chart + text + value_text).configure_view(
    strokeWidth=0
).configure_title(
    fontSize=18
)