In [0]:
%pip install pyspark

[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m
[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, length, count, when

In [0]:
# Set the access key for your Azure Blob Storage account
spark.conf.set(
    "fs.azure.account.key.aladdimbigdata.blob.core.windows.net", 
    "accesskey_removed"  
)


In [0]:
from pyspark.sql import SparkSession

# Create or get your SparkSession
spark = SparkSession.builder.appName("AmazonReviewsEDA").getOrCreate()


data_path = "wasbs://datafiles@aladdimbigdata.blob.core.windows.net/*.csv"

df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("multiLine", "true") \
    .option("escape", '"') \
    .csv(data_path)

df.printSchema()
df.show(10, truncate=False)

root
 |-- asin: string (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- overall: double (nullable = true)
 |-- category: string (nullable = true)
 |-- summary: string (nullable = true)

+----------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

## Dataset Overview

The dataset consists of Amazon product reviews across multiple categories with the following columns:

| Column Name | Data Type          | Description                                            | 
|-------------|--------------------|--------------------------------------------------------|
|asin         | String             | Unique Product ID                                      |
|reviewText   | String             | Full customer review text                              |
|overall      | Float (converted)  | Star rating (1 to 5)                                   |
|category     | String             | Product category (e.g., Beauty, Fashion, Appliances)   |
|summary      | String             | Brief review summary                                   |


In [0]:
print(f"Total Columns: {len(df.columns)}")
print(f"Columns: {df.columns}")
print(f"Total Rows: {df.count()}")
print(f"Total Categories: {df.select('category').distinct().count()}")

Total Columns: 5
Columns: ['asin', 'reviewText', 'overall', 'category', 'summary']
Total Rows: 18637932
Total Categories: 13


### Missing Counts

In [0]:
# Count NULLs for each column
missing_values = df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns])
missing_values.show()

+----+----------+-------+--------+-------+
|asin|reviewText|overall|category|summary|
+----+----------+-------+--------+-------+
|   0|         0|      0|       0|      0|
+----+----------+-------+--------+-------+



In [0]:
df.groupBy("overall").count().orderBy("overall").show()

+-------+--------+
|overall|   count|
+-------+--------+
|    1.0| 1003006|
|    2.0|  749579|
|    3.0| 1393843|
|    4.0| 3102498|
|    5.0|12389006|
+-------+--------+



In [0]:
# Drop rows where 'overall' is 0.0
df = df.filter(col("overall") != 0.0)

# Verify that 0.0 ratings are removed
df.groupBy("overall").count().orderBy("overall").show()

+-------+--------+
|overall|   count|
+-------+--------+
|    1.0| 1003006|
|    2.0|  749579|
|    3.0| 1393843|
|    4.0| 3102498|
|    5.0|12389006|
+-------+--------+



### Balancing the dataset 

As seen above the number of rows and everything do not match at all. As a result it is highly skewed to be spositive 

Down sampling can be done in order ot help with this skewed data

In [0]:
from pyspark.sql.functions import col
from pyspark.sql import DataFrame

# Sample size per class
target_size = 749579

# Cache original df to avoid recomputation
df = df.cache()

# Get all class counts in one shot
class_counts = df.groupBy("overall").count().collect()

# Precompute fractions
sampling_map = {row["overall"]: target_size / row["count"] for row in class_counts}

# Create sampled DataFrames list
sampled_dfs = []

for rating, fraction in sampling_map.items():
    sampled = df.filter(col("overall") == rating).sample(withReplacement=False, fraction=fraction, seed=42)
    sampled_dfs.append(sampled)

# Merge all sampled DataFrames efficiently
balanced_df = sampled_dfs[0]
for df_part in sampled_dfs[1:]:
    balanced_df = balanced_df.unionByName(df_part)

# ✅ Show final class distribution
balanced_df.groupBy("overall").count().orderBy("overall").show()


+-------+------+
|overall| count|
+-------+------+
|    1.0|749647|
|    2.0|749579|
|    3.0|750397|
|    4.0|750987|
|    5.0|749067|
+-------+------+



In [0]:
# Add review length column
balanced_df = balanced_df.withColumn("review_length", length(col("reviewText")))

# Get summary statistics
balanced_df.select("review_length").describe().show()

+-------+-----------------+
|summary|    review_length|
+-------+-----------------+
|  count|          3749677|
|   mean|363.3758283713504|
| stddev|580.9268160841771|
|    min|                1|
|    max|            32632|
+-------+-----------------+



In [0]:
balanced_df.selectExpr("mean(overall) as mean_rating", "stddev(overall) as std_rating").show()

+------------------+------------------+
|       mean_rating|        std_rating|
+------------------+------------------+
|3.0000661390301087|1.4138429562969868|
+------------------+------------------+



## Goals

Our goal is to predict the product rating (overall) based on the review text (reviewText).

| Column     | Role            |
|------------|-----------------|
| reviewText | Feature (Input) |
| overall    | Target (Output) |

Since overall is an ordinal variable (1-5 stars), we can treat this problem in two ways:

1. Classification: Predict one of 5 classes (1, 2, 3, 4, or 5 stars).
2. Regression: Predict a continuous rating (e.g., 3.8) and round to the nearest star.


## Evaluation Metrics

The choice of metric depends on whether we treat the task as classification or regression.

### Classification Metrics (if treating as a multi-class problem)

If we classify reviews into 5-star categories, we use:

1. Accuracy: Measures how often the model correctly predicts the exact rating.
    - Accuracy = Correct Predictions / Total Predictions

2. Precision, Recall, and F1-Score: Useful if we want balanced predictions across all ratings.
    - Precision = True Positives / True Positives + False Positives
    - Recall =  True Positives / True Positives + False Negatives
    - F1-Score = 2 × Precision × Recall / Precision + Recall

### Regression Metrics (if treating as a continuous problem)

If we predict ratings as continuous values, we use:

1. Mean Squared Error (MSE): Penalizes large errors more than small ones.
    - MSE = (1/N) * Σ (yi - ŷi)²
2. Root Mean Squared Error (RMSE): More interpretable because it has the same unit as the target variable.
    - RMSE = sqrt(MSE)
3. Mean Absolute Error (MAE): Measures the average absolute difference between predicted and actual ratings.
    - MAE = (1/N) * Σ |yi - ŷi|
4. R² Score (Coefficient of Determination): Measures how well predictions match the actual ratings.
    - R² = 1 - (Σ (yi - ŷi)² / Σ (yi - ȳ)²)


## Choosing the Right Approach

| Approach                             | Pros                          | Cons                                                             | 
|--------------------------------------|-------------------------------|------------------------------------------------------------------|
| Classification (1,2,3,4,5 stars)     | Simple, interpretable, direct | Ignores the ordinal nature of ratings                            |
| Regression (predict rating as float) | Captures ordinal structure    | Harder to optimize, may predict invalid values (e.g., 3.8 stars) |

### Best Choice?
Since ratings are ordinal, we can try both approaches and compare results.

## Data Ingestion

Since we have 39 CSV files spread across multiple categories, we need an efficient ingestion pipeline.

### PySpark Pipeline for Loading Data
1. Read all CSV files from a directory
2. Infer schema & ensure consistent column types
3. Combine files into a single DataFrame
4. Store in an optimized format (Parquet, Delta, etc.) for fast access

## Handling multiple Data Files

Since we are working with Amazon product reviews, we primarily have one main dataset consisting of reviews across multiple categories. We have concatted rows and that's all we need.


In [0]:
# spark.stop()


## Checkpointing data

In [0]:
# # Define checkpoint path
# checkpoint_path = "./amazon_reviews_checkpoint"

# # Save as Parquet (compressed format)
# df.write.mode("overwrite").parquet(checkpoint_path)
# # df.write.mode("overwrite").option("compression", "none").parquet(checkpoint_path)


## Data Splitting Strategy

Since our dataset consists of Amazon product reviews, we need to ensure that:

1. Stratified sampling is used to maintain the rating distribution across all splits.
2. No product (asin) appears in both train and test sets to prevent leakage.

### Split Ratio

| Set            | Purpose                       | Size |
|----------------|-------------------------------|------|
| Training Set   | Used to train the model       | 70%  |
| Validation Set | Used to tune hyperparameters  | 15%  |
| Test Set       | Used for final evaluation     | 15%  |

In [0]:
from pyspark.sql.functions import col
from pyspark.sql import DataFrame

def stratified_split(df: DataFrame, label_col: str, weights: list, seed: int = 42):
    """
    Stratified train/val/test split for a classification dataset in Spark.
    Returns (train_df, val_df, test_df)
    """
    train_df = val_df = test_df = None

    # Loop through each class and split separately
    for label in [1.0, 2.0, 3.0, 4.0, 5.0]:
        class_df = df.filter(col(label_col) == label)
        splits = class_df.randomSplit(weights, seed=seed)

        train_df = splits[0] if train_df is None else train_df.unionByName(splits[0])
        val_df   = splits[1] if val_df is None   else val_df.unionByName(splits[1])
        test_df  = splits[2] if test_df is None  else test_df.unionByName(splits[2])

    return train_df, val_df, test_df

# ✅ Use the function on balanced_df
train_df, val_df, test_df = stratified_split(balanced_df, label_col="overall", weights=[0.7, 0.15, 0.15])

# ✅ Verify counts
print(f"Training Rows: {train_df.count()}")
print(f"Validation Rows: {val_df.count()}")
print(f"Test Rows: {test_df.count()}")

# ✅ Verify class distribution
train_df.groupBy("overall").count().orderBy("overall").show()
val_df.groupBy("overall").count().orderBy("overall").show()
test_df.groupBy("overall").count().orderBy("overall").show()


Training Rows: 2626465
Validation Rows: 561074
Test Rows: 562138
+-------+------+
|overall| count|
+-------+------+
|    1.0|524954|
|    2.0|525178|
|    3.0|525650|
|    4.0|526004|
|    5.0|524679|
+-------+------+

+-------+------+
|overall| count|
+-------+------+
|    1.0|112190|
|    2.0|112134|
|    3.0|112244|
|    4.0|112416|
|    5.0|112090|
+-------+------+

+-------+------+
|overall| count|
+-------+------+
|    1.0|112503|
|    2.0|112267|
|    3.0|112503|
|    4.0|112567|
|    5.0|112298|
+-------+------+



## Preventing Data Leakage

To avoid data leakage, we must ensure that:
- No product (asin) appears in both Train & Test.
- Text normalization is applied only using Training data statistics.

In [0]:
# Get unique ASINs from the training set
train_asins = train_df.select("asin").distinct()

# Ensure validation & test sets do not contain ASINs from train set
# Why Use left_anti Join? - Removes any products (asin) in train_df from val_df and test_df.
val_df = val_df.join(train_asins, on="asin", how="left_anti")
test_df = test_df.join(train_asins, on="asin", how="left_anti")

# Verify final counts
# print(f"Final Validation Rows: {val_df.count()}")
# print(f"Final Test Rows: {test_df.count()}")

## Normalizing Data Using Training Statistics

Since review lengths and text embeddings may have large variations, we normalize:
- Review length → Standardize using mean & std from training data.
- Text embeddings (TF-IDF, Word2Vec, etc.) → Fit only on training data.

In [0]:
from pyspark.sql.functions import mean, stddev

# Compute mean & std dev on training data
stats = train_df.selectExpr("mean(review_length) as mean_length", "stddev(review_length) as std_length").collect()[0]

mean_length = stats["mean_length"]
std_length = stats["std_length"]

# Normalize review length
train_df = train_df.withColumn("review_length_normalized", (col("review_length") - mean_length) / std_length)
val_df = val_df.withColumn("review_length_normalized", (col("review_length") - mean_length) / std_length)
test_df = test_df.withColumn("review_length_normalized", (col("review_length") - mean_length) / std_length)

In [0]:
cb_base_path = "wasbs://datafiles@aladdimbigdata.blob.core.windows.net/catboost_data"

catboost_cols = ["reviewText", "review_length_normalized", "category", "overall"]

train_df.select(*catboost_cols) \
    .write.mode("overwrite").parquet(f"{cb_base_path}/train")

val_df.select(*catboost_cols) \
    .write.mode("overwrite").parquet(f"{cb_base_path}/val")

test_df.select(*catboost_cols) \
    .write.mode("overwrite").parquet(f"{cb_base_path}/test")


### EDA and Handling Categorical Features

In [0]:
import numpy

In [0]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder

# Drop existing 'category_index' and 'category_onehot' columns if they exist
train_df = train_df.drop("category_index", "category_onehot")
val_df = val_df.drop("category_index", "category_onehot")
test_df = test_df.drop("category_index", "category_onehot")

# Convert categorical 'category' column to numerical using StringIndexer
indexer = StringIndexer(inputCol="category", outputCol="category_index")
encoder = OneHotEncoder(inputCol="category_index", outputCol="category_onehot")

# Fit the indexer on the training data and then transform it
indexer_model = indexer.fit(train_df)
train_df = indexer_model.transform(train_df)

# Fit the encoder on the training data and then transform it
encoder_model = encoder.fit(train_df)
train_df = encoder_model.transform(train_df)

# Apply the same transformations to validation and test sets using the fitted models
val_df = indexer_model.transform(val_df)
val_df = encoder_model.transform(val_df)

test_df = indexer_model.transform(test_df)
test_df = encoder_model.transform(test_df)

# Verify the transformation
# train_df.select("category", "category_index", "category_onehot").show(5)
# val_df.select("category", "category_index", "category_onehot").show(5)
# test_df.select("category", "category_index", "category_onehot").show(5)


In [0]:
# Check the column names to ensure 'category' exists
print("Columns in the training dataset:", train_df.columns)

Columns in the training dataset: ['asin', 'reviewText', 'overall', 'category', 'summary', 'review_length', 'review_length_normalized', 'category_index', 'category_onehot']


In [0]:
# Check the column names to ensure 'category' exists
print("Columns in the validation dataset:", val_df.columns)

Columns in the validation dataset: ['asin', 'reviewText', 'overall', 'category', 'summary', 'review_length', 'review_length_normalized', 'category_index', 'category_onehot']


In [0]:
# Check the column names to ensure 'category' exists
print("Columns in the validation dataset:", test_df.columns)

Columns in the validation dataset: ['asin', 'reviewText', 'overall', 'category', 'summary', 'review_length', 'review_length_normalized', 'category_index', 'category_onehot']


## Feature Engineering

### Text Feature Engineering 

This section implements a text preprocessing pipeline for our product review classification task. The pipeline handles:

1. **Data Cleaning**: Filling missing values in review text and summaries to prevent null errors
2. **Column Management**: Removing any conflicting columns from previous runs
3. **Tokenization**: Converting the raw review text into word tokens for feature extraction

This is the first part of our feature engineering process, which will be followed by:
- Feature extraction using TF-IDF
- Category encoding
- Feature assembly into vectors for model training

In [0]:
from pyspark.ml.feature import Tokenizer, HashingTF, IDF, StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.sql import functions as F

# Step 1: Handle Missing Data (Optional)
train_df = train_df.fillna({'reviewText': 'Unknown', 'summary': 'Unknown'})
val_df = val_df.fillna({'reviewText': 'Unknown', 'summary': 'Unknown'})
test_df = test_df.fillna({'reviewText': 'Unknown', 'summary': 'Unknown'})

# Step 2: Drop any existing columns that might conflict
train_df = train_df.drop("words", "raw_features", "features", "category_index", "category_onehot")
val_df = val_df.drop("words", "raw_features", "features", "category_index", "category_onehot")
test_df = test_df.drop("words", "raw_features", "features", "category_index", "category_onehot")

# Step 3: Tokenize the 'reviewText' column into words
tokenizer = Tokenizer(inputCol="reviewText", outputCol="words")
train_df = tokenizer.transform(train_df)
val_df = tokenizer.transform(val_df)
test_df = tokenizer.transform(test_df)


In [0]:
# Show the first few rows of the 'words' column to verify tokenization
train_df.select("reviewText", "words").show(5, truncate=False)
# val_df.select("reviewText", "words").show(5, truncate=False)
# test_df.select("reviewText", "words").show(5, truncate=False)

# Try selecting a smaller number of rows to debug
# val_df.limit(5).show()



+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

### Feature Extraction with HashingTF

In this step, we convert our tokenized text into numerical features using the HashingTF transformer:

1. **Term Frequency Calculation**: The HashingTF transformer maps each word to a fixed-size feature vector using a hashing function
   
2. **Dimensionality Control**: We set `numFeatures=1000` to limit the feature space to 1000 dimensions, balancing between model complexity and computational efficiency
   
3. **Feature Generation**: This creates the "raw_features" column containing term frequency vectors for each review

This transformation is a critical step in converting text data into the numerical format required for machine learning algorithms. The term frequency vectors represent how often each word appears in the document.

In [0]:
# # Step 4: Apply HashingTF (Term Frequency) to convert words into numerical form
hashing_tf = HashingTF(inputCol="words", outputCol="raw_features", numFeatures=1000)
train_df = hashing_tf.transform(train_df)
val_df = hashing_tf.transform(val_df)
test_df = hashing_tf.transform(test_df)

### TF-IDF Weighting with IDF Transformer

This step applies the Inverse Document Frequency (IDF) transformation to our term frequency vectors:

1. **IDF Calculation**: The IDF transformer weights term frequencies based on how rare or common words are across the entire document collection
   
2. **Model Fitting**: We fit the IDF model on the training data only to prevent data leakage from validation and test sets
   
3. **Feature Transformation**: The model is then applied to transform all datasets, creating the final "features" column

This completes our TF-IDF (Term Frequency-Inverse Document Frequency) implementation, which gives higher weights to terms that are rare across documents but frequent within specific documents. This helps distinguish important, discriminative words from common, less informative ones.

P.S. As we are having to run all of this locally for now I have trained the model on 10% of the dataset however for the actual thing you can just remove the extra portion in the cell block below


In [0]:
# Step 5: Apply IDF (Inverse Document Frequency) to weight the words
idf = IDF(inputCol="raw_features", outputCol="features")

# Fit the IDF model using a .001% sample of the training data

# .sample(fraction=0.01, seed=42)

idf_model = idf.fit(train_df)

# Transform all datasets using the fitted model
train_df = idf_model.transform(train_df)
val_df = idf_model.transform(val_df)
test_df = idf_model.transform(test_df)

### Feature Engineering for Categories and Feature Assembly

This section completes our feature engineering pipeline with three important steps:

1. **Category Indexing**: Convert the text category labels to numerical indices
   - We use StringIndexer to transform categorical text values into numeric indices
   - This transformation is fitted only on the training dataset to avoid data leakage

2. **One-Hot Encoding**: Transform numerical category indices into one-hot encoded vectors
   - One-hot encoding creates binary vectors that machine learning algorithms can process
   - Each category gets its own dimension in the feature space

3. **Feature Assembly**: Combine all engineered features into a single vector
   - We combine category features, normalized review length, and TF-IDF text features
   - The VectorAssembler creates our final feature vectors ("final_features") for model training
   - We verify the transformation by displaying samples of our processed data

This completes our feature engineering pipeline, preparing the data for model training.

In [0]:
# # Step 6: Convert the categorical 'category' column to numerical using StringIndexer 
indexer = StringIndexer(inputCol="category", outputCol="category_index")

indexer_model = indexer.fit(train_df) 

train_df = indexer_model.transform(train_df)
val_df = indexer_model.transform(val_df)
test_df = indexer_model.transform(test_df)



In [0]:
from pyspark.ml.feature import OneHotEncoder, VectorAssembler

# Step 7: Apply OneHotEncoder to the 'category_index' column (fit on train, transform on all)
encoder = OneHotEncoder(inputCol="category_index", outputCol="category_onehot")
encoder_model = encoder.fit(train_df)  # Fit on training data

# Transform all datasets using the trained encoder model
train_df = encoder_model.transform(train_df)
val_df = encoder_model.transform(val_df)
test_df = encoder_model.transform(test_df)

# Step 8: Combine all features using VectorAssembler
feature_cols = ["category_onehot", "review_length_normalized", "features"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="final_features")

train_df = assembler.transform(train_df)
val_df = assembler.transform(val_df)
test_df = assembler.transform(test_df)

# Check the transformed dataframe to confirm the changes
train_df.select("final_features", "category_onehot", "features").show(5)


+--------------------+---------------+--------------------+
|      final_features|category_onehot|            features|
+--------------------+---------------+--------------------+
|(1013,[0,12,30,61...| (12,[0],[1.0])|(1000,[17,48,61,6...|
|(1013,[0,12,25,30...| (12,[0],[1.0])|(1000,[12,17,25,2...|
|(1013,[0,12,19,25...| (12,[0],[1.0])|(1000,[6,12,17,25...|
|(1013,[0,12,30,34...| (12,[0],[1.0])|(1000,[17,21,209,...|
|(1013,[0,12,30,66...| (12,[0],[1.0])|(1000,[17,53,66,7...|
+--------------------+---------------+--------------------+
only showing top 5 rows



In [0]:
rf_base_path = "wasbs://datafiles@aladdimbigdata.blob.core.windows.net/rf_data"

train_df.select("final_features", "overall") \
    .write.mode("overwrite").parquet(f"{rf_base_path}/train")

val_df.select("final_features", "overall") \
    .write.mode("overwrite").parquet(f"{rf_base_path}/val")

test_df.select("final_features", "overall") \
    .write.mode("overwrite").parquet(f"{rf_base_path}/test")



### **📌 Raw vs. Derived Features Table**
| Feature Name               | Type         | Raw/Derived | Description |
|----------------------------|-------------|------------|-------------|
| `reviewText`               | Text        | Raw        | Original review text from dataset |
| `category`                 | Categorical | Raw        | Product category of the review |
| `overall`                   | Numeric     | Raw        | Target variable (1-5 stars) |
| `review_length`            | Numeric     | Derived    | Length of the review text (character count) |
| `review_length_normalized` | Numeric     | Derived    | Normalized length of the review |
| `words`                    | List[String]| Derived    | Tokenized words from `reviewText` |
| `raw_features`             | Vector      | Derived    | HashingTF applied to `words` (Term Frequency) |
| `features`                 | Vector      | Derived    | IDF transformation on `raw_features` (TF-IDF representation) |
| `category_index`           | Numeric     | Derived    | Indexed categorical value for `category` (StringIndexer applied) |
| `category_onehot`          | Vector      | Derived    | OneHotEncoded vector for `category_index` |
| `final_features`           | Vector      | Derived    | Combined feature vector (`features` + `category_onehot` + `review_length_normalized`) |



### Absence of Regularization and PCA

Dimensionality Reduction:

PCA: Thefeature set is not too large or correlated. From the current setup, dimensionality  is reduced with HashingTF (by specifying numFeatures=1000).

LASSO: Applying L1 regularization (LASSO) in the logistic regression model can help with feature selection however as there are barely any there is no need for this



### Random Forest Model Training and Prediction

This section implements the machine learning model using Random Forest classification:

1. **Model Initialization**: 
   - We create a Random Forest Classifier with 10 decision trees
   - The model will use our "final_features" column as input and predict the "overall" rating

2. **Model Training**:
   - We fit the Random Forest model on our prepared training data
   - The model learns patterns between our engineered features and review ratings

3. **Prediction Generation**:
   - We apply the trained model to make predictions on all three datasets
   - This creates new dataframes with prediction columns for evaluation
   - Generating predictions on training data allows us to assess potential overfitting

The Random Forest algorithm is well-suited for this task as it handles high-dimensional data effectively and can capture non-linear relationships in the feature space.