In [1]:
# Set the PySpark environment variables
import os
os.environ['SPARK_HOME'] = "/Users/joey/Downloads/Spark"
os.environ['PYSPARK_DRIVER_PYTHON'] = 'jupyter'
os.environ['PYSPARK_DRIVER_PYTHON_OPTS'] = 'lab'
os.environ['PYSPARK_PYTHON'] = 'python'

In [2]:
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import mean, col, when
# Regression Evaluator  is used to evaluate the performance of the recommendation model. 
# It assesses how well the model's predicted ratings align with the actual ratings in the test dataset.

In [3]:
# Because of how big the 'ratings.csv' file is, I configured the spark driver memory  to allocate 4gb
SparkSession.builder.appName("PrismDeck").config("spark.driver.memory", "4g").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/01/17 08:41:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
# Creating a SparkSession
spark = SparkSession.builder.appName("PrismDeck").getOrCreate()

In [5]:
# Read CSV file into a DataFrame
ratings_file_path = "/Users/joey/Downloads/ml-25m/ratings.csv"

In [6]:
# Load CSV file
ratings = spark.read.csv('/Users/joey/Downloads/ml-25m/ratings.csv', header=True, inferSchema=True)

24/01/17 08:41:22 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
                                                                                

In [7]:
# Load a sample of the ratings data; 
# Specifying the fraction of the data to sample (10% in this case)
ratings_sample = ratings.sample(fraction=0.1, seed=123)

In [8]:
# Display the first 10 rows of the 'ratings' DataFrame
ratings.show(10)

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|    296|   5.0|1147880044|
|     1|    306|   3.5|1147868817|
|     1|    307|   5.0|1147868828|
|     1|    665|   5.0|1147878820|
|     1|    899|   3.5|1147868510|
|     1|   1088|   4.0|1147868495|
|     1|   1175|   3.5|1147868826|
|     1|   1217|   3.5|1147878326|
|     1|   1237|   5.0|1147868839|
|     1|   1250|   4.0|1147868414|
+------+-------+------+----------+
only showing top 10 rows



In [9]:
# ALS Model Setup
# ALS (Alternating Least Squares) is a collaborative filtering algorithm used for recommendation.
# 'userCol' specifies the column name in the DataFrame representing user IDs.
# In collaborative filtering, ALS predicts the ratings of items for users based on their preferences.
# 'itemCol' specifies the column name in the DataFrame representing item (movie) IDs.
# This is the identifier for the items that users might rate or interact with.
# 'ratingCol' specifies the column name in the DataFrame representing the actual ratings given by users.
# The algorithm will try to predict ratings in this column.
# 'coldStartStrategy' is a strategy for handling new users or items not present in the training dataset.
# 'drop' is a strategy that drops any rows in the DataFrame of predictions that contain NaN values.


In [10]:
# The configured ALS model will be used for training and making predictions.
als = ALS(
    userCol='userId',
    itemCol='movieId',
    ratingCol='rating',
    coldStartStrategy='drop'
)

In [11]:
# Split the data into training and testing sets
# 'combined_df' contains my feature-rich data for building the recommendation model.
# 'randomsplit' divides the data into two sets: Training data and Testing Data
# List inside randomsplit represents the split: 80% = Training data, 20% = Testing data
# seed parameter ensures reproducibility. If you use the same seed, you'll get the same split each time you run the code.
# This helps with the consistency in model evaluation.
# Tuple contains the two split DataFrames

(train_data, test_data) = ratings.randomSplit([0.8, 0.2], seed=123)

In [12]:
# Persist the DataFrames
# Which means the code following is storing or saving some DataFrames in a way that makes future operations on that data faster.
# persistence can happen in the computer's memory (RAM), on disk, or a combination of both.

train_data.persist()
test_data.persist()

DataFrame[userId: int, movieId: int, rating: double, timestamp: int]

In [13]:
# Repartition the DataFrames
# partitioning can improve parallelism and performance.
# Depending on the size of 'ratings.csv' data and the available memory, repartitioning can help in managing memory more efficiently.
# It prevents out-of-memory errors or excessive memory usage

train_data = train_data.repartition(10)  # Adjust the number of partitions
test_data = test_data.repartition(10)

In [14]:
# Train the ALS model on the 'Training data'
# Training data DataFrame includes the user-item interactions (user ratings)
# 'fit' trains the ALS model on the training data
# It learns the underlying patterns and relationships in the data to make predictions.

model = als.fit(train_data)

24/01/17 08:42:33 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/01/17 08:42:33 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
24/01/17 08:42:33 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
                                                                                

In [15]:
# Make predictions on the test data
predictions = model.transform(test_data)

In [16]:
# Evaluate the model using a regression evaluator
# - metricName: 'rmse' indicates that the evaluation metric is Root Mean Squared Error (RMSE)
# - labelCol: 'rating' specifies the column in the DataFrame containing the actual ratings
# - predictionCol: 'prediction' specifies the column in the DataFrame containing the predicted ratings

evaluator = RegressionEvaluator(
    metricName='rmse',
    labelCol='rating',
    predictionCol='prediction'
)
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE) on test data: {rmse}")


[Stage 147:>                                                        (0 + 8) / 8]

Root Mean Squared Error (RMSE) on test data: 0.8036803070756547


                                                                                

In [17]:
# Generate top 5 movie recommendations for each user
user_recommendations = model.recommendForAllUsers(10)
# Generate top 5 user recommendations for each movie
movie_recommendations = model.recommendForAllItems(10)

# Generate top 5 movie recommendations for a specified set of users
selected_users = ratings.select(als.getUserCol()).distinct().limit(5)
user_subset_recommendations = model.recommendForUserSubset(selected_users, 10)

# Generate top 5 user recommendations for a specified set of movies
selected_movies = ratings.select(als.getItemCol()).distinct().limit(5)
movie_subset_recommendations = model.recommendForItemSubset(selected_movies, 10)

# Display the generated recommendations
user_recommendations.show(truncate=False)
movie_recommendations.show(truncate=False)
user_subset_recommendations.show(truncate=False)
movie_subset_recommendations.show(truncate=False)

                                                                                

+------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|userId|recommendations                                                                                                                                                                                                |
+------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|26    |[{127252, 5.799946}, {203882, 5.4500513}, {203086, 5.3906593}, {144202, 5.3087993}, {196787, 5.1712666}, {194434, 4.973962}, {126941, 4.940329}, {149484, 4.90486}, {166812, 4.8869233}, {151615, 4.8856316}]  |
|27    |[{127252, 6.05012}, {175625, 5.850381}, {194334, 5.844524}, {54758, 5.650063}, {179985, 5.6348896}, {203086, 5.6253676}, {18

                                                                                

+-------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|movieId|recommendations                                                                                                                                                                                            |
+-------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|12     |[{87426, 5.4314575}, {149507, 4.750429}, {129669, 4.7246842}, {8583, 4.7114887}, {142811, 4.652433}, {93649, 4.6407547}, {159335, 4.629198}, {85938, 4.599441}, {10861, 4.568414}, {110061, 4.5407877}]    |
|26     |[{105130, 4.9823847}, {105801, 4.941435}, {96471, 4.9353614}, {66365, 4.934801}, {4395, 4.926381}, {112919, 4.90449}, {19143, 4.9015417

                                                                                

+------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|userId|recommendations                                                                                                                                                                                                |
+------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|471   |[{127252, 5.9139094}, {203086, 5.5853567}, {192089, 5.3456798}, {203882, 5.283688}, {156414, 5.197998}, {151615, 5.191771}, {198841, 5.1819134}, {196717, 5.1694903}, {194434, 5.16531}, {187951, 5.1023607}]  |
|463   |[{127252, 7.1203847}, {193257, 6.704946}, {190707, 6.5830116}, {203086, 6.5639505}, {203882, 6.5409117}, {196787, 6.518819},



+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|movieId|recommendations                                                                                                                                                                                             |
+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|1580   |[{87426, 5.6314783}, {105130, 5.3395433}, {149507, 5.287355}, {18230, 5.2767253}, {61667, 5.2740593}, {156252, 5.2701845}, {156318, 5.2649546}, {91940, 5.233714}, {105946, 5.212908}, {129440, 5.2123775}] |
|44022  |[{87426, 5.555072}, {52924, 5.28932}, {71207, 5.2047014}, {127635, 5.057534}, {54382, 5.047116}, {96471, 5.044008}, {45953, 5.04363

                                                                                

In [19]:
model.save('/Users/joey/Downloads/model_training')

                                                                                

In [None]:
spark.stop()