# 1. Load data and model

In [6]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
import pyspark
import pyspark.sql  as pyspark_sql
import pyspark.sql.types as pyspark_types
import pyspark.sql.functions  as pyspark_functions
from pyspark import SparkContext, SparkConf

Collecting pyspark
  Downloading pyspark-3.5.3.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m4.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.3-py2.py3-none-any.whl size=317840625 sha256=60726dd7dcf462d13e9654f1b2cee19f2bffc3c72f4362108bb36dde10dc9973
  Stored in directory: /root/.cache/pip/wheels/1b/3a/92/28b93e2fbfdbb07509ca4d6f50c5e407f48dce4ddbda69a4ab
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.3
The following additional packages will be installed:
  libxtst6 openjdk-8-jre-headless
Suggested packages:
  openjdk-8-demo openjdk-8-source libnss-mdns fonts-dejavu-extra fonts-nanum fonts-ipafont-gothic
  fonts-ipafont-mincho fonts-wqy-microhei fonts-wqy-zenhei fonts-indic

In [7]:
conf = SparkConf().set("spark.ui.port", "4050")

# create the context
sc = pyspark.SparkContext(conf=conf)
spark = pyspark_sql.SparkSession.builder.getOrCreate()

In [9]:
!pip install recommenders

Collecting recommenders
  Downloading recommenders-1.2.0-py3-none-any.whl.metadata (13 kB)
Collecting category-encoders<3,>=2.6.0 (from recommenders)
  Downloading category_encoders-2.6.4-py2.py3-none-any.whl.metadata (8.0 kB)
Collecting cornac<2,>=1.15.2 (from recommenders)
  Downloading cornac-1.18.0-cp310-cp310-manylinux1_x86_64.whl.metadata (23 kB)
Collecting lightfm<2,>=1.17 (from recommenders)
  Downloading lightfm-1.17.tar.gz (316 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.4/316.4 kB[0m [31m7.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting locust<3,>=2.12.2 (from recommenders)
  Downloading locust-2.31.8-py3-none-any.whl.metadata (7.7 kB)
Collecting memory-profiler<1,>=0.61.0 (from recommenders)
  Downloading memory_profiler-0.61.0-py3-none-any.whl.metadata (20 kB)
Collecting notebook<8,>=7.0.0 (from recommenders)
  Downloading notebook-7.2.2-py3-none-any.whl.metadata (10 kB)
Collecting retrying

In [10]:
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

import sys
import pyspark
from pyspark.ml.recommendation import ALS
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import StringType, FloatType, IntegerType, LongType
from pyspark.sql.functions import col

from recommenders.utils.timer import Timer
# from recommenders.datasets import movielens # This line caused the error
from recommenders.utils.notebook_utils import is_jupyter
from recommenders.datasets.spark_splitters import spark_random_split
from recommenders.evaluation.spark_evaluation import SparkRatingEvaluation, SparkRankingEvaluation
from recommenders.utils.spark_utils import start_or_get_spark
from recommenders.utils.notebook_utils import store_metadata

print(f"System version: {sys.version}")
print("Spark version: {}".format(pyspark.__version__))

System version: 3.10.12 (main, Sep 11 2024, 15:47:36) [GCC 11.4.0]
Spark version: 3.5.3


# 2. Split the data using the Spark random splitter

In [56]:
train, test = spark_random_split(data, ratio=0.75, seed=123)
print ("N train", train.cache().count())
print ("N test", test.cache().count())

N train 749899
N test 250310


# 3. Train the ALS model

In [57]:
header = {
    "userCol": COL_USER,
    "itemCol": COL_ITEM,
    "ratingCol": COL_RATING,
}


als = ALS(
    rank=10,
    maxIter=15,
    implicitPrefs=False,
    regParam=0.05,
    coldStartStrategy='drop',
    nonnegative=False,
    seed=42,
    **header
)

In [58]:
with Timer() as train_time:
    model = als.fit(train)

print("Took {} seconds for training.".format(train_time.interval))

Took 30.49780547399996 seconds for training.


In [59]:
with Timer() as test_time:

    # Get the cross join of all user-item pairs and score them.
    users = train.select(COL_USER).distinct()
    items = train.select(COL_ITEM).distinct()
    user_item = users.crossJoin(items)
    dfs_pred = model.transform(user_item)

    # Remove seen items.
    dfs_pred_exclude_train = dfs_pred.alias("pred").join(
        train.alias("train"),
        (dfs_pred[COL_USER] == train[COL_USER]) & (dfs_pred[COL_ITEM] == train[COL_ITEM]),
        how='outer'
    )

    top_all = dfs_pred_exclude_train.filter(dfs_pred_exclude_train[f"train.{COL_RATING}"].isNull()) \
        .select('pred.' + COL_USER, 'pred.' + COL_ITEM, 'pred.' + "prediction")

    # In Spark, transformations are lazy evaluation
    # Use an action to force execute and measure the test time
    top_all.cache().count()

print("Took {} seconds for prediction.".format(test_time.interval))

Took 94.46163341600004 seconds for prediction.


In [60]:
top_all.show()

+------+-------+----------+
|UserId|MovieId|prediction|
+------+-------+----------+
|     1|    587|  4.231573|
|     1|    869| 3.2675095|
|     1|   1208| 3.7568426|
|     1|   1348| 3.8881567|
|     1|   1357| 3.7257655|
|     1|   1677|  3.529725|
|     1|   1702|  2.806115|
|     1|   1720| 1.4069778|
|     1|   1892| 3.8566494|
|     1|   2086| 3.5733469|
|     1|   2202| 3.6929028|
|     1|   2324|  4.193265|
|     1|   2483| 1.8657755|
|     1|   2545|  4.628024|
|     1|   2667| 2.2472289|
|     1|   2870| 3.7266073|
|     1|   3304| 3.0608695|
|     1|   3452| 2.9702458|
|     1|   3468| 4.1978874|
|     1|   3477| 2.5803072|
+------+-------+----------+
only showing top 20 rows



In [64]:
top_recommendations = dfs_pred.groupBy("UserID") \
    .agg(F.collect_list(F.struct("MovieID", "prediction")).alias("recommendations")) \
    .withColumn("recommendations", F.expr(f"slice(recommendations, 1, {TOP_K})")) \
    .select("UserID", "recommendations")

top_recommendations.show(truncate=False)

+------+-----------------------------------------------------------------------------------------------+
|UserID|recommendations                                                                                |
+------+-----------------------------------------------------------------------------------------------+
|12    |[{3403, 2.4361722}, {3203, 3.9045157}, {3795, 2.0347874}, {2430, 3.4136176}, {2697, 2.4681168}]|
|22    |[{3203, 3.2323434}, {299, 2.9920409}, {2430, 2.2762494}, {1250, 3.433843}, {753, 2.6781292}]   |
|26    |[{1580, 3.1775737}, {299, 3.6808794}, {2366, 2.2891934}, {1250, 3.255144}, {1959, 3.4794157}]  |
|27    |[{519, 0.5744513}, {3403, 1.8615115}, {1942, 3.7517517}, {3795, 3.1788442}, {2181, 2.5507228}] |
|28    |[{1406, 2.8047717}, {2671, 2.8821023}, {1123, 3.4905834}, {293, 3.8796809}, {3687, 1.1582792}] |
|31    |[{3716, 2.6835275}, {3403, 2.8930357}, {2868, 1.7162561}, {3795, 2.8103352}, {1153, 2.8175373}]|
|34    |[{2040, 3.15895}, {2935, 2.9862275}, {1783, 2.8

# 4. Evaluate

In [65]:
rank_eval = SparkRankingEvaluation(test, top_all, k = TOP_K, col_user=COL_USER, col_item=COL_ITEM,
                                    col_rating=COL_RATING, col_prediction="prediction",
                                    relevancy_method="top_k")

In [66]:
print("Model:\tALS",
      "Top K:\t%d" % rank_eval.k,
      "MAP:\t%f" % rank_eval.map_at_k(),
      "NDCG:\t%f" % rank_eval.ndcg_at_k(),
      "Precision@K:\t%f" % rank_eval.precision_at_k(),
      "Recall@K:\t%f" % rank_eval.recall_at_k(), sep='\n')

Model:	ALS
Top K:	5
MAP:	0.006823
NDCG:	0.016373
Precision@K:	0.020762
Recall@K:	0.003265


# 5. Evaluate rating prediction

In [67]:
# Generate predicted ratings.
prediction = model.transform(test)
prediction.cache().show()

+------+-------+------+---------+----------+
|UserId|MovieId|Rating|Timestamp|prediction|
+------+-------+------+---------+----------+
|   148|      1|   5.0|977335193| 4.3743463|
|   148|      2|   5.0|979578366|  3.746474|
|   148|     11|   5.0|977334939|  4.149197|
|   148|     50|   2.0|979577217| 3.8229523|
|   148|     60|   3.0|979578136| 3.4684987|
|   148|    100|   2.0|977353410| 3.4215586|
|   148|    110|   5.0|977333311|  4.329249|
|   148|    150|   5.0|977352714|  4.372152|
|   148|    165|   3.0|977333786| 4.0510697|
|   148|    168|   3.0|977334225|  4.090685|
|   148|    231|   3.0|979578136| 3.5014732|
|   148|    239|   4.0|979578936| 3.5418477|
|   148|    257|   4.0|979578268|  3.466506|
|   148|    258|   3.0|977354311|  3.529729|
|   148|    316|   5.0|977333666|  4.043011|
|   148|    318|   5.0|977352608| 4.3612757|
|   148|    329|   3.0|977354207|   3.97779|
|   148|    339|   4.0|977335069| 3.9191005|
|   148|    364|   5.0|979577657| 4.3447757|
|   148|  

In [68]:
rating_eval = SparkRatingEvaluation(test, prediction, col_user=COL_USER, col_item=COL_ITEM,
                                    col_rating=COL_RATING, col_prediction="prediction")

print("Model:\tALS rating prediction",
      "RMSE:\t%f" % rating_eval.rmse(),
      "MAE:\t%f" % rating_eval.mae(),
      "Explained variance:\t%f" % rating_eval.exp_var(),
      "R squared:\t%f" % rating_eval.rsquared(), sep='\n')

Model:	ALS rating prediction
RMSE:	0.857651
MAE:	0.676218
Explained variance:	0.414675
R squared:	0.409071
