In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import when, col
from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import os

In [None]:
# 1. Start Spark session
spark = SparkSession.builder \
    .appName("Amazon Sentiment Classification with Tuning") \
    .getOrCreate()


In [None]:
# 2. Load training data
train_df = spark.read.json("../Data/train_data.json")
test1_df = spark.read.json("../Data/test1_data.json")


In [None]:
# 3. Prepare function to label
def prepare(df):
    df = df.select("reviewText", "overall")
    df = df.withColumn("label", when(col("overall") < 3, 0)
                                 .when(col("overall") == 3, 1)
                                 .otherwise(2))
    return df.dropna(subset=["reviewText", "label"])

train_data = prepare(train_df)
test1_data = prepare(test1_df)


In [None]:
# 4. Define stages
tokenizer = Tokenizer(inputCol="reviewText", outputCol="words")
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures")
idf = IDF(inputCol="rawFeatures", outputCol="features")
lr = LogisticRegression(featuresCol="features", labelCol="label", maxIter=20)

pipeline = Pipeline(stages=[tokenizer, remover, hashingTF, idf, lr])


In [None]:
# 5. Hyperparameter grid
paramGrid = ParamGridBuilder() \
    .addGrid(hashingTF.numFeatures, [1000, 5000, 10000]) \
    .addGrid(lr.regParam, [0.01, 0.1, 1.0]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()


In [None]:
# 6. Evaluator
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")


In [None]:
# 7. CrossValidator with test1_data as evaluation set
cv = CrossValidator(estimator=pipeline,
                    estimatorParamMaps=paramGrid,
                    evaluator=evaluator,
                    numFolds=3,
                    parallelism=2)


In [None]:
# 8. Fit model on train_data
cv_model = cv.fit(train_data)


In [None]:
# 9. Evaluate on test1_data
predictions = cv_model.transform(test1_data)
accuracy = evaluator.evaluate(predictions)
print(f"✅ Test1 Accuracy: {accuracy:.4f}")


In [None]:
# 10. Save best model
model_path = "../Data/Best_SentimentModel"
if os.path.exists(model_path):
    import shutil
    shutil.rmtree(model_path)
cv_model.bestModel.write().overwrite().save(model_path)

print("🎯 Best model saved successfully!")

# 11. Stop Spark
spark.stop()
