# Importing Libraries and Modules

In [1]:
%load_ext autoreload
%autoreload 2

import sys
import pyspark
from pyspark.ml.recommendation import ALS
import pyspark.sql.functions as F
from pyspark.sql.types import FloatType, IntegerType, LongType, StructType, StructField
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from pyspark.ml.feature import HashingTF, CountVectorizer, VectorAssembler
import warnings

from recommenders.utils.timer import Timer
from recommenders.datasets import movielens
from recommenders.datasets.spark_splitters import spark_random_split
from recommenders.evaluation.spark_evaluation import SparkDiversityEvaluation
from recommenders.utils.spark_utils import start_or_get_spark

from pyspark.sql.window import Window
import pyspark.sql.functions as F
import numpy as np
import pandas as pd

warnings.simplefilter(action='ignore', category=FutureWarning)
print("System version: {}".format(sys.version))
print("Spark version: {}".format(pyspark.__version__))



System version: 3.10.13 (main, Sep 11 2023, 08:16:02) [Clang 14.0.6 ]
Spark version: 3.5.0


## Spark Setup

In [2]:
# top k items to recommend
TOP_K = 10
# user, item column names
COL_USER="UserId"
COL_ITEM="MovieId"
COL_RATING="Rating"
COL_TITLE="Title"
COL_GENRE="Genre"

In [3]:
# setting up spark
spark = start_or_get_spark("ALS PySpark", memory="16g")
spark.conf.set("spark.sql.analyzer.failAmbiguousSelfJoin", "false")
spark.conf.set("spark.sql.crossJoin.enabled", "true")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/26 16:46:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# Diversity Metrics

In [4]:
def get_diversity_results(diversity_eval):
    metrics = {
        "catalog_coverage":diversity_eval.catalog_coverage(),
        "distributional_coverage":diversity_eval.distributional_coverage(), 
        "novelty": diversity_eval.novelty(), 
        "diversity": diversity_eval.diversity(), 
        "serendipity": diversity_eval.serendipity()
    }
    return metrics 

In [5]:
movielens_data_sizes = ["100k", "1m", "10m", "20m"]

# ALS Model

In [6]:
header = {
    "userCol": COL_USER,
    "itemCol": COL_ITEM,
    "ratingCol": COL_RATING,
}


als = ALS(
    rank=10,
    maxIter=15,
    implicitPrefs=False,
    regParam=0.05,
    coldStartStrategy='drop',
    nonnegative=False,
    seed=42,
    **header
)

# Experiment on MOVIE LENS 100K Dataset

In [7]:
MOVIELENS_DATA_SIZE = "20m"

schema = StructType(
(
    StructField(COL_USER, IntegerType()),
    StructField(COL_ITEM, IntegerType()),
    StructField(COL_RATING, FloatType()),
    StructField("Timestamp", LongType()),
)
)

data = movielens.load_spark_df(spark, size=MOVIELENS_DATA_SIZE, schema=schema, title_col=COL_TITLE, genres_col=COL_GENRE)
data.show()
    

100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 194k/194k [00:09<00:00, 20.4kKB/s]
23/11/26 16:46:58 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
                                                                                

+-------+------+------+----------+--------------------+--------------------+
|MovieId|UserId|Rating| Timestamp|               Title|               Genre|
+-------+------+------+----------+--------------------+--------------------+
|     29|     1|   3.5|1112484676|City of Lost Chil...|Adventure|Drama|F...|
|   2529|     3|   5.0| 945175680|Planet of the Ape...| Action|Drama|Sci-Fi|
|   2529|     7|   4.0|1011206732|Planet of the Ape...| Action|Drama|Sci-Fi|
|   2529|    10|   3.0| 943497376|Planet of the Ape...| Action|Drama|Sci-Fi|
|    474|    13|   4.0| 849082697|In the Line of Fi...|     Action|Thriller|
|    474|    15|   4.0| 840207465|In the Line of Fi...|     Action|Thriller|
|  45726|    18|   2.5|1236293298|You, Me and Dupre...|              Comedy|
|    474|    20|   2.0|1126539515|In the Line of Fi...|     Action|Thriller|
|    474|    21|   4.0| 992189970|In the Line of Fi...|     Action|Thriller|
|    474|    22|   3.0| 994638573|In the Line of Fi...|     Action|Thriller|

### Split the data using the Spark random splitter provided in utilities

In [8]:
train_df, test_df = spark_random_split(data.select(COL_USER, COL_ITEM, COL_RATING), ratio=0.75, seed=123)
print ("N train_df", train_df.cache().count())
print ("N test_df", test_df.cache().count())

                                                                                

N train_df 14998529


[Stage 21:>                                                         (0 + 8) / 8]

N test_df 5001734


                                                                                

### Get all possible user-item pairs

In [9]:
users = train_df.select(COL_USER).distinct()
items = train_df.select(COL_ITEM).distinct()
user_item = users.crossJoin(items)

### Train the ALS model on the training data, and get the top-k recommendations for our testing data

To predict movie ratings, we use the rating data in the training set as users' explicit feedback. The hyperparameters used in building the model are referenced from here. We do not constrain the latent factors (nonnegative = False) in order to allow for both positive and negative preferences towards movies. Timing will vary depending on the machine being used to train.

In [10]:
with Timer() as train_time:
    model = als.fit(train_df)

print("Took {} seconds for training.".format(train_time.interval))

23/11/26 16:47:17 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/11/26 16:47:17 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
23/11/26 16:47:17 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK


Took 16.946659542001726 seconds for training.


In the movie recommendation use case, recommending movies that have been rated by the users does not make sense. Therefore, the rated movies are removed from the recommended items.
In order to achieve this, we recommend all movies to all users, and then remove the user-movie pairs that exist in the training dataset.

In [11]:
# Score all user-item pairs
dfs_pred = model.transform(user_item)

# Remove seen items.
dfs_pred_exclude_train = dfs_pred.alias("pred").join(
    train_df.alias("train"),
    (dfs_pred[COL_USER] == train_df[COL_USER]) & (dfs_pred[COL_ITEM] == train_df[COL_ITEM]),
    how='outer'
)

top_all = dfs_pred_exclude_train.filter(dfs_pred_exclude_train["train.Rating"].isNull()) \
    .select('pred.' + COL_USER, 'pred.' + COL_ITEM, 'pred.' + "prediction")

print(top_all.count())
    
window = Window.partitionBy(COL_USER).orderBy(F.col("prediction").desc())    
top_k_reco = top_all.select("*", F.row_number().over(window).alias("rank")).filter(F.col("rank") <= TOP_K).drop("rank")
 
print(top_k_reco.count())

23/11/26 16:47:28 WARN Column: Constructing trivially true equals predicate, 'UserId#6 = UserId#6'. Perhaps you need to use aliases.
ERROR:root:KeyboardInterrupt while sending command.              (16 + 8) / 200]
Traceback (most recent call last):
  File "/Users/aryanjadon/miniconda3/envs/recommendations_experiments/lib/python3.10/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/Users/aryanjadon/miniconda3/envs/recommendations_experiments/lib/python3.10/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/Users/aryanjadon/miniconda3/envs/recommendations_experiments/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

### Performance Results

In [None]:
als_diversity_eval = SparkDiversityEvaluation(
    train_df = train_df, 
    reco_df = top_k_reco,
    col_user = COL_USER, 
    col_item = COL_ITEM
)

als_diversity_metrics = get_diversity_results(als_diversity_eval)
als_diversity_metrics