## Stage 3: Predictive Data Analytics

### Imports and Spark session

In [None]:
from pyspark.sql import SparkSession

team = "team15"

# location of your Hive database in HDFS
warehouse = "project/hive/warehouse"

spark = SparkSession.builder\
        .appName("{team} - spark ML".format(team))\
        .master("yarn")\
        .config("hive.metastore.uris", "thrift://hadoop-02.uni.innopolis.ru:9883")\
        .config("spark.sql.warehouse.dir", warehouse)\
        .config("spark.sql.avro.compression.codec", "snappy")\
        .enableHiveSupport()\
        .getOrCreate()

print("Spark Session Created.")
print(f"Spark version: {spark.version}")

#We can also add
# .config("spark.sql.catalogImplementation","hive")\ 
# But this is the default configuration
# You can switch to Spark Catalog by setting "in-memory" for "spark.sql.catalogImplementation"

### Load data from Hive

In [None]:
spark.sql("SHOW DATABASES").show()
# spark.sql("USE team15_projectdb").show()
# spark.sql("SHOW TABLES").show()
# spark.sql("SELECT * FROM <db_name>.<table_name>").show()

In [None]:
# List all databases

print(spark.catalog.listDatabases())
# spark.sql("SHOW DATABASES;").show()

In [None]:
# List all tables

print(spark.catalog.listTables("team15_projectdb"))
# spark.sql("USE team15_projectdb;")
# print(spark.sql("SHOW TABLES;"))

In [None]:
# Read Hive table

emps = spark.read.format("avro").table('team15_projectdb.employees_part')

# Creates a temporary view
# emps.createOrReplaceTempView('employees') 

depts = spark.read.format("avro").table('team15_projectdb.departments')

# Creates a temporary view
# depts.createOrReplaceTempView('departments')

In [None]:
# Run some queries

emps.printSchema()
depts.printSchema()

spark.sql("SELECT * FROM employees WHERE deptno=10").show()

spark.sql("SELECT * FROM departments").show()

spark.sql("SELECT AVG(SAL) FROM employees;").show()

spark.sql("SELECT * from employees where comm is NULL;").show()

### Data prep for ML modeling

In [None]:
import math
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf, sin, cos, pi, hour, minute, dayofmonth, dayofweek, year as f_year
from pyspark.sql.types import IntegerType, StringType, DoubleType

from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.mllib.evaluation import MulticlassMetrics

In [None]:
print("--- Loading Data ---")
df_flights = spark.table(f"{hive_db_name}.{hive_table_name}")
df_flights.printSchema()

In [None]:
print("--- Preprocessing Data ---")
# 1. Select relevant features & cast target
# CRUCIAL: Only use features known BEFORE flight departure/cancellation decision
selected_features_raw = [
    "Year", "Month", "DayofMonth", "DayOfWeek",
    "CRSDepTime", "CRSElapsedTime",
    "Origin", "Dest", "Distance", "Cancelled"
]
df_processed = df_flights.select(selected_features_raw)
df_processed = df_processed.withColumn("Cancelled", col("Cancelled").cast(DoubleType()))

In [None]:
# 2. Handle Missing Values (simple drop for this example, consider imputation)
df_processed = df_processed.na.drop()
print(f"Data count after NA drop: {df_processed.count()}")

In [None]:
# 3. Time/Date Feature Engineering
def parse_time_hour_udf(time_str):
    if time_str is None: return None
    try: return int(time_str.split(':')[0])
    except: return None

def parse_time_minute_udf(time_str):
    if time_str is None: return None
    try: return int(time_str.split(':')[1])
    except: return None

udf_parse_hour = udf(parse_time_hour_udf, IntegerType())
udf_parse_minute = udf(parse_time_minute_udf, IntegerType())

df_processed = df_processed.withColumn("ScheduledDepHour", udf_parse_hour(col("CRSDepTime")))
df_processed = df_processed.withColumn("ScheduledDepMinute", udf_parse_minute(col("CRSDepTime")))
df_processed = df_processed.na.drop(subset=["ScheduledDepHour", "ScheduledDepMinute"])

df_processed = df_processed.withColumn("DepHour_sin", sin(2 * pi() * col("ScheduledDepHour") / 24.0))
df_processed = df_processed.withColumn("DepHour_cos", cos(2 * pi() * col("ScheduledDepHour") / 24.0))
df_processed = df_processed.withColumn("Month_sin", sin(2 * pi() * col("Month") / 12.0))
df_processed = df_processed.withColumn("Month_cos", cos(2 * pi() * col("Month") / 12.0))
df_processed = df_processed.withColumn("DayOfMonth_sin", sin(2 * pi() * col("DayofMonth") / 31.0)) # Approx.
df_processed = df_processed.withColumn("DayOfMonth_cos", cos(2 * pi() * col("DayofMonth") / 31.0))
df_processed = df_processed.withColumn("DayOfWeek_sin", sin(2 * pi() * col("DayOfWeek") / 7.0))
df_processed = df_processed.withColumn("DayOfWeek_cos", cos(2 * pi() * col("DayOfWeek") / 7.0))

In [None]:
# 4. Categorical Feature Encoding
categorical_cols = ["Origin", "Dest"]
indexers = [StringIndexer(inputCol=c, outputCol=c+"_index", handleInvalid="keep") for c in categorical_cols]
encoders = [OneHotEncoder(inputCol=indexer.getOutputCol(), outputCol=indexer.getOutputCol()+"_ohe") for indexer in indexers]

In [None]:
# 5. Numerical Feature Scaling
numerical_cols_raw = ["Distance", "CRSElapsedTime", "Year"] # Year is not cyclical in the same way but can be scaled
cyclical_cols_engineered = [
    "DepHour_sin", "DepHour_cos", "Month_sin", "Month_cos",
    "DayOfMonth_sin", "DayOfMonth_cos", "DayOfWeek_sin", "DayOfWeek_cos"
]
# Assemble numerical features for scaling
temp_numerical_assembler_inputs = numerical_cols_raw + cyclical_cols_engineered
temp_numerical_assembler = VectorAssembler(inputCols=temp_numerical_assembler_inputs, outputCol="temp_numerical_features", handleInvalid="keep")
scaler = StandardScaler(inputCol="temp_numerical_features", outputCol="scaled_numerical_features", withStd=True, withMean=False) # Mean can be sensitive to outliers

In [None]:
# 6. Assemble Final Features Vector
final_assembler_input_cols = [encoder.getOutputCol() for encoder in encoders] + ["scaled_numerical_features"]
vector_assembler = VectorAssembler(inputCols=final_assembler_input_cols, outputCol="features", handleInvalid="keep")

In [None]:
# --- Split Data ---
(train_data, test_data) = df_processed.randomSplit([0.8, 0.2], seed=42)
train_data.cache() # Cache for repeated use in CV
test_data.cache()
print(f"Training data count: {train_data.count()}, Test data count: {test_data.count()}")

### ML modeling

In [None]:
# --- Define Models, Pipelines, and Tuning ---
lr = LogisticRegression(labelCol="Cancelled", featuresCol="features")
rf = RandomForestClassifier(labelCol="Cancelled", featuresCol="features", seed=42)

In [None]:
# Build pre-processing stages (common for both models)
preprocessing_stages = indexers + encoders + [temp_numerical_assembler, scaler, vector_assembler]

pipeline_lr = Pipeline(stages=preprocessing_stages + [lr])
pipeline_rf = Pipeline(stages=preprocessing_stages + [rf])

evaluator = BinaryClassificationEvaluator(labelCol="Cancelled", rawPredictionCol="rawPrediction", metricName="areaUnderPR")

paramGrid_lr = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.01, 0.1]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5]) \
    .build()

paramGrid_rf = ParamGridBuilder() \
    .addGrid(rf.numTrees, [20, 50]) \
    .addGrid(rf.maxDepth, [5, 10]) \
    .build()

cv_lr = CrossValidator(estimator=pipeline_lr, estimatorParamMaps=paramGrid_lr, evaluator=evaluator, numFolds=3, parallelism=4, seed=42)
cv_rf = CrossValidator(estimator=pipeline_rf, estimatorParamMaps=paramGrid_rf, evaluator=evaluator, numFolds=3, parallelism=4, seed=42)

In [None]:
print("--- Training Logistic Regression ---")
cv_model_lr = cv_lr.fit(train_data)
best_model_lr = cv_model_lr.bestModel

In [None]:
print("--- Training Random Forest ---")
cv_model_rf = cv_rf.fit(train_data)
best_model_rf = cv_model_rf.bestModel

In [None]:
# --- Evaluate Models ---
print("--- Evaluating Models on Test Data ---")
predictions_lr = best_model_lr.transform(test_data)
predictions_rf = best_model_rf.transform(test_data)

auc_pr_lr = evaluator.evaluate(predictions_lr)

In [None]:
def get_detailed_metrics(predictions_df, label_col="Cancelled", pred_col="prediction"):
    """Calculates and returns detailed classification metrics."""
    preds_and_labels = predictions_df.select(pred_col, label_col).rdd.map(lambda r: (float(r[0]), float(r[1])))
    metrics = MulticlassMetrics(preds_and_labels)
    
    metrics_dict = {
        "confusion_matrix": metrics.confusionMatrix().toArray().tolist(),
        "precision_0": metrics.precision(0.0),
        "recall_0": metrics.recall(0.0),
        "f1_0": metrics.fMeasure(0.0),
        "precision_1": metrics.precision(1.0), # For Cancelled
        "recall_1": metrics.recall(1.0),       # For Cancelled
        "f1_1": metrics.fMeasure(1.0),         # For Cancelled
        "accuracy": metrics.accuracy
    }
    return metrics_dict

In [None]:
detailed_metrics_lr = get_detailed_metrics(predictions_lr)
print(f"Logistic Regression - Test AreaUnderPR: {auc_pr_lr}")
print(f"Logistic Regression - Test Precision (Cancelled): {detailed_metrics_lr['precision_1']}")
print(f"Logistic Regression - Test Recall (Cancelled): {detailed_metrics_lr['recall_1']}")
print(f"Logistic Regression - Test F1 (Cancelled): {detailed_metrics_lr['f1_1']}")

In [None]:
auc_pr_rf = evaluator.evaluate(predictions_rf)
detailed_metrics_rf = get_detailed_metrics(predictions_rf)
print(f"Random Forest - Test AreaUnderPR: {auc_pr_rf}")
print(f"Random Forest - Test Precision (Cancelled): {detailed_metrics_rf['precision_1']}")
print(f"Random Forest - Test Recall (Cancelled): {detailed_metrics_rf['recall_1']}")
print(f"Random Forest - Test F1 (Cancelled): {detailed_metrics_rf['f1_1']}")

In [None]:
# --- Save Models and Predictions ---
print("--- Saving Models and Outputs ---")
model1_path_hdfs = "project/models/flight_cancellation_lr_model"
model2_path_hdfs = "project/models/flight_cancellation_rf_model"

best_model_lr.write().overwrite().save(model1_path_hdfs)
print(f"Saved Logistic Regression model to: {model1_path_hdfs}")
best_model_rf.write().overwrite().save(model2_path_hdfs)
print(f"Saved Random Forest model to: {model2_path_hdfs}")

predictions_lr.select("Cancelled", "prediction") \
    .coalesce(1).write.mode("overwrite").format("csv") \
    .option("header", "true").save("project/output/model1_lr_predictions")
print("Saved LR predictions to project/output/model1_lr_predictions")

predictions_rf.select("Cancelled", "prediction") \
    .coalesce(1).write.mode("overwrite").format("csv") \
    .option("header", "true").save("project/output/model2_rf_predictions")
print("Saved RF predictions to project/output/model2_rf_predictions")

In [None]:
# --- Save Evaluation Comparison ---
evaluation_summary_data = [
    ("Logistic Regression", auc_pr_lr, detailed_metrics_lr['precision_1'], detailed_metrics_lr['recall_1'], detailed_metrics_lr['f1_1']),
    ("Random Forest",       auc_pr_rf, detailed_metrics_rf['precision_1'], detailed_metrics_rf['recall_1'], detailed_metrics_rf['f1_1'])
]
eval_schema = ["ModelName", "AreaUnderPR", "Precision_Cancelled", "Recall_Cancelled", "F1_Score_Cancelled"]
evaluation_df = spark.createDataFrame(evaluation_summary_data, schema=eval_schema)
evaluation_df.show(truncate=False)
evaluation_df.coalesce(1).write.mode("overwrite").format("csv") \
    .option("header", "true").save("project/output/model_evaluation_comparison")
print("Saved evaluation comparison to project/output/model_evaluation_comparison")

In [None]:
train_data.unpersist()
test_data.unpersist()
spark.stop()
print("--- Pipeline Finished ---")