In [1]:
# 1) Environment setup
!pip install pyspark findspark matplotlib seaborn pandas

[0mDefaulting to user installation because normal site-packages is not writeable


In [2]:
import os, pickle, glob
from pyspark.sql import SparkSession
from pyspark.ml.feature import Word2Vec
from pyspark.sql.functions import lit, length, col, count, when, isnan, avg
import matplotlib.pyplot as plt
import seaborn as sns
from pyspark.sql.types import DoubleType, FloatType
from functools import reduce
from pyspark.sql import DataFrame
from pyspark.sql import functions as F
from pyspark.sql.functions import col, length, isnan, when, count, from_unixtime, date_format, date_trunc, year, avg
import pandas as pd
import matplotlib.dates as mdates
import matplotlib.ticker as mtick

Matplotlib created a temporary cache directory at /scratch/jsease/job_39780153/matplotlib-r89b0794 because the default path (/home/jovyan/.cache/matplotlib) is not a writable directory; it is highly recommended to set the MPLCONFIGDIR environment variable to a writable directory, in particular to speed up the import of Matplotlib and to better support multiprocessing.


In [3]:
sc = SparkSession.builder \
    .config("spark.driver.memory", "12g") \
    .config("spark.executor.memory", "12g") \
    .config('spark.executor.instances', 19) \
    .getOrCreate()

In [4]:
# Load data in

folder_path = "/expanse/lustre/projects/uci150/mzidell/reviews_parquet"
all_files = os.listdir(folder_path)
parquet_files = [f for f in all_files if f.endswith('.parquet')]

all_df = []
for file in parquet_files:
    file_path = os.path.join(folder_path, file)
    category = file.strip('review_').strip('.parquet')
    df = sc.read.parquet(file_path).withColumn("category", lit(category))
    all_df.append(df)
    print(f"Loaded: {file}")

reviews = reduce(DataFrame.unionByName, all_df)
reviews = reviews.withColumn("text_len", length(col("text")))

Loaded: review_Cell_Phones_and_Accessories.parquet
Loaded: review_Grocery_and_Gourmet_Food.parquet
Loaded: review_Health_and_Personal_Care.parquet
Loaded: review_Arts_Crafts_and_Sewing.parquet
Loaded: review_Pet_Supplies.parquet
Loaded: review_Toys_and_Games.parquet
Loaded: review_Industrial_and_Scientific.parquet
Loaded: review_Patio_Lawn_and_Garden.parquet
Loaded: review_Movies_and_TV.parquet
Loaded: review_Gift_Cards.parquet
Loaded: review_Video_Games.parquet
Loaded: review_Tools_and_Home_Improvement.parquet
Loaded: review_Office_Products.parquet
Loaded: review_Digital_Music.parquet
Loaded: review_Automotive.parquet
Loaded: review_Sports_and_Outdoors.parquet
Loaded: review_All_Beauty.parquet
Loaded: review_Musical_Instruments.parquet
Loaded: review_Amazon_Fashion.parquet
Loaded: review_CDs_and_Vinyl.parquet
Loaded: review_Kindle_Store.parquet
Loaded: review_Magazine_Subscriptions.parquet
Loaded: review_Beauty_and_Personal_Care.parquet
Loaded: review_Health_and_Household.parquet
Load

In [22]:
# Attempting to go on the whole dataset but still undersampling 
import time
from pyspark.ml import Pipeline
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, StringIndexer, OneHotEncoder, HashingTF, IDF, VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import round


# Timer start
total_start = time.time()

# Step 1: Filter and feature engineering
print("Filtering and preparing features...")
t1 = time.time()

reviews_clean = reviews.filter(col("rating").isNotNull() & col("text").isNotNull())

fraction_to_keep = 0.4
majority_df = df.filter(col("rating") == 5)
minority_df = df.filter(col("rating") != 5)
majority_sampled = majority_df.sample(withReplacement=False, fraction=fraction_to_keep, seed=42)
reviews_clean = minority_df.union(majority_sampled).cache()

#reviews_clean = reviews_undersampled.sample(withReplacement=False, fraction=0.01, seed=42).cache()

reviews_fe = reviews_clean.selectExpr(
    "rating", "title", "text", "verified_purchase", "helpful_vote", "timestamp", "category",
    "length(text) as text_length",
    "cast(verified_purchase as double) as verified_numeric"
)

print(f"Step 1 complete in {time.time() - t1:.2f} seconds")

print("Defining pipeline stages...")
t2 = time.time()

text_tokenizer = RegexTokenizer(inputCol="text", outputCol="text_tokens")
text_remover = StopWordsRemover(inputCol="text_tokens", outputCol="text_filtered")
text_hashingTF = HashingTF(inputCol="text_filtered", outputCol="text_rawFeatures", numFeatures=10000)
text_idf = IDF(inputCol="text_rawFeatures", outputCol="textFeatures")

title_tokenizer = RegexTokenizer(inputCol="title", outputCol="title_tokens")
title_remover = StopWordsRemover(inputCol="title_tokens", outputCol="title_filtered")
title_hashingTF = HashingTF(inputCol="title_filtered", outputCol="title_rawFeatures", numFeatures=5000)
title_idf = IDF(inputCol="title_rawFeatures", outputCol="titleFeatures")

category_indexer = StringIndexer(inputCol="category", outputCol="category_index", handleInvalid="keep")
category_encoder = OneHotEncoder(inputCol="category_index", outputCol="category_encoded")

assembler = VectorAssembler(
    inputCols=["textFeatures", "titleFeatures", "text_length", "verified_numeric", "helpful_vote", "timestamp", "category_encoded"],
    outputCol="features"
)


lr = LinearRegression(
    featuresCol="features",
    labelCol="rating",
    maxIter=100,
    regParam=0.005,      
    elasticNetParam=0.0   
)


pipeline = Pipeline(stages=[
    text_tokenizer, text_remover, text_hashingTF, text_idf,
    title_tokenizer, title_remover, title_hashingTF, title_idf,
    category_indexer, category_encoder,
    assembler,
    lr
])
print(f"Step 2 complete in {time.time() - t2:.2f} seconds")

# Step 3: Train-test split
print("Splitting data...")
t3 = time.time()
train_df, test_df = reviews_fe.randomSplit([0.8, 0.2], seed=42)
train_df = train_df.repartition(100)
test_df = test_df.repartition(50)
print(f"Step 3 complete in {time.time() - t3:.2f} seconds")

# Step 4: Train model
print("Training model...")
t4 = time.time()
model = pipeline.fit(train_df)
print(f"Model training complete in {time.time() - t4:.2f} seconds")

# Step 5: Make predictions
print("Making predictions...")
t5 = time.time()
predictions = model.transform(test_df)
print(f"Predictions complete in {time.time() - t5:.2f} seconds")

# Step 6: Evaluate accuracy
print("Evaluating accuracy...")
t6 = time.time()

rounded_preds = predictions.withColumn("rounded_prediction", 
    when(col("prediction") < 2.5, 1.0)
   .when(col("prediction") < 2.85, 2.0)
   .when(col("prediction") < 3.3, 3.0)
   .when(col("prediction") < 4.05, 4.0)
   .otherwise(5.0)
)


evaluator = MulticlassClassificationEvaluator(
    labelCol="rating",
    predictionCol="rounded_prediction",
    metricName="accuracy"
)
accuracy = evaluator.evaluate(rounded_preds)

print(f"Evaluation complete in {time.time() - t6:.2f} seconds")

# Final output
print(f"Test accuracy: {accuracy:.4f}")
print(f"Total time elapsed: {time.time() - total_start:.2f} seconds")


Filtering and preparing features...
Step 1 complete in 0.02 seconds
Defining pipeline stages...
Step 2 complete in 0.03 seconds
Splitting data...
Step 3 complete in 0.00 seconds
Training model...
Model training complete in 69.59 seconds
Making predictions...
Predictions complete in 0.11 seconds
Evaluating accuracy...
Evaluation complete in 3.70 seconds
Test accuracy: 0.5231
Total time elapsed: 73.45 seconds


In [24]:
rounded_preds.groupBy("rounded_prediction").count().orderBy("rounded_prediction").show()
rounded_preds.groupBy("rating").count().orderBy("rating").show()

+------------------+------+
|rounded_prediction| count|
+------------------+------+
|               1.0|136333|
|               2.0| 45963|
|               3.0| 87322|
|               4.0|174213|
|               5.0|212769|
+------------------+------+

+------+------+
|rating| count|
+------+------+
|   1.0|138758|
|   2.0| 47918|
|   3.0| 83963|
|   4.0|171332|
|   5.0|214629|
+------+------+

