In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer, HashingTF, IDF
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from mleap.pyspark.spark_support import SimpleSparkSerializer


# 1. Initialize Spark Session
spark = SparkSession.builder.appName("SteamReviewsTuning").getOrCreate()

# -- Configuration: Using your specified column names --
TEXT_COLUMN = "review_text"
LABEL_COLUMN = "review_score"
# ----------------------------------------------------

# 2. Load Data and add label/weight columns
print("📖 Loading data and preparing for training...")
file_path = "gs://steam-reviews-bucket-0/steam_reviews_cleaned.csv"
df = spark.read.csv(file_path, header=True, inferSchema=True)
df = df.withColumn("label", col(LABEL_COLUMN).cast("double")).na.drop()

balance_ratio = df.groupBy("label").count()
count_total = df.count()
count_class_0 = balance_ratio.filter(col("label") == 0).select("count").collect()[0][0]
count_class_1 = balance_ratio.filter(col("label") == 1).select("count").collect()[0][0]
weight_class_0 = count_total / (2.0 * count_class_0)
weight_class_1 = count_total / (2.0 * count_class_1)
df = df.withColumn("classWeight", when(col("label") == 1, weight_class_1).otherwise(weight_class_0))


# 3. Create the base pipeline (before tuning)
tokenizer = Tokenizer(inputCol=TEXT_COLUMN, outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="rawFeatures")
idf = IDF(inputCol=hashingTF.getOutputCol(), outputCol="features")
lr = LogisticRegression(featuresCol="features", labelCol="label", weightCol="classWeight")

pipeline = Pipeline(stages=[tokenizer, hashingTF, idf, lr])


# 4. Set up the Hyperparameter Grid
print("🛠️ Setting up the hyperparameter grid for tuning...")
paramGrid = (ParamGridBuilder()
             .addGrid(hashingTF.numFeatures, [10000, 50000])
             .addGrid(lr.regParam, [0.1, 0.01])
             .build())


# 5. Set up the Cross-Validator
evaluator = BinaryClassificationEvaluator(labelCol='label', rawPredictionCol='rawPrediction', metricName='areaUnderROC')

crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3)


# 6. Split data and run the Cross-Validation to find the best model
(training_data, test_data) = df.randomSplit([0.8, 0.2], seed=42)

print("\n🧠 Starting cross-validation... (This will take a long time)")
cvModel = crossval.fit(training_data)
print("✅ Cross-validation complete. Best model found.")


# 7. Use the BEST model found by the CrossValidator for evaluation
print("\n📊 Evaluating the BEST model on the test set...")
bestModel = cvModel.bestModel
predictions = bestModel.transform(test_data)

# --- General Evaluation ---
roc_auc = evaluator.evaluate(predictions)
print(f"\n🏅 ROC AUC of Best Model: {roc_auc:.4f}")

# --- Detailed Per-Class Evaluation ---
multi_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")

# Metrics for Negative Reviews (Class 0)
precision_0 = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "precisionByLabel", multi_evaluator.metricLabel: 0.0})
recall_0 = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "recallByLabel", multi_evaluator.metricLabel: 0.0})
f1_0 = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "f1", multi_evaluator.metricLabel: 0.0})

# Metrics for Positive Reviews (Class 1)
precision_1 = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "precisionByLabel", multi_evaluator.metricLabel: 1.0})
recall_1 = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "recallByLabel", multi_evaluator.metricLabel: 1.0})
f1_1 = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "f1", multi_evaluator.metricLabel: 1.0})

print("\n--- Metrics for Negative Reviews (Class 0) ---")
print(f"🎯 Precision: {precision_0:.4f}")
print(f"🔁 Recall:    {recall_0:.4f}")
print(f"🎯 F1 Score:  {f1_0:.4f}")

print("\n--- Metrics for Positive Reviews (Class 1) ---")
print(f"🎯 Precision: {precision_1:.4f}")
print(f"🔁 Recall:    {recall_1:.4f}")
print(f"🎯 F1 Score:  {f1_1:.4f}")

# Confusion Matrix
print("\n\n📋 Confusion Matrix")
print("Rows: True Label, Columns: Predicted Label")
predictions.groupBy('label').pivot('prediction', [0.0, 1.0]).count().na.fill(0).show()


# 8. Save the best model in both Spark and MLeap formats
print("\n💾 Saving the best model to GCS...")

# -- Original Spark format (good for batch jobs) --
spark_model_path = "gs://steam-reviews-bucket-0/models/spark_lr_model_tuned"
bestModel.write().overwrite().save(spark_model_path)
print(f"✅ Best Spark model saved to: {spark_model_path}")

# -- MLeap format (for serving) --
# Note: MLeap needs a schema definition from a sample DataFrame
df_schema = df.select(TEXT_COLUMN) # Your model only needs the text column as input

mleap_model_path = "gs://steam-reviews-bucket-0/models/mleap_lr_model_tuned.zip"
bestModel.serializeToBundle(f"jar:file:{mleap_model_path}", df_schema)
print(f"✅ Best MLeap model successfully saved to: {mleap_model_path}")

# 9. Stop the SparkSession
print("\n🛑 Stopping the Spark session.")
spark.stop()


In [None]:
# ... (after your training and evaluation is complete) ...
from mleap.pyspark.spark_support import SimpleSparkSerializer

# 8. Save the best model in both Spark and MLeap formats
print("\n💾 Saving the best model to GCS...")

# -- Original Spark format (good for batch jobs) --
spark_model_path = "gs://steam-reviews-bucket-0/models/spark_lr_model_tuned"
bestModel.write().overwrite().save(spark_model_path)
print(f"✅ Best Spark model saved to: {spark_model_path}")

# -- MLeap format (for serving) --
# Note: MLeap needs a schema definition from a sample DataFrame
df_schema = df.select(TEXT_COLUMN) # Your model only needs the text column as input

mleap_model_path = "gs://steam-reviews-bucket-0/models/mleap_lr_model_tuned.zip"
bestModel.serializeToBundle(f"jar:file:{mleap_model_path}", df_schema)
print(f"✅ Best MLeap model successfully saved to: {mleap_model_path}")

# 9. Stop the SparkSession
print("\n🛑 Stopping the Spark session.")
spark.stop()