# Packages
Run this cell before doing anything

In [0]:
#import necessary packages

import pyspark as ps
import pandas as pd
from pyspark.sql.functions import to_date, dayofmonth, month, year, regexp_replace, regexp_extract, col, trim, when, length, lower

# Data Preprocessing
No need to run this part again. To skip this part, Use df10 loaded in DBFS.

In [0]:
#creating Spark Dataframe from uploaded data
df = spark.read.json("dbfs:/FileStore/tables/CDs_and_Vinyl_5.json")

#Show data to see what it looks like before cleaning
df.show()

In [0]:
#Display data to see what it looks like and what data type is assigned
display(df)

In [0]:
#count number of observations
df.count()

In [0]:
#drops data rows with Style = NULL (no idea whether it is a cd,dvd,mp3 etc)
df2 = df.dropna(subset=['style'])

#check how many datarows
df2.count()

In [0]:
#dropping irrelevant variables, reasons as follows:Image cannot be seen in DF, reviewerName is personal data and we can use reviewerID instead, unixReviewTime is a duplicate of reviewTime, which will be cleaned in next step.
df3 = df2.drop('image', 'reviewerName', 'unixReviewTime')

#Cleaning of the date/time and converting it to a date/time object
#Replace double spaces (or more) with a single space 
#Generated with ChatGPT prompts:
#  "convert this data in pyspark to date/time type" (with image of data)
#  "getting an error because one of the days has a single digit"
df4 = df3.withColumn("reviewTime_cleaned", regexp_replace("reviewTime", "\s+", " "))

#Convert to date using flexible format 
df4 = df4.withColumn("reviewTime", to_date("reviewTime_cleaned", "M d, yyyy"))

#Drop the cleaned helper column
df4 = df4.drop("reviewTime_cleaned")


In [0]:
#split date/time into day/month/year columns
#generated with ChatGPT prompt:
#"convert date/time object into 3 separate columns day month year in pyspark"

df45 = df4.withColumn("reviewDay", dayofmonth("reviewTime")) \
       .withColumn("reviewMonth", month("reviewTime")) \
       .withColumn("reviewYear", year("reviewTime")) \
        .drop('reviewTime')

#check data
df45.show()

In [0]:
#Cleaning "style" column from object to string

# Overwrite 'style' column with cleaned format
df5 = df45.withColumn("style", trim(col("style.`Format:`")))
display(df5)

In [0]:
#change column "vote" from string to double, and replace "null" with "0"
#generated with following ChatGPT prompt:
# "Convert a "string" column to "double" in PySpark and switch "null" values to "0" in "helpful" column"

df6 = df5.withColumn(
    "vote",
    when(col("vote").isNull(), 0.0)  # Replace null with 0.0
    .otherwise(col("vote").cast("double"))  # Cast string to double
)

In [0]:
#renaming columns

# List of tuples (old_column_name, new_column_name)
columns_to_rename = [("asin", "amazon_productid"), ("overall", "rating"), ("vote", "helpful"), ("summary", "reviewSummary")]

# Renaming columns
for old_name, new_name in columns_to_rename:
    df6 = df6.withColumnRenamed(old_name, new_name)

df6.show()

In [0]:
#reordering columns

new_order = ["amazon_productid", "style", "rating", "reviewDay", "reviewMonth", "reviewYear", "reviewerID", "verified", "helpful", "reviewSummary", "reviewText"]
df7 = df6.select(*new_order)

In [0]:
#Removing duplicates and keeps only reviews with more than 20 character
#adapted from ChatGPT prompt to suggest further ideas for data-cleaning
df8 = df7.dropDuplicates(["reviewText", "reviewerID", "amazon_productid"])
df9 = df8.filter((col("reviewText").isNotNull()) & (length(col("reviewText")) > 20))
df9.count()

In [0]:
#standardising text columns; lowercasing, removing punctuation, trimming whitespace. (generated)
#adapted from ChatGPT prompt to suggest further ideas for data-cleaning
df10 = df9.withColumn("reviewText", lower(regexp_replace(col("reviewText"), "[^a-zA-Z0-9\s]", "")))
df10 = df10.withColumn("reviewSummary", lower(regexp_replace(col("reviewSummary"), "[^a-zA-Z0-9\s]", "")))
display(df10)

In [0]:
#Save to DBFS for easy reading later on
#ChatGPT prompt: how to save df10, a spark dataframe, to a dbfs in databricks community edition

df10.write.mode("overwrite").parquet("/dbfs/df10_output")

Column descriptors:
- Amazon Product ID = refers to a particular product on Amazon
- Style = type of product i.e. CD, MP3, etc
- Rating = from 1 to 5, 5 being highest and 1 being lowest
- reviewDay/Month/Year - selfexplanatory
- reviewerID = anonymised identifier of reviewer.
- verified = whether review is verified by Amazon.
- helpful = amount of 'helpful' votes received by review
- reviewSummary = title of review
- reviewText = full review

# Exploratory Data Analysis - Insightful visualizations and statistical analysis

In [0]:
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
import seaborn as sns
import re
from sklearn.feature_extraction.text import ENGLISH_STOP_WORDS
from pyspark.ml.feature import VectorAssembler, StandardScaler, Tokenizer, StopWordsRemover, CountVectorizer, IDF
from pyspark.sql import functions as F
from pyspark.ml.stat import Correlation

In [0]:
# Read the saved df10 from DBFS
df10 = spark.read.parquet("/dbfs/df10_output")

# 0. SAMPLE DATA FROM SPARK
total_rows = df10.count()
sample_fraction = 10000 / total_rows

# Sample a subset of the data
#generated with following ChatGPT prompt: How to make the subset of the large data for visualisation.
df_sampled = df10.sample(withReplacement=False, fraction=sample_fraction, seed=42)

# Cache the sampled data for subsequent transformations
df_sampled.cache()

# Display sample using Databricks built-in visualizer
display(df_sampled)

# Convert to Pandas for local analysis
pdf = df_sampled.toPandas()

In [0]:
# 1. DISTRIBUTION ANALYSIS 
plt.style.use('fivethirtyeight')
sns.set_palette("deep")
plt.rcParams['figure.figsize'] = [12, 8]

fig, axs = plt.subplots(2, 2, figsize=(16, 12))

# Step 1: Distribution of Ratings
sns.histplot(pdf['rating'], kde=True, ax=axs[0, 0], bins=5, discrete=True)
axs[0, 0].set_title('Distribution of Ratings')
axs[0, 0].set_xlabel('Rating')
axs[0, 0].set_ylabel('Count')
axs[0, 0].set_xticks(range(1, 6))

# Step 2: Pie chart: Verified vs Non-Verified
verified_counts = pdf['verified'].value_counts()
axs[0, 1].pie(verified_counts, labels=['Not Verified', 'Verified'], autopct='%1.1f%%', startangle=90, colors=['#66c2a5', '#fc8d62'])
axs[0, 1].set_title('Verified vs Non-Verified Reviews')

# Step 3: Top 10 Product Styles
style_counts = pdf['style'].value_counts().head(10)
style_counts.sort_values().plot(kind='barh', ax=axs[1, 0])
axs[1, 0].set_title('Top 10 Product Styles')
axs[1, 0].set_xlabel('Count')
axs[1, 0].set_ylabel('Style')

# Step 4: Average Helpful Votes by Rating
avg_helpful_by_rating = pdf.groupby('rating')['helpful'].mean()

# Plot the bar chart for Average Helpful Votes by Rating
axs[1, 1].bar(avg_helpful_by_rating.index, avg_helpful_by_rating.values, color=sns.color_palette("deep")[0])
axs[1, 1].set_title('Average Helpful Votes by Rating')
axs[1, 1].set_xlabel('Rating')
axs[1, 1].set_ylabel('Avg Helpful Votes')

# Set the x-ticks to be from 1 to 5 
axs[1, 1].set_xticks(range(1, 6)) 
axs[1, 1].set_xticklabels(range(1, 6))  

# Adjust layout to ensure all plots fit well
plt.tight_layout()
# Show the plots
plt.show()

In [0]:
# 2. TEMPORAL ANALYSIS
pdf['review_date'] = pd.to_datetime(dict(year=pdf['reviewYear'], 
                                         month=pdf['reviewMonth'], 
                                         day=pdf['reviewDay']))

annual_reviews = pdf.groupby(pdf['review_date'].dt.year).size()

annual_reviews.plot()
plt.title('Annual Review Volume')
plt.xlabel('Year')
plt.ylabel('Number of Reviews')

# Remove decimal points from x-axis (prompt: remove decimal points from this graph)
plt.gca().xaxis.set_major_locator(ticker.MaxNLocator(integer=True))

plt.tight_layout()
plt.show()

In [0]:
# 3. TEXT ANALYSIS

from sklearn.feature_extraction.text import TfidfVectorizer

# Extend stopwords with domain-specific common words
custom_stopwords = set(ENGLISH_STOP_WORDS).union({
    'product', 'amazon', 'buy', 'purchase', 'purchased', 'use', 'used', 'using', 
    'thing', 'really', 'get', 'got', 'would', 'one', 'also', 'could', 'even',
    'item', 'well', 'still', 'time', 'album', 'music', 'songs','im'
})

# Clean text function
def clean_text(text):
    text = text.lower()
    text = re.sub(r'[^a-z\s]', '', text)
    text = re.sub(r'\s+', ' ', text).strip()
    return text

# Plot top words function
#generated with following ChatGPT prompt: How to plot most frequent words in the review for positive and negative
def plot_top_words(text_series, title, color, top_n=10, ax=None):
    text = ' '.join(text_series.fillna('').astype(str).map(clean_text))
    words = [word for word in text.split() if word not in custom_stopwords]
    word_counts = pd.Series(words).value_counts().head(top_n)
    
    sns.barplot(x=word_counts.values, y=word_counts.index, palette=color, ax=ax)
    ax.set_title(title, fontsize=16)
    ax.set_xlabel('Word Frequency')
    ax.set_ylabel('Word')

# Assuming 'pdf' is your DataFrame containing the reviews and ratings
positive_reviews = pdf[pdf['rating'] == 5]['reviewText']
negative_reviews = pdf[pdf['rating'] == 1]['reviewText']

# Create subplots for both graphs in one row
fig, axes = plt.subplots(1, 2, figsize=(16, 6))

# Plot top words for positive and negative reviews side by side
plot_top_words(positive_reviews, 'Top 10 Words in Positive Reviews', 'Greens_r', ax=axes[0])
plot_top_words(negative_reviews, 'Top 10 Words in Negative Reviews', 'Reds_r', ax=axes[1])

# Adjust layout
plt.tight_layout()
plt.show()

In [0]:
# 4. SUMMARY METRICS
summary_stats = {
    'Total Reviews': len(pdf),
    'Average Rating': pdf['rating'].mean(),
    'Verified Reviews (%)': (pdf['verified'] == True).mean() * 100,
    'Average Helpful Votes': pdf['helpful'].mean(),
    'Most Common Style': pdf['style'].value_counts().index[0],
    'Most Active Year': pdf['reviewYear'].value_counts().index[0]
}

print("\n===== DATASET SUMMARY =====")
for key, value in summary_stats.items():
    print(f"{key}: {value:.2f}" if isinstance(value, float) else f"{key}: {value}")

In [0]:
# 5. SPARKSQL SUMMARY
df10.createOrReplaceTempView("reviews")
spark.sql("""
    SELECT 
        AVG(rating) AS avg_rating,
        STDDEV(rating) AS stddev_rating,
        MIN(helpful) AS min_helpful,
        MAX(helpful) AS max_helpful,
        AVG(helpful) AS avg_helpful,
        PERCENTILE(helpful, 0.5) AS median_helpful
    FROM reviews
""").show()

# Feature Analysis

In [0]:
# 1. CORRELATION ANALYSIS
from pyspark.sql.types import DoubleType

# Read the saved df10 from DBFS
df10 = spark.read.parquet("/dbfs/df10_output")

# Ensure 'rating' and 'helpful' are DoubleType and not null
#generated with following ChatGPT prompt: How can I find correlation between rating & helpful by using optimisation 
df_filtered = df10 \
    .filter((col("rating").isNotNull()) & (col("helpful").isNotNull())) \
    .withColumn("rating", col("rating").cast(DoubleType())) \
    .withColumn("helpful", col("helpful").cast(DoubleType()))

# Assemble features
assembler = VectorAssembler(inputCols=['rating', 'helpful'], outputCol='features')
assembled_df = assembler.transform(df_filtered).select("features")

# Cache to avoid recomputing
assembled_df.cache()

# Correlation matrix
correlation_matrix = Correlation.corr(assembled_df, 'features', 'pearson')
correlation_matrix.show(truncate=False)

In [0]:
#2. NORMALIZATION
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=True)
scaler_model = scaler.fit(assembled_df)
scaled_data = scaler_model.transform(assembled_df)

# Cache the scaled data if used multiple times
scaled_data.cache()

scaled_data.select("scaledFeatures").show(20, truncate=False)

In [0]:
# 3. TEXT VECTORISATION WITH TF-IDF 
#generated with following ChatGPT prompt: Add tokenizer and count vectorisation with TF_IDF
tokenizer = Tokenizer(inputCol="reviewText", outputCol="words")
words_data = tokenizer.transform(df10)

remover = StopWordsRemover(inputCol="words", outputCol="filtered")
filtered_data = remover.transform(words_data)

vectorizer = CountVectorizer(inputCol="filtered", outputCol="rawFeatures")
vector_model = vectorizer.fit(filtered_data)
featurized_data = vector_model.transform(filtered_data)

idf = IDF(inputCol="rawFeatures", outputCol="features_text")
idf_model = idf.fit(featurized_data)
tfidf_data = idf_model.transform(featurized_data)
tfidf_data.select("features_text").show(2, truncate=False)

# Machine Learning Model Implementation (Using SparkML)

In [0]:
# import 
from pyspark.ml.feature import StringIndexer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline

In [0]:
# STEP 1: Label Creation

# Label positive reviews as 1 (rating >= 4), negative as 0 (rating <= 2)
# Neutral (rating == 3) is ignored
from pyspark.sql.functions import when

# Read the saved df10 from DBFS
df10 = spark.read.parquet("/dbfs/df10_output")

df_labeled = df10.withColumn(
    "label",
    when(col("rating") >= 4, 1).when(col("rating") <= 2, 0)
).filter(col("label").isNotNull())

In [0]:
# STEP 2: Text Feature Extraction (TF-IDF)

from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, IDF

# Tokenize review text
tokenizer = Tokenizer(inputCol="reviewText", outputCol="words")
tokenized = tokenizer.transform(df_labeled)

# Remove stopwords
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
filtered = remover.transform(tokenized)

# Convert words to term frequency vectors
vectorizer = CountVectorizer(inputCol="filtered", outputCol="rawFeatures", vocabSize=5000)
vector_model = vectorizer.fit(filtered)
featurized = vector_model.transform(filtered)

# Apply TF-IDF to scale frequencies
idf = IDF(inputCol="rawFeatures", outputCol="features")
idf_model = idf.fit(featurized)
tfidf_data = idf_model.transform(featurized)

In [0]:
# STEP 3: Train/Test Split

# Use only necessary columns
final_data = tfidf_data.select("features", "label")

# Split into train and test datasets
train_data, test_data = final_data.randomSplit([0.8, 0.2], seed=42)

In [0]:
# STEP 4: Model Training

from pyspark.ml.classification import LogisticRegression

# Create a logistic regression model
lr = LogisticRegression(featuresCol="features", labelCol="label", maxIter=10)

# Train the model
lr_model = lr.fit(train_data)

In [0]:
# STEP 5: Predictions and Evaluation

from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Make predictions
predictions = lr_model.transform(test_data)

# Evaluate using Accuracy and F1 Score
evaluator_acc = MulticlassClassificationEvaluator(metricName="accuracy", labelCol="label", predictionCol="prediction")
evaluator_f1 = MulticlassClassificationEvaluator(metricName="f1", labelCol="label", predictionCol="prediction")

accuracy = evaluator_acc.evaluate(predictions)
f1 = evaluator_f1.evaluate(predictions)

print(f"Test Accuracy: {accuracy:.4f}")
print(f"F1 Score: {f1:.4f}")

# Optimization & Tuning

In [0]:
# Step 1: Import required libraries

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [0]:
# Step 2: Create a binary classification evaluator
# - It will be used to measure the model’s performance
# - We use 'areaUnderROC' to evaluate classification quality

evaluator = BinaryClassificationEvaluator(labelCol="label", metricName="areaUnderROC")

In [0]:
# Step 3: Build a parameter grid
# - This defines multiple combinations of hyperparameters
# - We’ll try different values of:
#   regParam: controls regularization strength (smaller = less penalty)
#   elasticNetParam: 0 = L2 (Ridge), 1 = L1 (Lasso), 0.5 = mix of both

param_grid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.01, 0.1, 0.5]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()

In [0]:
# Step 4: Create a CrossValidator
# - It performs k-fold cross-validation (here, 3-fold)
# - It will test all combinations from the parameter grid
# - It returns the best model based on evaluator metric

crossval = CrossValidator(
    estimator=lr,                      # Our base model (Logistic Regression)
    estimatorParamMaps=param_grid,     # Our hyperparameter grid
    evaluator=evaluator,               # Evaluation strategy
    numFolds=3                         # Number of folds for cross-validation
)

In [0]:
# Step 5: Train the model using cross-validation
# - This might take a few minutes

cv_model = crossval.fit(train_data)

In [0]:
# Step 6: Use the best model to make predictions on test data

predictions_cv = cv_model.transform(test_data)

In [0]:
# Step 7: Evaluate the final tuned model

from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Accuracy
evaluator_acc = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator_acc.evaluate(predictions_cv)

# F1 Score
evaluator_f1 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
f1_score = evaluator_f1.evaluate(predictions_cv)

# Print final results
print("===== Tuned Model Performance =====")
print(f"Accuracy: {accuracy:.4f}")
print(f"F1 Score: {f1_score:.4f}")

In [0]:
# Step 8: Print the best hyperparameters

best_model = cv_model.bestModel

print("\n===== Best Hyperparameters Found =====")
print("Best Regularization Parameter (regParam):", best_model._java_obj.getRegParam())
print("Best ElasticNet Parameter (elasticNetParam):", best_model._java_obj.getElasticNetParam())

# ML Visualisations

In [0]:
# Cache the training dataset to avoid recomputation
train_data.cache()

# Repartition the data to improve parallelism (if needed)
train_data = train_data.repartition(4)

# Persist final TF-IDF dataset if reused often
tfidf_data.persist()

In [0]:
from sklearn.metrics import confusion_matrix

# 2. Collect true & predicted labels
pdf_cm = predictions_cv.select("label", "prediction").toPandas()

# 3. Compute confusion matrix
cm = confusion_matrix(pdf_cm["label"], pdf_cm["prediction"], labels=[0,1])

# 4. Plot heatmap
plt.figure(figsize=(6,5))
sns.heatmap(
    cm,
    annot=True,
    fmt='d',
    cmap='Blues',
    xticklabels=["Pred Neg","Pred Pos"],
    yticklabels=["True Neg","True Pos"]
)
plt.xlabel("Predicted Label")
plt.ylabel("True Label")
plt.title("Confusion Matrix: Tuned Logistic Regression")
plt.tight_layout()
display(plt)

In [0]:
# 1. Import
from sklearn.metrics import roc_curve, auc

# 2. Extract labels & scores
pdf_roc = predictions_cv.select("label", "probability").toPandas()
y_true = pdf_roc["label"]
y_score = pdf_roc["probability"].apply(lambda v: float(v[1]))

# 3. Compute ROC metrics
fpr, tpr, _ = roc_curve(y_true, y_score)
roc_auc = auc(fpr, tpr)

# 4. Plot ROC
plt.figure(figsize=(6,5))
plt.plot(fpr, tpr, label=f"AUC = {roc_auc:.3f}")
plt.plot([0,1], [0,1], linestyle='--', color='gray')
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.title("ROC Curve: Tuned Logistic Regression")
plt.legend(loc="lower right")
plt.tight_layout()
display(plt)


In [0]:
# 1. Import
from sklearn.metrics import precision_recall_curve, average_precision_score

# 2. Compute PR metrics
precision, recall, _ = precision_recall_curve(y_true, y_score)
avg_prec = average_precision_score(y_true, y_score)

# 3. Plot PR curve
plt.figure(figsize=(6,5))
plt.plot(recall, precision, label=f"AP = {avg_prec:.3f}")
plt.xlabel("Recall")
plt.ylabel("Precision")
plt.title("Precision–Recall Curve")
plt.legend(loc="lower left")
plt.tight_layout()
display(plt)


In [0]:
# 1. Evaluate baseline (if not already)
pred_base = lr_model.transform(test_data)
base_f1 = evaluator_f1.evaluate(pred_base)

# 2. Evaluate tuned
pred_tuned = predictions_cv.cache()
tuned_f1 = evaluator_f1.evaluate(pred_tuned)

# 3. Build DataFrame
import pandas as pd
df_cmp = pd.DataFrame({
    "Model": ["Baseline LR", "Tuned LR"],
    "F1 Score": [base_f1, tuned_f1]
})

# 4. Plot comparison
plt.figure(figsize=(6,4))
plt.bar(df_cmp["Model"], df_cmp["F1 Score"])
plt.ylim(0,1)
plt.ylabel("F1 Score")
plt.title("Baseline vs. Tuned Logistic Regression")
plt.tight_layout()
display(plt)


In [0]:
# 1. Extract vocabulary & coefficients
vocab = vector_model.vocabulary            # list of terms
coeffs = lr_model.coefficients.toArray()   # NumPy array

# 2. Build Pandas DataFrame
import pandas as pd
df_feat = pd.DataFrame({
    "term": vocab,
    "coef": coeffs
})
df_top = df_feat.reindex(df_feat.coef.abs().sort_values(ascending=False).index).head(20)

# 3. Plot
plt.figure(figsize=(8,6))
plt.barh(df_top["term"], df_top["coef"])
plt.axvline(0, color='black', linewidth=0.8)
plt.xlabel("Coefficient")
plt.title("Top 20 Influential Terms")
plt.gca().invert_yaxis()
plt.tight_layout()
display(plt)
