****1. Spark Setup & Data Loading****

In [0]:
spark

In [0]:
# Load dataset
df = spark.read.format("json").load("dbfs:/FileStore/tables/reviews_Electronics_5.json")
df.printSchema()
df.show(10)

root
 |-- asin: string (nullable = true)
 |-- helpful: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- overall: double (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- reviewTime: string (nullable = true)
 |-- reviewerID: string (nullable = true)
 |-- reviewerName: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- unixReviewTime: long (nullable = true)

+----------+--------+-------+--------------------+-----------+--------------+--------------------+--------------------+--------------+
|      asin| helpful|overall|          reviewText| reviewTime|    reviewerID|        reviewerName|             summary|unixReviewTime|
+----------+--------+-------+--------------------+-----------+--------------+--------------------+--------------------+--------------+
|0528881469|  [0, 0]|    5.0|We got this GPS f...| 06 2, 2013| AO94DHGC771SJ|             amazdnu|     Gotta have GPS!|    1370131200|
|0528881469|[12, 15]|    1.0|I'm a profess

**2. Data Cleaning**

In [0]:
print(f"Original row count: {df.count()}")

# Handling Missing Values
# Drop rows where important fields are null
important_cols = ["reviewText", "overall", "reviewerID", "asin"]
df_cleaned = df.dropna(subset=important_cols)

# Fill NAs in non-critical fields with default values
df_cleaned = df_cleaned.fillna({
    "summary": "No summary",
    "reviewTime": "Unknown",
    "reviewerName": "Anonymous"})

Original row count: 1689188


In [0]:
df_cleaned = df_cleaned.dropDuplicates()

In [0]:
from pyspark.sql.functions import col,length

df_cleaned = df_cleaned.filter((length(col("reviewText")) > 10) & (length(col("reviewText")) < 5000))


In [0]:
index_cols = ["reviewerID", "asin"]
for col_name in index_cols:
    from pyspark.ml.feature import StringIndexer
    indexer = StringIndexer(inputCol=col_name, outputCol=f"{col_name}_idx")
    df_cleaned = indexer.fit(df_cleaned).transform(df_cleaned).drop(col_name)

# Check cleaned data
df_cleaned.select("overall", "summary", "reviewText").show(5, truncate=80)

# Cache for performance
df_cleaned.cache()

# Final number of rows after cleaning
print(f"Rows after cleaning: {df_cleaned.count()} | Columns: {len(df_cleaned.columns)}")

+-------+--------------------------------------+--------------------------------------------------------------------------------+
|overall|                               summary|                                                                      reviewText|
+-------+--------------------------------------+--------------------------------------------------------------------------------+
|    3.0|                        1st impression|Well, what can I say.  I've had this unit in my truck for about four days now...|
|    2.0|               Great grafics, POOR GPS|Not going to write a long review, even thought this unit deserves one. I've d...|
|    5.0|                       Gotta have GPS!|We got this GPS for my husband who is an (OTR) over the road trucker.  Very I...|
|    1.0|Major issues, only excuses for support|I've had mine for a year and here's what we got. It tries to route be down no...|
|    1.0|                     Very Disappointed|I'm a professional OTR truck driver, and I

**3. Exploratory Data Analysis (EDA)**

In [0]:
from pyspark.sql.functions import count, avg, max, min, length, col

# Register the cleaned DataFrame as a temporary SQL view
df_cleaned.createOrReplaceTempView("reviews")

# 1. Summary Statistics
# Overall rating distribution
display(df_cleaned.groupBy("overall").count().orderBy("overall"))

# Basic stats using Spark SQL
summary_stats = spark.sql("""
    SELECT 
        COUNT(*) as total_reviews,
        AVG(overall) as avg_rating,
        MIN(overall) as min_rating,
        MAX(overall) as max_rating
    FROM reviews
""")
display(summary_stats)

# Review text length analysis
length_stats = df_cleaned.withColumn("review_length", length(col("reviewText")))
display(length_stats.select("review_length").summary())

# 2. Text Length Visualization
display(length_stats.select("review_length"))

# 3. Top 10 Most Reviewed Products (by ASIN)
if "asin_idx" in df_cleaned.columns:
    top_products = df_cleaned.groupBy("asin_idx").agg(count("overall").alias("review_count")).orderBy(col("review_count").desc()).limit(10)
    display(top_products)


from pyspark.sql.functions import from_unixtime, year

# Extract year from unixReviewTime
df_yearly = df_cleaned.withColumn("review_year", year(from_unixtime("unixReviewTime")))

# Group by year and count reviews # Generated with the ChatGPT prompt: Asked chatgpt for a groupby code for year and review count.
yearly_review_counts = df_yearly.groupBy("review_year").agg(count("*").alias("review_count")).orderBy("review_year")

# Display as bar chart
display(yearly_review_counts)


# Persist EDA result for feature engineering
length_stats.cache()


overall,count
1.0,108109
2.0,81483
3.0,140808
4.0,343146
5.0,1002592


Databricks visualization. Run in Databricks to view.

total_reviews,avg_rating,min_rating,max_rating
1676138,4.223424920859738,1.0,5.0


summary,review_length
count,1676138.0
mean,590.6244569361234
stddev,678.2905501049955
min,11.0
25%,177.0
50%,339.0
75%,712.0
max,4999.0


review_length
462
1651
124
185
197
113
163
185
150
51


Databricks visualization. Run in Databricks to view.

asin_idx,review_count
0.0,4901
1.0,4138
2.0,3789
3.0,3435
4.0,2813
5.0,2648
6.0,2597
7.0,2542
8.0,2089
9.0,2080


Databricks visualization. Run in Databricks to view.

review_year,review_count
1999,71
2000,811
2001,1602
2002,2292
2003,3525
2004,5107
2005,9519
2006,15227
2007,35616
2008,49279


Databricks visualization. Run in Databricks to view.

Out[8]: DataFrame[helpful: array<bigint>, overall: double, reviewText: string, reviewTime: string, reviewerName: string, summary: string, unixReviewTime: bigint, reviewerID_idx: double, asin_idx: double, review_length: int]

**4. Feature Analysis**

In [0]:
# Import necessary functions
from pyspark.sql.functions import col, length
from pyspark.ml.feature import (
    StandardScaler, VectorAssembler,
    Tokenizer, StopWordsRemover, HashingTF
)

# 1. Add Review Length as a New Feature
# Creating a new numerical feature: the length of the review text
if "reviewText" in df_cleaned.columns:
    df_cleaned = df_cleaned.withColumn("review_length", length(col("reviewText")))
    print("Added review_length column based on reviewText")
else:
    print("reviewText column not found — skipping review_length feature")

# 2. Correlation Analysis
# Find numeric columns in the dataset (automatically)
numeric_columns = [
    field.name for field in df_cleaned.schema.fields
    if str(field.dataType) in ['DoubleType', 'IntegerType', 'LongType']
]

# Include review_length if it's present
if "review_length" in df_cleaned.columns:
    numeric_columns.append("review_length")

# Remove duplicates if any
numeric_columns = list(set(numeric_columns))

# Display numeric columns
print("Numeric columns available for analysis:", numeric_columns)

# Correlation between 'overall' rating and review length # Generated with the ChatGPT prompt: Asked chatgpt to provide a correlation sample.
if "overall" in df_cleaned.columns and "review_length" in numeric_columns:
    corr_value = df_cleaned.stat.corr("overall", "review_length")
    print(f"Correlation between overall rating and review length: {corr_value:.4f}")
else:
    print("Either 'overall' or 'review_length' column missing — skipping correlation analysis")

# 3. Standardize Numerical Features
# Prepare numeric features for machine learning using VectorAssembler + StandardScaler
features_to_scale = [col for col in numeric_columns if col != "overall"]

if features_to_scale:
    assembler = VectorAssembler(inputCols=features_to_scale, outputCol="numerical_features_vec")
    df_vector = assembler.transform(df_cleaned)

    scaler = StandardScaler(inputCol="numerical_features_vec", outputCol="scaled_features", withMean=True, withStd=True)
    df_scaled = scaler.fit(df_vector).transform(df_vector)
    print("Numerical features standardized")
else:
    df_scaled = df_cleaned
    print("No numeric features available to scale")

# 4. Text Feature Extraction (Tokenization + Stopword Removal + Vectorization)
# Processing the reviewText column into numerical vectors
if "reviewText" in df_scaled.columns:
    tokenizer = Tokenizer(inputCol="reviewText", outputCol="tokens")
    df_tokenized = tokenizer.transform(df_scaled)

    remover = StopWordsRemover(inputCol="tokens", outputCol="filtered_tokens")
    df_filtered = remover.transform(df_tokenized)

    hashingTF = HashingTF(inputCol="filtered_tokens", outputCol="text_features", numFeatures=1000)
    df_final = hashingTF.transform(df_filtered)

    print("Text features vectorized into 'text_features'")
else:
    df_final = df_scaled
    print("'reviewText' not found — skipping text feature extraction")


Added review_length column based on reviewText
Numeric columns available for analysis: ['review_length']
Correlation between overall rating and review length: -0.0959
Numerical features standardized
Text features vectorized into 'text_features'


**5. Model Training & Evaluation**

In [0]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import col

# Step 1: Create the label column
df_final = df_final.withColumn(
    "label",
    ((col("review_length") < 20) & ((col("overall") == 1) | (col("overall") == 5))).cast("integer")
)

# Step 2: Assemble features
feature_inputs = []
if "scaled_features" in df_final.columns:
    feature_inputs.append("scaled_features")
if "text_features" in df_final.columns:
    feature_inputs.append("text_features")

# Check that features are available
if not feature_inputs:
    raise ValueError("No features found. Ensure 'scaled_features' or 'text_features' exist in the DataFrame.")

assembler = VectorAssembler(inputCols=feature_inputs, outputCol="features")

# Step 3: Define classifier
classifier = RandomForestClassifier(
    featuresCol="features",
    labelCol="label",
    predictionCol="prediction",
    maxDepth=3,
    numTrees=10,
    seed=42
)

# Step 4: # Generated with the ChatGPT prompt: How to build pipeline
pipeline = Pipeline(stages=[assembler, classifier])

# Step 5: Sampling (optional)
sampled_df = df_final.sample(fraction=0.3, seed=42)

# Step 6: Generated with the ChatGPT prompt: How to split training and test data
train_data, test_data = sampled_df.randomSplit([0.8, 0.2], seed=42)

# Step 7: Train model
model = pipeline.fit(train_data)

# Step 8: Predictions
predictions = model.transform(test_data)

# Step 9: Evaluation
evaluator = MulticlassClassificationEvaluator(
    labelCol="label",
    predictionCol="prediction",
    metricName="accuracy"
)

accuracy = evaluator.evaluate(predictions)
print(f" Model Accuracy: {accuracy:.4f}")

# Step 10: Show predictions
predictions.select("overall", "review_length", "label", "prediction").show(10)


 Model Accuracy: 0.9991
+-------+-------------+-----+----------+
|overall|review_length|label|prediction|
+-------+-------------+-----+----------+
|    1.0|          257|    0|       0.0|
|    1.0|          307|    0|       0.0|
|    1.0|          104|    0|       0.0|
|    1.0|          947|    0|       0.0|
|    1.0|          171|    0|       0.0|
|    1.0|          183|    0|       0.0|
|    1.0|          222|    0|       0.0|
|    1.0|          309|    0|       0.0|
|    1.0|          144|    0|       0.0|
|    1.0|         1143|    0|       0.0|
+-------+-------------+-----+----------+
only showing top 10 rows



**6. Model Training & Optimization**

In [0]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.functions import col, length

# Step 1: Ensure review_length and label exist
if "review_length" not in df_final.columns:
    df_final = df_final.withColumn("review_length", length(col("reviewText")))

df_final = df_final.withColumn("label", (col("overall") >= 4).cast("integer"))
df_final.groupBy("label").count().show()

# Step 2: Assemble features (review_length only)
assembler = VectorAssembler(inputCols=["review_length"], outputCol="features")

# Step 3: Random Forest Classifier & Pipeline
classifier = RandomForestClassifier(
    featuresCol="features",
    labelCol="label",
    predictionCol="prediction",
    seed=42
)

pipeline = Pipeline(stages=[assembler, classifier])

# Generated with the ChatGPT prompt: How to split training and test data
train_data, test_data = df_final.randomSplit([0.8, 0.2], seed=42)

# Speed optimization for CE: Sample & cache
train_data = train_data.sample(fraction=0.2, seed=42).cache()
test_data = test_data.cache()

print(f"Train Count : {train_data.count()} | Test Count: {test_data.count()}")

# Step 5: Evaluator and Param Grid
evaluator = MulticlassClassificationEvaluator(
    labelCol="label",
    predictionCol="prediction",
    metricName="accuracy"  
)

# Use minimal grid for faster execution
paramGrid = ParamGridBuilder().build()  

# Step 6: Cross-validation and Training
crossval = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=2,          
    parallelism=2        
)

cv_model = crossval.fit(train_data)
print("Model training with cross-validation completed.")

# Step 7: Evaluate on Test Set #Generated with the ChatGPT prompt: How to evaluate test set.
tuned_predictions = cv_model.transform(test_data)
accuracy = evaluator.evaluate(tuned_predictions)
print(f" Final Tuned Model Accuracy: {accuracy:.4f}")

# Show sample predictions
tuned_predictions.select("overall", "review_length", "label", "prediction").show(5)

# Best model parameters
best_model = cv_model.bestModel.stages[-1]
print("Best Model Parameters:")
print(f"  • numTrees: {best_model.getNumTrees}")
print(f"  • maxDepth: {best_model.getOrDefault('maxDepth')}")


+-----+-------+
|label|  count|
+-----+-------+
|    1|1345738|
|    0| 330400|
+-----+-------+

Train Count : 268822 | Test Count: 334990
Model training with cross-validation completed.
 Final Tuned Model Accuracy: 0.8029
+-------+-------------+-----+----------+
|overall|review_length|label|prediction|
+-------+-------------+-----+----------+
|    1.0|         1491|    0|       1.0|
|    1.0|          308|    0|       1.0|
|    1.0|         1973|    0|       1.0|
|    1.0|          111|    0|       1.0|
|    1.0|           99|    0|       1.0|
+-------+-------------+-----+----------+
only showing top 5 rows

Best Model Parameters:
  • numTrees: 20
  • maxDepth: 5
