In [2]:
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

import sys
import findspark
findspark.init()
import pyspark
from pyspark.ml.recommendation import ALS
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import StringType, FloatType, IntegerType, LongType


from recommenders.utils.timer import Timer
from recommenders.datasets import movielens
from recommenders.utils.notebook_utils import is_jupyter
from recommenders.datasets.spark_splitters import spark_random_split
from recommenders.eval.spark_eval import SparkRatingEvaluation, SparkRankingEvaluation

from recommenders.utils.spark_utils import start_or_get_spark
from recommenders.utils.notebook_utils import store_metadata

print(f"System version: {sys.version}")
print("Spark version: {}".format(pyspark.__version__))

System version: 3.11.11 (main, Dec 11 2024, 16:28:39) [GCC 11.2.0]
Spark version: 3.5.5


In [1]:
import sys
import os

# Automatically add the parent directory to sys.path
BASE_DIR = os.path.abspath(os.path.join(os.getcwd(), ".."))  # or any path that contains "recommenders"
if BASE_DIR not in sys.path:
    sys.path.append(BASE_DIR)

In [3]:
# top k items to recommend
TOP_K = 10

# Select MovieLens data size: 100k, 1m, 10m, or 20m
MOVIELENS_DATA_SIZE = '100k'

# Column names for the dataset
COL_USER = "UserId"
COL_ITEM = "MovieId"
COL_RATING = "Rating"
COL_TIMESTAMP = "Timestamp"

In [4]:

# the following settings work well for debugging locally on VM - change when running on a cluster
# set up a giant single executor with many threads and specify memory cap
spark = start_or_get_spark("ALS PySpark", memory="16g")
spark.conf.set("spark.sql.analyzer.failAmbiguousSelfJoin", "false")

25/05/01 23:56:31 WARN Utils: Your hostname, codespaces-be3971 resolves to a loopback address: 127.0.0.1; using 10.0.0.151 instead (on interface eth0)
25/05/01 23:56:31 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/01 23:56:32 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
# Note: The DataFrame-based API for ALS currently only supports integers for user and item ids.
schema = StructType(
    (
        StructField(COL_USER, IntegerType()),
        StructField(COL_ITEM, IntegerType()),
        StructField(COL_RATING, FloatType()),
        StructField(COL_TIMESTAMP, LongType()),
    )
)

data = movielens.load_spark_df(spark, size=MOVIELENS_DATA_SIZE, schema=schema)
data.show()

100%|██████████| 4.81k/4.81k [00:00<00:00, 13.3kKB/s]
                                                                                

+------+-------+------+---------+
|UserId|MovieId|Rating|Timestamp|
+------+-------+------+---------+
|   196|    242|   3.0|881250949|
|   186|    302|   3.0|891717742|
|    22|    377|   1.0|878887116|
|   244|     51|   2.0|880606923|
|   166|    346|   1.0|886397596|
|   298|    474|   4.0|884182806|
|   115|    265|   2.0|881171488|
|   253|    465|   5.0|891628467|
|   305|    451|   3.0|886324817|
|     6|     86|   3.0|883603013|
|    62|    257|   2.0|879372434|
|   286|   1014|   5.0|879781125|
|   200|    222|   5.0|876042340|
|   210|     40|   3.0|891035994|
|   224|     29|   3.0|888104457|
|   303|    785|   3.0|879485318|
|   122|    387|   5.0|879270459|
|   194|    274|   2.0|879539794|
|   291|   1042|   4.0|874834944|
|   234|   1184|   2.0|892079237|
+------+-------+------+---------+
only showing top 20 rows



In [6]:
train, test = spark_random_split(data, ratio=0.75, seed=123)
print ("N train", train.cache().count())
print ("N test", test.cache().count())

25/05/01 23:56:46 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
                                                                                

N train 75018
N test 24982


In [7]:
header = {
    "userCol": COL_USER,
    "itemCol": COL_ITEM,
    "ratingCol": COL_RATING,
}


als = ALS(
    rank=10,
    maxIter=15,
    implicitPrefs=False,
    regParam=0.05,
    coldStartStrategy='drop',
    nonnegative=False,
    seed=42,
    **header
)

In [8]:
with Timer() as train_time:
    model = als.fit(train)

print("Took {} seconds for training.".format(train_time.interval))

25/05/01 23:56:55 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
25/05/01 23:56:55 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
25/05/01 23:56:55 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK


Took 10.252358210998864 seconds for training.


In [9]:
with Timer() as test_time:

    # Get the cross join of all user-item pairs and score them.
    users = train.select(COL_USER).distinct()
    items = train.select(COL_ITEM).distinct()
    user_item = users.crossJoin(items)
    dfs_pred = model.transform(user_item)

    # Remove seen items.
    dfs_pred_exclude_train = dfs_pred.alias("pred").join(
        train.alias("train"),
        (dfs_pred[COL_USER] == train[COL_USER]) & (dfs_pred[COL_ITEM] == train[COL_ITEM]),
        how='outer'
    )

    top_all = dfs_pred_exclude_train.filter(dfs_pred_exclude_train[f"train.{COL_RATING}"].isNull()) \
        .select('pred.' + COL_USER, 'pred.' + COL_ITEM, 'pred.' + "prediction")

    # In Spark, transformations are lazy evaluation
    # Use an action to force execute and measure the test time 
    top_all.cache().count()

print("Took {} seconds for prediction.".format(test_time.interval))

25/05/01 23:57:17 WARN Column: Constructing trivially true equals predicate, 'UserId#0 = UserId#0'. Perhaps you need to use aliases.

Took 12.856408866000493 seconds for prediction.


                                                                                

In [10]:
top_all.show()

+------+-------+----------+
|UserId|MovieId|prediction|
+------+-------+----------+
|     1|    587| 4.1602826|
|     1|    869| 2.7732866|
|     1|   1208| 2.0333834|
|     1|   1348| 1.0019258|
|     1|   1357| 0.9430025|
|     1|   1677| 2.8777318|
|     2|     80|  2.351385|
|     2|    472| 2.5865324|
|     2|    582| 3.9548614|
|     2|    838| 0.9482964|
|     2|    975| 3.1133537|
|     2|   1260| 1.9871742|
|     2|   1325| 1.2368056|
|     2|   1381| 3.5477588|
|     2|   1530| 2.0882902|
|     3|     22| 3.1524534|
|     3|     57| 3.6980166|
|     3|     89| 3.9733818|
|     3|    367| 3.6629043|
|     3|   1091| 0.9144471|
+------+-------+----------+
only showing top 20 rows



In [11]:
rank_eval = SparkRankingEvaluation(test, top_all, k = TOP_K, col_user=COL_USER, col_item=COL_ITEM, 
                                    col_rating=COL_RATING, col_prediction="prediction", 
                                    relevancy_method="top_k")

                                                                                

In [12]:
print("Model:\tALS",
      "Top K:\t%d" % rank_eval.k,
      "MAP:\t%f" % rank_eval.map_at_k(),
      "NDCG:\t%f" % rank_eval.ndcg_at_k(),
      "Precision@K:\t%f" % rank_eval.precision_at_k(),
      "Recall@K:\t%f" % rank_eval.recall_at_k(), sep='\n')

[Stage 182:>                                                        (0 + 2) / 2]

Model:	ALS
Top K:	10
MAP:	0.020018
NDCG:	0.051718
Precision@K:	0.051274
Recall@K:	0.018840


                                                                                

In [13]:
# Generate predicted ratings.
prediction = model.transform(test)
prediction.cache().show()



+------+-------+------+---------+----------+
|UserId|MovieId|Rating|Timestamp|prediction|
+------+-------+------+---------+----------+
|   148|     69|   5.0|877019101| 3.4735334|
|   148|     70|   5.0|877021271| 3.6578934|
|   148|     78|   1.0|877399018| 1.2749909|
|   148|    127|   1.0|877399351| 3.7908807|
|   148|    169|   5.0|877020297| 5.0153956|
|   148|    172|   5.0|877016513| 4.9947286|
|   148|    175|   4.0|877016259| 4.4411917|
|   148|    181|   5.0|877399135|  5.302301|
|   148|    190|   2.0|877398586| 4.9187717|
|   148|    194|   5.0|877015066| 4.1845074|
|   148|    214|   5.0|877019882|  4.658432|
|   148|    238|   4.0|877398586| 3.6696792|
|   148|    474|   5.0|877019882|  4.647562|
|   148|    496|   3.0|877015066|  4.130228|
|   148|    507|   5.0|877398587| 2.8675237|
|   148|    529|   5.0|877398901|  4.845461|
|   148|    588|   4.0|877399018| 3.1328173|
|   148|    713|   3.0|877021535|  4.235163|
|   148|    969|   4.0|877398513|  5.099872|
|   148|  

                                                                                

In [14]:
rating_eval = SparkRatingEvaluation(test, prediction, col_user=COL_USER, col_item=COL_ITEM, 
                                    col_rating=COL_RATING, col_prediction="prediction")

print("Model:\tALS rating prediction",
      "RMSE:\t%f" % rating_eval.rmse(),
      "MAE:\t%f" % rating_eval.mae(),
      "Explained variance:\t%f" % rating_eval.exp_var(),
      "R squared:\t%f" % rating_eval.rsquared(), sep='\n')

                                                                                

Model:	ALS rating prediction
RMSE:	0.967434
MAE:	0.753340
Explained variance:	0.265916
R squared:	0.259532


                                                                                

In [15]:
# Record results for tests - ignore this cell
if is_jupyter():
    store_metadata("map", rank_eval.map_at_k())
    store_metadata("ndcg", rank_eval.ndcg_at_k())
    store_metadata("precision", rank_eval.precision_at_k())
    store_metadata("recall", rank_eval.recall_at_k())
    store_metadata("rmse", rating_eval.rmse())
    store_metadata("mae", rating_eval.mae())
    store_metadata("exp_var", rating_eval.exp_var())
    store_metadata("rsquared", rating_eval.rsquared())
    store_metadata("train_time", train_time.interval)
    store_metadata("test_time", test_time.interval)

                                                                                

                                                                                

                                                                                

                                                                                

                                                                                

In [16]:

# cleanup spark instance
spark.stop()