In [1]:
import json
import time
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
from pyspark.ml.classification import RandomForestClassificationModel
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml import Pipeline
from pyspark.sql.functions import udf
import pandas as pd

In [2]:
spark = SparkSession.builder \
    .appName("FlightDelayPrediction") \
    .config("spark.driver.memory", "8g") \
    .config("spark.driver.maxResultSize", "4g") \
    .getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/12/04 21:02:58 WARN Utils: Your hostname, Aakashs-MacBook-Pro.local, resolves to a loopback address: 127.0.0.1; using 10.250.205.135 instead (on interface en0)
25/12/04 21:02:58 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/12/04 21:02:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/12/04 21:02:59 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
import os
model_path = f"file://{os.path.abspath('flight_delay_rf_model')}"
rf_model = RandomForestClassificationModel.load(model_path)

                                                                                

In [4]:
# Define feature columns (must match training)
numerical_features = [
    'MONTH', 'DAY_OF_MONTH', 'DAY_OF_WEEK',
    'CRS_DEP_TIME', 'CRS_ARR_TIME',
    'DISTANCE', 'CRS_ELAPSED_TIME'
]

categorical_features = [
    'OP_UNIQUE_CARRIER',  # Airline
    'ORIGIN',             # Origin airport
    'DEST'                # Destination airport
]

In [5]:
@udf(returnType=DoubleType())
def get_delay_prob(probability):
    return float(probability[1]) if probability else 0.0

In [6]:
def simulate_kafka_producer(csv_file, batch_size=100):
    """
    Simulates a Kafka producer that reads flight data and yields batches.
    In real Kafka, this would send messages to a topic.
    """
    print(f"\nPRODUCER: Reading data from {csv_file}")
    
    # Read CSV file
    df = spark.read.csv(csv_file, header=True, inferSchema=False)
    
    # Drop FL_DATE if exists
    if "FL_DATE" in df.columns:
        df = df.drop("FL_DATE")
    
    total_rows = df.count()
    print(f"PRODUCER: Total records to process: {total_rows:,}")
    
    # Convert to Pandas for batch iteration (simulate streaming)
    pdf = df.toPandas()
    
    # Yield batches
    num_batches = (len(pdf) + batch_size - 1) // batch_size
    print(f"PRODUCER: Will send {num_batches} batches of {batch_size} records each\n")
    
    for i in range(0, len(pdf), batch_size):
        batch = pdf.iloc[i:i+batch_size]
        batch_num = (i // batch_size) + 1
        
        # Convert batch to JSON (simulate Kafka message)
        messages = batch.to_dict('records')
        
        yield {
            'batch_num': batch_num,
            'batch_size': len(messages),
            'messages': messages
        }

In [7]:
def preprocess_batch(batch_df):
    """
    Preprocess incoming batch: type conversion, encoding, feature assembly
    """
    # Type conversion
    int_cols = ["YEAR", "MONTH", "DAY_OF_MONTH", "DAY_OF_WEEK", 
                "CRS_DEP_TIME", "CRS_ARR_TIME"]
    float_cols = ["DISTANCE", "CRS_ELAPSED_TIME"]
    
    for col in int_cols:
        if col in batch_df.columns:
            batch_df = batch_df.withColumn(col, F.expr(f"try_cast({col} as int)"))
    for col in float_cols:
        if col in batch_df.columns:
            batch_df = batch_df.withColumn(col, F.expr(f"try_cast({col} as double)"))
    
    # Select features and drop nulls
    batch_df = batch_df.select(numerical_features + categorical_features).dropna()
    
    # Encode categorical features
    indexers = [
        StringIndexer(inputCol=col, outputCol=f"{col}_indexed", handleInvalid="keep")
        for col in categorical_features
    ]
    pipeline = Pipeline(stages=indexers)
    indexer_model = pipeline.fit(batch_df)
    batch_df = indexer_model.transform(batch_df)
    
    # Assemble features
    indexed_categorical = [f"{col}_indexed" for col in categorical_features]
    all_features = numerical_features + indexed_categorical
    
    assembler = VectorAssembler(
        inputCols=all_features,
        outputCol="features",
        handleInvalid="skip"
    )
    batch_df = assembler.transform(batch_df)
    
    return batch_df

In [8]:
def predict_batch(batch_df):
    """
    Make predictions on preprocessed batch
    """
    # Predict
    predictions = rf_model.transform(batch_df)
    
    # Extract delay probability
    predictions = predictions.withColumn(
        "delay_probability",
        get_delay_prob(F.col("probability"))
    )
    
    # Select relevant columns
    result = predictions.select(
        "OP_UNIQUE_CARRIER", "ORIGIN", "DEST",
        "CRS_DEP_TIME", "prediction", "delay_probability"
    )
    
    return result

In [24]:
def simulate_kafka_consumer(producer_generator):
    """
    Simulates a Kafka consumer that outputs one high-risk flight at a time.
    Each batch outputs only the single highest-risk flight in JSON format.
    """
    print("ðŸ“¥ CONSUMER: Starting to consume batches...\n")
    
    import json
    
    for batch_data in producer_generator:
        batch_num = batch_data['batch_num']
        batch_size = batch_data['batch_size']
        messages = batch_data['messages']
        
        try:
            # Convert messages to Pandas DataFrame first
            batch_pdf = pd.DataFrame(messages)
            batch_pdf = batch_pdf.where(pd.notnull(batch_pdf), None)
            batch_df = spark.createDataFrame(batch_pdf.astype(str))
            
            # Preprocess and predict
            batch_df = preprocess_batch(batch_df)
            
            if batch_df.count() == 0:
                continue
            
            predictions = predict_batch(batch_df)
            
            # Get only the TOP 1 highest-risk flight
            highest_risk = predictions.orderBy(F.desc("delay_probability")).limit(1).collect()
            
            if highest_risk:
                row = highest_risk[0]
                flight_data = {
                    "carrier": row["OP_UNIQUE_CARRIER"],
                    "origin": row["ORIGIN"],
                    "destination": row["DEST"],
                    "scheduled_departure": int(row["CRS_DEP_TIME"]) if row["CRS_DEP_TIME"] else None,
                    "prediction": int(row["prediction"]),
                    "delay_probability": round(row["delay_probability"], 4)
                }
                
                # Print single JSON record
                print(json.dumps(flight_data, indent=2))
                print()  # Empty line between records
            
        except Exception as e:
            continue

In [25]:
import itertools

CSV_FILE = f"file://{os.path.abspath('may_2025.csv')}"
BATCH_SIZE = 1000 
NUM_BATCHES = 20  # Limit to 20 batches

print("\nðŸš€ Starting Kafka simulation (20 batches)...\n")

producer = simulate_kafka_producer(CSV_FILE, batch_size=BATCH_SIZE)
simulate_kafka_consumer(itertools.islice(producer, NUM_BATCHES))

print("\nâœ“ Simulation complete!")


ðŸš€ Starting Kafka simulation (20 batches)...

ðŸ“¥ CONSUMER: Starting to consume batches...


PRODUCER: Reading data from file:///Users/aakashvardhan/Documents/big-data-project-prediction/Big Data ML Model/may_2025.csv
PRODUCER: Total records to process: 667,586


                                                                                

PRODUCER: Will send 668 batches of 1000 records each



25/12/04 21:28:58 WARN DAGScheduler: Broadcasting large task binary with size 20.8 MiB
                                                                                

{
  "carrier": "9E",
  "origin": "SYR",
  "destination": "LGA",
  "scheduled_departure": 1701,
  "prediction": 0,
  "delay_probability": 0.4067
}



25/12/04 21:29:01 WARN DAGScheduler: Broadcasting large task binary with size 20.7 MiB
                                                                                

{
  "carrier": "AA",
  "origin": "CLT",
  "destination": "RDU",
  "scheduled_departure": 1432,
  "prediction": 0,
  "delay_probability": 0.4095
}



25/12/04 21:29:03 WARN DAGScheduler: Broadcasting large task binary with size 20.7 MiB
                                                                                

{
  "carrier": "AA",
  "origin": "MCO",
  "destination": "MIA",
  "scheduled_departure": 1329,
  "prediction": 0,
  "delay_probability": 0.4085
}



25/12/04 21:29:06 WARN DAGScheduler: Broadcasting large task binary with size 20.8 MiB
                                                                                

{
  "carrier": "AA",
  "origin": "RDU",
  "destination": "CLT",
  "scheduled_departure": 1249,
  "prediction": 0,
  "delay_probability": 0.4011
}



25/12/04 21:29:08 WARN DAGScheduler: Broadcasting large task binary with size 20.8 MiB
                                                                                

{
  "carrier": "B6",
  "origin": "BOS",
  "destination": "JFK",
  "scheduled_departure": 1445,
  "prediction": 0,
  "delay_probability": 0.4349
}



25/12/04 21:29:10 WARN DAGScheduler: Broadcasting large task binary with size 20.7 MiB
                                                                                

{
  "carrier": "DL",
  "origin": "ATL",
  "destination": "TYS",
  "scheduled_departure": 1256,
  "prediction": 0,
  "delay_probability": 0.4079
}



25/12/04 21:29:13 WARN DAGScheduler: Broadcasting large task binary with size 20.7 MiB
                                                                                

{
  "carrier": "DL",
  "origin": "JFK",
  "destination": "SEA",
  "scheduled_departure": 1910,
  "prediction": 0,
  "delay_probability": 0.3664
}



25/12/04 21:29:16 WARN DAGScheduler: Broadcasting large task binary with size 20.7 MiB
                                                                                

{
  "carrier": "DL",
  "origin": "SFO",
  "destination": "LAX",
  "scheduled_departure": 1340,
  "prediction": 0,
  "delay_probability": 0.3826
}



25/12/04 21:29:19 WARN DAGScheduler: Broadcasting large task binary with size 20.8 MiB
                                                                                

{
  "carrier": "F9",
  "origin": "SNA",
  "destination": "LAS",
  "scheduled_departure": 1810,
  "prediction": 0,
  "delay_probability": 0.3762
}



25/12/04 21:29:22 WARN DAGScheduler: Broadcasting large task binary with size 20.8 MiB
                                                                                

{
  "carrier": "MQ",
  "origin": "DFW",
  "destination": "MSY",
  "scheduled_departure": 1716,
  "prediction": 0,
  "delay_probability": 0.4095
}



25/12/04 21:29:25 WARN DAGScheduler: Broadcasting large task binary with size 20.8 MiB
                                                                                

{
  "carrier": "NK",
  "origin": "MIA",
  "destination": "PHL",
  "scheduled_departure": 2126,
  "prediction": 0,
  "delay_probability": 0.3753
}



ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/Users/aakashvardhan/Documents/big-data-project-prediction/.venv/lib/python3.11/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/aakashvardhan/Documents/big-data-project-prediction/.venv/lib/python3.11/site-packages/py4j/clientserver.py", line 535, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/aakashvardhan/.local/share/uv/python/cpython-3.11.13-macos-aarch64-none/lib/python3.11/socket.py", line 718, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt
25/12/04 21:29:26 ERROR Executor: Exception in task 5.0 in stage 1304.0 (TID 4186)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/aakashva

KeyboardInterrupt: 