# Spark practical work
### Melen Laclais, Carlos Manzano Izquierdo

## Load of training data 

In [1]:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, when, isnan

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("Flight_Delay_EDA_Analysis") \
    .config("spark.driver.memory", "8g") \
    .master("local[*]") \
    .getOrCreate()

# Load Training Data (2006 + 2007)
print("Loading raw flight data (2006-2007)... This may take a moment due to schema inference...")

# We use inferSchema=True to see the actual data types Spark detects
# We use nullValue="NA" because the documentation states "NA" is used for nulls
flights_raw = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("nullValue", "NA") \
    .csv(["../training_data/flight_data/2006.csv.bz2", "../training_data/flight_data/2007.csv.bz2"])

# # Load auxiliary tables for inspection
# planes_df = spark.read.option("header", "true").option("inferSchema", "true").option("nullValue", "NA").csv("../training_data/flight_data/plane-data.csv")
# airports_df = spark.read.option("header", "true").option("inferSchema", "true").option("nullValue", "NA").csv("../training_data/flight_data/airports.csv")

print(f"Total flights loaded: {flights_raw.count():,}")

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/12/28 11:31:57 WARN Utils: Your hostname, MSICarlos, resolves to a loopback address: 127.0.1.1; using 192.168.1.182 instead (on interface wlo1)
25/12/28 11:31:57 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/12/28 11:31:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Loading raw flight data (2006-2007)... This may take a moment due to schema inference...




Total flights loaded: 14,595,137


                                                                                

## Dataset exploration, analysis and processing

First, lets start by analyzing the schema of the dataset

In [2]:

print("--- Inferred Schema ---")
flights_raw.printSchema()

#(using Pandas for better formatting in Jupyter)
print("--- Data Sample (First 5 rows) ---")
flights_raw.limit(5).toPandas()

--- Inferred Schema ---
root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DepTime: integer (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- ArrTime: integer (nullable = true)
 |-- CRSArrTime: integer (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: integer (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- ActualElapsedTime: integer (nullable = true)
 |-- CRSElapsedTime: integer (nullable = true)
 |-- AirTime: integer (nullable = true)
 |-- ArrDelay: integer (nullable = true)
 |-- DepDelay: integer (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: integer (nullable = true)
 |-- TaxiIn: integer (nullable = true)
 |-- TaxiOut: integer (nullable = true)
 |-- Cancelled: integer (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: i

25/12/28 11:33:22 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


Unnamed: 0,Year,Month,DayofMonth,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,...,TaxiIn,TaxiOut,Cancelled,CancellationCode,Diverted,CarrierDelay,WeatherDelay,NASDelay,SecurityDelay,LateAircraftDelay
0,2006,1,11,3,743,745,1024,1018,US,343,...,45,13,0,,0,0,0,0,0,0
1,2006,1,11,3,1053,1053,1313,1318,US,613,...,27,19,0,,0,0,0,0,0,0
2,2006,1,11,3,1915,1915,2110,2133,US,617,...,4,11,0,,0,0,0,0,0,0
3,2006,1,11,3,1753,1755,1925,1933,US,300,...,16,10,0,,0,0,0,0,0,0
4,2006,1,11,3,824,832,1015,1015,US,765,...,27,12,0,,0,0,0,0,0,0



### Delete Forbidden variables


Now the first step that we will take is to delete all of the variables whose information would not be available during inference, the "Forbidden" variables:

* ArrTime
* ActualElapsedTime
* AirTime
* TaxiIn
* Diverted
* CarrierDelay
* WeatherDelay
* NASDelay
* SecurityDelay
* LateAircraftDelay

In [4]:

# These variables contain future information known only after landing.
forbidden_vars = [
    "ArrTime",
    "ActualElapsedTime",
    "AirTime",
    "TaxiIn",
    "Diverted",
    "CarrierDelay",
    "WeatherDelay",
    "NASDelay",
    "SecurityDelay",
    "LateAircraftDelay"
]

# Drop the forbidden columns
flights_clean = flights_raw.drop(*forbidden_vars)

print("--- Structure after removing forbidden variables ---")
flights_clean.printSchema()

print("Ensure these variables are not anymore present:\n")

error_found = False
for var in forbidden_vars:
    if var in flights_clean.columns:
        print(f"ERROR: {var} is still present!")
        error_found = True
        
if error_found:
    print("Some forbidden variables are still present. Please check the code.")
else:
    print("All forbidden variables successfully removed.")


--- Structure after removing forbidden variables ---
root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DepTime: integer (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- CRSArrTime: integer (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: integer (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- CRSElapsedTime: integer (nullable = true)
 |-- ArrDelay: integer (nullable = true)
 |-- DepDelay: integer (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: integer (nullable = true)
 |-- TaxiOut: integer (nullable = true)
 |-- Cancelled: integer (nullable = true)
 |-- CancellationCode: string (nullable = true)

Ensure these variables are not anymore present:

All forbidden variables successfully removed.



### Delete canceled flights and associated columns

The first cleaning step we must do is removing all the rows whose flights were cancelled, and delete the columns which express if the flight were canceled, as they won't have useful information anymore.

In [5]:
flights_clean = flights_clean.filter("Cancelled == 0") \
                            .drop("Cancelled", "CancellationCode")
                            
flights_clean.printSchema()                            
print(f"Total valid flights for training: {flights_clean.count():,}")


root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DepTime: integer (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- CRSArrTime: integer (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: integer (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- CRSElapsedTime: integer (nullable = true)
 |-- ArrDelay: integer (nullable = true)
 |-- DepDelay: integer (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: integer (nullable = true)
 |-- TaxiOut: integer (nullable = true)





Total valid flights for training: 14,312,455


                                                                                

### Data quality analysis cleaning and application of transformations
(*Smart handling of special format in some input variables, performing relevant processing
and/or transformations.*)

Now we will keep assesing the quality of the features that we have available, to verify which techniques of imputation and deletion we should follow. We will also evaluate if we need to transform into categorical some of the variables,  and the consistence showed in their values. 


In [8]:
print("--- Missing Values Analysis ---")
# Calculate count of nulls for each column
null_counts = flights_clean.select([count(when(col(c).isNull(), c)).alias(c) for c in flights_clean.columns])
null_counts.show(truncate=False)

--- Missing Values Analysis ---




+----+-----+----------+---------+-------+----------+----------+-------------+---------+-------+--------------+--------+--------+------+----+--------+-------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|CRSElapsedTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiOut|
+----+-----+----------+---------+-------+----------+----------+-------------+---------+-------+--------------+--------+--------+------+----+--------+-------+
|0   |0    |0         |0        |0      |0         |0         |0            |0        |0      |727           |33365   |0       |0     |0   |0       |0      |
+----+-----+----------+---------+-------+----------+----------+-------------+---------+-------+--------------+--------+--------+------+----+--------+-------+



                                                                                

The only columns that contain null values are **CRSElapsedTime** and **ArrDelay**, with **727** and **33**,**365** missing values respectively.

When dealing with the issue where a lot ot observations have a null value in ArrDelay, it maybe could be consequence of not having filtered the rows whose flight was diverted.It could make sense that  this ones does not have any value setted in the colum ArrDelay, since the flight did not arrive to his original destiny. Thus, we must check if this hypothesis is true.


In [9]:
print("---  Hypothesis Verification (ArrDelay Nulls vs Diverted) ---")

# We check the raw data to see the relationship between Null ArrDelay and Diverted status
flights_raw.filter("ArrDelay IS NULL AND Cancelled == 0") \
           .select("ArrDelay", "Diverted", "Cancelled") \
           .groupBy("Diverted") \
           .count() \
           .show()

---  Hypothesis Verification (ArrDelay Nulls vs Diverted) ---




+--------+-----+
|Diverted|count|
+--------+-----+
|       1|33365|
+--------+-----+



                                                                                

We can confirm our hypothesis since the number of rows which satisfy the condition is equal to the number of nulls we observed earlier in the target variable. Then, we have to process again the flights raw data in order to obtain the clean dataset without nulls in ArrDelay as consequence of flights diverted.

In [15]:
# We must filter out 'Diverted' flights BEFORE dropping the 'Diverted' column.
print("--- Applying Fix (Filtering Diverted flights ---")

# Re-creating flights_clean with the correct logic:
# Filter Cancelled AND Diverted -> Then drop forbidden variables
flights_clean = flights_raw.filter("Cancelled == 0 AND Diverted == 0") \
                           .drop(*forbidden_vars) \
                           .drop("Cancelled", "CancellationCode")
                           
print("--- Schema check after applying fix: ---")
flights_clean.printSchema()

# Checking if 'ArrDelay' still has nulls after the fix.
print("--- Final Verification (Nulls in ArrDelay) ---")

flights_clean.select(
    count(when(col("ArrDelay").isNull(), "ArrDelay")).alias("ArrDelay_Null_Count")
).show()

# Verify if the number of columns in flights_clean matches the expected count
# (Raw columns - Forbidden variables - 2 cancellation columns)
expected_count = len(flights_raw.columns) - len(forbidden_vars) - 2
actual_count = len(flights_clean.columns)

print(f"Number of columns in flights_clean: {actual_count}")
print(f"Expected number of columns: {expected_count}")

if actual_count == expected_count:
    print("Verification successful: The column count matches the expected reduction.")
else:
    print("Verification failed: The column count does not match.")



--- Applying Fix (Filtering Diverted flights ---
--- Schema check after applying fix: ---
root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DepTime: integer (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- CRSArrTime: integer (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: integer (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- CRSElapsedTime: integer (nullable = true)
 |-- ArrDelay: integer (nullable = true)
 |-- DepDelay: integer (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: integer (nullable = true)
 |-- TaxiOut: integer (nullable = true)

--- Final Verification (Nulls in ArrDelay) ---


                                                                                

+-------------------+
|ArrDelay_Null_Count|
+-------------------+
|                  0|
+-------------------+

Number of columns in flights_clean: 17
Expected number of columns: 17
Verification successful: The column count matches the expected reduction.


Now that we have fixed the problem of the nulls in our target variable, we have to manage the **nulls of CRSElapsedTime**. So lets impute the nulls, but , how ?. We thinked about calculating the difference between the columns **CrSArrTime and CRSDepTime**, but since we are working with data from flights, this would be difficult since there are differente time zones that would complicate to calcualate this value in a straightforward way. 

Thus, we have decided to impute this values with the average of the values present in other observation for the flights with the same origin and destination.

In [17]:
from pyspark.sql.window import Window
from pyspark.sql.functions import avg, col, coalesce, round

# ==========================================
# 6. Smart Imputation for CRSElapsedTime
# Strategy: Use the average duration of the specific Route (Origin -> Dest)
# ==========================================

print("--- Step 1: Imputing CRSElapsedTime using Route Average ---")

# 1. Define the Window: "Look at all flights sharing the same Origin and Dest"
route_window = Window.partitionBy("Origin", "Dest")

# 2. Calculate the average scheduled duration for that specific route
#    We use 'round' to keep it as integer minutes
avg_route_duration = round(avg("CRSElapsedTime").over(route_window))

# 3. Apply Imputation
#    If CRSElapsedTime is null, use the Route Average.
#    If it's still null (e.g., a unique route with only 1 flight that is null), we drop it later.
flights_clean = flights_clean.withColumn(
    "CRSElapsedTime",
    coalesce(col("CRSElapsedTime"), avg_route_duration)
)

# 4. Final Cleanup
#    Drop any remaining nulls (routes that had NO valid data at all to learn from)
flights_clean = flights_clean.dropna(subset=["CRSElapsedTime"])

# Verification
print("--- Final Verification ---")
flights_clean.select(
    count(when(col("CRSElapsedTime").isNull(), 1)).alias("Nulls_After_Smart_Imputation")
).show()

# Show a sample of how it worked (e.g. for a specific route)
flights_clean.filter("Origin == 'JFK' AND Dest == 'LAX'") \
             .select("Origin", "Dest", "CRSDepTime", "CRSArrTime", "CRSElapsedTime") \
             .limit(5).show()

--- Step 1: Imputing CRSElapsedTime using Route Average ---
--- Final Verification ---


                                                                                

+----------------------------+
|Nulls_After_Smart_Imputation|
+----------------------------+
|                           0|
+----------------------------+





+------+----+----------+----------+--------------+
|Origin|Dest|CRSDepTime|CRSArrTime|CRSElapsedTime|
+------+----+----------+----------+--------------+
|   JFK| LAX|      1400|      1709|         369.0|
|   JFK| LAX|      1400|      1709|         369.0|
|   JFK| LAX|      1400|      1709|         369.0|
|   JFK| LAX|      1400|      1709|         369.0|
|   JFK| LAX|      1400|      1709|         369.0|
+------+----+----------+----------+--------------+



                                                                                

So lets evaluate finally the null count on this dataset before making further analisys and transforamtions

In [18]:
# Calculate count of nulls for each column
null_counts = flights_clean.select([count(when(col(c).isNull(), c)).alias(c) for c in flights_clean.columns])
null_counts.show(truncate=False)



+----+-----+----------+---------+-------+----------+----------+-------------+---------+-------+--------------+--------+--------+------+----+--------+-------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|CRSElapsedTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiOut|
+----+-----+----------+---------+-------+----------+----------+-------------+---------+-------+--------------+--------+--------+------+----+--------+-------+
|0   |0    |0         |0        |0      |0         |0         |0            |0        |0      |0             |0       |0       |0     |0   |0       |0      |
+----+-----+----------+---------+-------+----------+----------+-------------+---------+-------+--------------+--------+--------+------+----+--------+-------+



                                                                                

In [19]:
# Save the cleaned dataframe to a parquet file for future use
flights_clean.write.mode("overwrite").parquet("../check_point")

print("Dataframe successfully saved to ../check_point")



Dataframe successfully saved to ../check_point


                                                                                

### Join dataset with other files to obtain useful information
(*exploring additional datasets to try to find additional relevant
information*)

### Agreggation and mixing of columns in order to create new columns that could be useful 

### EDA and statistical analysis over these features
*(Proper exploratory data analysis (possibly including univariate and/or multivariate
analysis) to better understand the input data and provide robust criteria for variable
selection.)*

### Feature Engineering

(*Feature engineering*) 

### PCA ???

## Code used to train, test and save the model.

### Scaling of values ? 

### Definition of models to employ and metrics to use 
* *Select more than one valid machine learning algorithm for building the model.*
* *Consider more than one possible model performance metric and explain the criteria for
selecting the most appropriate*

### Cross-Validation loop to validate models with 80/20
* *Use cross-validation techniques to select the best model*
* *Perform model hyper-parameter tuning*
* *Use the full capacities of Spark’s MLlib*


### Selection and saving the best model 