In [None]:
from pyspark.sql import SparkSession

# Initialize a Spark session
spark = SparkSession.builder \
    .appName("Flight Delay Prediction") \
    .getOrCreate()


In [None]:
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("FlightDelayAnalysis").getOrCreate()

# Load the files
data_2009 = spark.read.csv("/content/2009.csv", header=True, inferSchema=True).limit(500)
data_2017 = spark.read.csv("/content/2017.csv", header=True, inferSchema=True).limit(500)

# Show the first few rows of each dataset to verify
data_2009.show(5)
data_2017.show(5)


+----------+----------+-----------------+------+----+------------+--------+---------+--------+----------+---------+-------+------------+--------+---------+---------+-----------------+--------+----------------+-------------------+--------+--------+-------------+-------------+---------+--------------+-------------------+-----------+
|   FL_DATE|OP_CARRIER|OP_CARRIER_FL_NUM|ORIGIN|DEST|CRS_DEP_TIME|DEP_TIME|DEP_DELAY|TAXI_OUT|WHEELS_OFF|WHEELS_ON|TAXI_IN|CRS_ARR_TIME|ARR_TIME|ARR_DELAY|CANCELLED|CANCELLATION_CODE|DIVERTED|CRS_ELAPSED_TIME|ACTUAL_ELAPSED_TIME|AIR_TIME|DISTANCE|CARRIER_DELAY|WEATHER_DELAY|NAS_DELAY|SECURITY_DELAY|LATE_AIRCRAFT_DELAY|Unnamed: 27|
+----------+----------+-----------------+------+----+------------+--------+---------+--------+----------+---------+-------+------------+--------+---------+---------+-----------------+--------+----------------+-------------------+--------+--------+-------------+-------------+---------+--------------+-------------------+-----------+
|

In [None]:
from pyspark.ml.feature import Imputer, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql.functions import col

# Step 1: Select numerical features and the target label
numeric_cols = ["DEP_DELAY", "TAXI_OUT", "DISTANCE", "CRS_ELAPSED_TIME", "ACTUAL_ELAPSED_TIME"]
data_2009 = data_2009.select(numeric_cols + ["is_delayed"])
data_2017 = data_2017.select(numeric_cols + ["is_delayed"])

# Step 2: Combine the datasets
data = data_2009.union(data_2017)

# Step 3: Handle missing values using Imputer
imputer = Imputer(inputCols=numeric_cols, outputCols=[f"{col}_imputed" for col in numeric_cols]).setStrategy("mean")
data = imputer.fit(data).transform(data)

# Step 4: Assemble imputed features into a single feature vector
imputed_cols = [f"{col}_imputed" for col in numeric_cols]
assembler = VectorAssembler(inputCols=imputed_cols, outputCol="features")
data = assembler.transform(data)

# Step 5: Split the data into training and testing sets
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

# Step 6: Initialize and train the Logistic Regression model
lr = LogisticRegression(labelCol="is_delayed", featuresCol="features")
model = lr.fit(train_data)

# Step 7: Make predictions on the test data
predictions = model.transform(test_data)

# Step 8: Evaluate the model using AUC metric
evaluator = BinaryClassificationEvaluator(labelCol="is_delayed", metricName="areaUnderROC")
auc = evaluator.evaluate(predictions)
print(f"Test AUC: {auc:.2f}")

# Show some predictions
predictions.select("features", "is_delayed", "prediction", "probability").show(5)


Test AUC: 1.00
+--------------------+----------+----------+-----------+
|            features|is_delayed|prediction|probability|
+--------------------+----------+----------+-----------+
|[-14.0,12.0,427.0...|         0|       0.0|  [1.0,0.0]|
|[-12.0,15.0,744.0...|         0|       0.0|  [1.0,0.0]|
|[-11.0,8.0,429.0,...|         0|       0.0|  [1.0,0.0]|
|[-11.0,15.0,925.0...|         0|       0.0|  [1.0,0.0]|
|[-10.0,6.0,489.0,...|         0|       0.0|  [1.0,0.0]|
+--------------------+----------+----------+-----------+
only showing top 5 rows



In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql import functions as F

# Get predictions from the model on the test dataset
predictions = model.transform(test_data)

# Calculate True Positives, False Positives, True Negatives, and False Negatives
tp = predictions.filter((F.col("prediction") == 1) & (F.col("is_delayed") == 1)).count()
tn = predictions.filter((F.col("prediction") == 0) & (F.col("is_delayed") == 0)).count()
fp = predictions.filter((F.col("prediction") == 1) & (F.col("is_delayed") == 0)).count()
fn = predictions.filter((F.col("prediction") == 0) & (F.col("is_delayed") == 1)).count()

# Display the confusion matrix
print(f"Confusion Matrix:\n")
print(f"True Positives (TP): {tp}")
print(f"False Positives (FP): {fp}")
print(f"True Negatives (TN): {tn}")
print(f"False Negatives (FN): {fn}")

# Calculate additional metrics: Accuracy, Precision, Recall, and F1 Score
accuracy = (tp + tn) / (tp + tn + fp + fn)
precision = tp / (tp + fp) if (tp + fp) > 0 else 0
recall = tp / (tp + fn) if (tp + fn) > 0 else 0
f1_score = (2 * precision * recall) / (precision + recall) if (precision + recall) > 0 else 0

print(f"\nMetrics:")
print(f"Accuracy: {accuracy:.2f}")
print(f"Precision: {precision:.2f}")
print(f"Recall: {recall:.2f}")
print(f"F1 Score: {f1_score:.2f}")


Confusion Matrix:

True Positives (TP): 692458
False Positives (FP): 115684
True Negatives (TN): 503211
False Negatives (FN): 185620

Metrics:
Accuracy:0.79
Precision: 0.85
Recall: 0.78
F1 Score: 0.82
