## ALS

Alernative Least Squares

https://medium.com/analytics-vidhya/model-based-recommendation-system-with-matrix-factorization-als-model-and-the-math-behind-fdce8b2ffe6d

In [None]:
# Install the required packages
# be sure you have Java installed on your machine, https://www.java.com/en/download/
! pip install pyspark  
! pip install findspark

! pip install pandas

! pip install numpy
! pip install pymongo
! pip install python-dotenv

In [1]:
# Import the required packages
import pandas as pd
import numpy as np
import json

import findspark
findspark.init()

import pyspark
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

import os
java_home = os.environ.get('JAVA_HOME')
print(java_home)

from dotenv import dotenv_values
import pymongo

None


In [2]:
# clear out the predicted ratings collection

# specify the name of the .env file name 
env_name = "myconfig.env" # following example.env template change to your own .env file name
config = dotenv_values(env_name)

# Azure Cosmos DB for MongoDB
cosmos_conn = config['cosmos_connection_string']
cosmos_client = pymongo.MongoClient(cosmos_conn)

db = cosmos_client["ProductRecommendation"]

# Load actual ratings
collection_actual_rating = db['ActualRating']

# specify the collection to put the predicted ratings in
collection_predicted_rating = db['PredictedRating']

# delete all documents in the collection to clear it out
collection_predicted_rating.delete_many({})

DeleteResult({'ok': 1, 'n': 0}, acknowledged=True)

The data we start with here is a set of ratings that have been generated by users for products they have previously purchased.

In this cell we read the dataset of user-item ratings from the file, Augmented Ratings, into a Pandas DataFrame, selecting only relevant columns (UserId, ProductId, Rating). This DataFrame is then converted into a Spark DataFrame, which is required to run the ALS model.

In [3]:
# Create a Spark session
conf = SparkConf()
conf.set("spark.executor.memory","6g")
conf.set("spark.driver.memory", "6g")
conf.set("spark.driver.cores", "8")
sc = SparkContext.getOrCreate(conf)
spark = SparkSession.builder.getOrCreate()

# Load the data from the json file
full_pd_data = pd.read_json("./data/ratings/AugmentedRating.json")  # TO-DO change the name of this file
# just keep the required columns
pd_data = full_pd_data[['UserId', 'ProductId', 'Rating']]
# convert the data to spark dataframe, required to run the ALS model
data = spark.createDataFrame(pd_data)
# count the number of rows in the data
data.count()


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/07 14:14:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

119454

We next need to split the dataset into training and testing sets with an 80-20 split, ensuring that models are trained on a majority of the data while having a separate subset for evaluation. We then cache both to improve performance by storing them in memory.

In [4]:
# Split to create train (80%) and test (20%) datasets
train, test = data.randomSplit([0.8,0.2],10001)

#cache the train and test datasets
train.cache()
test.cache()

DataFrame[UserId: bigint, ProductId: bigint, Rating: double]

We now need to setup the ALS model, a collaborative filtering technique that will use the existing product ratings given by users over their purchased products to predict for each user the rating they might give to those products.

Hyperparameter tuning is performed through a grid search over a defined parameter space combined with cross-validation to ensure the model's generalizability. The CrossValidator in PySpark automates this process, evaluating the model's performance using RMSE (Root Mean Square Error) metric. RMSE measures the average difference bewteen predicted and actual values in a dataset.

After training, the best model is selected based on its performance, and its hyperparameters are printed for inspection. The model is then used to make predictions on the test set, and the RMSE of these predictions is calculated to assess the model's accuracy.

In [5]:
# we use the cross validator to tune the hyperparameters
als = ALS(
         userCol="UserId", 
         itemCol="ProductId",
         ratingCol="Rating", 
         coldStartStrategy="drop"
)

param_grid = ParamGridBuilder() \
            .addGrid(als.rank, [10, 100]) \
            .addGrid(als.regParam, [.1]) \
            .addGrid(als.maxIter, [10]) \
            .build()

evaluator = RegressionEvaluator(
           metricName="rmse", 
           labelCol="Rating", 
           predictionCol="prediction")

cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=3, parallelism = 6)

In [6]:
# train the model itself
model = cv.fit(train)

# return the best model from those that were trained above
best_model = model.bestModel

24/04/07 14:14:41 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/04/07 14:14:41 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
24/04/07 14:14:41 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
                                                                                

In [7]:
# now take the train data and calculate how well it does predicting the ratings.
prediction = best_model.transform(test)
rmse = evaluator.evaluate(prediction)
print(f'RMSE = {rmse}. This is the average difference between the actual and predicted ratings. Lower values are better.')



RMSE = 0.6452022049222121. This is the average difference between the actual and predicted ratings. Lower values are better.


                                                                                

Gather the vectors for each user and product.

In [8]:
# vector to describe what user attributes are important in making a predication for a rating
user_v = best_model.userFactors.collect()[0].features

# vector to describe what product attributes are important in making a prediction for a rating
item_v = best_model.itemFactors.collect()[0].features

print('User Vector: ' + str(user_v))
print('Product Vector: ' + str(item_v))

User Vector: [0.2528408169746399, -0.047741763293743134, 0.399604469537735, 0.5014708638191223, -0.02424103394150734, 0.004867623560130596, 0.08064773678779602, -0.293194055557251, -0.7540773749351501, -0.3363887667655945, -0.04997250437736511, -0.7089352607727051, 0.06902622431516647, 0.10350262373685837, 0.2637403607368469, -0.4069480001926422, -0.2657521665096283, -0.4132049083709717, -0.30096641182899475, -0.006876070983707905, -0.26336440443992615, 0.9074861407279968, 0.23711714148521423, -0.03726669400930405, 0.7248176336288452, 0.07234722375869751, 0.0077450149692595005, -0.19445198774337769, 0.1358102709054947, -0.270291268825531, -0.20880624651908875, -0.251738578081131, 0.44534462690353394, 0.555341362953186, 0.11355116218328476, -0.2772083580493927, 0.23267744481563568, -0.27411848306655884, -0.1341416835784912, -0.24743595719337463, -0.18482233583927155, 0.23662665486335754, 0.3361949026584625, 0.0602228119969368, 0.3474673330783844, 0.05968641862273216, -0.2937221825122833

 The **dot product** of a user and item vector is computed as an example, showcasing how to predict a user's rating for a specific item based on their latent features.

In [9]:
np.dot((user_v),(item_v))

6.080652576709776

Instead of calculating dot product each time we need a prediction, to make our ecommerce app as fast as possible, we are going to first generate all of the recommendations for all users and all products. Then save these in Azure Cosmos DB for MongoDB

In [11]:
# This line uses the best model obtained from the tuning process to generate recommendations for all users.
val_recommendations = best_model.recommendForAllUsers(10001)

In [15]:
val_recommendations.collect()[0]

Row(UserId=26, recommendations=[Row(ProductId=42, rating=8.003403663635254), Row(ProductId=72, rating=8.00214958190918), Row(ProductId=92, rating=7.087383270263672), Row(ProductId=22, rating=7.025835037231445), Row(ProductId=62, rating=6.746711254119873), Row(ProductId=32, rating=6.671200752258301), Row(ProductId=53, rating=6.631563186645508), Row(ProductId=73, rating=6.580449104309082), Row(ProductId=60, rating=6.4144816398620605), Row(ProductId=28, rating=6.324685096740723), Row(ProductId=12, rating=6.269505977630615), Row(ProductId=65, rating=6.221710205078125), Row(ProductId=83, rating=6.196121692657471), Row(ProductId=43, rating=6.109328269958496), Row(ProductId=45, rating=5.990270614624023), Row(ProductId=55, rating=5.982399940490723), Row(ProductId=15, rating=5.915400505065918), Row(ProductId=10, rating=5.906833648681641), Row(ProductId=82, rating=5.8972649574279785), Row(ProductId=49, rating=5.882152080535889), Row(ProductId=25, rating=5.80623722076416), Row(ProductId=95, ratin

In [17]:
pymongo_cursor = collection_actual_rating.find({})
all_data = list(pymongo_cursor)
df = pd.DataFrame(all_data)


Unnamed: 0,_id,UserId,ProductId,Rating
0,6612e0a616c8f03e15c1ea36,1,49,6.59
1,6612e0a616c8f03e15c1ea37,1,70,6.41
2,6612e0a616c8f03e15c1ea38,1,15,6.71
3,6612e0a616c8f03e15c1ea39,1,60,7.31
4,6612e0a616c8f03e15c1ea3a,1,81,4.47
5,6612e0a616c8f03e15c1ea3b,1,8,6.04
6,6612e0a616c8f03e15c1ea3c,1,12,7.2
7,6612e0a616c8f03e15c1ea3d,1,42,9.1
8,6612e0a616c8f03e15c1ea3e,1,48,5.75
9,6612e0a616c8f03e15c1ea3f,1,68,5.99


In [18]:
rec_sys_final_predictions = []

# This will call all the products users already rated. These will be filtered from the predictions
# before insertion into Azure Cosmos DB.
pymongo_cursor = collection_actual_rating.find({})
all_data = list(pymongo_cursor)
df = pd.DataFrame(all_data)

for user in val_recommendations.collect():
    to_insert = {}
    to_insert['UserId'] = user.UserId
    to_insert['Predictions'] = []
    for x in range(len(user.recommendations)):
        if len(df[(df['UserId'] == user.UserId) & (df['ProductId'] == user.recommendations[x].ProductId)]) == 0:
            to_insert['Predictions'].append({"ProductId": user.recommendations[x].ProductId, "rating": user.recommendations[x].rating})
    rec_sys_final_predictions.append(to_insert)
    print("Collated predictions for:", to_insert['UserId'])

# Insert all predictions into Azure Cosmos DB
collection_predicted_rating.insert_many(rec_sys_final_predictions)

Uploaded predictions for: 26
Uploaded predictions for: 27
Uploaded predictions for: 28
Uploaded predictions for: 31
Uploaded predictions for: 34
Uploaded predictions for: 53
Uploaded predictions for: 65
Uploaded predictions for: 76
Uploaded predictions for: 78
Uploaded predictions for: 81
Uploaded predictions for: 85
Uploaded predictions for: 101
Uploaded predictions for: 108
Uploaded predictions for: 115
Uploaded predictions for: 126
Uploaded predictions for: 133
Uploaded predictions for: 137
Uploaded predictions for: 148
Uploaded predictions for: 155
Uploaded predictions for: 183
Uploaded predictions for: 193
Uploaded predictions for: 210
Uploaded predictions for: 211
Uploaded predictions for: 243
Uploaded predictions for: 251
Uploaded predictions for: 255
Uploaded predictions for: 296
Uploaded predictions for: 300
Uploaded predictions for: 321
Uploaded predictions for: 322
Uploaded predictions for: 332
Uploaded predictions for: 362
Uploaded predictions for: 368
Uploaded predictions 

InsertManyResult([ObjectId('6612e4b0cf41d60aeab90030'), ObjectId('6612e4b0cf41d60aeab90031'), ObjectId('6612e4b0cf41d60aeab90032'), ObjectId('6612e4b0cf41d60aeab90033'), ObjectId('6612e4b0cf41d60aeab90034'), ObjectId('6612e4b0cf41d60aeab90035'), ObjectId('6612e4b0cf41d60aeab90036'), ObjectId('6612e4b0cf41d60aeab90037'), ObjectId('6612e4b0cf41d60aeab90038'), ObjectId('6612e4b0cf41d60aeab90039'), ObjectId('6612e4b0cf41d60aeab9003a'), ObjectId('6612e4b0cf41d60aeab9003b'), ObjectId('6612e4b0cf41d60aeab9003c'), ObjectId('6612e4b0cf41d60aeab9003d'), ObjectId('6612e4b0cf41d60aeab9003e'), ObjectId('6612e4b0cf41d60aeab9003f'), ObjectId('6612e4b0cf41d60aeab90040'), ObjectId('6612e4b0cf41d60aeab90041'), ObjectId('6612e4b0cf41d60aeab90042'), ObjectId('6612e4b0cf41d60aeab90043'), ObjectId('6612e4b0cf41d60aeab90044'), ObjectId('6612e4b0cf41d60aeab90045'), ObjectId('6612e4b0cf41d60aeab90046'), ObjectId('6612e4b0cf41d60aeab90047'), ObjectId('6612e4b0cf41d60aeab90048'), ObjectId('6612e4b0cf41d60aeab900

In [19]:
val_recommendations.show()



+------+--------------------+
|UserId|     recommendations|
+------+--------------------+
|    26|[{42, 8.003404}, ...|
|    27|[{11, 4.5141315},...|
|    28|[{42, 8.752557}, ...|
|    31|[{72, 7.5373335},...|
|    34|[{42, 6.9014306},...|
|    53|[{42, 7.347647}, ...|
|    65|[{42, 8.476885}, ...|
|    76|[{42, 7.3496275},...|
|    78|[{72, 7.279582}, ...|
|    81|[{42, 7.0681143},...|
|    85|[{42, 8.5074415},...|
|   101|[{72, 7.1265635},...|
|   108|[{42, 6.6497765},...|
|   115|[{42, 8.8110695},...|
|   126|[{42, 7.1991296},...|
|   133|[{42, 7.4181867},...|
|   137|[{42, 8.878123}, ...|
|   148|[{72, 7.386739}, ...|
|   155|[{42, 8.548487}, ...|
|   183|[{72, 7.771553}, ...|
+------+--------------------+
only showing top 20 rows



                                                                                

# Verify predictions inserted into Azure Cosmos DB

In [29]:
len(list(collection_predicted_rating.find({})))


10000

24/04/07 23:28:04 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 983543 ms exceeds timeout 120000 ms
24/04/07 23:28:04 WARN SparkContext: Killing executors is not supported by current scheduler.
24/04/07 23:28:11 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$