## Import libraries

In [43]:
import pandas as pd
import numpy as np
import os
import boto3
from io import BytesIO, StringIO
from botocore.exceptions import NoCredentialsError, ClientError
import seaborn as sns
import time
import matplotlib.pyplot as plt
import warnings
from pyspark.sql import SparkSession
from pyspark.ml.classification import (
    LogisticRegression,
    RandomForestClassifier,
    GBTClassifier,
    DecisionTreeClassifier
)

# For Pipeline and Feature Transformation
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.feature import (
    VectorAssembler,
    StringIndexer,
    OneHotEncoder,
    StandardScaler,
    MinMaxScaler,
    RobustScaler
)

# For Model Evaluation
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator

# For SQL and DataFrame Operations
from pyspark.sql.functions import col, monotonically_increasing_id, lit
from pyspark.sql.types import DoubleType

In [2]:
pd.set_option("display.max_columns", None)

warnings.filterwarnings("ignore")

sns.set(style="whitegrid")

## Create SparkSession

In [3]:
spark = SparkSession.builder \
    .appName("Classification") \
    .config("spark.executor.memory", "16g") \
    .config("spark.driver.memory", "16g") \
    .config("spark.memory.offHeap.enabled", True) \
    .config("spark.memory.offHeap.size", "2g") \
    .config("spark.executor.cores", "2") \
    .config("spark.task.maxFailures", "4") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


25/01/09 10:47:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
spark.sparkContext.setLogLevel("ERROR")

## Extract Historical Data from S3 Bucket

Defines a function `read_csv_from_s3_as_df` to fetch a CSV file from the specified S3 bucket and key, reads its contents into a pandas DataFrame, and prints the DataFrame or an error message if retrieval or parsing fails.

In [5]:
def read_csv_from_s3_as_df(bucket, key):
    try:
        # Create an S3 client
        s3 = boto3.client('s3')

        # Get the object from S3
        obj = s3.get_object(Bucket=bucket, Key=key)

        # Read the contents of the file into a pandas DataFrame
        df_pre_clean = pd.read_csv(BytesIO(obj['Body'].read()), header=0)

        return df_pre_clean
    except NoCredentialsError:
        print("Credentials not available")
    except ClientError as e:
        print(f"An error occurred: {e}")
    except Exception as e:
        print(f"An error occurred during DataFrame conversion: {e}")


bucket = 'big-data-team1-bucket'
key = 'cleaned-data/historical_data.csv'
historical_df = read_csv_from_s3_as_df(bucket, key)
if historical_df is not None:
    print(historical_df)
else:
    print("No data returned or error occurred")

         flightdate  day_of_week          airline tail_number dep_airport  \
0        2023-01-02            1     Endeavor Air      N605LR         BDL   
1        2023-01-03            2     Endeavor Air      N605LR         BDL   
2        2023-01-04            3     Endeavor Air      N331PQ         BDL   
3        2023-01-05            4     Endeavor Air      N906XJ         BDL   
4        2023-01-06            5     Endeavor Air      N337PQ         BDL   
...             ...          ...              ...         ...         ...   
6743368  2023-12-31            7  JetBlue Airways      N903JB         SJU   
6743369  2023-12-31            7  JetBlue Airways      N535JB         MCO   
6743370  2023-12-31            7  JetBlue Airways      N354JB         PHL   
6743371  2023-12-31            7  JetBlue Airways      N768JB         PBI   
6743372  2023-12-31            7  JetBlue Airways      N547JB         BDL   

                           dep_cityname deptime_label  dep_delay  \
0      

Defines a function `sample_flights` that samples a fixed number (`n_flights`) of flights per departure airport and date from the provided DataFrame, grouping by specified columns, and handles groups with fewer rows by enabling replacement during sampling.

In [6]:
def sample_flights(dataframe, dep_airport_col='dep_airport', date_col='flightdate', n_flights=10):
    dataframe[date_col] = pd.to_datetime(dataframe[date_col])

    # Group by dep_airport and date
    grouped = dataframe.groupby([dep_airport_col, date_col])

    # Sample 10 flights from each group, allowing replacement if a group has fewer rows
    sampled_data = grouped.apply(lambda x: x.sample(n=n_flights, replace=len(x) < n_flights)).reset_index(drop=True)

    return sampled_data

Samples 10 flights per departure airport and flight date from the `historical_df` DataFrame and outputs the shape of the resulting sampled DataFrame to verify the operation.

In [7]:
historical_df = sample_flights(historical_df, dep_airport_col='dep_airport', date_col='flightdate', n_flights=10)

# Verify the sampling
historical_df.shape

(1192190, 32)

Converts the Pandas DataFrame `historical_df` into a PySpark DataFrame for distributed processing and scalability.

In [8]:
historical_df = spark.createDataFrame(historical_df)

## Train Model (Classification)

Defines a function `encode_data` to perform label or one-hot encoding on categorical columns in PySpark DataFrames using a pipeline, returning encoded training and test datasets.

In [9]:
def encode_data(X_train, X_test, encoder_type='label', columns=None):
    if columns is None:
        # Default to all string (categorical) columns if no columns are specified
        columns = [coll for coll, dtype in X_train.dtypes if dtype == 'string']

    stages = []  # List to store transformation stages

    for coll in columns:
        if encoder_type == 'label':
            # Label Encoding
            indexer = StringIndexer(inputCol=coll, outputCol=f"{coll}_indexed", handleInvalid="keep")
            stages.append(indexer)
        elif encoder_type == 'onehot':
            # One-Hot Encoding
            indexer = StringIndexer(inputCol=coll, outputCol=f"{coll}_indexed", handleInvalid="keep")
            onehot_encoder = OneHotEncoder(inputCol=f"{coll}_indexed", outputCol=f"{coll}_encoded")
            stages.append(indexer)
            stages.append(onehot_encoder)

    # Create a Pipeline with all the stages
    pipeline = Pipeline(stages=stages)

    # Fit the pipeline on the training data
    pipeline_model = pipeline.fit(X_train)

    # Transform both train and test data
    X_train_encoded = pipeline_model.transform(X_train)
    X_test_encoded = pipeline_model.transform(X_test)

    return X_train_encoded, X_test_encoded

Defines a function `encode_target` to perform label encoding on target columns (`y_train` and `y_test`) in PySpark DataFrames using `StringIndexer`, ensuring consistent encoding across training and testing datasets.

In [10]:
def encode_target(y_train, y_test, encoder_type='label'):
    if encoder_type == 'label':
        # Combine y_train and y_test for consistent encoding
        combined_df = y_train.union(y_test)

        # StringIndexer for label encoding
        indexer = StringIndexer(inputCol="label", outputCol="label_indexed")
        indexer_model = indexer.fit(combined_df)  # Fit on combined data to ensure consistency

        # Transform training and testing data
        y_train_encoded = indexer_model.transform(y_train)
        y_test_encoded = indexer_model.transform(y_test)
    else:
        raise ValueError("Invalid encoder_type. Currently supported: 'label'.")

    return y_train_encoded, y_test_encoded

Defines the `scale_data` function to scale features in training and testing PySpark DataFrames using specified scalers (`StandardScaler`, `MinMaxScaler`, or `RobustScaler`), returning the transformed datasets with scaled features.

In [11]:
def scale_data(X_train, X_test, scaler_type='standard'):
    # Step 1: Choose the appropriate scaler
    if scaler_type == 'standard':
        scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withMean=True, withStd=True)
    elif scaler_type == 'minmax':
        scaler = MinMaxScaler(inputCol="features", outputCol="scaled_features")
    elif scaler_type == 'robust':
        scaler = RobustScaler(inputCol="features", outputCol="scaled_features")
    else:
        raise ValueError("Invalid scaler_type. Choose from 'standard', 'minmax', 'robust'.")

    # Step 2: Create a Pipeline
    pipeline = Pipeline(stages=[scaler])

    # Step 3: Fit the pipeline on the training data
    pipeline_model = pipeline.fit(X_train)

    # Step 4: Transform both train and test data
    X_train_scaled = pipeline_model.transform(X_train).drop("features").withColumnRenamed("scaled_features", "features")
    X_test_scaled = pipeline_model.transform(X_test).drop("features").withColumnRenamed("scaled_features", "features")

    return X_train_scaled, X_test_scaled

Defines the `evaluate_classification_models` function to train multiple classification models, evaluate their performance using metrics like accuracy, F1 score, and ROC AUC, log results, save the best-performing model, and return a summary DataFrame along with trained models.

In [12]:
def evaluate_classification_models(X_train, y_train, X_test, y_test, models, save_path="best_model"):
    # Combine features and labels into single DataFrames
    train_data = X_train.join(y_train, "id")
    test_data = X_test.join(y_test, "id")

    model_results = []
    trained_models = {}
    best_model = None
    best_accuracy = 0

    for model in models:
        start_time = time.time()

        # Train the model using a pipeline
        pipeline = Pipeline(stages=[model])
        trained_model = pipeline.fit(train_data)
        trained_models[model.__class__.__name__] = trained_model

        # Make predictions on the test dataset
        predictions = trained_model.transform(test_data)

        # Define evaluators for metrics
        evaluator_accuracy = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
        evaluator_f1 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
        evaluator_precision = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedPrecision")
        evaluator_recall = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedRecall")
        evaluator_roc = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
        evaluator_pr = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderPR")

        # Calculate metrics for test data
        accuracy = evaluator_accuracy.evaluate(predictions)
        f1_score = evaluator_f1.evaluate(predictions)
        precision = evaluator_precision.evaluate(predictions)
        recall = evaluator_recall.evaluate(predictions)
        roc_auc = evaluator_roc.evaluate(predictions)
        pr_auc = evaluator_pr.evaluate(predictions)

        inference_time = time.time() - start_time  # Inference time in seconds

        # Log results
        print(f"{model.__class__.__name__} is ready")

        model_results.append({
            "Model-Name": model.__class__.__name__,
            "Test_Accuracy": accuracy,
            "F1_Score": f1_score,
            "Precision": precision,
            "Recall": recall,
            "ROC_AUC": roc_auc,
            "PR_AUC": pr_auc,
            "Inference Time (ms)": inference_time * 1000
        })

        # Update the best model
        if accuracy > best_accuracy:
            best_accuracy = accuracy
            best_model = trained_model

    # Save the best model
    if best_model:
        if not os.path.exists(save_path):
            best_model.save(save_path)
        print(f"Best model saved at {save_path}")

    # Convert results to a pandas DataFrame
    models_df = pd.DataFrame(model_results)
    models_df = models_df.set_index("Model-Name")

    return models_df.sort_values("Test_Accuracy", ascending=False), trained_models

Defines a list of classification models, including Logistic Regression, Random Forest, Gradient Boosted Trees (GBT), and Decision Tree classifiers, each configured with specific hyperparameters for training and evaluation.

In [13]:
classification_models = [
    LogisticRegression(featuresCol="features", labelCol="label", maxIter=10, regParam=0.01),

    RandomForestClassifier(featuresCol="features", labelCol="label", numTrees=50, seed=42),

    GBTClassifier(featuresCol="features", labelCol="label", maxIter=10, seed=42),

    DecisionTreeClassifier(featuresCol="features", labelCol="label", maxDepth=5, seed=42),
]

Filters the `historical_df` DataFrame to retain only relevant columns, sets `dep_delay_tag` as the target variable labeled as `label`, and drops the original `dep_delay_tag` column to prevent duplication.

In [14]:
# Combine X and Y into a single DataFrame
data = historical_df.select(
    *[col(c) for c in historical_df.columns if c not in [
        'dep_delay', "flightdate", "tail_number", "deptime_label",
        "dep_airport", "dep_cityname", "tmin", "tmax", "day_of_week",
        "delay_carrier", "delay_nas", "delay_security", "delay_lastaircraft", "delay_weather"
    ]],  # Keep only desired columns
).withColumn("label", col("dep_delay_tag").cast(DoubleType()))  # Set 'label' column as target variable

# Drop the original target column to avoid duplication
data = data.drop("dep_delay_tag")

Adds a unique `id` column to the `data` DataFrame using the `monotonically_increasing_id` function to uniquely identify each row.

In [15]:
data = data.withColumn("id", monotonically_increasing_id())

Identifies numerical columns (`double` and `bigint` types) excluding the `label` column, creates a feature vector using `VectorAssembler`, and selects the `id`, `features`, and `label` columns for further processing.

In [None]:
feature_columns = [col for col, dtype in data.dtypes if dtype in ['double', 'bigint'] and col != 'label']
vector_assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
data = vector_assembler.transform(data).select("id", "features", "label")

Splits the data into training (80%) and testing (20%) sets using a random seed for reproducibility, then separates features (`X_train`, `X_test`) and labels (`y_train`, `y_test`) into distinct DataFrames.

In [18]:
# Split the data into training and testing sets
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

X_train = train_data.select("id", "features")
y_train = train_data.select("id", "label")
X_test = test_data.select("id", "features")
y_test = test_data.select("id", "label")

Encodes categorical features in the training and testing datasets (`X_train` and `X_test`) using label encoding, producing `X_train_encoded` and `X_test_encoded` as the transformed datasets.

In [19]:
X_train_encoded, X_test_encoded = encode_data(X_train, X_test, encoder_type='label')

Encodes categorical features in the training and testing datasets (`X_train` and `X_test`) using one-hot encoding, producing `X_train_encoded` and `X_test_encoded` as the transformed datasets.

In [None]:
X_train_encoded, X_test_encoded = encode_data(X_train, X_test, encoder_type='onehot')

Encodes the target labels (`y_train` and `y_test`) using label encoding, resulting in `y_train_encoded` and `y_test_encoded` for consistent numerical representation of the target variable.

In [20]:
y_train_encoded, y_test_encoded = encode_target(y_train, y_test)

                                                                                

In [21]:
models_class_no_s, trained_no_s = evaluate_classification_models(X_train_encoded, y_train_encoded, X_test_encoded,
                                                                 y_test_encoded, classification_models)

                                                                                

LogisticRegression is ready


                                                                                

RandomForestClassifier is ready


                                                                                

GBTClassifier is ready


                                                                                

DecisionTreeClassifier is ready
Best model saved at best_model


In [22]:
models_class_no_s.iloc[:, :-1]

Unnamed: 0_level_0,Test_Accuracy,F1_Score,Precision,Recall,ROC_AUC,PR_AUC
Model-Name,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
GBTClassifier,0.859641,0.855101,0.856474,0.859641,0.910755,0.847216
DecisionTreeClassifier,0.857715,0.854236,0.854299,0.857715,0.607771,0.620572
RandomForestClassifier,0.854164,0.851071,0.850685,0.854164,0.886917,0.815444
LogisticRegression,0.82639,0.804113,0.841714,0.82639,0.901416,0.824106


In [29]:
X_train_ss, X_test_ss = scale_data(X_train_encoded, X_test_encoded, scaler_type="standard")

                                                                                

In [30]:
models_class_ss, trained_ss = evaluate_classification_models(X_train_ss, y_train_encoded, X_test_ss,
                                                             y_test_encoded, classification_models)

models_class_ss.iloc[:, :-1]

                                                                                

LogisticRegression is ready


                                                                                

RandomForestClassifier is ready


                                                                                

GBTClassifier is ready


                                                                                

DecisionTreeClassifier is ready


Unnamed: 0_level_0,Test_Accuracy,F1_Score,Precision,Recall,ROC_AUC,PR_AUC
Model-Name,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
GBTClassifier,0.859909,0.854178,0.857332,0.859909,0.911145,0.846724
DecisionTreeClassifier,0.858209,0.851631,0.856113,0.858209,0.597719,0.612607
RandomForestClassifier,0.854524,0.851999,0.851313,0.854524,0.885414,0.811675
LogisticRegression,0.826281,0.803664,0.841696,0.826281,0.901766,0.823107


In [31]:
X_train_sm, X_test_sm = scale_data(X_train_encoded, X_test_encoded, scaler_type="minmax")

                                                                                

In [32]:
models_class_mm, trained_mm = evaluate_classification_models(X_train_sm, y_train_encoded, X_test_sm,
                                                             y_test_encoded, classification_models)

models_class_mm.iloc[:, :-1]

                                                                                

LogisticRegression is ready


                                                                                

RandomForestClassifier is ready


                                                                                

GBTClassifier is ready


                                                                                

DecisionTreeClassifier is ready


Unnamed: 0_level_0,Test_Accuracy,F1_Score,Precision,Recall,ROC_AUC,PR_AUC
Model-Name,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
GBTClassifier,0.859897,0.854165,0.857318,0.859897,0.911157,0.846711
DecisionTreeClassifier,0.858209,0.851631,0.856113,0.858209,0.597719,0.612607
RandomForestClassifier,0.854536,0.852079,0.85137,0.854536,0.88541,0.811091
LogisticRegression,0.826281,0.803664,0.841696,0.826281,0.901767,0.823107


In [33]:
X_train_sr, X_test_sr = scale_data(X_train_encoded, X_test_encoded, scaler_type="robust")

                                                                                

In [34]:
models_class_rs, trained_rs = evaluate_classification_models(X_train_sr, y_train_encoded, X_test_sr,
                                                             y_test_encoded, classification_models)

models_class_rs.iloc[:, :-1]

                                                                                

LogisticRegression is ready


                                                                                

RandomForestClassifier is ready


                                                                                

GBTClassifier is ready


                                                                                

DecisionTreeClassifier is ready


Unnamed: 0_level_0,Test_Accuracy,F1_Score,Precision,Recall,ROC_AUC,PR_AUC
Model-Name,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
GBTClassifier,0.859959,0.854269,0.857359,0.859959,0.911118,0.846706
DecisionTreeClassifier,0.858222,0.851694,0.85608,0.858222,0.586694,0.606325
RandomForestClassifier,0.854076,0.851684,0.850943,0.854076,0.885681,0.81344
LogisticRegression,0.826294,0.803685,0.841696,0.826294,0.901769,0.823111


In [35]:
models_class_no_s["Scaler"] = "No Scaling"
models_class_ss["Scaler"] = "Standard Scaler"
models_class_mm["Scaler"] = "MinMax Scaler"
models_class_rs["Scaler"] = "Robust Scaler"


all_models = pd.concat([models_class_no_s, models_class_ss, models_class_mm, models_class_rs], axis=0)
all_models = all_models.sort_values(by="Test_Accuracy", ascending=False)
all_models

Unnamed: 0_level_0,Test_Accuracy,F1_Score,Precision,Recall,ROC_AUC,PR_AUC,Inference Time (ms),Scaler
Model-Name,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
GBTClassifier,0.859959,0.854269,0.857359,0.859959,0.91112,0.846705,84990.954161,No Scaling
GBTClassifier,0.859959,0.854269,0.857359,0.859959,0.911118,0.846706,84324.897051,Robust Scaler
GBTClassifier,0.859909,0.854178,0.857332,0.859909,0.911145,0.846724,83307.399511,Standard Scaler
GBTClassifier,0.859897,0.854165,0.857318,0.859897,0.911157,0.846711,85228.909969,MinMax Scaler
DecisionTreeClassifier,0.858222,0.851694,0.85608,0.858222,0.586694,0.606325,77758.510113,No Scaling
DecisionTreeClassifier,0.858222,0.851694,0.85608,0.858222,0.586694,0.606325,77047.998905,Robust Scaler
DecisionTreeClassifier,0.858209,0.851631,0.856113,0.858209,0.597719,0.612607,77111.476183,MinMax Scaler
DecisionTreeClassifier,0.858209,0.851631,0.856113,0.858209,0.597719,0.612607,78164.190292,Standard Scaler
RandomForestClassifier,0.854536,0.852079,0.85137,0.854536,0.88541,0.811091,121518.332243,MinMax Scaler
RandomForestClassifier,0.854524,0.851999,0.851313,0.854524,0.885414,0.811675,120347.884893,Standard Scaler


## Import Streaming Data

In [23]:
bucket = 'big-data-team1-bucket'
key = 'cleaned-data/realtime_data.csv'
streaming_df = read_csv_from_s3_as_df(bucket, key)

In [24]:
streaming_df.dtypes

flightdate             object
day_of_week             int64
airline                object
tail_number            object
dep_airport            object
dep_cityname           object
deptime_label          object
dep_delay               int64
dep_delay_tag           int64
dep_delay_type         object
arr_airport            object
arr_cityname           object
arr_delay               int64
arr_delay_type         object
flight_duration         int64
distance_type          object
delay_carrier           int64
delay_weather           int64
delay_nas               int64
delay_security          int64
delay_lastaircraft      int64
manufacturer           object
model                  object
aicraft_age             int64
tavg                  float64
tmin                  float64
tmax                  float64
prcp                  float64
snow                  float64
wdir                  float64
wspd                  float64
pres                  float64
dtype: object

In [25]:
streaming_df = spark.createDataFrame(streaming_df)

In [26]:
streaming_df.dtypes

[('flightdate', 'string'),
 ('day_of_week', 'bigint'),
 ('airline', 'string'),
 ('tail_number', 'string'),
 ('dep_airport', 'string'),
 ('dep_cityname', 'string'),
 ('deptime_label', 'string'),
 ('dep_delay', 'bigint'),
 ('dep_delay_tag', 'bigint'),
 ('dep_delay_type', 'string'),
 ('arr_airport', 'string'),
 ('arr_cityname', 'string'),
 ('arr_delay', 'bigint'),
 ('arr_delay_type', 'string'),
 ('flight_duration', 'bigint'),
 ('distance_type', 'string'),
 ('delay_carrier', 'bigint'),
 ('delay_weather', 'bigint'),
 ('delay_nas', 'bigint'),
 ('delay_security', 'bigint'),
 ('delay_lastaircraft', 'bigint'),
 ('manufacturer', 'string'),
 ('model', 'string'),
 ('aicraft_age', 'bigint'),
 ('tavg', 'double'),
 ('tmin', 'double'),
 ('tmax', 'double'),
 ('prcp', 'double'),
 ('snow', 'double'),
 ('wdir', 'double'),
 ('wspd', 'double'),
 ('pres', 'double')]

In [28]:
# Combine X and y into a single DataFrame
realtime_data = streaming_df.select(
    *[col(c) for c in streaming_df.columns if c not in [
        'dep_delay', "flightdate", "tail_number", "deptime_label",
        "dep_airport", "dep_cityname", "tmin", "tmax", "day_of_week",
        "delay_carrier", "delay_nas", "delay_security", "delay_lastaircraft", "delay_weather", "dep_delay_tag"
    ]],  # Keep only desired columns
)

In [30]:
realtime_data = realtime_data.withColumn("id", monotonically_increasing_id())

In [29]:
realtime_data.dtypes

[('airline', 'string'),
 ('dep_delay_type', 'string'),
 ('arr_airport', 'string'),
 ('arr_cityname', 'string'),
 ('arr_delay', 'bigint'),
 ('arr_delay_type', 'string'),
 ('flight_duration', 'bigint'),
 ('distance_type', 'string'),
 ('manufacturer', 'string'),
 ('model', 'string'),
 ('aicraft_age', 'bigint'),
 ('tavg', 'double'),
 ('prcp', 'double'),
 ('snow', 'double'),
 ('wdir', 'double'),
 ('wspd', 'double'),
 ('pres', 'double')]

In [31]:
real_feature_columns = [col for col, dtype in realtime_data.dtypes if dtype in ['double', 'bigint']]
vector_assembler = VectorAssembler(inputCols=real_feature_columns, outputCol="features")
realtime_data = vector_assembler.transform(realtime_data).select("id", "features")

In [32]:
# Load the saved model
loaded_model = PipelineModel.load("best_model")



In [33]:
realtime_data.dtypes

[('id', 'bigint'), ('features', 'vector')]

In [34]:
predictions = loaded_model.transform(realtime_data)

In [35]:
predictions.printSchema()

root
 |-- id: long (nullable = false)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [47]:
predictions.select("features", "prediction", "probability").show(10, truncate=False)

+-----------------------------------------------------+----------+----------------------------------------+
|features                                             |prediction|probability                             |
+-----------------------------------------------------+----------+----------------------------------------+
|[0.0,87.0,21.0,5.88,0.0,0.0,310.0,31.5,1022.0,0.0]   |0.0       |[0.7590338038950871,0.24096619610491288]|
|[-16.0,43.0,21.0,5.88,0.0,0.0,310.0,31.5,1022.0,1.0] |0.0       |[0.9160584954639378,0.0839415045360622] |
|[-15.0,68.0,20.0,5.88,0.0,0.0,310.0,31.5,1022.0,2.0] |0.0       |[0.9160584954639378,0.0839415045360622] |
|[-14.0,62.0,11.0,5.88,0.0,0.0,310.0,31.5,1022.0,3.0] |0.0       |[0.9118810672939638,0.08811893270603621]|
|[906.0,55.0,20.0,5.88,0.0,0.0,310.0,31.5,1022.0,4.0] |1.0       |[0.06552106523960509,0.9344789347603949]|
|[-19.0,109.0,16.0,5.88,0.0,0.0,310.0,31.5,1022.0,5.0]|0.0       |[0.9193055768343861,0.08069442316561393]|
|[2.0,64.0,17.0,5.88,0.0,0.0

In [42]:
streaming_df = streaming_df.withColumn("id", monotonically_increasing_id())
predictions = predictions.withColumn("id", monotonically_increasing_id())

streaming_with_predictions = streaming_df.join(predictions.select("id", "prediction"), on="id", how="inner")

streaming_with_predictions = streaming_with_predictions.drop("id")

# Convert PySpark DataFrame to Pandas DataFrame
prediction_df = streaming_with_predictions.toPandas()

# Display the Pandas DataFrame
prediction_df.head(10)

Unnamed: 0,flightdate,day_of_week,airline,tail_number,dep_airport,dep_cityname,deptime_label,dep_delay,dep_delay_tag,dep_delay_type,arr_airport,arr_cityname,arr_delay,arr_delay_type,flight_duration,distance_type,delay_carrier,delay_weather,delay_nas,delay_security,delay_lastaircraft,manufacturer,model,aicraft_age,tavg,tmin,tmax,prcp,snow,wdir,wspd,pres,prediction
0,2025-01-01,7,Endeavor Air,N601LR,ATL,"Atlanta, GA",Morning,-2,0,Low <5min,GNV,"Gainesville, FL",-6,Low <5min,81,Short Haul >1500Mi,0,0,0,0,0,CANADAIR REGIONAL JET,CRJ,17,5.88,2.1,9.66,0.0,0.0,310.0,31.5,1022.0,0.0
1,2025-01-01,7,Endeavor Air,N153PQ,ATL,"Atlanta, GA",Evening,-5,0,Low <5min,ABE,"Allentown/Bethlehem/Easton, PA",-1,Low <5min,117,Short Haul >1500Mi,0,0,0,0,0,CANADAIR REGIONAL JET,CRJ,17,5.88,2.1,9.66,0.0,0.0,310.0,31.5,1022.0,0.0
2,2025-01-01,7,Delta Air Lines Inc,N807DN,ATL,"Atlanta, GA",Evening,-2,0,Low <5min,CVG,"Cincinnati, OH",0,Low <5min,86,Short Haul >1500Mi,0,0,0,0,0,BOEING,737 NG,11,5.88,2.1,9.66,0.0,0.0,310.0,31.5,1022.0,0.0
3,2025-01-01,7,Southwest Airlines Co.,N413WN,LAX,"Los Angeles, CA",Night,-1,0,Low <5min,PHX,"Phoenix, AZ",-26,Low <5min,70,Short Haul >1500Mi,0,0,0,0,0,BOEING,737 NG,23,14.77,10.24,19.29,0.0,0.0,280.0,16.67,1018.0,0.0
4,2025-01-01,7,United Air Lines Inc.,N26232,LGA,"New York, NY",Afternoon,-10,0,Low <5min,DEN,"Denver, CO",-11,Low <5min,274,Medium Haul <3000Mi,0,0,0,0,0,BOEING,737 NG,25,7.11,4.39,9.83,0.0,0.0,300.0,51.84,1000.0,0.0
5,2025-01-01,7,Frontier Airlines Inc.,N388FR,MDW,"Chicago, IL",Afternoon,2,1,Low <5min,PHL,"Philadelphia, PA",-9,Low <5min,102,Short Haul >1500Mi,0,0,0,0,0,AIRBUS,A320,2,-1.71,-4.54,1.12,0.19,0.0,284.0,41.83,1020.0,0.0
6,2025-01-01,7,Southwest Airlines Co.,N8803L,MDW,"Chicago, IL",Evening,37,1,Medium >15min,LAX,"Los Angeles, CA",40,Medium >15min,273,Medium Haul <3000Mi,0,0,3,0,37,BOEING,737 NG,5,-1.71,-4.54,1.12,0.19,0.0,284.0,41.83,1020.0,1.0
7,2025-01-01,7,American Airlines Inc.,N180US,ORD,"Chicago, IL",Morning,7,1,Low <5min,DFW,"Dallas/Fort Worth, TX",-3,Low <5min,143,Short Haul >1500Mi,0,0,0,0,0,AIRBUS,A321,23,-1.71,-4.54,1.12,0.19,0.0,284.0,41.83,1020.0,0.0
8,2025-01-01,7,Alaska Airlines Inc.,N971AK,ORD,"Chicago, IL",Evening,-4,0,Low <5min,SEA,"Seattle, WA",-25,Low <5min,263,Medium Haul <3000Mi,0,0,0,0,0,BOEING,737 NG,6,-1.71,-4.54,1.12,0.19,0.0,284.0,41.83,1020.0,0.0
9,2025-01-01,7,Southwest Airlines Co.,N240WN,ORD,"Chicago, IL",Evening,10,1,Low <5min,DEN,"Denver, CO",15,Low <5min,180,Short Haul >1500Mi,0,0,5,0,10,BOEING,737 NG,18,-1.71,-4.54,1.12,0.19,0.0,284.0,41.83,1020.0,0.0


In [44]:
s3_resource = boto3.Session().resource('s3')

s3_bucket = 'big-data-team1-bucket'
prediction_data_file = "prediction_data.csv"


def upload_s3_csv(filename, folder, dataframe):
    csv_buffer = StringIO()
    dataframe.to_csv(csv_buffer, header=True, index=False)
    s3_resource.Bucket(s3_bucket).Object(os.path.join(folder, filename)).put(Body=csv_buffer.getvalue())

In [45]:
upload_s3_csv(prediction_data_file, "prediction-data", prediction_df)