In [None]:
import os
import sys
import json
import traceback
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, IDF, HashingTF, StringIndexer
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, NaiveBayes
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
import re


In [None]:

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

winutils_path = "C:\\hadoop"
if os.path.exists(winutils_path):
    os.environ["HADOOP_HOME"] = winutils_path
    os.environ["PATH"] = f"{os.environ['PATH']};{winutils_path}\\bin"

spark = SparkSession.builder \
    .appName("Amazon Reviews Sentiment Analysis") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .config("spark.python.worker.memory", "512m") \
    .config("spark.python.worker.timeout", "600") \
    .config("spark.network.timeout", "600s") \
    .config("spark.executor.heartbeatInterval", "60s") \
    .config("spark.sql.shuffle.partitions", "2") \
    .master("local[2]") \
    .getOrCreate()
spark.sparkContext.setLogLevel("WARN")
print("Spark session initialized successfully")

Spark session initialized successfully


In [None]:
def load_and_clean_data(spark, data_path="Data.json"):
    print(f"Loading data from {data_path}...")
    try:
        # Load JSON data
        reviews_df = spark.read.json(data_path)
        print(f"Number of raw records: {reviews_df.count()}")

        # Create sentiment label based on overall rating
        reviews_df = reviews_df.withColumn(
            "sentiment",
            when(col("overall") < 3, 0)  # Negative
            .when(col("overall") == 3, 1)  # Neutral
            .otherwise(2)  # Positive
        )

        # Basic data cleaning
        reviews_df = reviews_df.filter(col("reviewText").isNotNull())
        reviews_df = reviews_df.withColumn("cleaned_reviews", regexp_replace(col("reviewText"), "[^a-zA-Z\\s]", " "))
        reviews_df = reviews_df.withColumn("cleaned_reviews", lower(col("cleaned_reviews")))
        reviews_df = reviews_df.withColumn("cleaned_reviews", trim(col("cleaned_reviews")))

        # Convert sentiment to textual labels for consistency with best_model_training
        reviews_df = reviews_df.withColumn(
            "label",
            when(col("sentiment") == 0, "negative")
            .when(col("sentiment") == 1, "neutral")
            .otherwise("positive")
        )

        # Select relevant columns
        df = reviews_df.select("cleaned_reviews", "label").cache()
        print(f"Number of valid records after cleaning: {df.count()}")
        df.show(3, truncate=50)
        return df
    except Exception as e:
        print(f"Error loading or cleaning data: {str(e)}")
        traceback.print_exc()
        return None

In [None]:
from pyspark.sql.functions import col, broadcast
import builtins

def balance_data_alt(df):
    print("Balancing data using oversampling...")

    class_counts = df.groupBy("label").count()
    class_counts.show()

    counts_dict = {row["label"]: row["count"] for row in class_counts.collect()}

    max_count = builtins.max(counts_dict.values())
    print(f"Maximum class count: {max_count}")

    balanced_dfs = []

    for label, count in counts_dict.items():
        if count < max_count:
            # Filter for this class
            class_df = df.filter(col("label") == label)

            # Calculate sampling fraction (with replacement)
            sampling_fraction = max_count / count
            print(f"Oversampling class '{label}' with fraction {sampling_fraction}")

            # Sample with replacement
            oversampled_df = class_df.sample(
                withReplacement=True,
                fraction=sampling_fraction,
                seed=42
            )

            # Ensure exactly max_count rows
            if oversampled_df.count() != max_count:
                print(f"Adjusting sample size for class '{label}'")
                oversampled_df = oversampled_df.limit(max_count)

            balanced_dfs.append(oversampled_df)
        else:
            print(f"Keeping class '{label}' as is (majority class)")
            balanced_dfs.append(df.filter(col("label") == label))

    # Union all balanced dataframes
    balanced_df = balanced_dfs[0]
    for i in range(1, len(balanced_dfs)):
        balanced_df = balanced_df.union(balanced_dfs[i])

    # Verify final distribution
    print("Class distribution after balancing:")
    balanced_df.groupBy("label").count().show()

    return balanced_df

In [None]:
def split_data(df):
    print("Splitting data...")
    train_df, temp_df = df.randomSplit([0.8, 0.2], seed=42)
    val_df, test_df = temp_df.randomSplit([0.5, 0.5], seed=42)
    print(f"Training data: {train_df.count()} examples")
    print(f"Validation data: {val_df.count()} examples")
    print(f"Test data: {test_df.count()} examples")
    return train_df, val_df, test_df

In [None]:
def save_model_locally(model, model_name, output_dir="models"):
    try:
        os.makedirs(output_dir, exist_ok=True)
        model_path = os.path.join(output_dir, f"{model_name.lower().replace(' ', '_')}_model")
        print(f"Saving model to: {model_path}")
        model.write().overwrite().save(model_path)
        print(f"Model saved successfully: {model_path}")
        return True
    except Exception as e:
        print(f"Error saving model: {e}")
        traceback.print_exc()
        return False

def save_test_data_locally(test_data, output_file="data/processed/test_data.json"):
    try:
        os.makedirs(os.path.dirname(output_file), exist_ok=True)
        test_data_pandas = test_data.toPandas()
        test_data_pandas.to_json(output_file, orient="records", force_ascii=False, indent=4)
        print(f"Test data saved to: {output_file}")
        return True
    except Exception as e:
        print(f"Error saving test data: {e}")
        traceback.print_exc()
        return False

In [None]:
def train_and_evaluate_models(train_df, val_df, test_df):
    print("Configuring pipelines...")

    # Create pipeline stages
    tokenizer = Tokenizer(inputCol="cleaned_reviews", outputCol="words")
    remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
    cv = CountVectorizer(inputCol="filtered_words", outputCol="raw_features", minDF=2.0)
    idf = IDF(inputCol="raw_features", outputCol="features", minDocFreq=2)
    hashing_tf = HashingTF(inputCol="filtered_words", outputCol="hashing_features", numFeatures=5000)
    label_indexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(train_df)

    # Define models with optimized parameters
    lr = LogisticRegression(
        featuresCol="features",
        labelCol="indexedLabel",
        maxIter=100,
        regParam=1/6866.498,  # From best_model_training
        elasticNetParam=0.0,
        family="multinomial"
    )
    rf = RandomForestClassifier(
        featuresCol="features",
        labelCol="indexedLabel",
        numTrees=100,
        maxDepth=10,
        seed=42
    )
    nb = NaiveBayes(
        featuresCol="features",
        labelCol="indexedLabel",
        smoothing=1.0,
        modelType="multinomial"
    )

    # Create pipelines
    lr_pipeline = Pipeline(stages=[tokenizer, remover, cv, idf, label_indexer, lr])
    rf_pipeline = Pipeline(stages=[tokenizer, remover, cv, idf, label_indexer, rf])
    nb_pipeline = Pipeline(stages=[tokenizer, remover, cv, idf, label_indexer, nb])

    # Define parameter grids for hyperparameter tuning
    lr_param_grid = ParamGridBuilder() \
        .addGrid(lr.regParam, [0.0001, 0.001, 0.01]) \
        .addGrid(lr.elasticNetParam, [0.0, 0.5]) \
        .build()
    rf_param_grid = ParamGridBuilder() \
        .addGrid(rf.numTrees, [50, 100]) \
        .addGrid(rf.maxDepth, [5, 10]) \
        .build()
    nb_param_grid = ParamGridBuilder() \
        .addGrid(nb.smoothing, [0.5, 1.0, 2.0]) \
        .build()

    # Evaluator
    evaluator = MulticlassClassificationEvaluator(
        labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy"
    )

    # Cross-validation for each model
    models = [
        ("Logistic Regression", lr_pipeline, lr_param_grid),
        ("Random Forest", rf_pipeline, rf_param_grid),
        ("Naive Bayes", nb_pipeline, nb_param_grid)
    ]

    best_model = None
    best_model_name = None
    best_val_accuracy = float('-inf')
    best_metrics = {}

    for name, pipeline, param_grid in models:
        print(f"Training {name} with cross-validation...")
        start_time = datetime.now()
        cv = CrossValidator(
            estimator=pipeline,
            estimatorParamMaps=param_grid,
            evaluator=evaluator,
            numFolds=3,
            seed=42
        )
        model = cv.fit(train_df)
        print(f"Training time for {name}: {datetime.now() - start_time}")

        # Evaluate on validation set
        val_predictions = model.transform(val_df)
        val_accuracy = evaluator.evaluate(val_predictions)
        print(f"Validation accuracy for {name}: {val_accuracy:.4f}")

        # Track best model
        if val_accuracy > best_val_accuracy:
            best_val_accuracy = val_accuracy
            best_model = model
            best_model_name = name
            best_metrics = {
                "accuracy": val_accuracy,
                "f1": evaluator.evaluate(val_predictions, {evaluator.metricName: "f1"}),
                "weightedPrecision": evaluator.evaluate(val_predictions, {evaluator.metricName: "weightedPrecision"}),
                "weightedRecall": evaluator.evaluate(val_predictions, {evaluator.metricName: "weightedRecall"})
            }

        # Show class distribution
        print(f"\nClass distribution for {name}:")
        val_predictions.groupBy("indexedLabel").count().show()

    print(f"\nBest model: {best_model_name} with validation accuracy: {best_val_accuracy:.4f}")
    print("Metrics for best model:")
    for metric, value in best_metrics.items():
        print(f"{metric}: {value:.4f}")

    # Evaluate best model on test set
    test_predictions = best_model.transform(test_df)
    test_accuracy = evaluator.evaluate(test_predictions)
    print(f"Test accuracy for {best_model_name}: {test_accuracy:.4f}")

    # Save the best model and test data
    save_model_locally(best_model, best_model_name)
    save_test_data_locally(test_df)

    # Save TF-IDF model separately
    tfidf_stages = best_model.bestModel.stages[0:4]  # Tokenizer, StopWordsRemover, CountVectorizer, IDF
    tfidf_pipeline = Pipeline(stages=tfidf_stages)
    tfidf_model = tfidf_pipeline.fit(train_df)
    save_model_locally(tfidf_model, "TFIDF")

    return best_model, best_model_name

In [None]:
def main():
    start_time = datetime.now()
    print(f"Starting process at {start_time.strftime('%H:%M:%S')}")

    try:
        # Load and clean data
        df = load_and_clean_data(spark)
        if df is None:
            print("ERROR: Data not loaded")
            return

        # Balance data
        balanced_df = balance_data_alt(df)

        # Show statistics
        print("\nStatistics on balanced data:")
        balanced_df.describe().show()

        # Split data
        train_df, val_df, test_df = split_data(balanced_df)

        # Train and evaluate models
        best_model, best_model_name = train_and_evaluate_models(train_df, val_df, test_df)

        # Save processed data
        train_df.write.mode("overwrite").parquet("data/processed/train_data.parquet")
        val_df.write.mode("overwrite").parquet("data/processed/val_data.parquet")
        test_df.write.mode("overwrite").parquet("data/processed/test_data.parquet")

        print(f"\n=== Training completed successfully with best model: {best_model_name} ===")
    except Exception as e:
        print(f"ERROR: {str(e)}")
        traceback.print_exc()
    finally:
        print("\nCleaning up resources...")
        spark.stop()
        print("Spark session closed")
        print(f"Total execution time: {datetime.now() - start_time}")

if __name__ == "__main__":
    main()

Starting process at 19:26:31
Loading data from Data.json...
Number of raw records: 10261
Number of valid records after cleaning: 10261
+--------------------------------------------------+--------+
|                                   cleaned_reviews|   label|
+--------------------------------------------------+--------+
|not much to write about here  but it does exact...|positive|
|the product does exactly as it should and is qu...|positive|
|the primary job of this device is to block the ...|positive|
+--------------------------------------------------+--------+
only showing top 3 rows

Balancing data using oversampling...
+--------+-----+
|   label|count|
+--------+-----+
| neutral|  772|
|positive| 9022|
|negative|  467|
+--------+-----+

Maximum class count: 9022
Oversampling class 'neutral' with fraction 11.686528497409327
Adjusting sample size for class 'neutral'
Keeping class 'positive' as is (majority class)
Oversampling class 'negative' with fraction 19.319057815845824
Adjustin

In [None]:
spark.stop()