SET UP

In [1]:
# %pip install mysql-connector-python
# %pip install pyspark # may need to install java as well and set java home and spark home
# %pip install pandas
# %pip install numpy

import pandas as pd
import numpy as np
from collections import defaultdict

import boto3
from botocore.exceptions import ClientError
import json

import mysql.connector
from mysql.connector import Error

from pyspark.sql import SparkSession
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS


In [2]:

def get_secret():

    secret_name = "rds!db-f8407841-d834-4490-a67a-a994b71bc2e1"
    region_name = "ca-central-1"

    # Create a Secrets Manager client
    session = boto3.session.Session()
    client = session.client(
        service_name='secretsmanager',
        region_name=region_name
    )

    try:
        get_secret_value_response = client.get_secret_value(
            SecretId=secret_name
        )
        return get_secret_value_response
    except ClientError as e:
        raise e

secret = json.loads(get_secret()['SecretString'])

host = "ecommerce.c7eeeeuega91.ca-central-1.rds.amazonaws.com"
user = secret['username']
password = secret['password']
port = 3306
database = "ecommerce"


In [None]:
#functions to establish and close connection to RDS 

def connect_to_rds():
    try: #can add a while loop to re-run for failure
        connection = mysql.connector.connect(
            host=host,
            user=user,
            password=password,
            port=port,
            database=database
        )
        if connection.is_connected():
            print("Successfully connected to RDS!")
            cursor = connection.cursor()
            cursor.execute("SELECT DATABASE();")
            record = cursor.fetchone()
            print(f"You're connected to: {record}")
            return connection, cursor
        else:
            return None, None
    except Error as e:
        print("Error while connecting to RDS:", e)
        return None, None

def close_connection(connection, cursor):
    if connection!=None and connection.is_connected():
        cursor.close()
        connection.close()
        print("Connection closed.")


In [4]:
# Executing queries to get our the reviews table, the purchases table, and all OUR users.

connection, cursor = connect_to_rds()

if connection.is_connected() and cursor != None:
    print("Attempting query execution")
    try:       
        get_reviews_query = "SELECT userid, product_asin, rating FROM Reviews;" 
        cursor.execute(get_reviews_query)
        rows = cursor.fetchall()
        columns = [desc[0] for desc in cursor.description] # Fetch column names from the cursor
        
        get_our_users_query = "SELECT userid FROM Users WHERE email IS NOT NULL;"
        cursor.execute(get_our_users_query)
        our_users = cursor.fetchall()

        get_purchases_query = "SELECT DISTINCT userid, product_asin FROM Purchases;"
        cursor.execute(get_purchases_query)
        purchases = cursor.fetchall()

    except Error as e:
        print("Error while executing MySQL queries:", e)

# close_connection(connection, cursor)
# cursor = None
# connection = None # to avoid dangling pointer

Successfully connected to RDS!
You're connected to: ('ecommerce',)
Attempting query execution


In [5]:
print(columns)
ratings_df = pd.DataFrame(rows, columns=columns)
ratings_df

['userid', 'product_asin', 'rating']


Unnamed: 0,userid,product_asin,rating
0,AFTYGYAECGAWJRDXIIXSO3PZ45KA,B08GCR1G1R,5.0
1,AE57MRF2R2ALCC6H5WQLFKT7KSSA,B01MRV9Z1Y,5.0
2,AFXF3EGQTQDXMRLDWFU7UBFQZB7Q,B0896Q9WZY,4.0
3,AHPFHP43AXWRYZZ4HPNCW7I7J3ZQ,B0896Q9WZY,5.0
4,AGUFDVO4TYIFQHQUMDXFHSHH43BA,B0C5BMZ1K9,2.0
...,...,...,...
81390,1cbd15f8-4021-70b7-c332-5b8938bd4417,B0000WS5EQ,4.0
81391,1cbd15f8-4021-70b7-c332-5b8938bd4417,B0000WS5EQ,4.0
81392,1cbd15f8-4021-70b7-c332-5b8938bd4417,B0006HHNOI,4.0
81393,1cbd15f8-4021-70b7-c332-5b8938bd4417,B00067AVGK,4.0


In [6]:
our_users_df = pd.DataFrame(our_users, columns=['userid'])
print(our_users_df)

recommendations_dict = {user[0]: list() for user in our_users}
print(recommendations_dict)

                                 userid
0  1cbd15f8-4021-70b7-c332-5b8938bd4417
1  3cad2568-b0c1-7000-07b0-b98f8f2c5010
2  8c5d7538-90d1-70a0-efb1-64a13ebc70e0
{'1cbd15f8-4021-70b7-c332-5b8938bd4417': [], '3cad2568-b0c1-7000-07b0-b98f8f2c5010': [], '8c5d7538-90d1-70a0-efb1-64a13ebc70e0': []}


In [7]:
purchases_df = pd.DataFrame(purchases, columns=['userid', 'product_asin'])
print(purchases_df)

purchases_dict = defaultdict(list)
for _, row in purchases_df.iterrows():
    purchases_dict[row['userid']].append(row['product_asin'])

print(purchases_dict)

max_purchases_by_a_user = 0
for key in purchases_dict.keys():
    n_purchases = len(purchases_dict[key])
    if n_purchases > max_purchases_by_a_user:
        max_purchases_by_a_user = n_purchases
print(max_purchases_by_a_user)



                                 userid product_asin
0  3cad2568-b0c1-7000-07b0-b98f8f2c5010   6025006393
defaultdict(<class 'list'>, {'3cad2568-b0c1-7000-07b0-b98f8f2c5010': ['6025006393']})
1


RUNNING THE COLLABORATIVE FILTERING ML MODEL FROM SPARK - BASED ON ALS

In [8]:
spark = SparkSession.builder \
    .appName("ALSRecommendationSystem") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/03 07:19:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [9]:
num_unique_users = len(np.unique(ratings_df["userid"]))
num_unique_items = len(np.unique(ratings_df["product_asin"]))

# Spark's ALS cannot handle the users and items if they're strings - they have to be integers 
user_mapper = dict(zip(np.unique(ratings_df["userid"]), list(range(num_unique_users))))
item_mapper = dict(zip(np.unique(ratings_df["product_asin"]), list(range(num_unique_items))))
user_inverse_mapper = dict(zip(list(range(num_unique_users)), np.unique(ratings_df["userid"])))
item_inverse_mapper = dict(zip(list(range(num_unique_items)), np.unique(ratings_df["product_asin"])))


ratings_transformed_df = ratings_df.copy()
ratings_transformed_df["userid"] = ratings_transformed_df["userid"].map(user_mapper)
ratings_transformed_df["product_asin"] = ratings_transformed_df["product_asin"].map(item_mapper)
print(ratings_transformed_df)
ratings_spark = spark.createDataFrame(ratings_transformed_df)


our_users_transformed_df = our_users_df.copy()
our_users_transformed_df["userid"] = our_users_df["userid"].map(user_mapper)
print(our_users_transformed_df)
# !!! Some users have not rated any items. These users will have NaN in mapped id in the above df. These users cannot get recommendations. 
our_users_transformed_df = our_users_transformed_df.dropna()
our_users_spark = spark.createDataFrame(our_users_transformed_df)

       userid  product_asin  rating
0       35805          2307     5.0
1        1983          1209     5.0
2       37867          2196     4.0
3       72315          2196     5.0
4       55627          2986     2.0
...       ...           ...     ...
81390       0             4     4.0
81391       0             4     4.0
81392       0            43     4.0
81393       0            40     4.0
81394       1            89     4.0

[81395 rows x 3 columns]
   userid
0     0.0
1     1.0
2     NaN


In [10]:
# HYPERPARAMETERS
num_recommendations_per_user = 5
num_recommendations_per_user += max_purchases_by_a_user
training_split_ratio = 0.8

maxIter=10
regParam=0.01
coldStartStrategy="drop"
implicitPrefs=False 

metricName="rmse"
labelCol="rating"
predictionCol="prediction"

In [11]:
(training, test) = ratings_spark.randomSplit([training_split_ratio, 1-training_split_ratio])

# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(maxIter=maxIter, regParam=regParam, userCol="userid", itemCol="product_asin", ratingCol="rating", coldStartStrategy=coldStartStrategy, implicitPrefs=implicitPrefs)
model = als.fit(training)

# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName=metricName, labelCol=labelCol, predictionCol=predictionCol)
error = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(error))
# consistently getting 4.33 rmse which is really bad - the data is EXTREMELY sparse

# Generate top num_recommendations_per_user product recommendations for OUR users
userSubsetRecs = model.recommendForUserSubset(our_users_spark.select("userid"), num_recommendations_per_user)

24/12/03 07:19:41 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/12/03 07:19:41 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
24/12/03 07:19:41 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
                                                                                

Root-mean-square error = 3.577522569275012


In [12]:
user_recommendations_pd = pd.DataFrame(userSubsetRecs.collect(), columns=userSubsetRecs.columns)

print(user_recommendations_pd)
user_recommendations_pd['recommended_products'] = user_recommendations_pd['recommendations'].apply(
    lambda recs: [rec[0] for rec in recs]
)

user_recommendations_pd = user_recommendations_pd[['userid', 'recommended_products']]

user_recommendations_pd["userid"] = user_recommendations_pd["userid"].map(user_inverse_mapper)
user_recommendations_pd['recommended_products'] = user_recommendations_pd['recommended_products'].apply(
    lambda rec_products: [item_inverse_mapper[product] for product in rec_products]
)

user_recommendations_pd.head(10)

for index, row in user_recommendations_pd.iterrows():
    recommendations_dict[row['userid']]=row['recommended_products']

print(recommendations_dict)

   userid                                    recommendations
0       1  [(2714, 7.125075340270996), (2488, 6.932002544...
1       0  [(2913, 5.738107681274414), (1583, 5.105199337...
{'1cbd15f8-4021-70b7-c332-5b8938bd4417': ['B0BJ656TGW', 'B07DMW8WXR', 'B096WCPSHC', 'B07FB68DNX', 'B07GP54Q23', 'B00E1TI0WQ'], '3cad2568-b0c1-7000-07b0-b98f8f2c5010': ['B09JC2GTVD', 'B08TLS33KT', 'B07MBD7XQW', 'B00CYVUAA8', 'B07MDHZTTX', 'B07Y29C8BG'], '8c5d7538-90d1-70a0-efb1-64a13ebc70e0': []}


In [13]:
# We don't want to recommend products that have already been bought
for user in recommendations_dict.keys():
    if user in purchases_dict:
        recommendations_dict[user] = [product for product in recommendations_dict[user] if product not in purchases_dict[user]]
        # recommendations_dict[key] - purchases_dict[key]  # Subtract corresponding sets !!!

num_recommendations_per_user -= max_purchases_by_a_user
for user in recommendations_dict.keys():
    recommendations_dict[user] = recommendations_dict[user][:num_recommendations_per_user]

print(recommendations_dict)

{'1cbd15f8-4021-70b7-c332-5b8938bd4417': ['B0BJ656TGW', 'B07DMW8WXR', 'B096WCPSHC', 'B07FB68DNX', 'B07GP54Q23'], '3cad2568-b0c1-7000-07b0-b98f8f2c5010': ['B09JC2GTVD', 'B08TLS33KT', 'B07MBD7XQW', 'B00CYVUAA8', 'B07MDHZTTX'], '8c5d7538-90d1-70a0-efb1-64a13ebc70e0': []}


UPDATING THE RECOMMENDATIONS ON RDS

In [14]:
# connection, cursor = connect_to_rds()
if connection.is_connected() and cursor != None:
    print("Attempting query execution")
    try:       
        delete_existing_recommendations_query = "DELETE FROM Recommendations;" #can also use TRUNCATE TABLE recommendations;
        cursor.execute(delete_existing_recommendations_query)
        connection.commit()
        # _ = cursor.fetchall()
    
    except Error as e:
        print("Error while executing MySQL queries:", e)

Attempting query execution


In [15]:
if connection.is_connected() and cursor != None:
    print("Attempting query execution")
    try:       
        for user in recommendations_dict:
            for recommendation in recommendations_dict[user]:
                insertion_query = f"INSERT INTO Recommendations (userid, product_asin) VALUES ('{user}', '{recommendation}');"
                print(insertion_query)
                cursor.execute(insertion_query)
                connection.commit()
        # _ = cursor.fetchall()
    except Error as e:
        print("Error while executing MySQL queries:", e)

Attempting query execution
INSERT INTO Recommendations (userid, product_asin) VALUES ('1cbd15f8-4021-70b7-c332-5b8938bd4417', 'B0BJ656TGW');
INSERT INTO Recommendations (userid, product_asin) VALUES ('1cbd15f8-4021-70b7-c332-5b8938bd4417', 'B07DMW8WXR');
INSERT INTO Recommendations (userid, product_asin) VALUES ('1cbd15f8-4021-70b7-c332-5b8938bd4417', 'B096WCPSHC');
INSERT INTO Recommendations (userid, product_asin) VALUES ('1cbd15f8-4021-70b7-c332-5b8938bd4417', 'B07FB68DNX');
INSERT INTO Recommendations (userid, product_asin) VALUES ('1cbd15f8-4021-70b7-c332-5b8938bd4417', 'B07GP54Q23');
INSERT INTO Recommendations (userid, product_asin) VALUES ('3cad2568-b0c1-7000-07b0-b98f8f2c5010', 'B09JC2GTVD');
INSERT INTO Recommendations (userid, product_asin) VALUES ('3cad2568-b0c1-7000-07b0-b98f8f2c5010', 'B08TLS33KT');
INSERT INTO Recommendations (userid, product_asin) VALUES ('3cad2568-b0c1-7000-07b0-b98f8f2c5010', 'B07MBD7XQW');
INSERT INTO Recommendations (userid, product_asin) VALUES ('3

In [16]:
close_connection(connection, cursor)
spark.stop()

Connection closed.
