In [2]:

import os
os.environ['PYSPARK_PYTHON'] = '/opt/homebrew/bin/python3.10'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/opt/homebrew/bin/python3.10'

# Initialize Spark session
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

# Configure Spark with more memory
spark = SparkSession.builder \
    .appName("American-Airlines") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.maxResultSize", "4g") \
    .config("spark.sql.shuffle.partitions", "10") \
    .getOrCreate()


# Path to your CSV file
csv_file_path = "/Users/mika/ML-Big-Data/Project/Dataset/2008.csv"

# Read the CSV file into a DataFrame
df = spark.read.csv(csv_file_path, header=True, inferSchema=True)

# Show the first few rows of the DataFrame
df.show(5)

25/01/27 13:38:12 WARN Utils: Your hostname, Mikas-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.0.253 instead (on interface en0)
25/01/27 13:38:12 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/01/27 13:38:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|2008|    1|         3|        4|   2003|      1955|   2211|      2225|           WN|      335

25/01/27 13:38:19 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [3]:
df.count()

                                                                                

7009728

In [4]:
df = df.drop("Year", "TailNum", "CancellationCode", "TailNum", "DayOfMonth", "FlightNum", "DepTime", "CRSDepTime", "CRSArrTime" )

In [5]:
df.show(5)

+-----+---------+-------+-------------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+--------+------------+------------+--------+-------------+-----------------+
|Month|DayOfWeek|ArrTime|UniqueCarrier|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+-----+---------+-------+-------------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+--------+------------+------------+--------+-------------+-----------------+
|    1|        4|   2211|           WN|              128|           150|    116|     -14|       8|   IAD| TPA|     810|     4|      8|        0|       0|          NA|          NA|      NA|           NA|               NA|
|    1|        4|   1002|           WN|              128|           145|    113|       2|      19|   IAD| TPA|     8

In [6]:
cancellation_count = df.filter(df.Cancelled == 1).count()

# Print the result
print(f"Number of rows with cancellation = 1: {cancellation_count}")

[Stage 7:>                                                          (0 + 8) / 8]

Number of rows with cancellation = 1: 137434


                                                                                

In [7]:
cancellation_count = df.filter(df.Diverted == 1).count()
print(f"Number of rows with diverted = 1: {cancellation_count}")

[Stage 10:>                                                         (0 + 8) / 8]

Number of rows with diverted = 1: 17265


                                                                                

In [8]:

# Remove all cancelled flights
df = df.filter(df.Cancelled != 1)


In [9]:
df.count()

                                                                                

6872294

In [10]:
df = df.drop("Cancelled")

In [11]:
from pyspark.sql.functions import col, isnan, when, count

# Count NULL or NaN values for each column
null_counts = df.select(
    [count(when(col(c).isNull() | isnan(col(c)) | (col(c) == "NA"), c)).alias(c) for c in df.columns]
)

# Show the result
null_counts.show()

25/01/27 13:38:29 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors

+-----+---------+-------+-------------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+--------+------------+------------+--------+-------------+-----------------+
|Month|DayOfWeek|ArrTime|UniqueCarrier|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+-----+---------+-------+-------------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+--------+------------+------------+--------+-------------+-----------------+
|    0|        0|  14215|            0|            17265|           598|  17265|   17265|       0|     0|   0|       0| 14215|      0|       0|     5347559|     5347559| 5347559|      5347559|          5347559|
+-----+---------+-------+-------------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+--------+------------+

                                                                                

In [12]:
df = df.filter(
    (df.ArrTime != "NA") & 
    (df.ActualElapsedTime != "NA") & 
    (df.CRSElapsedTime != "NA") & 
    (df.ArrDelay != "NA") & 
    (df.TaxiIn != "NA")
)

# Show the count of rows after filtering
print(f"Number of rows after filtering: {df.count()}")

[Stage 19:>                                                         (0 + 8) / 8]

Number of rows after filtering: 6855029


                                                                                

In [13]:
df.show()

+-----+---------+-------+-------------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+--------+------------+------------+--------+-------------+-----------------+
|Month|DayOfWeek|ArrTime|UniqueCarrier|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+-----+---------+-------+-------------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+--------+------------+------------+--------+-------------+-----------------+
|    1|        4|   2211|           WN|              128|           150|    116|     -14|       8|   IAD| TPA|     810|     4|      8|       0|          NA|          NA|      NA|           NA|               NA|
|    1|        4|   1002|           WN|              128|           145|    113|       2|      19|   IAD| TPA|     810|     5|     10|       0|          NA|

In [14]:
# List of columns to check
columns_to_check = ["WeatherDelay", "NASDelay", "SecurityDelay", "LateAircraftDelay", "CarrierDelay"]

# Loop through columns and replace "NA" with 0
for column in columns_to_check:
    df = df.withColumn(
        column,
        when(col(column) == "NA", 0).otherwise(col(column))
    )

# Show the updated DataFrame
df.show()

+-----+---------+-------+-------------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+--------+------------+------------+--------+-------------+-----------------+
|Month|DayOfWeek|ArrTime|UniqueCarrier|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+-----+---------+-------+-------------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+--------+------------+------------+--------+-------------+-----------------+
|    1|        4|   2211|           WN|              128|           150|    116|     -14|       8|   IAD| TPA|     810|     4|      8|       0|           0|           0|       0|            0|                0|
|    1|        4|   1002|           WN|              128|           145|    113|       2|      19|   IAD| TPA|     810|     5|     10|       0|           0|

In [15]:
df.describe()

DataFrame[summary: string, Month: string, DayOfWeek: string, ArrTime: string, UniqueCarrier: string, ActualElapsedTime: string, CRSElapsedTime: string, AirTime: string, ArrDelay: string, DepDelay: string, Origin: string, Dest: string, Distance: string, TaxiIn: string, TaxiOut: string, Diverted: string, CarrierDelay: string, WeatherDelay: string, NASDelay: string, SecurityDelay: string, LateAircraftDelay: string]

In [16]:
from pyspark.sql import functions as F

# List of columns to exclude (non-numeric columns)
exclude_columns = ["UniqueCarrier", "Origin", "Dest"]

# Get the list of all columns in the DataFrame
all_columns = df.columns

# Identify columns that need to be cast to float (i.e., all columns except the ones to exclude)
columns_to_cast = [col for col in all_columns if col not in exclude_columns]

# Loop through the columns that need to be cast and apply the transformation
for column in columns_to_cast:
    df = df.withColumn(column, F.col(column).cast("float"))

# Show the schema to verify the changes
df.printSchema()


root
 |-- Month: float (nullable = true)
 |-- DayOfWeek: float (nullable = true)
 |-- ArrTime: float (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- ActualElapsedTime: float (nullable = true)
 |-- CRSElapsedTime: float (nullable = true)
 |-- AirTime: float (nullable = true)
 |-- ArrDelay: float (nullable = true)
 |-- DepDelay: float (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: float (nullable = true)
 |-- TaxiIn: float (nullable = true)
 |-- TaxiOut: float (nullable = true)
 |-- Diverted: float (nullable = true)
 |-- CarrierDelay: float (nullable = true)
 |-- WeatherDelay: float (nullable = true)
 |-- NASDelay: float (nullable = true)
 |-- SecurityDelay: float (nullable = true)
 |-- LateAircraftDelay: float (nullable = true)



In [17]:
from pyspark.sql.functions import hour, when, col

# Create a new column for TotalDelay by summing delay-related columns
df = df.withColumn(
    "TotalDelay",
    F.coalesce(F.col("ArrDelay"), F.lit(0)) +
    F.coalesce(F.col("DepDelay"), F.lit(0)) +
    F.coalesce(F.col("CarrierDelay"), F.lit(0)) +
    F.coalesce(F.col("WeatherDelay"), F.lit(0)) +
    F.coalesce(F.col("NASDelay"), F.lit(0)) +
    F.coalesce(F.col("SecurityDelay"), F.lit(0)) +
    F.coalesce(F.col("LateAircraftDelay"), F.lit(0))
)

df.show(5)

+-----+---------+-------+-------------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+--------+------------+------------+--------+-------------+-----------------+----------+
|Month|DayOfWeek|ArrTime|UniqueCarrier|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|TotalDelay|
+-----+---------+-------+-------------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+--------+------------+------------+--------+-------------+-----------------+----------+
|  1.0|      4.0| 2211.0|           WN|            128.0|         150.0|  116.0|   -14.0|     8.0|   IAD| TPA|   810.0|   4.0|    8.0|     0.0|         0.0|         0.0|     0.0|          0.0|              0.0|      -6.0|
|  1.0|      4.0| 1002.0|           WN|            128.0|         145.0|  113.0|     2.0|    19.0|   IAD| TPA|  

In [18]:
df = df.drop("ArrDelay", "DepDelay", "CarrierDelay", "WeatherDelay", "NASDelay", "SecurityDelay", "LateAircraftDelay", "Origin", "Dest", "UniqueCarrier", "Month", "DayOfWeek", "Diverted" )

Ingen diverted^

In [19]:
from pyspark.ml.feature import StandardScaler, VectorAssembler
from pyspark.ml import Pipeline

numeric_cols = ["ArrTime", "ActualElapsedTime", "CRSElapsedTime", "AirTime", "Distance", "TaxiIn", "TaxiOut", "TotalDelay"]


assembler = VectorAssembler(inputCols=numeric_cols, outputCol="features")

scaler = StandardScaler(
   inputCol="features",
   outputCol="scaled_features",
   withStd=True,
   withMean=True
)

pipeline = Pipeline(stages=[assembler, scaler])
model = pipeline.fit(df)
normalized_df = model.transform(df)

from pyspark.sql.types import DoubleType

# Extract normalized values back to individual columns
for i, col in enumerate(numeric_cols):
   normalized_df = normalized_df.withColumn(
       f"{col}_normalized",
       F.udf(lambda v: float(v[i]), DoubleType())("scaled_features")
   )

# Drop intermediate columns
normalized_df = normalized_df.drop("features", "scaled_features", "ArrTime", "ActualElapsedTime", "CRSElapsedTime", "AirTime", "Distance", "TaxiIn", "TaxiOut", "TotalDelay")



                                                                                

In [20]:
normalized_df.show(5)

[Stage 28:>                                                         (0 + 1) / 1]

+-------------------+----------------------------+-------------------------+--------------------+--------------------+--------------------+-------------------+---------------------+
| ArrTime_normalized|ActualElapsedTime_normalized|CRSElapsedTime_normalized|  AirTime_normalized| Distance_normalized|   TaxiIn_normalized| TaxiOut_normalized|TotalDelay_normalized|
+-------------------+----------------------------+-------------------------+--------------------+--------------------+--------------------+-------------------+---------------------+
| 1.4446406057361534|        0.009653829068129543|       0.3008970272696851|   0.177660841130685|  0.1442644055700898| -0.5800082185828016|-0.7465270652882132|  -0.3411327654407681|
|-0.9487922092808968|        0.009653829068129543|      0.22898842634807962| 0.13317672077029769|  0.1442644055700898|-0.37721879664093644|-0.5697021070225374| -0.09037429034533025|
|-1.3407687993333168|         -0.4462690574973425|      -0.5620061837895808|-0.41546076367

                                                                                

In [21]:
train_df, test_df = normalized_df.randomSplit([0.8, 0.2], seed=1234)

In [22]:
from pyspark.ml.regression import LinearRegression

# Select feature columns and target
feature_cols = [col for col in normalized_df.columns if col.endswith('_normalized') and col != 'TotalDelay_normalized']
train_data = train_df.select(feature_cols + ['TotalDelay_normalized'])
test_data = test_df.select(feature_cols + ['TotalDelay_normalized'])

# Prepare for linear regression
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
lr = LinearRegression(featuresCol="features", labelCol="TotalDelay_normalized")
pipeline = Pipeline(stages=[assembler, lr])

# Train model
model = pipeline.fit(train_data)

25/01/27 13:38:51 WARN Instrumentation: [e5444b6e] regParam is zero, which might cause numerical instability and overfitting.
25/01/27 13:39:17 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
25/01/27 13:39:17 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
25/01/27 13:39:20 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
                                                                                

In [23]:
from pyspark.ml.evaluation import RegressionEvaluator

# Make predictions
train_predictions = model.transform(train_data)
test_predictions = model.transform(test_data)

# View predictions and evaluate
evaluator = RegressionEvaluator(labelCol="TotalDelay_normalized", predictionCol="prediction")
rmse = evaluator.evaluate(test_predictions, {evaluator.metricName: "rmse"})
r2 = evaluator.evaluate(test_predictions, {evaluator.metricName: "r2"})

print(f"RMSE: {rmse:.4f}")
print(f"R²: {r2:.4f}")



RMSE: 0.9593
R²: 0.0852


                                                                                

In [24]:
train_df, test_df = df.randomSplit([0.8, 0.2], seed=1234)

In [26]:
# Select feature columns and target
feature_cols = [col for col in df.columns if col != 'TotalDelay']
train_data = train_df.select(feature_cols + ['TotalDelay'])
test_data = test_df.select(feature_cols + ['TotalDelay'])

# Prepare for linear regression
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
lr = LinearRegression(featuresCol="features", labelCol="TotalDelay")
pipeline = Pipeline(stages=[assembler, lr])

# Train model
model = pipeline.fit(train_data)

25/01/27 13:44:44 WARN Instrumentation: [459853f6] regParam is zero, which might cause numerical instability and overfitting.
25/01/27 13:44:51 WARN Instrumentation: [459853f6] Cholesky solver failed due to singular covariance matrix. Retrying with Quasi-Newton solver.
                                                                                

In [28]:
# Make predictions
train_predictions = model.transform(train_data)
test_predictions = model.transform(test_data)

# View predictions and evaluate
evaluator = RegressionEvaluator(labelCol="TotalDelay", predictionCol="prediction")
rmse = evaluator.evaluate(test_predictions, {evaluator.metricName: "rmse"})
r2 = evaluator.evaluate(test_predictions, {evaluator.metricName: "r2"})

print(f"RMSE: {rmse:.4f}")
print(f"R²: {r2:.4f}")

[Stage 44:>                                                         (0 + 8) / 8]

RMSE: 103.2884
R²: 0.0852


                                                                                