In [36]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import (
    RegexTokenizer, StopWordsRemover, CountVectorizer,
    IDF, ChiSqSelector, Normalizer, StringIndexer
)
from pyspark.ml.classification import LinearSVC, OneVsRest
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit # Changed from CrossValidator
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
from pyspark.ml.feature import CountVectorizerModel

import numpy as np
import pandas as pd # For results summary, keep pandas import
import random
import os # For checking file existence

# Set a fixed random seed for reproducibility
seed = 42
random.seed(seed)
np.random.seed(seed)

# Initialize SparkSession with optimized configuration
spark = SparkSession.builder \
    .appName("DIC EX 2_2 - group 36 - TVS with Custom Vocab") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.kryoserializer.buffer.max", "1g") \
    .config("spark.sql.shuffle.partitions", "200") \
    .config("spark.default.parallelism", "200") \
    .config("spark.rdd.compress", "true") \
    .config("spark.logLevel", "ERROR") \
    .getOrCreate()

# Set log level to reduce warnings
spark.sparkContext.setLogLevel("ERROR")

print("Spark session initialized with optimized configuration")



Spark session initialized with optimized configuration


In [None]:
#spark.stop()

In [28]:
# Load the development dataset
print("Loading review data...")
data_path = "hdfs:///user/dic25_shared/amazon-reviews/full/reviews_devset.json" # Ensure this path is correct
df = spark.read.json(data_path)

# Cache the DataFrame to improve performance of multiple operations
df = df.cache()

# Display schema and sample data
print("Dataset Schema:")
df.printSchema()
print("\nSample data:")
df.select("reviewText", "category").show(5, truncate=True)

# Function to check dataset statistics
def print_stats(df_to_stat):
    total_reviews = df_to_stat.count()
    category_counts_df = df_to_stat.groupBy("category").count().orderBy("count", ascending=False)
    
    print(f"Total number of reviews: {total_reviews}")
    print("Category distribution:")
    category_counts_df.show(20, truncate=False)
    
    return category_counts_df

# Get dataset statistics
category_counts = print_stats(df)



Loading review data...


                                                                                

Dataset Schema:
root
 |-- asin: string (nullable = true)
 |-- category: string (nullable = true)
 |-- helpful: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- overall: double (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- reviewTime: string (nullable = true)
 |-- reviewerID: string (nullable = true)
 |-- reviewerName: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- unixReviewTime: long (nullable = true)


Sample data:


                                                                                

+--------------------+--------------------+
|          reviewText|            category|
+--------------------+--------------------+
|This was a gift f...|Patio_Lawn_and_Garde|
|This is a very ni...|Patio_Lawn_and_Garde|
|The metal base wi...|Patio_Lawn_and_Garde|
|For the most part...|Patio_Lawn_and_Garde|
|This hose is supp...|Patio_Lawn_and_Garde|
+--------------------+--------------------+
only showing top 5 rows



                                                                                

Total number of reviews: 78829
Category distribution:




+--------------------------+-----+
|category                  |count|
+--------------------------+-----+
|Book                      |22507|
|Electronic                |7825 |
|Clothing_Shoes_and_Jewelry|5749 |
|Movies_and_TV             |4607 |
|Home_and_Kitche           |4254 |
|CDs_and_Vinyl             |3749 |
|Cell_Phones_and_Accessorie|3447 |
|Sports_and_Outdoor        |3269 |
|Kindle_Store              |3205 |
|Health_and_Personal_Care  |2982 |
|Apps_for_Android          |2638 |
|Toys_and_Game             |2253 |
|Beauty                    |2023 |
|Tools_and_Home_Improvement|1926 |
|Automotive                |1374 |
|Grocery_and_Gourmet_Food  |1297 |
|Office_Product            |1243 |
|Pet_Supplie               |1235 |
|Patio_Lawn_and_Garde      |994  |
|Baby                      |916  |
+--------------------------+-----+
only showing top 20 rows



                                                                                

In [29]:
# Data Preparation: Split into training, validation, and test sets
print("Splitting data into training, validation, and test sets...")
# train_data for TrainValidationSplit, validation_data for intermediate check, test_data for final evaluation
train_data, temp_data = df.randomSplit([0.7, 0.3], seed=seed)
validation_data, test_data = temp_data.randomSplit([0.5, 0.5], seed=seed)

# Cache the data splits
train_data = train_data.cache()
validation_data = validation_data.cache()
test_data = test_data.cache()

print(f"Training set size (for TrainValidationSplit input): {train_data.count()}")
print(f"Validation set size (for intermediate model check): {validation_data.count()}")
print(f"Test set size (for final model evaluation): {test_data.count()}")

# Load the top features selected in Part 2 (to be used as vocabulary)
print("Loading selected features from output_ds.txt to use as vocabulary...")
selected_vocabulary_path = "output_ds.txt"
if not os.path.exists(selected_vocabulary_path):
    print(f"ERROR: {selected_vocabulary_path} not found. Please ensure it's in the correct location.")
    spark.stop()
    exit()

with open(selected_vocabulary_path, "r") as f:
    selected_vocabulary = f.read().strip().split()

# Broadcast the selected vocabulary
broadcast_vocabulary = spark.sparkContext.broadcast(selected_vocabulary)

print(f"Loaded {len(selected_vocabulary)} features from output_ds.txt to be used as vocabulary.")
if selected_vocabulary:
    print(f"Sample vocabulary features: {selected_vocabulary[:10]}")
else:
    print("Warning: Loaded vocabulary is empty!")


# Efficiently load stopwords once and broadcast them
def load_stopwords(path: str) -> list[str]:
    if not os.path.exists(path):
        print(f"ERROR: Stopwords file {path} not found.")
        return [] # Return empty list to avoid error, but flag it
    with open(path, "r", encoding="utf-8") as f:
        stopwords = set(line.strip() for line in f if line.strip())
    return list(stopwords)

# Load stopwords once
stopwords_list = load_stopwords("stopwords.txt")
if not stopwords_list:
    print("Warning: Stopwords list is empty. Proceeding without stopword removal if file was not found.")
# Broadcast stopwords to all executors
broadcast_stopwords = spark.sparkContext.broadcast(stopwords_list)



Splitting data into training, validation, and test sets...


                                                                                

Training set size (for TrainValidationSplit input): 55332


                                                                                

Validation set size (for intermediate model check): 11805


                                                                                

Test set size (for final model evaluation): 11692
Loading selected features from output_ds.txt to use as vocabulary...
Loaded 75 features from output_ds.txt to be used as vocabulary.
Sample vocabulary features: ['amazon', 'author', 'back', 'bad', 'big', 'bit', 'bought', 'buy', 'character', 'characters']


In [35]:
import pyspark
print(pyspark.__version__)
spark.version


3.3.4


'3.3.4'

In [39]:
print("Building the ML Pipeline for classification...")

# 1. Convert category to numeric labels
label_indexer = StringIndexer(inputCol="category", outputCol="label")

# 2. Text preprocessing with optimized tokenizer
tokenizer = RegexTokenizer(
    inputCol="reviewText", 
    outputCol="tokens", 
    pattern="[\\s\\t\\d\\(\\)\\[\\]\\{\\}\\.\\!\\?\\,\\;\\:\\+\\=\\-\\_\\\"\\'`\\~\\#\\@\\&\\*\\%\\€\\$\\§\\\\\\/]+"
)

# 3. Remove stopwords using broadcast variable
stopwords_remover = StopWordsRemover(
    inputCol="tokens", 
    outputCol="tokens_filtered", 
    stopWords=broadcast_stopwords.value if broadcast_stopwords.value else [] # Handle empty stopwords
)

# 4. Create term frequency vectors using the pre-defined vocabulary
count_vectorizer = CountVectorizerModel.from_vocabulary(
    vocabulary=broadcast_vocabulary.value,
    inputCol="tokens_filtered",
    outputCol="tf"
)


# 5. Calculate IDF
idf = IDF(inputCol="tf", outputCol="tf_idf")

# Determine number of features for ChiSqSelector safely based on vocabulary size
num_vocab_features = len(selected_vocabulary) if selected_vocabulary else 0

# 6. Feature selection using Chi-Square - reduced dimensionality
# Ensure numTopFeatures is not greater than the actual number of features from CountVectorizer (vocab size)
chi_sq_selector_2000 = ChiSqSelector(
    numTopFeatures=min(2000, num_vocab_features) if num_vocab_features > 0 else 100, # Fallback if vocab is tiny/empty
    featuresCol="tf_idf", 
    outputCol="selected_features",
    labelCol="label"
)

chi_sq_selector_500 = ChiSqSelector(
    numTopFeatures=min(500, num_vocab_features) if num_vocab_features > 0 else 50, # Fallback
    featuresCol="tf_idf", 
    outputCol="selected_features",
    labelCol="label"
)
if num_vocab_features == 0:
    print("WARNING: Vocabulary size is 0. ChiSqSelector numTopFeatures is set to a small default.")
elif num_vocab_features < 2000:
    print(f"WARNING: Vocabulary size ({num_vocab_features}) is less than 2000. ChiSqSelector_2000 will select all {num_vocab_features} features.")
elif num_vocab_features < 500:
    print(f"WARNING: Vocabulary size ({num_vocab_features}) is less than 500. ChiSqSelectors will select all {num_vocab_features} features.")


# 7. Vector normalization
normalizer = Normalizer(inputCol="selected_features", outputCol="normalized_features", p=2.0)

# 8. Create SVM classifier
svm = LinearSVC(featuresCol="normalized_features", labelCol="label")

# Wrap in OneVsRest for multi-class classification
ovr = OneVsRest(classifier=svm, featuresCol="normalized_features", labelCol="label", predictionCol="prediction")

# Build the pipeline with 2000 features (or fewer if vocab is smaller)
pipeline_2000 = Pipeline(stages=[
    label_indexer,
    tokenizer,
    stopwords_remover,
    count_vectorizer, # Uses custom vocabulary
    idf,
    chi_sq_selector_2000,
    normalizer,
    ovr
])

# Build the pipeline with 500 features (or fewer if vocab is smaller)
pipeline_500 = Pipeline(stages=[
    label_indexer,
    tokenizer,
    stopwords_remover,
    count_vectorizer, # Uses custom vocabulary
    idf,
    chi_sq_selector_500,
    normalizer,
    ovr
])



Building the ML Pipeline for classification...


In [40]:
# Define the SVM parameter grid
print("Setting up parameter grid for SVM optimization...")
param_grid = ParamGridBuilder() \
    .addGrid(svm.regParam, [0.01, 0.1, 1.0]) \
    .addGrid(svm.standardization, [True, False]) \
    .addGrid(svm.maxIter, [10, 50]) \
    .build()

# Create an evaluator for model assessment
evaluator_f1 = MulticlassClassificationEvaluator(
    labelCol="label", 
    predictionCol="prediction", 
    metricName="f1"
)

# Create TrainValidationSplitters
# It will split the 'train_data' internally using trainRatio for its own training and validation.
tvs_2000 = TrainValidationSplit(
    estimator=pipeline_2000,
    estimatorParamMaps=param_grid,
    evaluator=evaluator_f1,
    trainRatio=0.8,  # 80% of train_data for TVS's internal training, 20% for its internal validation
    parallelism=4,
    seed=seed
)

tvs_500 = TrainValidationSplit(
    estimator=pipeline_500,
    estimatorParamMaps=param_grid,
    evaluator=evaluator_f1,
    trainRatio=0.8,
    parallelism=4,
    seed=seed
)

# Results container
results = []



Setting up parameter grid for SVM optimization...


In [41]:
# --- Train model with "2000" features ---
print(f"\nTraining model with ChiSqSelector for up to {chi_sq_selector_2000.getNumTopFeatures()} features...")
print("This may take some time (but faster than CrossValidator)...")
tvs_model_2000 = tvs_2000.fit(train_data) # Fit on the 70% training split

# Get the best model and its parameters
best_pipeline_model_2000 = tvs_model_2000.bestModel

best_params_map_2000 = tvs_model_2000.getEstimatorParamMaps()[np.argmax(tvs_model_2000.validationMetrics)]
params_2000 = {
    "regParam": best_params_map_2000[svm.regParam],
    "standardization": best_params_map_2000[svm.standardization],
    "maxIter": best_params_map_2000[svm.maxIter]
}

# Evaluate on the explicit VALIDATION set
print(f"\nEvaluating model (up to {chi_sq_selector_2000.getNumTopFeatures()} features) on the VALIDATION set...")
predictions_validation_2000 = best_pipeline_model_2000.transform(validation_data).cache()
f1_score_validation_2000 = evaluator_f1.evaluate(predictions_validation_2000)
print(f"F1 score on VALIDATION set: {f1_score_validation_2000:.4f}")

# Evaluate on the TEST set
print(f"Evaluating model (up to {chi_sq_selector_2000.getNumTopFeatures()} features) on the TEST set...")
predictions_test_2000 = best_pipeline_model_2000.transform(test_data).cache()
f1_score_test_2000 = evaluator_f1.evaluate(predictions_test_2000)

results.append({
    "feature_set": f"{chi_sq_selector_2000.getNumTopFeatures()} features (max)",
    "regParam": params_2000["regParam"],
    "standardization": params_2000["standardization"],
    "maxIter": params_2000["maxIter"],
    "f1_score_validation": f1_score_validation_2000,
    "f1_score_test": f1_score_test_2000,
    "predictions_on_test_set": predictions_test_2000 # Keep for later best model selection
})

print(f"\nBest parameters for model with up to {chi_sq_selector_2000.getNumTopFeatures()} features:")
print(f"  regParam: {params_2000['regParam']}")
print(f"  standardization: {params_2000['standardization']}")
print(f"  maxIter: {params_2000['maxIter']}")
print(f"  F1 score on VALIDATION set: {f1_score_validation_2000:.4f}")
print(f"  F1 score on TEST set: {f1_score_test_2000:.4f}")





Training model with ChiSqSelector for up to 75 features...
This may take some time (but faster than CrossValidator)...


IOPub message rate exceeded.                                                    4]
The Jupyter server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--ServerApp.iopub_msg_rate_limit`.

Current values:
ServerApp.iopub_msg_rate_limit=1000.0 (msgs/sec)
ServerApp.rate_limit_window=3.0 (secs)



In [None]:
# --- Train model with "500" features ---
print(f"\nTraining model with ChiSqSelector for up to {chi_sq_selector_500.getNumTopFeatures()} features...")
print("This may take some time...")
tvs_model_500 = tvs_500.fit(train_data) # Fit on the 70% training split

best_pipeline_model_500 = tvs_model_500.bestModel

best_params_map_500 = tvs_model_500.getEstimatorParamMaps()[np.argmax(tvs_model_500.validationMetrics)]
params_500 = {
    "regParam": best_params_map_500[svm.regParam],
    "standardization": best_params_map_500[svm.standardization],
    "maxIter": best_params_map_500[svm.maxIter]
}

# Evaluate on the explicit VALIDATION set
print(f"\nEvaluating model (up to {chi_sq_selector_500.getNumTopFeatures()} features) on the VALIDATION set...")
predictions_validation_500 = best_pipeline_model_500.transform(validation_data).cache()
f1_score_validation_500 = evaluator_f1.evaluate(predictions_validation_500)
print(f"F1 score on VALIDATION set: {f1_score_validation_500:.4f}")

# Evaluate on the TEST set
print(f"Evaluating model (up to {chi_sq_selector_500.getNumTopFeatures()} features) on the TEST set...")
predictions_test_500 = best_pipeline_model_500.transform(test_data).cache()
f1_score_test_500 = evaluator_f1.evaluate(predictions_test_500)

results.append({
    "feature_set": f"{chi_sq_selector_500.getNumTopFeatures()} features (max)",
    "regParam": params_500["regParam"],
    "standardization": params_500["standardization"],
    "maxIter": params_500["maxIter"],
    "f1_score_validation": f1_score_validation_500,
    "f1_score_test": f1_score_test_500,
    "predictions_on_test_set": predictions_test_500 # Keep for later best model selection
})

print(f"\nBest parameters for model with up to {chi_sq_selector_500.getNumTopFeatures()} features:")
print(f"  regParam: {params_500['regParam']}")
print(f"  standardization: {params_500['standardization']}")
print(f"  maxIter: {params_500['maxIter']}")
print(f"  F1 score on VALIDATION set: {f1_score_validation_500:.4f}")
print(f"  F1 score on TEST set: {f1_score_test_500:.4f}")




In [None]:
# Find which model performed better based on F1 score on the TEST set
best_result_config = max(results, key=lambda x: x['f1_score_test'])
best_predictions_df = best_result_config["predictions_on_test_set"]

print("\nBest Overall Configuration (based on Test Set F1 Score):")
print(f"Feature Set: {best_result_config['feature_set']}")
print(f"regParam: {best_result_config['regParam']}")
print(f"standardization: {best_result_config['standardization']}")
print(f"maxIter: {best_result_config['maxIter']}")
print(f"F1 Score on Validation Set: {best_result_config['f1_score_validation']:.4f}")
print(f"F1 Score on Test Set: {best_result_config['f1_score_test']:.4f}")

# Save the best model parameters
with open("best_model_params_tvs.txt", "w") as f:
    f.write(f"Best Model (based on Test Set F1 from TrainValidationSplit)\n")
    f.write(f"Feature Set: {best_result_config['feature_set']}\n")
    f.write(f"regParam: {best_result_config['regParam']}\n")
    f.write(f"standardization: {best_result_config['standardization']}\n")
    f.write(f"maxIter: {best_result_config['maxIter']}\n")
    f.write(f"F1 Score on Validation Set: {best_result_config['f1_score_validation']:.4f}\n")
    f.write(f"F1 Score on Test Set: {best_result_config['f1_score_test']:.4f}\n")

print("\nBest model parameters saved to 'best_model_params_tvs.txt'")



In [None]:
# --- Confusion Matrix for the best model on the test set ---
# Fit the label_indexer on the original data to get all labels for consistent mapping
label_mapping_model = label_indexer.fit(df) # Fit on full df to get all labels
idx_to_label_udf = udf(lambda idx: label_mapping_model.labels[int(idx)], StringType()) # StringType might be needed

# Get category mapping
# Using the already fitted label_indexer_model from the pipeline training is generally fine
# but fitting on full 'df' ensures all categories are known if 'train_data' was sparse.
# Here, best_pipeline_model_2000 or best_pipeline_model_500 contains a fitted label_indexer.
# Let's use the one from the overall best model's pipeline.
# We need the pipeline that generated 'best_predictions_df'.
if best_result_config["feature_set"] == f"{chi_sq_selector_2000.getNumTopFeatures()} features (max)":
    final_best_pipeline_model = best_pipeline_model_2000
else:
    final_best_pipeline_model = best_pipeline_model_500

label_indexer_in_best_model = final_best_pipeline_model.stages[0] # First stage is label_indexer
category_labels_ordered = label_indexer_in_best_model.labels

# Convert predictions to a pandas DataFrame - limit columns to only what's needed
# Best_predictions_df already has 'category', 'label' (numeric true), 'prediction' (numeric)
pred_pd_df = best_predictions_df.select("label", "prediction").limit(20000).toPandas() # Limit for safety
true_labels_pd = pred_pd_df["label"].astype(int)
pred_labels_pd = pred_pd_df["prediction"].astype(int)

# Get top categories for confusion matrix display (can be all if not too many)
top_n_categories_for_cm = 10 # or len(category_labels_ordered)
category_counts_pd = category_counts.limit(top_n_categories_for_cm).toPandas()
top_categories_list = category_counts_pd['category'].tolist()

# Map top category names to their numerical indices used in the model
# The 'label_indexer_in_best_model.labels' gives index -> label_string. We need label_string -> index.
string_to_idx_map = {label: float(idx) for idx, label in enumerate(category_labels_ordered)}
top_indices = [string_to_idx_map[cat] for cat in top_categories_list if cat in string_to_idx_map]


if top_indices:
    conf_matrix = np.zeros((len(top_indices), len(top_indices)), dtype=int)
    # Fill confusion matrix efficiently
    for i, true_cat_idx_numeric in enumerate(top_indices):
        for j, pred_cat_idx_numeric in enumerate(top_indices):
            conf_matrix[i, j] = sum((true_labels_pd == true_cat_idx_numeric) & (pred_labels_pd == pred_cat_idx_numeric))
    
    conf_matrix_pd = pd.DataFrame(conf_matrix, index=top_categories_list[:len(top_indices)], columns=top_categories_list[:len(top_indices)])
    print("\nConfusion Matrix for Best Model on Test Set (Top Categories):")
    print(conf_matrix_pd)
else:
    print("\nCould not generate confusion matrix: No top categories found or mapping issue.")


# Conclusion
print("\nConclusion:")
print("We have successfully implemented an optimized text classification pipeline using Spark ML.")
print("Key changes: Used a pre-defined vocabulary with CountVectorizer and TrainValidationSplit for hyperparameter tuning.")
print("The pipeline includes text preprocessing, feature extraction with TF-IDF, and SVM classification.")
print(f"The best model (based on Test F1) used '{best_result_config['feature_set']}' and achieved an F1 score of {best_result_config['f1_score_test']:.4f} on the test set.")
print(f"This model also scored {best_result_config['f1_score_validation']:.4f} on the separate validation set.")

# Clean up and release resources
train_data.unpersist()
validation_data.unpersist()
test_data.unpersist()

# Unpersist prediction DataFrames if they were cached (they were)
if 'predictions_validation_2000' in locals() and predictions_validation_2000.is_cached: predictions_validation_2000.unpersist()
if 'predictions_test_2000' in locals() and predictions_test_2000.is_cached: predictions_test_2000.unpersist()
if 'predictions_validation_500' in locals() and predictions_validation_500.is_cached: predictions_validation_500.unpersist()
if 'predictions_test_500' in locals() and predictions_test_500.is_cached: predictions_test_500.unpersist()

df.unpersist()

# Stop Spark session
spark.stop()
print("\nSpark session stopped and resources released")