In [51]:
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
from pyspark import SparkContext

# Increase Memory if data is big
# SparkContext.setSystemProperty('spark.driver.memory', '10g')
# SparkContext.setSystemProperty('spark.executor.memory', '10g')

# Start Spark Context
sc = SparkContext("local", "MF mllib implementation") # local machine

# Load data
data = sc.textFile("small_ratings.csv")
header = data.first() # extract header
data = data.filter(lambda row: row != header) # filter out header

# Convert the rows into Rating objectS - ((user, movie), rating) format
ratings = data.map(lambda row: row.split(',')).map(lambda row: Rating(int(row[0]), int(row[1]), float(row[2])))

# Split into Train and Test set
train, test = ratings.randomSplit([0.8, 0.2])

# Train and Evaluate accuracy for different values of params 
for rank in [10,15]:
  for iterations in [10, 15]:
    print(f"{rank} latent features - {iterations} Epochs")
    # Define ALS Model
    model = ALS.train(train, rank, iterations)
    # Evaluate the model
    x = train.map(lambda p: (p[0], p[1]))                             # convert data into ((user,movie),rating) format
    p = model.predictAll(x).map(lambda r: ((r[0], r[1]), r[2]))       # make predictions
    ratesAndPreds = train.map(lambda r: ((r[0], r[1]), r[2])).join(p) # actual and predicited rating values
    mse = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()  # calculate mean squared error
    print("\nTrain MSE: %s" % mse)

    # Repeat the same for test data
    x = test.map(lambda p: (p[0], p[1]))
    p = model.predictAll(x).map(lambda r: ((r[0], r[1]), r[2]))
    ratesAndPreds = test.map(lambda r: ((r[0], r[1]), r[2])).join(p)
    mse = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
    print("Test MSE: %s\n" % mse)
sc.stop()

10 latent features - 10 Epochs

Train MSE: 0.5086526541757123
Test MSE: 0.5444256163426991

10 latent features - 15 Epochs

Train MSE: 0.5048889300331068
Test MSE: 0.5406031146346947

15 latent features - 10 Epochs

Train MSE: 0.4753186994795162
Test MSE: 0.5277953325266466

15 latent features - 15 Epochs

Train MSE: 0.47338385709648656
Test MSE: 0.5261933265332943



In [None]:
# Save and load the model
myModelPath = 'als_recommender'
model.save(sc, myModelPath)

#### Create training script

In [1]:
%%writefile train_recommender.py
# Load required packages
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
from pyspark import SparkContext
import argparse
import os
import shutil

# Create argument parser
parser = argparse.ArgumentParser()
# Add arguments
parser.add_argument('--filename', type = str, default = 'small_ratings.csv')
parser.add_argument('--rank', type = int, default = 15)
parser.add_argument('--epochs', type = int, default = 15)
parser.add_argument('--output_path', type = str, default = 'als_recommender')
# Parse arguments 
args = parser.parse_args()

# Increase Memory if data is big
# SparkContext.setSystemProperty('spark.driver.memory', '10g')
# SparkContext.setSystemProperty('spark.executor.memory', '10g')

# Start Spark Context
sc = SparkContext("local", "MF mllib implementation") # local machine

# Load data
data = sc.textFile(args.filename)
header = data.first() # extract header
data = data.filter(lambda row: row != header) # filter out header

# Convert the rows into Rating objectS - ((user, movie), rating) format
ratings = data.map(lambda row: row.split(',')).map(lambda row: Rating(int(row[0]), int(row[1]), float(row[2])))

# Split into Train and Test set
train, test = ratings.randomSplit([0.8, 0.2])

# Create ALS model
model = ALS.train(train, args.rank, args.epochs)
# Evaluate the model
x = train.map(lambda p: (p[0], p[1]))                             # convert data into ((user,movie),rating) format
p = model.predictAll(x).map(lambda r: ((r[0], r[1]), r[2]))       # make predictions
ratesAndPreds = train.map(lambda r: ((r[0], r[1]), r[2])).join(p) # actual and predicited rating values
mse = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()  # calculate mean squared error
print("Train MSE: %s" % mse)

# Repeat the same for test data
x = test.map(lambda p: (p[0], p[1]))
p = model.predictAll(x).map(lambda r: ((r[0], r[1]), r[2]))
ratesAndPreds = test.map(lambda r: ((r[0], r[1]), r[2])).join(p)
mse = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
print("Test MSE: %s" % mse)

if os.path.isdir(args.output_path):
  shutil.rmtree(args.output_path)
model.save(sc, args.output_path)

Writing train_recommender.py


In [37]:
!python train_recommender.py --rank 15 --epochs 15 --output_path 'recommender_15_!5'

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/02/27 08:43:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/02/27 08:43:02 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/02/27 08:43:12 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 2 (TID 2): Attempting to kill Python Worker
23/02/27 08:43:16 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 3 (TID 3): Attempting to kill Python Worker
Train MSE: 0.4763550563982868
Test MSE: 0.5273373229301271


In [2]:
%%writefile recommend.py 
from pyspark.mllib.recommendation import  MatrixFactorizationModel
from pyspark import SparkContext
import argparse

# Create argument parser, add and parse arguments
parser = argparse.ArgumentParser()
parser.add_argument('--model_path', type = str, help = 'path of model artifact')
parser.add_argument('--user_id', type = int , default = 90, help = 'id of the currect user')
parser.add_argument('--numRecs', type = int , default = 10, help = 'number of recommendations')
args = parser.parse_args()
userId = args.user_id
numRecs = args.numRecs  # number of movies to recommend

# Start Spark Context
sc = SparkContext("local", "MF Recommender from saved model") # local machine

# Load the model
savedModel = MatrixFactorizationModel.load(sc, args.model_path)

# Generate top recommendations for a user
topRecommendations = savedModel.recommendProducts(userId, numRecs)
print("\n\nTop recommendations for user " + str(userId) + ":")
for recommendation in topRecommendations:
    print(recommendation.product, recommendation.rating)

Writing recommend.py


In [47]:
!python recommend.py --model_path '/content/recommender_15_!5' --user_id 90 --numRecs 15

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/02/27 08:53:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/02/27 08:53:13 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/02/27 08:53:25 WARN MatrixFactorizationModel: User factor does not have a partitioner. Prediction on individual records could be slow.
23/02/27 08:53:25 WARN MatrixFactorizationModel: User factor is not cached. Prediction could be slow.
23/02/27 08:53:25 WARN MatrixFactorizationModel: Product factor does not have a partitioner. Prediction on individual records could be slow.
23/02/27 08:53:25 WARN MatrixFactorizationModel: Product factor is not cached. Prediction could be slow.
23/02/27 08:53:25 WARN MatrixFactorizationModelWrapper: User factor does not have a partitioner. Prediction on individual records could be slo