<a href="https://colab.research.google.com/github/ElijahLewisUWI/testrepo/blob/main/Recommender%20System2/Recommender_System.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Recommender System

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("ALS Recommender").getOrCreate()

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator #Evaluation Metric
from pyspark.ml.recommendation import ALS #Alternating least squares model for collaborative filtering
from pyspark.sql.functions import count #aggregate function
from pyspark.sql.functions import col #treat dataframe column as an object
from pyspark.ml.feature import StringIndexer #Converts string columns to numbers
from pyspark.sql.functions import min, max #Used to get the min and max of rating column
import random #Used to get random users



In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
df = spark.read.csv("/content/drive/My Drive/amazon_data_2023/cleaned_amazon_data.csv", header=True, inferSchema=True) #input cleaned dataset


In [None]:
df.columns #Ensures all the necessary columns are present

['rating',
 'title_x',
 'text',
 'images_x',
 'asin',
 'parent_asin',
 'user_id',
 'timestamp',
 'helpful_vote',
 'verified_purchase',
 'main_category',
 'title_y',
 'average_rating',
 'rating_number',
 'features',
 'description',
 'price',
 'images_y',
 'videos',
 'store',
 'categories',
 'details',
 'bought_together',
 'subtitle',
 'author',
 'brand',
 'review_length',
 'year']

In [None]:
review_count = df.count()
print(f"Csv contains {review_count} reviews")

Csv contains 476684 reviews


# Data Setup
Retaining user_id, asin and rating while dropping users with fewer than 5 total
reviews

In [None]:
user_counts = df.groupBy('user_id').count() #Creates a dataframe based on the unique users


In [None]:
users_to_keep = user_counts.filter(user_counts['count'] >= 5).select('user_id') #Filter out users with fewer than 5 interaction
filtered_df = df.join(users_to_keep, on='user_id', how='inner')#Keeps rows with users that have at least 5 reviews

In [None]:
#Cast rating to float
filtered_df = filtered_df.withColumn("rating", col("rating").cast("float"))

In [None]:
filtered_count = filtered_df.count()
print(f"Amount of rows in the filtered dataframe is {filtered_count}")

Amount of rows in the filtered dataframe is 38384


The filtered dataset contains 38384 rows, which meets the minimum amount for the recommender system.

In [None]:
#Shows the min and max of the ratings
filtered_df.select(min("rating"), max("rating")).show()

+-----------+-----------+
|min(rating)|max(rating)|
+-----------+-----------+
|        1.0|        5.0|
+-----------+-----------+



# Split
Splitting dataset into an 80% traiing set so the model can have enough data to generalize, while also having enough to confirm model efficiency.

The below code deals with converting the string values of the user_id and asin to numerical indexes due to the Alternating Least Squares method requiring numerical values for processing

In [None]:
#Convert string columns to usable indexes
user_indexer = StringIndexer(inputCol="user_id", outputCol="user_index",handleInvalid="skip")
item_indexer = StringIndexer(inputCol="asin", outputCol="item_index",handleInvalid="skip")

#Apply indexed data to filtered dataset
indexed_data = user_indexer.fit(filtered_df).transform(filtered_df)
indexed_data = item_indexer.fit(indexed_data).transform(indexed_data)
#Randomly splits dataset into an 80% train set and 20% test set
(training, test) = indexed_data.randomSplit([0.8, 0.2], seed=42)

#Alternating Least Squares
The Spark ML library was chosen to implement alternating least squares due to its ability to handle large datasets, its high processing speed as well as its ease of implementation.

The below code utilizes manual als tuning to find the best hyperparameters for the model. It evaluates multiple combinations of hyperparameters (rank, regularization parameter, and maximum iterations)

In [None]:
#Define the evaluator to assess model performance using RMSE
evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="rating",
    predictionCol="prediction"
)

In [None]:
def als_tuning(training, test, ranks, regParams, maxIters):
    #Initialize variables to store best parameters and model
    best_model = None
    best_rmse = float("inf") #Stores large value so any RMSE can be better than it
    best_params = None
    #Loops through the different combinations of parameters
    for rank in ranks:
        for reg in regParams:
            for max_iter in maxIters:
                als = ALS(
                    userCol="user_index",
                    itemCol="item_index",
                    ratingCol="rating",
                    coldStartStrategy="drop",
                    nonnegative=True,
                    implicitPrefs=False,
                    rank=rank,
                    maxIter=max_iter,
                    regParam=reg
                )

                model = als.fit(training) #Fit the ALS model on the training data
                predictions = model.transform(test) #Make predictions on the test data
                rmse = evaluator.evaluate(predictions) #Evaluate model using RMSE

                print(f"Rank: {rank}, RegParam: {reg}, MaxIter: {max_iter} || RMSE: {rmse}") #Print current combination of parameters and corresponding RMSE value
                #Update the best model and parameters if rmse  is lower the the current best RMSE
                if rmse < best_rmse:
                    best_rmse = rmse
                    best_model = model
                    best_params = (rank, reg, max_iter)

    print(f"\nBest RMSE: {best_rmse} with Rank: {best_params[0]}, RegParam: {best_params[1]}, MaxIter: {best_params[2]}")
    return best_model


In [None]:
#List of hyperparameters for tuning the ALS model
ranks = [8, 10, 12]
regParams = [0.05, 0.1, 0.15]
maxIters = [10, 15]

# Evaluation
Different combinations of hyperparamters were chosen to train and evaluate the model. The model with the lowest RMSE score will be chosen to make predictions.

In [None]:
best_model = als_tuning(training, test, ranks, regParams, maxIters)

Rank: 8, RegParam: 0.05, MaxIter: 10 || RMSE: 0.9626941988288471
Rank: 8, RegParam: 0.05, MaxIter: 15 || RMSE: 0.9658879752223287
Rank: 8, RegParam: 0.1, MaxIter: 10 || RMSE: 0.9940081117607227
Rank: 8, RegParam: 0.1, MaxIter: 15 || RMSE: 1.0035237992723878
Rank: 8, RegParam: 0.15, MaxIter: 10 || RMSE: 1.0330511399638678
Rank: 8, RegParam: 0.15, MaxIter: 15 || RMSE: 1.0479245933433168
Rank: 10, RegParam: 0.05, MaxIter: 10 || RMSE: 0.9640274068999336
Rank: 10, RegParam: 0.05, MaxIter: 15 || RMSE: 0.9669068285722151
Rank: 10, RegParam: 0.1, MaxIter: 10 || RMSE: 0.9943001757165887
Rank: 10, RegParam: 0.1, MaxIter: 15 || RMSE: 1.0037625979709999
Rank: 10, RegParam: 0.15, MaxIter: 10 || RMSE: 1.0331387718763871
Rank: 10, RegParam: 0.15, MaxIter: 15 || RMSE: 1.049047060584209
Rank: 12, RegParam: 0.05, MaxIter: 10 || RMSE: 0.9649358774209971
Rank: 12, RegParam: 0.05, MaxIter: 15 || RMSE: 0.9663680178728038
Rank: 12, RegParam: 0.1, MaxIter: 10 || RMSE: 0.9933270762978792
Rank: 12, RegParam: 0.

From the fine tuning above, it can be seen that the best RMSE was  0.9631 with a Rank of 8, RegParam of 0.05 and MaxIter of 10.
This relatively low RMSE suggests that the model has strong predictive performance and is well-suited for the task at hand. It also suggests that the patterns in the data are learnable and the variability of ratings is somewhat predictable. Though, there is room for improvement with better fine tuning, and a train/validation/test split.


# Demo

In [None]:
"""Function to get top n recommendations for a specified number of users"""
def top_recommendations(model,test, indexed_data, num_users=3, top_n=5):

    unique_users = test.select("user_index").distinct()
    user_ids = [row["user_index"] for row in unique_users.collect()]

    random_users = random.sample(user_ids, num_users)
    print(f"Randomly selected user indices: {random_users}")

    #Recommend for each user
    all_items = indexed_data.select("item_index").distinct()

    for user_id in random_users:
        #Create pairs of users and items
        user_df = all_items.withColumn("user_index", col("item_index") * 0 + user_id)

        #Generate predictions
        user_recommendations = model.transform(user_df).filter(col("prediction").isNotNull())

        #top recommendations
        top_recommendations = user_recommendations.orderBy(col("prediction").desc()).limit(top_n)

        #Display results
        print(f"\nTop {top_n} recommendations for user_index {user_id}:")
        top_recommendations.select("item_index", "prediction").show()



In [None]:
top_recommendations(best_model, test, indexed_data)

Randomly selected user indices: [1019.0, 2304.0, 4084.0]

Top 5 recommendations for user_index 1019.0:
+----------+----------+
|item_index|prediction|
+----------+----------+
|   12121.0| 5.2859592|
|   18416.0| 5.2859592|
|   11024.0| 5.2577105|
|   17536.0| 5.2577105|
|   21716.0| 5.2370405|
+----------+----------+


Top 5 recommendations for user_index 2304.0:
+----------+----------+
|item_index|prediction|
+----------+----------+
|   28516.0| 5.7482314|
|   16031.0| 5.7482314|
|   24047.0| 5.7482314|
|    7035.0| 5.7482314|
|   11991.0|  5.636387|
+----------+----------+


Top 5 recommendations for user_index 4084.0:
+----------+----------+
|item_index|prediction|
+----------+----------+
|   15292.0|  5.999779|
|   16534.0|  5.999779|
|    8724.0|  5.999779|
|    8729.0|  5.999779|
|   24262.0|  5.995158|
+----------+----------+



The recommendations for each user show that the model is able to generate meaningful predictions based on the user’s past interactions. The predicted ratings are fairly high, suggesting that the model is efficient in identifying items that are relevant to the users.