In [None]:
# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import DoubleType
from pyspark.ml import PipelineModel
from pyspark.ml.classification import (
    RandomForestClassifier, 
    LogisticRegression, 
    NaiveBayes,
    GBTClassifier,     
    OneVsRest           
)
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql import functions as F

In [None]:
import os

os.environ['PYSPARK_PYTHON'] = "C:/Users/rwkos/miniconda3/envs/music_classifier/python.exe"
os.environ['PYSPARK_DRIVER_PYTHON'] = "C:/Users/rwkos/miniconda3/envs/music_classifier/python.exe"

In [None]:
# Initialize Spark Session
spark = SparkSession.builder \
    .appName("EnsembleModelTraining") \
    .master("local[2]") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

# Define file paths (assuming a project structure with a 'data' and 'models' folder)
BASE_PATH = "" 
TRAIN_DATA_PATH = f"notebook_data/Mendeley_cleaned_train.csv"
TEST_DATA_PATH = f"notebook_data/Mendeley_cleaned_test.csv"
PIPELINE_PATH = f"notebook_data/feature_pipeline_lyrics_only"

# Load the saved feature engineering pipeline
pipeline_model = PipelineModel.load(PIPELINE_PATH)

# Load the raw cleaned data
df_train = spark.read.csv(TRAIN_DATA_PATH, header=True, inferSchema=True)
df_test = spark.read.csv(TEST_DATA_PATH, header=True, inferSchema=True)

# Transform the data using the pipeline
train_transformed = pipeline_model.transform(df_train).cache()
test_transformed = pipeline_model.transform(df_test).cache()

print("Data loaded and transformed successfully.")
train_transformed.select("features", "label").show(5, truncate=False)


In [None]:
gbt = GBTClassifier(featuresCol="features", labelCol="label", seed=42)
ovr_gbt = OneVsRest(classifier=gbt)

models = {
    #"Random Forest": RandomForestClassifier(featuresCol="features", labelCol="label", seed=42),
    "Logistic Regression": LogisticRegression(featuresCol="features", labelCol="label"),
    "Naive Bayes": NaiveBayes(featuresCol="features", labelCol="label"),
    "GBT with One-vs-Rest": ovr_gbt,
}

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")

print("--- Evaluating Individual Models ---")
for name, model in models.items():
    print(f"Training {name}...")
    
    fitted_model = model.fit(train_transformed)
    predictions = fitted_model.transform(test_transformed)
    f1_score = evaluator.evaluate(predictions)
    
    print(f" F1-Score for {name}: {f1_score:.4f}")
    

In [None]:
# 1. Define the base models for the ensemble
#rf_model = RandomForestClassifier(featuresCol="features", labelCol="label", seed=42)
lr_model = LogisticRegression(featuresCol="features", labelCol="label")
nb_model = NaiveBayes(featuresCol="features", labelCol="label")
ovr_gbt_model = OneVsRest(classifier=GBTClassifier(featuresCol="features", labelCol="label", seed=42))

# 2. Train each model
print("Training base models for the ensemble...")
#fitted_rf = rf_model.fit(train_transformed)
fitted_lr = lr_model.fit(train_transformed)
fitted_nb = nb_model.fit(train_transformed)
fitted_ovr_gbt = ovr_gbt_model.fit(train_transformed)
print("Base models trained.")

# 3. Get predictions from each model, renaming the prediction column for clarity
print("Generating predictions from each model...")
#preds_rf = fitted_rf.transform(test_transformed).withColumnRenamed("prediction", "rf_pred")
preds_lr = fitted_lr.transform(test_transformed).withColumnRenamed("prediction", "lr_pred")
preds_nb = fitted_nb.transform(test_transformed).withColumnRenamed("prediction", "nb_pred")
preds_ovr_gbt = fitted_ovr_gbt.transform(test_transformed).withColumnRenamed("prediction", "ovr_gbt_pred")

# 4. Join the predictions into a single DataFrame
# We add a unique ID to each DataFrame to ensure a correct row-by-row join
#preds_rf = preds_rf.withColumn("id", F.monotonically_increasing_id())
preds_lr = preds_lr.withColumn("id", F.monotonically_increasing_id())
preds_nb = preds_nb.withColumn("id", F.monotonically_increasing_id())
preds_ovr_gbt = preds_ovr_gbt.withColumn("id", F.monotonically_increasing_id())

# Select only the necessary columns before joining to avoid conflicts
# combined_preds = preds_rf.select("id", "label", "rf_pred") \
#     .join(preds_lr.select("id", "lr_pred"), "id") \
#     .join(preds_nb.select("id", "nb_pred"), "id")

combined_preds = preds_ovr_gbt.select("id", "label", "ovr_gbt_pred") \
    .join(preds_lr.select("id", "lr_pred"), "id") \
    .join(preds_nb.select("id", "nb_pred"), "id")

print("Combined predictions from all models:")
combined_preds.show(10)

# 5. Define and apply the majority vote UDF
def majority_vote(*predictions):
    from collections import Counter
    # Find the most common prediction among the inputs
    return Counter(predictions).most_common(1)[0][0]

vote_udf = udf(majority_vote, DoubleType())

# Add a new column with the final ensemble prediction
# final_preds = combined_preds.withColumn(
#     "prediction", 
#     vote_udf(col("rf_pred"), col("lr_pred"), col("nb_pred"))
# )
final_preds = combined_preds.withColumn(
    "prediction", 
    vote_udf(col("ovr_gbt_pred"), col("lr_pred"), col("nb_pred"))
)

print("Final predictions after majority vote:")
# final_preds.select("label", "rf_pred", "lr_pred", "nb_pred", "prediction").show(10)
final_preds.select("label", "ovr_gbt_pred", "lr_pred", "nb_pred", "prediction").show(10)

In [None]:
# Evaluate the ensemble's performance
ensemble_f1 = evaluator.evaluate(final_preds)

print("-" * 50)
print(f"F1-Score for Voting Ensemble (OGB+LR+NB): {ensemble_f1:.4f}")
print("-" * 50)

In [None]:
# Evaluate the ensemble's performance
ensemble_f1 = evaluator.evaluate(final_preds)

print("-" * 50)
print(f"F1-Score for Voting Ensemble (RF+LR+NB): {ensemble_f1:.4f}")
print("-" * 50)