In [5]:
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.ml import Pipeline
from pyspark.sql import functions as F
from pyspark.sql.functions import udf, length, size, split, col, trim
from pyspark.sql.types import FloatType
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import when, col
from pyspark.ml.classification import LinearSVC, LogisticRegression, NaiveBayes

from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, HashingTF, IDF
from pyspark.ml.tuning import CrossValidator

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType
# pd.set_osption('display.float_format', '{:.0f}'.format)

import time
import pandas as pd
import pyspark
from itertools import islice
from google.cloud import storage
from operator import add

import warnings
warnings.filterwarnings("ignore")
warnings.filterwarnings("once")

In [None]:
# Initialize a Spark session
spark = SparkSession.builder \
.appName("read_tsv") \
.master('local[*]') \
.config('spark.sql.execution.arrow.pyspark.enabled', True) \
.config('spark.sql.session.timeZone', 'UTC') \
.config('spark.driver.memory','32G') \
.config('spark.executor.memory', '16G')\
.config('spark.network.timeout', '800s') \
.config('spark.executor.heartbeatInterval', '60s') \
.config('spark.ui.showConsoleProgress', True) \
.config('spark.sql.repl.eagerEval.enabled', True) \
.getOrCreate()


spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "500MB")  # Adjust the size as needed

#spark = SparkSession.builder.appName("read_tsv").getOrCreate()


## Load Data

In [None]:
# Read tsv file
#data = spark.read.option("delimiter", "\t").csv("gs://msca-bdp-student-gcs/group10-amazon-review/amazon_reviews_us_Baby_v1_00.tsv", header=True, inferSchema=True)

In [7]:
data1 = spark.read.option("delimiter", "\t").csv("gs://msca-bdp-student-gcs/group10-amazon-review/amazon_reviews_us_Baby_v1_00.tsv", header=True, inferSchema=True)
data2 = spark.read.option("delimiter", "\t").csv("gs://msca-bdp-student-gcs/group10-amazon-review/amazon_reviews_us_Beauty_v1_00.tsv", header=True, inferSchema=True)
data3 = spark.read.option("delimiter", "\t").csv("gs://msca-bdp-student-gcs/group10-amazon-review/amazon_reviews_us_Camera_v1_00.tsv", header=True, inferSchema=True)
data4 = spark.read.option("delimiter", "\t").csv("gs://msca-bdp-student-gcs/group10-amazon-review/amazon_reviews_us_Electronics_v1_00.tsv", header=True, inferSchema=True)
data5 = spark.read.option("delimiter", "\t").csv("gs://msca-bdp-student-gcs/group10-amazon-review/amazon_reviews_us_Furniture_v1_00.tsv", header=True, inferSchema=True)
data = data1.union(data2).union(data3).union(data4).union(data5)

                                                                                

In [8]:
data.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|    9970739| R8EWA1OFT84NX|B00GSP5D94|     329991347|Summer Infant Swa...|            Baby|          5|            0|          0|   N|                Y|Great swaddled bl...|Loved these swadd...| 2015-08-31|
|         US|   23538442|R2JWY4YRQD4FOP|B00YYDDZGU|     646108902|Pacifier Clip Gir...|            Baby|          5|    

## Data Cleaning

In [9]:
#data cleaning

from pyspark.sql.functions import col

#check any NA values
missing_count = [data.filter(col(customer_id).isNull()).count() for customer_id in data.columns]
missing_values = dict(zip(data.columns, missing_count))

# missing value counts
for column, count in missing_values.items():
    print(f"Missing values in '{column}': {count}")
    

#remove missing values
data_1 = data.dropna()

# Check for duplicates based on a column

data_dup = data_1.dropDuplicates(subset=["review_body", "review_id"])
# Count rows and see if duplicate rows were removed
data_dup.count()

from pyspark.sql.functions import length
data_text = data_dup.filter(length(data_dup["review_body"]) > 5)

# Count rows
data_text.count()


# convert to all lowercase text for consistency

from pyspark.sql.functions import lower
 
cols_lower = ["review_body", "product_title", "product_category","review_headline"]

# Apply lowercase conversion to each column
for col in cols_lower:
    data_text = data_text.withColumn(col, lower(data_text[col]))
    
    
#remove any unwanted characters

from pyspark.sql.functions import regexp_replace

cols_lower = ["review_body", "product_title", "product_category","review_headline"]

# Pattern for text cleaning
pattern = "[^a-zA-Z0-9\\s]"

# Apply text cleaning pattern to each column
for col in cols_lower:
    data_text = data_text.withColumn(col, regexp_replace(data_text[col], pattern, ""))


                                                                                

Missing values in 'marketplace': 0
Missing values in 'customer_id': 0
Missing values in 'review_id': 0
Missing values in 'product_id': 0
Missing values in 'product_parent': 0
Missing values in 'product_title': 0
Missing values in 'product_category': 223
Missing values in 'star_rating': 223
Missing values in 'helpful_votes': 224
Missing values in 'total_votes': 224
Missing values in 'vine': 224
Missing values in 'verified_purchase': 224
Missing values in 'review_headline': 237
Missing values in 'review_body': 1579
Missing values in 'review_date': 836


                                                                                

## Filter Data

In [10]:
from pyspark.sql.functions import when, col
data_r= data_text.select("customer_id", "product_id","star_rating")
data_r = data_r.withColumn("star_rating", col("star_rating").cast("float"))

In [38]:
# # Write to Parquet
# data_r.write.parquet("gs://msca-bdp-student-gcs/group10-amazon-review/Parquet")

In [None]:
# # Read from Parquet
# parquet_df = spark.read.parquet("gs://msca-bdp-student-gcs/group10-amazon-review/Parquet")

# # The data type of 'some_column' will be integer as casted earlier
# parquet_df.printSchema()

In [None]:
# Transform DataFrame to RDD
# parquet_df = parquet_df.repartition(10)
# parquet_df_rdd = parquet_df.rdd

# Map data to key-value pairs
#key_value_mes_rdd_1 = message_results_1_rdd.map(lambda x: ((x['message_A'], x['message_B'], x['repo_name.A'], x['repo_name.B']), 1))

## Movie Recommender System - Collaborative Filtering

In [11]:
from pyspark.ml.feature import StringIndexer
from pyspark.sql.functions import col


# Convert string identifiers to numeric
indexer = StringIndexer(inputCols=["customer_id", "product_id"], outputCols=["userIndex", "productIndex"])
data_r = indexer.fit(data_r).transform(data_r)



                                                                                

In [12]:
# Data split
(train, test) = data_r .randomSplit([0.8, 0.2])

In [13]:
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import expr
from pyspark.mllib.evaluation import RankingMetrics
from pyspark.sql import Window

# Initialize ALS model
als = ALS(userCol="userIndex", itemCol="productIndex", ratingCol="star_rating", coldStartStrategy="drop", nonnegative=True)

## ParamGridBuilder
#### Create a grid of hyperparameters to search over

In [14]:
# Create ParamGrid for Cross Validation
paramGrid = ParamGridBuilder() \
    .addGrid(als.rank, [5, 10]) \
    .addGrid(als.maxIter, [5]) \
    .addGrid(als.regParam, [0.05, 1]) \
    .build()

## Prediction Performance

In [15]:
from pyspark.sql.functions import expr
from pyspark.mllib.evaluation import RankingMetrics
from pyspark.sql import Window

In [16]:

# Evaluator for assessing model performance
evaluator = RegressionEvaluator(metricName="rmse", labelCol="star_rating", predictionCol="prediction")


In [17]:
from pyspark.ml.tuning import CrossValidator

# CrossValidator requires all of the above components
crossval = CrossValidator(estimator=als,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=4) 


In [None]:
# Fit ALS model to training data
cvModel = crossval.fit(train)

# Extract best model from the tuning exercise using ParamGridBuilder
bestModel = cvModel.bestModel

# Generate test predictions
predictions = bestModel.transform(test)

# Evaluate best model
rmse = evaluator.evaluate(predictions)
print("Rank:", bestModel.rank)
print("MaxIter:", bestModel._java_obj.parent().getMaxIter())
print("RegParam:", bestModel._java_obj.parent().getRegParam())
print("Root-mean-square error (RMSE) for cross validator model = " + str(rmse))


23/11/28 21:23:43 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 273.0 MiB
23/11/28 21:24:24 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 273.0 MiB
23/11/28 21:30:06 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 273.0 MiB
23/11/28 21:34:01 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 273.0 MiB
23/11/28 21:34:38 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 273.0 MiB
23/11/28 21:36:40 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 273.0 MiB
23/11/28 21:37:02 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 273.0 MiB
23/11/28 21:37:21 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 273.0 MiB
23/11/28 21:37:39 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting lar

Rank: 5
MaxIter: 5
RegParam: 1.0
Root-mean-square error (RMSE) for cross validator model = 1.6978622681113018


                                                                                

In [None]:
predictions.show(10)

23/11/29 02:16:02 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 273.0 MiB
23/11/29 02:16:13 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 273.0 MiB
23/11/29 02:16:36 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 273.0 MiB
23/11/29 02:18:22 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 273.1 MiB
[Stage 1579:>                                                       (0 + 1) / 1]

+-----------+----------+-----------+---------+------------+----------+
|customer_id|product_id|star_rating|userIndex|productIndex|prediction|
+-----------+----------+-----------+---------+------------+----------+
|   42609937|B005URY734|        5.0|    148.0|    351282.0| 3.8449013|
|   42609937|B000XFFW64|        5.0|    148.0|     17948.0| 3.4707494|
|   42609937|B000Z94V2O|        4.0|    148.0|    162370.0|  3.517347|
|   42609937|B004L0H030|        4.0|    148.0|    346281.0| 3.4900675|
|   42609937|B00J0OJJC2|        5.0|    148.0|    633131.0| 3.6921678|
|   42609937|B000FL5OUA|        5.0|    148.0|    326116.0|  3.007879|
|   42609937|B000WCIOYU|        4.0|    148.0|    394863.0| 3.2452035|
|   42609937|B003VPK4M0|        4.0|    148.0|     47049.0|  3.495666|
|   42609937|B000QBQ55C|        5.0|    148.0|    134164.0| 3.3087392|
|   42609937|B00JMRS0E0|        4.0|    148.0|    635741.0| 1.7272252|
+-----------+----------+-----------+---------+------------+----------+
only s

                                                                                

## Predictions:Generate Recommendations

In [None]:
# Recommend top 5 products for each user
warnings.filterwarnings("ignore")
userRecs = bestModel.recommendForAllUsers(5)
userRecs.show(10)

In [None]:
#example:
warnings.filterwarnings("ignore")
# Generate top 10 product recommendations for a specified set of users
users = data_r.select(als.getUserCol()).distinct().limit(3)
userSubsetRecs = bestModel.recommendForUserSubset(users, 10)

userSubsetRecs.show(10, truncate=False)

In [None]:
from pyspark.sql import Row

def get_recommendation(user_id, item_id):
    # Create a DataFrame with the specified user and item
    request_df = spark.createDataFrame([Row(userIndex=user_id, productIndex=item_id)])

    # Get the prediction
    prediction = bestModel.transform(request_df)

    # Return the predicted rating
    predicted_rating = prediction.collect()[0]["prediction"] if prediction.count() > 0 else None
    return predicted_rating

In [None]:
user_id = 54153   # Replace with the user ID
item_id = 130316  # Replace with the item ID

predicted_rating = get_recommendation(user_id, item_id)
print(f"Predicted rating for user {user_id} and item {item_id}: {predicted_rating}")


In [None]:
def get_recommendations_for_user(user_id, num_recommendations=10):
    """
    Generates top product recommendations for a given user.
    
    :param user_id: The user ID for whom to generate recommendations.
    :param num_recommendations: The number of recommendations to generate.
    :return: DataFrame containing the recommended product IDs and their ratings.
    """
    # Create a DataFrame for the specified user
    user_df = spark.createDataFrame([(user_id,)], ["userIndex"])
    
    # Generate recommendations using the ALS model
    user_recs = bestModel.recommendForUserSubset(user_df, num_recommendations)
    
    # Extract and return the recommendations
    recommendations = user_recs.select("recommendations.productIndex", "recommendations.rating")
    return recommendations

In [None]:
user_id = 54153  # Replace with the actual user ID
num_recommendations = 10

recommended_products = get_recommendations_for_user(user_id, num_recommendations)
recommended_products.show(truncate=False)
