# Data import
- Set SPARK_HOME correctly before running the notebook (or comment it out)
- These files should be in the same directory as this notebook:
  - `full_data_flightdelay.csv`
  - `airport_weather_2019.csv`
  - `airports.txt` - Official airport names with their display names

In [3]:
import os
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType, DoubleType, TimestampType
from pyspark.sql.functions import col, to_date, concat, lit
os.environ["SPARK_HOME"] = "/home/hel/.local/lib/python3.10/site-packages/pyspark/"
os.environ["PYSPARK_DRIVER_PYTHON"] = "jupyter"
os.environ["PYSPARK_DRIVER_PYTHON_OPTS"] = "notebook"


In [4]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local") \
    .appName("Data import and cleanup") \
    .config("spark.executor.memory", "6g") \
    .getOrCreate()


24/05/09 11:10:40 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [5]:
from pyspark.sql.functions import col, dayofweek,to_date, month, count, avg
from pyspark.sql import Window
from pyspark.sql.functions import row_number,   sum, when


# Load the CSV file into a DataFrame
csv_file_path_flightdelay = "./full_data_flightdelay.csv"  # Replace with the path to your CSV file


df_flightdelay = spark.read.option("delimiter", ",").option("header", "true").csv(csv_file_path_flightdelay)


# Read the CSV file using the manually defined schema
csv_file_path_weather = "./airport_weather_2019.csv"  # Replace with your file path
df_weather = spark.read.option("delimiter", ",").option("header", "true").csv(csv_file_path_weather)




# Data cleanup and preparation

In [6]:
from pyspark.sql.functions import coalesce

# create new column for month and day_of_week values derived from date
# put date into same format
df_day_column = df_weather.withColumn("DATE_NEW", to_date(col("DATE"), "M/d/yyyy"))
df_day_column = df_day_column.withColumn("DATE_NEW", coalesce(df_day_column["DATE_NEW"], to_date(df_day_column["DATE"], 'yyyy-MM-dd')))

# add day of week and month column to weather
df_day_column = df_day_column.withColumn("DAY_OF_WEEK", dayofweek(col("DATE_NEW").alias("DAY_OF_WEEK")))
df_day_column = df_day_column.withColumn("MONTH", month(col("DATE_NEW").alias("MONTH")))

df_day_column.createOrReplaceTempView("table1")
df_select = spark.sql("SELECT STATION, NAME,DAY_OF_WEEK,DATE, MONTH, AWND, PRCP, SNOW, SNWD, TAVG, TMAX, TMIN, WDF2 from table1")


grouped_df = df_select.groupBy("MONTH", "NAME").agg(
    avg("AWND").alias("AWND"),
    avg("PRCP").alias("PRCP"),
    avg("SNOW").alias("SNOW"),
    avg("SNWD").alias("SNWD"),
    avg("TAVG").alias("TAVG"),
    avg("TMAX").alias("TMAX"),
    avg("TMIN").alias("TMIN"),
    avg("WDF2").alias("WDF2")
).orderBy("NAME","MONTH")



24/05/09 11:10:43 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [7]:
from pyspark.sql.functions import lower, split, col, lit, monotonically_increasing_id


# Normalize joining columns
grouped_df = grouped_df.withColumn("normalized_name", lower(col("name")))
df_flightdelay = df_flightdelay.withColumn("normalized_name", lower(split(col("departing_airport"), " ").getItem(0)))

# Group by to investigate
grouped_df_nn = grouped_df.groupBy("normalized_name").agg(
    count('*').alias('count')
)

grouped_df_name = grouped_df.groupBy("NAME").agg(
    count('*').alias('count')
)

In [8]:
# For 'grouped_df', transforming 'NAME' to lowercase and dropping duplicates based on the 'name' column
grouped_df_lower = grouped_df.select(lower(col("NAME")).alias("name")).dropDuplicates(['name'])

# For 'df_flightdelay', transforming 'DEPARTING_AIRPORT' to lowercase, casting it to string, and dropping duplicates based on the 'departing_airport' column
df_flightdelay_lower = df_flightdelay.select(lower(col("DEPARTING_AIRPORT")).alias("departing_airport")).dropDuplicates(['departing_airport'])



#join providing table that contain in the name column all distinct airports 
#from weather dataset and under departing_flight all distinc airports from delay dataset

result_df = df_flightdelay_lower.alias("flight").join(
    grouped_df_lower.alias("grouped"),
    (col("grouped.name").contains(col("flight.departing_airport"))),
    "inner"
).select(
    col("flight.departing_airport").alias("departing_airport"),
    col("grouped.name").alias("name")
)

### Modify dataframe such that `df_result` will contain the airports matched on join and enhanced results will contain the `df` of unmatched airports for each dataset

In [9]:


# Identifying non-matched entries
non_matched_flight = df_flightdelay_lower.alias("flight").join(
    result_df.alias("result"),
    result_df.departing_airport == df_flightdelay_lower.departing_airport,
    "left_anti"
)

non_matched_grouped = grouped_df_lower.alias("grouped").join(
    result_df.alias("result"),
    result_df.name == grouped_df_lower.name,
    "left_anti"
)


# Add a unique ID to each DataFrame to facilitate the outer join
result_df = result_df.withColumn("id", monotonically_increasing_id())
non_matched_flight = non_matched_flight.withColumn("id", monotonically_increasing_id())
non_matched_grouped = non_matched_grouped.withColumn("id", monotonically_increasing_id())


# Perform the outer joins using the unique IDs, result_df is now composed of matched airports
enhanced_result_df = result_df.join(non_matched_flight, "id", "outer" ).join(non_matched_grouped, "id", "outer" )
enhanced_result_df = enhanced_result_df.drop("id")


# Select columns, get rid of duplicates
selected_columns = [col for col in enhanced_result_df.columns if col != 'name' and col != 'departing_airport'] + ['grouped.name'] + ['flight.departing_airport']

#will contain unmatched airports for each dataset
enhanced_result_df = enhanced_result_df.select(selected_columns)
enhanced_result_df.drop('name','departing_airport')



DataFrame[]

In [10]:
# create dataframe that contains airports matched and unmatched result from the join
# Rename columns in result_df
result_df = result_df.withColumnRenamed("name", "weather_matched") \
                     .withColumnRenamed("departing_airport", "delay_matched")

# Rename columns in enhanced_result_df
enhanced_result_df = enhanced_result_df.withColumnRenamed("name", "weather_unmatched") \
                                       .withColumnRenamed("departing_airport", "delay_unmatched")

# Optional: If you need to ensure the rows are matched by order, add an index column to each DataFrame
result_df = result_df.withColumn("index", monotonically_increasing_id())
enhanced_result_df = enhanced_result_df.withColumn("index", monotonically_increasing_id())

# Join DataFrames on the index column
matched_and_unmatched_airports = result_df.join(
    enhanced_result_df,
    on="index",
    how="outer"  # Use "outer" to include all rows from both DataFrames
)

# Drop the index column as it's no longer needed after joining
matched_and_unmatched_airports = matched_and_unmatched_airports.drop("index", 'name', 'departing_airport')


In [11]:
import pandas as pd

# Initialize a list to store the parsed data
data = []

# Open the text file and parse it line by line
with open('./airports.txt', 'r') as file:
    for line in file:
        # Split the line by comma to extract the needed parts
        parts = line.split(',')
        
        # Check if the line has enough parts to avoid index errors
        if len(parts) >= 4:
            # Extract and clean the desired parts
            # Remove quotation marks and extra spaces if present
            name = parts[1].strip('"').strip()
            city = parts[2].strip('"').strip()
            country = parts[3].strip('"').strip()
            
            # Combine the first two parts into one column, and keep the country as the second column
            combined = f"{name}, {city}, {country}"
            data.append(combined)

# Create a DataFrame from the list
df_airports = pd.DataFrame(data, columns=['Airport and City'])


In [12]:
!pip install fuzzywuzzy
!pip install python-Levenshtein

Defaulting to user installation because normal site-packages is not writeable
Defaulting to user installation because normal site-packages is not writeable


In [13]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Get all airport names into one table
df_delay_unique_airport = df_flightdelay.select('DEPARTING_AIRPORT').distinct().withColumnRenamed("DEPARTING_AIRPORT", "airport")
df_weather_unique_airport = df_weather.select('NAME').distinct().withColumnRenamed("NAME", "airport")

df_union = df_delay_unique_airport.union(df_weather_unique_airport)

def filter_out_useless_parts_of_string(airport_name):
    useless_words = ["international", "airport", "regional"]

    modified = airport_name.lower()
    for word in useless_words:
        modified = modified.replace(word,"")
        
    return modified
    
filter_string_udf = udf(filter_out_useless_parts_of_string, StringType())

df_union = df_union.select(filter_string_udf(col('airport'))).withColumnRenamed("filter_out_useless_parts_of_string(airport)", "airport")

In [14]:
from fuzzywuzzy import process, fuzz

def get_matches(df1, col1, df2, col2, threshold=40):
    # Convert each column to a list for processing, ensuring to drop NA values
    list1 = df1[col1].dropna().tolist()
    list2 = df2[col2].dropna().tolist()

    # Find best matches with a score above the threshold
    matches = []
    for item in list1:
        # Use process.extractOne to find the best match for each item from list1 in list2
        best_match = process.extractOne(item, list2, scorer=fuzz.token_set_ratio)
        if best_match and best_match[1] >= threshold:
            matches.append((item, best_match[0], best_match[1]))

    # Return matches as a DataFrame for better visualization
    return pd.DataFrame(matches, columns=[col1, col2 + '_match', 'Score'])

In [15]:
# Add official airports using fuzzy matching
# Assuming 'matched_and_unmatched_airports' is your PySpark DataFrame
pandas_df = df_union.toPandas()  # Convert to Pandas DataFrame

# Example usage (ensure df1 and df2 are already defined and loaded with your data)
df_matches_airports = get_matches(pandas_df, 'airport', df_airports, 'Airport and City')

spark_df_airports = spark.createDataFrame(df_matches_airports)

                                                                                

In [16]:
joining_table = spark_df_airports.select("airport", "Airport and City_match")
delay_table = df_flightdelay.withColumn("DEPARTING_AIRPORT", filter_string_udf('DEPARTING_AIRPORT'))
weather_table = df_day_column.withColumn("NAME", filter_string_udf('NAME'))

delay_joined = delay_table.join(joining_table, joining_table.airport == delay_table.DEPARTING_AIRPORT, 'inner')
weather_joined = weather_table.join(joining_table, joining_table.airport == weather_table.NAME, 'inner')

In [17]:
# Convert integer columns in df1 to strings
weather_joined = weather_joined.withColumn("MONTH", col("MONTH").cast("string")) \
         .withColumn("DAY_OF_WEEK", col("DAY_OF_WEEK").cast("string"))

result_joined = delay_joined.join(weather_joined, ["MONTH","DAY_OF_WEEK","Airport and City_match"], 'inner')

### Drop unnecessary columns


In [18]:
# LATITUDE, LONGITUDE, STATION, MONTH, airport, normalized_name, NAME, _c0
result_joined = result_joined.drop("LATITUDE", "LONGITUDE", "STATION", "MONTH", \
                                   "airport", "normalized_name", "NAME", "_c0", \
                                   "DATE", "AIRLINE_FLIGHTS_MONTH", "AVG_MONTHLY_PASS_AIRLINE" \
                                   "DEPARTING_AIRPORT", "PGTM", "WDF5", "WDF2", "WSF2", "WSF5", \
                                   "SN32", "SX32", "TOBS","WESD", "PSUN","TSUN")

df = result_joined

In [19]:
df = df.withColumn("TMIN", df["TMIN"].cast("float"))
df = df.withColumn("PRCP", df["PRCP"].cast("float"))


df = df.withColumn("SNOW", when(col("TMIN") > 3, 0).otherwise(col("SNOW")))
df = df.withColumn("SNOW", when( \
                    (col("SNOW").isNull() | (col("SNOW") == '')) & (col("PRCP") > 0), col("PRCP") \
                               ).otherwise(lit(0)))


In [20]:
# Simplify temperatures, create new flag column EXTREME_WEATHER based on TMIN and TMAX and drop all others


# Since TMAX and TMIN are strings, you need to convert them to integers before comparison
df = df.withColumn("TMAX", df["TMAX"].cast("integer"))
df = df.withColumn("TMIN", df["TMIN"].cast("integer"))

# Creating the EXTREME_WEATHER column based on the conditions provided
df = df.withColumn("EXTREME_WEATHER", 
                   when((col("TMAX") > 40) | (col("TMIN") < 0), 1)
                   .otherwise(0))

df = df.drop("TMIN", "TMAX", "TAVG")

In [21]:
from pyspark.sql.functions import expr

# Replace all WT** with column which adds these extreme weather conditions into one value
values_as_strings = [f"WT{i:02}" for i in range(1, 12)]

for column_name in values_as_strings:
    df = df.withColumn(column_name, df[column_name].cast("integer"))

In [22]:
from functools import reduce

total_wt_column = reduce(lambda a, b: a + b, [coalesce(col(c), lit(0)) for c in values_as_strings])

df = df.withColumn('EXTREME_WEATHER_WT', total_wt_column)

df = df.drop('WT01', 'WT02', 'WT03', 'WT04', 'WT05', 'WT06', 'WT07', 'WT08', 'WT09', 'WT10', 'WT11')



In [23]:
df.printSchema()

root
 |-- DAY_OF_WEEK: string (nullable = true)
 |-- Airport and City_match: string (nullable = true)
 |-- DEP_DEL15: string (nullable = true)
 |-- DEP_TIME_BLK: string (nullable = true)
 |-- DISTANCE_GROUP: string (nullable = true)
 |-- SEGMENT_NUMBER: string (nullable = true)
 |-- CONCURRENT_FLIGHTS: string (nullable = true)
 |-- NUMBER_OF_SEATS: string (nullable = true)
 |-- CARRIER_NAME: string (nullable = true)
 |-- AIRPORT_FLIGHTS_MONTH: string (nullable = true)
 |-- AIRLINE_AIRPORT_FLIGHTS_MONTH: string (nullable = true)
 |-- AVG_MONTHLY_PASS_AIRPORT: string (nullable = true)
 |-- AVG_MONTHLY_PASS_AIRLINE: string (nullable = true)
 |-- FLT_ATTENDANTS_PER_PASS: string (nullable = true)
 |-- GROUND_SERV_PER_PASS: string (nullable = true)
 |-- PLANE_AGE: string (nullable = true)
 |-- DEPARTING_AIRPORT: string (nullable = true)
 |-- PREVIOUS_AIRPORT: string (nullable = true)
 |-- AWND: string (nullable = true)
 |-- PRCP: float (nullable = true)
 |-- SNOW: float (nullable = true)
 |-

### Count null values

In [24]:
from pyspark.sql.functions import col, when, count

# Define a function to count nulls and empty strings
def count_nulls_and_empties(df):
    # Use aggregation to sum up each condition of being null or empty across all columns
    exprs = [count(when(df[c].isNull() | (df[c] == ""), c)).alias(c) for c in df.columns]
    return df.agg(*exprs)


## Fill in missing values, for example AWND and PRCP


In [25]:
# Fill in missing values, for example AWND and PRCP

from pyspark.sql.functions import avg, col, coalesce, month, median
from pyspark.sql.window import Window

# Define a window spec partitioned by month
window_spec = Window.partitionBy(month("DATE_NEW"))

# Assuming 'column_name' is the column with null values you want to fill
avg_column = avg(col("AWND")).over(window_spec)
avg_prcp = avg(col("PRCP")).over(window_spec)
med_column = median(col("FLT_ATTENDANTS_PER_PASS")).over(window_spec)


# Replace nulls with the average of that month
df = df.withColumn("AWND_filled", coalesce(col("AWND"), avg_column))
df = df.withColumn("PRCP_filled", coalesce(col("PRCP"), avg_prcp))
df = df.withColumn("DISTANCE_GROUP_filled", coalesce(col("DISTANCE_GROUP"), lit("1")))
df = df.withColumn("FLT_ATTENDANTS_PER_PASS_filled", coalesce(col("FLT_ATTENDANTS_PER_PASS"), med_column))


df = df.drop("AWND").withColumnRenamed("AWND_filled", "AWND")
df = df.drop("PRCP").withColumnRenamed("PRCP_filled", "PRCP")
df = df.drop("DISTANCE_GROUP").withColumnRenamed("DISTANCE_GROUP_filled", "DISTANCE_GROUP")
df = df.drop("FLT_ATTENDANTS_PER_PASS").withColumnRenamed("FLT_ATTENDANTS_PER_PASS_filled", "FLT_ATTENDANTS_PER_PASS")

missing_values_df = df


In [26]:
df.printSchema()

root
 |-- DAY_OF_WEEK: string (nullable = true)
 |-- Airport and City_match: string (nullable = true)
 |-- DEP_DEL15: string (nullable = true)
 |-- DEP_TIME_BLK: string (nullable = true)
 |-- SEGMENT_NUMBER: string (nullable = true)
 |-- CONCURRENT_FLIGHTS: string (nullable = true)
 |-- NUMBER_OF_SEATS: string (nullable = true)
 |-- CARRIER_NAME: string (nullable = true)
 |-- AIRPORT_FLIGHTS_MONTH: string (nullable = true)
 |-- AIRLINE_AIRPORT_FLIGHTS_MONTH: string (nullable = true)
 |-- AVG_MONTHLY_PASS_AIRPORT: string (nullable = true)
 |-- AVG_MONTHLY_PASS_AIRLINE: string (nullable = true)
 |-- GROUND_SERV_PER_PASS: string (nullable = true)
 |-- PLANE_AGE: string (nullable = true)
 |-- DEPARTING_AIRPORT: string (nullable = true)
 |-- PREVIOUS_AIRPORT: string (nullable = true)
 |-- SNOW: float (nullable = true)
 |-- SNWD: string (nullable = true)
 |-- DATE_NEW: date (nullable = true)
 |-- EXTREME_WEATHER: integer (nullable = false)
 |-- EXTREME_WEATHER_WT: integer (nullable = false)


In [27]:
from pyspark.sql.functions import when, col

# Replace null values in the 'PREVIOUS_AIRPORT' column with empty strings
df = df.withColumn('PREVIOUS_AIRPORT', when(col('PREVIOUS_AIRPORT').isNull(), '').otherwise(col('PREVIOUS_AIRPORT')))

In [28]:
from pyspark.sql.functions import round

calculate_flights = df.select("CARRIER_NAME", "Airport and City_match", "DATE_NEW")

calculate_flights = calculate_flights.withColumn("MONTH", month(col("DATE_NEW").alias("MONTH")))

count = calculate_flights.groupBy("CARRIER_NAME", "Airport and City_match", "MONTH").count()

count = count.groupBy("CARRIER_NAME", "Airport and City_match") \
                   .agg(round(avg("count")).alias("monthly_avg_count"))

## Join into table combining delays with weather

In [29]:
rsdf = df.join(
    count,
    ["CARRIER_NAME", "Airport and City_match"],
    "inner"
)

rsdf = rsdf.withColumn("AIRLINE_AIRPORT_FLIGHTS_MONTH",\
                       when(\
                           (col("AIRLINE_AIRPORT_FLIGHTS_MONTH").isNull() | (col("AIRLINE_AIRPORT_FLIGHTS_MONTH") == ''))
                           , col("monthly_avg_count")).otherwise(col("AIRLINE_AIRPORT_FLIGHTS_MONTH")))

rsdf = rsdf.drop("monthly_avg_count")

#rsdf.show()


In [30]:
df = rsdf

In [31]:
# ------------------------------------
# Converting from numerical to nominal
# ------------------------------------

In [32]:
# Convert precipitation from numerical to nominal

# Calculate the quantile thresholds
#thresholds = result_df.approxQuantile("PRCP", [0.33, 0.67], 0.01)  # 0.01 is the relative error

# Categorize based on quantile thresholds
#result_df = result_df.withColumn(
#    "precip_category",
#    when(col("PRCP") <= thresholds[0], "low")
#    .when(col("PRCP") <= thresholds[1], "medium")
#    .otherwise("high")
#)

# Show the resulting DataFrame
#result_df.select("PRCP", "precip_category").show()

## Transform numerical to nominal

In [33]:
# Transform weekday from numerical to nominal

# Weekday mapping dictionary
month_dict = {
    '1': 'Monday', '2': 'Tuesday', '3': 'Wednesday', '4': 'Thursday', 
    '5': 'Friday', '6': 'Saturday', '7': 'Sunday'}

# Define the UDF to convert numerical months to names
def convert_weekday_to_name(weekday):
    return month_dict.get(str(weekday), "Unknown")

convert_weekday_udf = udf(convert_weekday_to_name, StringType())

In [34]:
# NUMBER_OF_SEATS into nominal

# Categorize based on research
df = df.withColumn(
    "NUMBER_OF_SEATS_NOM",
    when(col("NUMBER_OF_SEATS") <= 100, "Small")
    .when(col("NUMBER_OF_SEATS") <= 200, "Medium")
    .when(col("NUMBER_OF_SEATS") <= 400, "Large")
    .otherwise("Jumbo")
)
# Replace NUMBER_OF_SEATS column with the nominal one
df = df.drop("NUMBER_OF_SEATS").withColumnRenamed("NUMBER_OF_SEATS_NOM", "NUMBER_OF_SEATS")

In [35]:
# Plane age into nominal

# Categorize based on research
df = df.withColumn(
    "PLANE_AGE_NOM",
    when(col("PLANE_AGE") <= 10, "New")
    .when(col("PLANE_AGE") <= 20, "Standard")
    .otherwise("Old")
)

# Replace PLANE_AGE column with the nominal one
df = df.drop("PLANE_AGE").withColumnRenamed("PLANE_AGE_NOM", "PLANE_AGE")


this_df = df

In [36]:

df.printSchema()

root
 |-- CARRIER_NAME: string (nullable = true)
 |-- Airport and City_match: string (nullable = true)
 |-- DAY_OF_WEEK: string (nullable = true)
 |-- DEP_DEL15: string (nullable = true)
 |-- DEP_TIME_BLK: string (nullable = true)
 |-- SEGMENT_NUMBER: string (nullable = true)
 |-- CONCURRENT_FLIGHTS: string (nullable = true)
 |-- AIRPORT_FLIGHTS_MONTH: string (nullable = true)
 |-- AIRLINE_AIRPORT_FLIGHTS_MONTH: string (nullable = true)
 |-- AVG_MONTHLY_PASS_AIRPORT: string (nullable = true)
 |-- AVG_MONTHLY_PASS_AIRLINE: string (nullable = true)
 |-- GROUND_SERV_PER_PASS: string (nullable = true)
 |-- DEPARTING_AIRPORT: string (nullable = true)
 |-- PREVIOUS_AIRPORT: string (nullable = true)
 |-- SNOW: float (nullable = true)
 |-- SNWD: string (nullable = true)
 |-- DATE_NEW: date (nullable = true)
 |-- EXTREME_WEATHER: integer (nullable = false)
 |-- EXTREME_WEATHER_WT: integer (nullable = false)
 |-- AWND: string (nullable = true)
 |-- PRCP: double (nullable = true)
 |-- DISTANCE_GR

## Saving cleaned and prepared data to a CSV

In [37]:
# Save cleaned and prepared data file to a csv
df.write.csv('cleaned_flight_data.csv', header=True)

                                                                                

# Data analysis

## Load cleaned data from csv
- Data from the cleanup and preparation was saved into a csv to avoid repeated computations

In [38]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local") \
    .appName("Data analysis") \
    .config("spark.executor.memory", "6g") \
    .getOrCreate()

loaded_df = spark.read.csv("cleaned_flight_data.csv", header=True, inferSchema=True)

loaded_df.printSchema()

24/05/09 11:12:10 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.

root
 |-- CARRIER_NAME: string (nullable = true)
 |-- Airport and City_match: string (nullable = true)
 |-- DAY_OF_WEEK: integer (nullable = true)
 |-- DEP_DEL15: integer (nullable = true)
 |-- DEP_TIME_BLK: string (nullable = true)
 |-- SEGMENT_NUMBER: integer (nullable = true)
 |-- CONCURRENT_FLIGHTS: integer (nullable = true)
 |-- AIRPORT_FLIGHTS_MONTH: integer (nullable = true)
 |-- AIRLINE_AIRPORT_FLIGHTS_MONTH: double (nullable = true)
 |-- AVG_MONTHLY_PASS_AIRPORT: integer (nullable = true)
 |-- AVG_MONTHLY_PASS_AIRLINE: integer (nullable = true)
 |-- GROUND_SERV_PER_PASS: double (nullable = true)
 |-- DEPARTING_AIRPORT: string (nullable = true)
 |-- PREVIOUS_AIRPORT: string (nullable = true)
 |-- SNOW: double (nullable = true)
 |-- SNWD: double (nullable = true)
 |-- DATE_NEW: date (nullable = true)
 |-- EXTREME_WEATHER: integer (nullable = true)
 |-- EXTREME_WEATHER_WT: integer (nullable = true)
 |-- AWND: double (nullable = true)
 |-- PRCP: double (nullable = true)
 |-- DISTA

                                                                                

In [39]:
# Df checkpoint
df = loaded_df

## Create a 10% sample
- Computations on the entire data set would take too much time to work with sensibly

In [40]:
# Calculate the fraction for each unique value in 'DISTR' to maintain distribution
fractions = df.select("DEP_DEL15").distinct().rdd.flatMap(lambda x: x).collect()
fractions = {x: 0.01 for x in fractions}  # 1% sample

# Perform stratified sampling
sampled_df = df.stat.sampleBy("DEP_DEL15", fractions, seed=1)

# Show the result or perform further analysis
#sampled_df.show()
#sampled_df.count() ~ 150 251

                                                                                

In [41]:

# Df checkpoint
df = sampled_df

### Plotting of nominal columns

In [None]:
!pip install seaborn
import matplotlib.pyplot as plt
import seaborn as sns  # For better visual aesthetics

# Setting a style
sns.set(style="whitegrid")

# List of nominal columns
nominal_columns = [
    "CARRIER_NAME",
    "Airport and City_match",
    "DEP_TIME_BLK",
    "DEPARTING_AIRPORT",
    "PREVIOUS_AIRPORT",
    "DATE_NEW",
    "PLANE_AGE"
]

# Create a figure to hold the subplots
fig, axes = plt.subplots(nrows=len(nominal_columns), figsize=(12, 6 * len(nominal_columns)))  # Adjusted size for better fit

# Loop through each nominal column and create a histogram
for i, column in enumerate(nominal_columns):
    # Count the occurrences of each category in the column
    category_counts = df.groupBy(column).count().toPandas().sort_values(by='count', ascending=False)

    # If there are many categories, you might want to limit the number of categories displayed
    if len(category_counts) > 20:  # Adjust the threshold as needed
        category_counts = category_counts.head(20)  # Only show top 20 categories
    
    # Plotting the histogram for the column
    sns.barplot(x=column, y='count', data=category_counts, ax=axes[i], palette='viridis')  # Using seaborn for a better-looking plot
    axes[i].set_title(f'Histogram of {column}')
    axes[i].set_xlabel('')
    axes[i].set_ylabel('Counts')
    axes[i].tick_params(axis='x', rotation=45)  # Rotate labels to prevent overlap

# Adjust layout to prevent overlap of subplots
plt.tight_layout()
plt.show()
#
#CARRIER_NAME: 7.594988367110728e-05
#Information Gain for Airport and City_match: 8.950570449908421e-05
#Information Gain for DEP_TIME_BLK: 0.00044215567622180735
#Information Gain for DEPARTING_AIRPORT: 8.933648488036738e-05
#Information Gain for PREVIOUS_AIRPORT: 0.00036900671974516436
#Information Gain for NUMBER_OF_SEATS: 0.00012451283453875778            
#Information Gain for PLANE_AGE: -0.00014759181821908524

### Cast numeric columns to float

In [42]:
numeric_columns = [
     "DEP_DEL15", "SEGMENT_NUMBER", "CONCURRENT_FLIGHTS",
    "AIRPORT_FLIGHTS_MONTH",
    "AIRLINE_AIRPORT_FLIGHTS_MONTH", "AVG_MONTHLY_PASS_AIRPORT",
    "GROUND_SERV_PER_PASS", "SNOW", "EXTREME_WEATHER", "EXTREME_WEATHER_WT",
    "AWND", "PRCP", "DISTANCE_GROUP", "FLT_ATTENDANTS_PER_PASS","SNWD"
]

for column in numeric_columns:
    df_stat = df.withColumn(column, col(column).cast('float'))

df = df_stat

### Information gain calculations

In [None]:
from pyspark.sql.functions import col, count, udf
from pyspark.sql.types import DoubleType
from math import log2

def entropy(probability):
    """Calculate the entropy based on probability."""
    return -probability * log2(probability)

def calculate_entropy(df, column):
    """Calculate the entropy of a given DataFrame column."""
    total_count = df.count()
    probabilities = df.groupBy(column).agg((count('*') / total_count).alias('probability'))
    entropy_udf = udf(entropy, DoubleType())
    entropy_df = probabilities.withColumn('entropy_part', entropy_udf('probability'))
    total_entropy = entropy_df.agg(sqlsum('entropy_part')).collect()[0][0]
    return total_entropy if total_entropy is not None else 0

def information_gain(df, attribute, target):
    """Calculate the information gain of a column with respect to the target."""
    total_entropy = calculate_entropy(df, target)
    total_count = df.count()

    attribute_values = df.select(attribute).distinct().collect()

    conditional_entropy_sum = 0
    for value_row in attribute_values:
        value = value_row[0]
        subset_df = df.filter(col(attribute) == value)
        subset_count = subset_df.count()
        
        # Ensure subset has sufficient data to compute entropy
        if subset_count > 0:
            subset_entropy = calculate_entropy(subset_df, target)
            conditional_entropy_sum += (subset_count / total_count) * subset_entropy

    return total_entropy - conditional_entropy_sum


# Example usage
nominal_columns = ["NUMBER_OF_SEATS", "PLANE_AGE"]

information_gains = {}
for column in nominal_columns:
    ig = information_gain(df, column, 'DEP_DEL15')  # Assuming DEP_DEL15 is your target column
    information_gains[column] = ig
    print(f'Information Gain for {column}: {ig}')


# Preparation of data for model training

In [43]:
# df checkpoint
checkpoint_df = df

In [44]:
df = df.drop("DEPARTING_AIRPORT")

### Transforming nominal data into numerical
- Models generally prefer working with numbers

In [45]:
# df checkpoint
checkpoint_df_nom_to_num = df

In [46]:
df = checkpoint_df_nom_to_num
# Plane age into numerical

# Categorize based on research
df = df.withColumn(
    "PLANE_AGE_NOM",
    when(col("PLANE_AGE") == "New", 1)
    .when(col("PLANE_AGE") == "Standard", 2)
    .otherwise(3)
)

# Replace PLANE_AGE column with the nominal one
df = df.drop("PLANE_AGE").withColumnRenamed("PLANE_AGE_NOM", "PLANE_AGE")

In [47]:
# NUMBER_OF_SEATS into numerical category

# Categorize based on research
df = df.withColumn(
    "NUMBER_OF_SEATS_NOM",
    when(col("NUMBER_OF_SEATS") == "Small", 1)
    .when(col("NUMBER_OF_SEATS") == "Medium", 2)
    .when(col("NUMBER_OF_SEATS") == "Large", 3)
    .otherwise(4)
)

# Replace NUMBER_OF_SEATS column with the nominal one
df = df.drop("NUMBER_OF_SEATS").withColumnRenamed("NUMBER_OF_SEATS_NOM", "NUMBER_OF_SEATS")

In [48]:
# DAY_OF_WEEK into numerical form
from pyspark.sql.functions import col, radians, cos, sin

df = df.withColumn("day_cos", cos(radians(col("DAY_OF_WEEK") * (360/7)))+1)
df = df.withColumn("day_sin", sin(radians(col("DAY_OF_WEEK") * (360/7)))+1)

## Indexing & encoding nominal values

In [49]:
# df checkpoint
checkpoint_df_indx_encd = df

In [50]:
# Small preprocessing optimizations
df.drop("SNWD")
#cast to float what can be casted
float_columns = ["DEP_DEL15", "SEGMENT_NUMBER", "CONCURRENT_FLIGHTS", "AIRPORT_FLIGHTS_MONTH",
                 "AIRLINE_AIRPORT_FLIGHTS_MONTH", "AVG_MONTHLY_PASS_AIRPORT", "GROUND_SERV_PER_PASS",
                 "SNOW", "EXTREME_WEATHER_WT", "AWND", "PRCP", "DISTANCE_GROUP", "FLT_ATTENDANTS_PER_PASS", "day_cos","day_sin","NUMBER_OF_SEATS","AVG_MONTHLY_PASS_AIRLINE"]

for col_name in float_columns:
    df = df.withColumn(col_name, df[col_name].cast('float'))

# Check the schema of the new DataFrame
df.printSchema()

root
 |-- CARRIER_NAME: string (nullable = true)
 |-- Airport and City_match: string (nullable = true)
 |-- DAY_OF_WEEK: integer (nullable = true)
 |-- DEP_DEL15: float (nullable = true)
 |-- DEP_TIME_BLK: string (nullable = true)
 |-- SEGMENT_NUMBER: float (nullable = true)
 |-- CONCURRENT_FLIGHTS: float (nullable = true)
 |-- AIRPORT_FLIGHTS_MONTH: float (nullable = true)
 |-- AIRLINE_AIRPORT_FLIGHTS_MONTH: float (nullable = true)
 |-- AVG_MONTHLY_PASS_AIRPORT: float (nullable = true)
 |-- AVG_MONTHLY_PASS_AIRLINE: float (nullable = true)
 |-- GROUND_SERV_PER_PASS: float (nullable = true)
 |-- PREVIOUS_AIRPORT: string (nullable = true)
 |-- SNOW: float (nullable = true)
 |-- SNWD: float (nullable = true)
 |-- DATE_NEW: date (nullable = true)
 |-- EXTREME_WEATHER: integer (nullable = true)
 |-- EXTREME_WEATHER_WT: float (nullable = true)
 |-- AWND: float (nullable = true)
 |-- PRCP: float (nullable = true)
 |-- DISTANCE_GROUP: float (nullable = true)
 |-- FLT_ATTENDANTS_PER_PASS: floa

In [51]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline

# Indexing and Encoding for 'CARRIER_NAME'
carrier_name_indexer = StringIndexer(inputCol="CARRIER_NAME", outputCol="CARRIER_NAME_Index",handleInvalid="skip")
carrier_name_encoder = OneHotEncoder(inputCol="CARRIER_NAME_Index", outputCol="CARRIER_NAME_Encoded")

# Indexing and Encoding for 'PREVIOUS_AIRPORT'
#prev_airport_indexer = StringIndexer(inputCol="PREVIOUS_AIRPORT", outputCol="PREVIOUS_AIRPORT_Index")
#prev_airport_encoder = OneHotEncoder(inputCol="PREVIOUS_AIRPORT_Index", outputCol="PREVIOUS_AIRPORT_Encoded")

# Indexing and Encoding for 'Airport and City_match'
city_match_indexer = StringIndexer(inputCol="Airport and City_match", outputCol="Airport_and_City_match_Index",handleInvalid="skip")
city_match_encoder = OneHotEncoder(inputCol="Airport_and_City_match_Index", outputCol="Airport_and_City_match_Encoded")


#removed ground per pass,"AIRPORT_FLIGHTS_MONTH"
# Assemble features into a single vector
assembler = VectorAssembler(
    inputCols=[ "Airport_and_City_match_Encoded", "CARRIER_NAME_Encoded"] + ["SNOW","SEGMENT_NUMBER", "CONCURRENT_FLIGHTS", "NUMBER_OF_SEATS",
               "AIRLINE_AIRPORT_FLIGHTS_MONTH", "AVG_MONTHLY_PASS_AIRPORT",
               "AWND", "PRCP", "DISTANCE_GROUP", "FLT_ATTENDANTS_PER_PASS", "day_cos","day_sin", "EXTREME_WEATHER_WT"],
    outputCol="features",
    handleInvalid="keep"
)

# Pipeline all preprocessing steps
pipeline = Pipeline(stages=[
      city_match_indexer, city_match_encoder, carrier_name_indexer, carrier_name_encoder, assembler
])

# Fit and Transform
model = pipeline.fit(df)


                                                                                

## Feature column creation

In [52]:
df_checkpoint_ftr_clmn = df

In [53]:
transformed_df = model.transform(df)
transformed_df.drop("PREVIOUS_AIRPORT","CARRIER_NAME","Airport and City_match","DAY_OF_WEEK")

from pyspark.sql.functions import to_date

# Convert DATE_NEW to DateType
transformed_df = transformed_df.withColumn("DATE_NEW", to_date("DATE_NEW"))

# Convert EXTREME_WEATHER to FloatType (assuming it's a binary indicator)
transformed_df = transformed_df.withColumn("EXTREME_WEATHER", col("EXTREME_WEATHER").cast("float"))
k_means_df = transformed_df.select("features")

from pyspark.ml.linalg import SparseVector

def check_features_format(df):

    # Check if the 'features' column exists in the DataFrame
    if 'features' not in df.columns:
        print("Error: 'features' column not found in the DataFrame.")
        return False
    
    # Check if all values in the 'features' column are SparseVectors
    all_sparse = df.select('features').rdd.map(lambda row: isinstance(row.features, SparseVector)).reduce(lambda x, y: x and y)
    
    return all_sparse

import numpy as np
import pandas as dpd

df = k_means_df

In [167]:
df.count()

                                                                                

150251

# Model training

## Splitting data into training and testing set

In [54]:
# Data with feature column
data_with_feature_column_df = transformed_df
feature_column_only = transformed_df.select("features")


In [None]:
data_with_feature_column_df.printSchema()

In [None]:
# Making sure data does not contains values incompatible with models computations
def count_negative_vals(df):
    # Use aggregation to sum up each condition of being null or empty across all columns
    exprs = [count(when((df[c] < 0), c)).alias(c) for c in df.columns]
    return df.agg(*exprs)

from pyspark.sql.functions import udf
from pyspark.ml.linalg import VectorUDT, Vectors

# Define a UDF to check minimum values in vectors
min_element = udf(lambda v: float(min(v)), FloatType())
data_with_feature_column_df.withColumn("min_feature", min_element("features")).show()


In [56]:
# Splits the DataFrame into training (60%) and test (40%) sets
def split_data(df):
    train_data, test_data = df.randomSplit([0.6, 0.4], seed=1234)
    return train_data, test_data

# Show the size of each set
#print("Training Dataset Count: " + str(train_data.count()))
#print("Testing Dataset Count: " + str(test_data.count()))


## Decision tree model

### Prepare data

In [57]:
train_data, test_data = split_data(data_with_feature_column_df.select("features", "DEP_DEL15"))


### Fit, train, predict

In [58]:
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql.functions import col


# Decision Tree setup
dt = DecisionTreeClassifier(featuresCol="features", labelCol="DEP_DEL15")
dt_model = dt.fit(train_data)
predictions = dt_model.transform(test_data)

# Evaluate the model
evaluator = MulticlassClassificationEvaluator(labelCol="DEP_DEL15", predictionCol="prediction")
accuracy = evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})
precision = evaluator.evaluate(predictions, {evaluator.metricName: "precisionByLabel"})
recall = evaluator.evaluate(predictions, {evaluator.metricName: "recallByLabel"})
f1 = evaluator.evaluate(predictions, {evaluator.metricName: "f1"})

# Compute confusion matrix
rdd = predictions.select(col("prediction"), col("DEP_DEL15").cast("double")).rdd
metrics = MulticlassMetrics(rdd)
conf_matrix = metrics.confusionMatrix().toArray()

print("Test Accuracy = %g" % accuracy)
print("Confusion Matrix:")
print(conf_matrix)
print("Precision:", precision)
print("Recall:", recall)
print("F1 Score:", f1)



Test Accuracy = 0.808369
Confusion Matrix:
[[7749.   30.]
 [1811.   17.]]
Precision: 0.8105648535564853
Recall: 0.9961434631700733
F1 Score: 0.7271987298433047


                                                                                

## K-means model

### Prepare data

In [213]:
df = feature_column_only

### Fit, train, predict

In [216]:
from pyspark.ml.clustering import KMeans
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import DoubleType, FloatType, ArrayType
from pyspark.ml.linalg import VectorUDT, DenseVector
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import sqrt, pow, col


from pyspark.ml.clustering import KMeans

from pyspark.ml.linalg import SparseVector, DenseVector


kmeans = KMeans(k=4,seed=17399)
# Get predictions and cluster centers
centers = model.clusterCenters()
predictions = model.transform(df)

# UDF to calculate the Euclidean distance from the nearest cluster center
def distance_to_center(features, centers):
    best_center = centers[min(range(len(centers)), key=lambda i: features.squared_distance(centers[i]))]
    return float(features.squared_distance(best_center))

distance_udf = udf(lambda features: distance_to_center(features, centers), FloatType())
predictions = predictions.withColumn("distance", distance_udf(col("features")))

# Show potential anomalies (e.g., distances greater than a threshold)
distance_threshold = predictions.approxQuantile("distance", [0.95], 0.05)  # 95th percentile with 5% relative error
anomalies = predictions.filter(col("distance") >= distance_threshold[0])

predictions.show()
anomalies.show()

                                                                                

+--------------------+----------+-------------+
|            features|prediction|     distance|
+--------------------+----------+-------------+
|(80,[17,54,68,69,...|         1| 4.5613986E10|
|(80,[19,52,68,69,...|         1|1.52854518E10|
|(80,[25,52,68,69,...|         1|  7.8787425E9|
|(80,[0,54,68,69,7...|         0| 8.8941068E11|
|(80,[45,54,68,69,...|         1|2.00553923E11|
|(80,[19,51,68,69,...|         1|1.52851005E10|
|(80,[2,51,68,69,7...|         0| 1.10492576E8|
|(80,[3,55,68,69,7...|         0|  5.0915384E9|
|(80,[17,54,68,69,...|         1| 4.5613982E10|
|(80,[2,59,68,69,7...|         0| 1.08633472E8|
|(80,[17,54,68,69,...|         1| 4.5613986E10|
|(80,[25,51,68,69,...|         1|  7.8783437E9|
|(80,[45,58,68,69,...|         1|2.00553857E11|
|(80,[45,64,68,69,...|         1|2.00553972E11|
|(80,[45,51,68,69,...|         1|2.00553955E11|
|(80,[3,55,68,69,7...|         0|  5.0915384E9|
|(80,[25,55,68,69,...|         1|  7.8797036E9|
|(80,[8,52,68,69,7...|         0|1.51877



+--------------------+----------+------------+
|            features|prediction|    distance|
+--------------------+----------+------------+
|(80,[0,53,68,69,7...|         0|8.8955355E11|
|(80,[0,53,68,69,7...|         0|8.8955355E11|
|(80,[0,53,68,69,7...|         0|8.8955355E11|
|(80,[0,53,68,69,7...|         0|8.8955355E11|
|(80,[0,53,68,69,7...|         0|8.8955355E11|
|(80,[0,53,68,69,7...|         0|8.8955355E11|
|(80,[0,53,68,69,7...|         0|8.8955355E11|
|(80,[0,53,68,69,7...|         0|8.8955355E11|
|(80,[0,53,68,69,7...|         0|8.8955355E11|
|(80,[0,53,68,69,7...|         0|8.8955355E11|
|(80,[0,53,68,69,7...|         0|8.8955355E11|
|(80,[0,53,68,69,7...|         0|8.8955355E11|
|(80,[0,53,68,69,7...|         0|8.8955355E11|
|(80,[0,53,68,69,7...|         0|8.8955355E11|
|(80,[0,53,68,69,7...|         0|8.8955355E11|
|(80,[0,53,68,69,7...|         0|8.8955355E11|
|(80,[0,53,68,69,7...|         0|8.8955355E11|
|(80,[0,53,68,69,7...|         0|8.8955355E11|
|(80,[0,53,68

                                                                                

## Naive Bayes model

### Prepare data

In [211]:
train_data, test_data = split_data(data_with_feature_column_df.select("features", "DEP_DEL15").withColumnRenamed("DEP_DEL15", "label"))

### Fit, train, predict

In [212]:
# Naive Bayes model
from pyspark.sql.functions import col
from pyspark.ml.feature import StringIndexer, VectorAssembler


from pyspark.ml.classification import NaiveBayes

# Initialize the Naive Bayes model
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")

# Train the model
model = nb.fit(train_data)

from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Make predictions
predictions = model.transform(test_data)

# Evaluate the model
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))



Test set accuracy = 0.5577150865017267


                                                                                

## Support Vector Machine

### Prepare data

In [188]:
train_data, test_data = split_data(data_with_feature_column_df.select("features", "DEP_DEL15"))

### Fit, train and predict

In [190]:
# Support Vector Machine
from pyspark.ml.classification import LinearSVC
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Initialize the LinearSVC model

svm = LinearSVC(featuresCol="features", labelCol="DEP_DEL15", maxIter=100, regParam=0.1)

# Fit the model on training data
svm_model = svm.fit(train_data)

# Predictions
predictions = svm_model.transform(test_data)

# Evaluate the model
evaluator = BinaryClassificationEvaluator(labelCol="DEP_DEL15", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
auc = evaluator.evaluate(predictions)
print(f"Area under ROC: {auc:.2f}")

24/05/08 13:46:37 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/05/08 13:46:37 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
                                                                                

Area under ROC: 0.53


## Random forest 

### Prepare data 

In [185]:
train_data, test_data = split_data(data_with_feature_column_df.select("features", "DEP_DEL15"))

### Fit, train and predict 

In [187]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql.functions import col

# RandomForest setup (assuming train_data and test_data are already prepared)
rf = RandomForestClassifier(featuresCol="features", labelCol="DEP_DEL15", numTrees=10)
rf_model = rf.fit(train_data)
predictions = rf_model.transform(test_data)

# Evaluate the model
evaluator = MulticlassClassificationEvaluator(labelCol="DEP_DEL15", predictionCol="prediction")
accuracy = evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})
precision = evaluator.evaluate(predictions, {evaluator.metricName: "precisionByLabel"})
recall = evaluator.evaluate(predictions, {evaluator.metricName: "recallByLabel"})
f1 = evaluator.evaluate(predictions, {evaluator.metricName: "f1"})

# Compute confusion matrix
rdd = predictions.select(col("prediction"), col("DEP_DEL15").cast("double")).rdd
metrics = MulticlassMetrics(rdd)
conf_matrix = metrics.confusionMatrix().toArray()

print("Test Accuracy = %g" % accuracy)
print("Confusion Matrix:")
print(conf_matrix)
print("Precision:", precision)
print("Recall:", recall)
print("F1 Score:", f1)




Test Accuracy = 0.816019
Confusion Matrix:
[[48913.     0.]
 [11028.     0.]]
Precision: 0.8160190854340101
Recall: 1.0
F1 Score: 0.7333481824431576


                                                                                