<i>Copyright (c) Microsoft Corporation. All rights reserved.</i>

<i>Licensed under the MIT License.</i>

# Running ALS on MovieLens (PySpark)

Matrix factorization by [ALS](https://spark.apache.org/docs/latest/api/python/_modules/pyspark/ml/recommendation.html#ALS) (Alternating Least Squares) is a well known collaborative filtering algorithm.

This notebook provides an example of how to utilize and evaluate ALS PySpark ML (DataFrame-based API) implementation, meant for large-scale distributed datasets. We use a smaller dataset in this example to run ALS efficiently on multiple cores of a [Data Science Virtual Machine](https://azure.microsoft.com/en-gb/services/virtual-machines/data-science-virtual-machines/).

**Note**: This notebook requires a PySpark environment to run properly. Please follow the steps in [SETUP.md](../../SETUP.md) to install the PySpark environment.

In [2]:
# set the environment path to find Recommenders
# Have to set in environment before starting jupyter
#set PYSPARK_PYTHON=C:\Users\Dave\anaconda3\envs\CudaPythonPySpark\python.exe
#set PYSPARK_DRIVER_PYTHON=C:\Users\Dave\anaconda3\envs\CudaPythonPySpark\python.exe

import sys
import pyspark
from pyspark.ml.recommendation import ALS
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import StringType, FloatType, IntegerType, LongType

from recommenders.utils.timer import Timer
from recommenders.datasets import movielens
from recommenders.utils.notebook_utils import is_jupyter
from recommenders.datasets.spark_splitters import spark_random_split
from recommenders.evaluation.spark_evaluation import SparkRatingEvaluation, SparkRankingEvaluation
from recommenders.utils.spark_utils import start_or_get_spark

print("System version: {}".format(sys.version))
print("Spark version: {}".format(pyspark.__version__))


System version: 3.7.10 (default, Feb 26 2021, 13:06:18) [MSC v.1916 64 bit (AMD64)]
Spark version: 3.2.0


Set the default parameters.

In [3]:
# top k items to recommend
TOP_K = 10

# Select MovieLens data size: 100k, 1m, 10m, or 20m
MOVIELENS_DATA_SIZE = '100k'

# Column names for the dataset
COL_USER = "UserId"
COL_ITEM = "MovieId"
COL_RATING = "Rating"
COL_TIMESTAMP = "Timestamp"

### 0. Set up Spark context

The following settings work well for debugging locally on VM - change when running on a cluster. We set up a giant single executor with many threads and specify memory cap. 

In [4]:
# the following settings work well for debugging locally on VM - change when running on a cluster
# set up a giant single executor with many threads and specify memory cap
spark = start_or_get_spark("ALS PySpark", memory="16g")

### 1. Download the MovieLens dataset

In [5]:
# Note: The DataFrame-based API for ALS currently only supports integers for user and item ids.
schema = StructType(
    (
        StructField(COL_USER, IntegerType()),
        StructField(COL_ITEM, IntegerType()),
        StructField(COL_RATING, FloatType()),
        StructField(COL_TIMESTAMP, LongType()),
    )
)

data = movielens.load_spark_df(spark, size=MOVIELENS_DATA_SIZE, schema=schema)
data.show()

100%|████████████████████████████████████████████████████████████████████████████| 4.81k/4.81k [00:00<00:00, 6.38kKB/s]


+------+-------+------+---------+
|UserId|MovieId|Rating|Timestamp|
+------+-------+------+---------+
|   196|    242|   3.0|881250949|
|   186|    302|   3.0|891717742|
|    22|    377|   1.0|878887116|
|   244|     51|   2.0|880606923|
|   166|    346|   1.0|886397596|
|   298|    474|   4.0|884182806|
|   115|    265|   2.0|881171488|
|   253|    465|   5.0|891628467|
|   305|    451|   3.0|886324817|
|     6|     86|   3.0|883603013|
|    62|    257|   2.0|879372434|
|   286|   1014|   5.0|879781125|
|   200|    222|   5.0|876042340|
|   210|     40|   3.0|891035994|
|   224|     29|   3.0|888104457|
|   303|    785|   3.0|879485318|
|   122|    387|   5.0|879270459|
|   194|    274|   2.0|879539794|
|   291|   1042|   4.0|874834944|
|   234|   1184|   2.0|892079237|
+------+-------+------+---------+
only showing top 20 rows



### 2. Split the data using the Spark random splitter provided in utilities

In [6]:
train, test = spark_random_split(data, ratio=0.75, seed=123)
print ("N train", train.cache().count())
print ("N test", test.cache().count())

N train 75018
N test 24982


### 3. Train the ALS model on the training data, and get the top-k recommendations for our testing data

To predict movie ratings, we use the rating data in the training set as users' explicit feedback. The hyperparameters used in building the model are referenced from [here](http://mymedialite.net/examples/datasets.html). We do not constrain the latent factors (`nonnegative = False`) in order to allow for both positive and negative preferences towards movies.
Timing will vary depending on the machine being used to train.

In [7]:
header = {
    "userCol": COL_USER,
    "itemCol": COL_ITEM,
    "ratingCol": COL_RATING,
}


als = ALS(
    rank=10,
    maxIter=15,
    implicitPrefs=False,
    regParam=0.05,
    coldStartStrategy='drop',
    nonnegative=False,
    seed=42,
    **header
)

In [8]:
with Timer() as train_time:
    model = als.fit(train)

print("Took {} seconds for training.".format(train_time.interval))

Took 5.368141399999999 seconds for training.


In the movie recommendation use case, recommending movies that have been rated by the users do not make sense. Therefore, the rated movies are removed from the recommended items.

In order to achieve this, we recommend all movies to all users, and then remove the user-movie pairs that exist in the training dataset.

In [27]:
# Not sure why this isn't working since it uses alias but does work if you rename the columns (see next block)

with Timer() as test_time:

    # Get the cross join of all user-item pairs and score them.
    users = train.select(COL_USER).distinct()
    items = train.select(COL_ITEM).distinct()
    user_item = users.crossJoin(items)
    dfs_pred = model.transform(user_item)

    # Remove seen items.
    dfs_pred_exclude_train = dfs_pred.alias("pred").join(
        train.alias("train"),
        (dfs_pred[COL_USER] == train[COL_USER]) & (dfs_pred[COL_ITEM] == train[COL_ITEM]),
        how='outer'
    )

    top_all = dfs_pred_exclude_train.filter(dfs_pred_exclude_train[f"train.{COL_RATING}"].isNull()) \
        .select('pred.' + COL_USER, 'pred.' + COL_ITEM, 'pred.' + "prediction")

    # In Spark, transformations are lazy evaluation
    # Use an action to force execute and measure the test time 
    top_all.cache().count()

print("Took {} seconds for prediction.".format(test_time.interval))

AnalysisException:  Column MovieId#1519 are ambiguous. It's probably because you joined several Datasets together, and some of these Datasets are the same. This column points to one of the Datasets but Spark is unable to figure out which one. Please alias the Datasets with different names via `Dataset.as` before joining them, and specify the column using qualified name, e.g. `df.as("a").join(df.as("b"), $"a.id" > $"b.id")`. You can also set spark.sql.analyzer.failAmbiguousSelfJoin to false to disable this check.        

In [9]:
with Timer() as test_time:

    # Get the cross join of all user-item pairs and score them.
    users = train.select(COL_USER).distinct()
    items = train.select(COL_ITEM).distinct()
    user_item = users.crossJoin(items)
    dfs_pred = model.transform(user_item)

    PRED_COL_USER = 'pred_' + COL_USER
    PRED_COL_ITEM = 'pred_' + COL_ITEM

    dfs_pred = dfs_pred.withColumnRenamed(COL_USER, PRED_COL_USER)
    dfs_pred = dfs_pred.withColumnRenamed(COL_ITEM, PRED_COL_ITEM)
    print(dfs_pred.columns)
    
    # Remove seen items.
    dfs_pred_exclude_train = dfs_pred.alias("pred").join(
        train.alias("train"),
        (dfs_pred[PRED_COL_USER] == train[COL_USER]) & (dfs_pred[PRED_COL_ITEM] == train[COL_ITEM]),
        how='outer'
    )

    top_all = dfs_pred_exclude_train.filter(dfs_pred_exclude_train[f"train.{COL_RATING}"].isNull()) \
        .select('pred.' + PRED_COL_USER, 'pred.' + PRED_COL_ITEM, 'pred.' + "prediction")

    # In Spark, transformations are lazy evaluation
    # Use an action to force execute and measure the test time 
    top_all.cache().count()

print("Took {} seconds for prediction.".format(test_time.interval))

['pred_UserId', 'pred_MovieId', 'prediction']
Took 25.026073199999992 seconds for prediction.


In [10]:
top_all.show()

+-----------+------------+----------+
|pred_UserId|pred_MovieId|prediction|
+-----------+------------+----------+
|          1|         587| 4.1602826|
|          1|         869| 2.7732866|
|          1|        1208| 2.0333834|
|          1|        1348| 1.0019258|
|          1|        1357| 0.9430025|
|          1|        1677| 2.8777318|
|          2|          80|  2.351385|
|          2|         472| 2.5865324|
|          2|         582| 3.9548614|
|          2|         838| 0.9482964|
|          2|         975| 3.1133537|
|          2|        1260| 1.9871742|
|          2|        1325| 1.2368056|
|          2|        1381| 3.5477588|
|          2|        1530| 2.0882902|
|          3|          22| 3.1524534|
|          3|          57| 3.6980166|
|          3|          89| 3.9733818|
|          3|         367| 3.6629043|
|          3|        1091| 0.9144471|
+-----------+------------+----------+
only showing top 20 rows



In [11]:
top_all = top_all.withColumnRenamed(PRED_COL_USER, COL_USER)
top_all = top_all.withColumnRenamed(PRED_COL_ITEM, COL_ITEM)
top_all.show()

+------+-------+----------+
|UserId|MovieId|prediction|
+------+-------+----------+
|     1|    587| 4.1602826|
|     1|    869| 2.7732866|
|     1|   1208| 2.0333834|
|     1|   1348| 1.0019258|
|     1|   1357| 0.9430025|
|     1|   1677| 2.8777318|
|     2|     80|  2.351385|
|     2|    472| 2.5865324|
|     2|    582| 3.9548614|
|     2|    838| 0.9482964|
|     2|    975| 3.1133537|
|     2|   1260| 1.9871742|
|     2|   1325| 1.2368056|
|     2|   1381| 3.5477588|
|     2|   1530| 2.0882902|
|     3|     22| 3.1524534|
|     3|     57| 3.6980166|
|     3|     89| 3.9733818|
|     3|    367| 3.6629043|
|     3|   1091| 0.9144471|
+------+-------+----------+
only showing top 20 rows



### 4. Evaluate how well ALS performs

In [12]:
rank_eval = SparkRankingEvaluation(test, top_all, k = TOP_K, col_user=COL_USER, col_item=COL_ITEM, 
                                    col_rating=COL_RATING, col_prediction="prediction", 
                                    relevancy_method="top_k")



In [13]:
print("Model:\tALS",
      "Top K:\t%d" % rank_eval.k,
      "MAP:\t%f" % rank_eval.map_at_k(),
      "NDCG:\t%f" % rank_eval.ndcg_at_k(),
      "Precision@K:\t%f" % rank_eval.precision_at_k(),
      "Recall@K:\t%f" % rank_eval.recall_at_k(), sep='\n')

Model:	ALS
Top K:	10
MAP:	0.006527
NDCG:	0.051718
Precision@K:	0.051274
Recall@K:	0.018840


### 5. Evaluate rating prediction

In [14]:
# Generate predicted ratings.
prediction = model.transform(test)
prediction.cache().show()


+------+-------+------+---------+----------+
|UserId|MovieId|Rating|Timestamp|prediction|
+------+-------+------+---------+----------+
|   580|    148|   4.0|884125773| 3.4059544|
|   406|    148|   3.0|879540276| 2.7134616|
|   916|    148|   2.0|880843892|  2.224198|
|   663|    148|   4.0|889492989| 2.7143617|
|   330|    148|   4.0|876544781|   4.52321|
|   935|    148|   4.0|884472892| 4.3838587|
|   308|    148|   3.0|887740788|  2.616949|
|    20|    148|   5.0|879668713|   4.37212|
|   923|    148|   4.0|880387474| 3.9818575|
|   455|    148|   3.0|879110346|  3.076419|
|    15|    148|   3.0|879456049| 2.9913845|
|   374|    148|   4.0|880392992| 3.2223387|
|   880|    148|   2.0|880167030| 2.8111987|
|   677|    148|   4.0|889399265| 3.8451848|
|    49|    148|   1.0|888068195| 1.3751595|
|   244|    148|   2.0|880605071| 2.6781514|
|    84|    148|   4.0|883452274| 3.6721768|
|   627|    148|   3.0|879530463|  2.636207|
|   434|    148|   3.0|886724797| 3.0973825|
|   793|  

In [15]:
rating_eval = SparkRatingEvaluation(test, prediction, col_user=COL_USER, col_item=COL_ITEM, 
                                    col_rating=COL_RATING, col_prediction="prediction")

print("Model:\tALS rating prediction",
      "RMSE:\t%f" % rating_eval.rmse(),
      "MAE:\t%f" % rating_eval.mae(),
      "Explained variance:\t%f" % rating_eval.exp_var(),
      "R squared:\t%f" % rating_eval.rsquared(), sep='\n')

Model:	ALS rating prediction
RMSE:	0.967434
MAE:	0.753340
Explained variance:	0.265916
R squared:	0.259532


In [17]:
if is_jupyter():
    # Record results with papermill for tests
    import papermill as pm
    import scrapbook as sb
    sb.glue("map", rank_eval.map_at_k())
    sb.glue("ndcg", rank_eval.ndcg_at_k())
    sb.glue("precision", rank_eval.precision_at_k())
    sb.glue("recall", rank_eval.recall_at_k())
    sb.glue("rmse", rating_eval.rmse())
    sb.glue("mae", rating_eval.mae())
    sb.glue("exp_var", rating_eval.exp_var())
    sb.glue("rsquared", rating_eval.rsquared())
    sb.glue("train_time", train_time.interval)
    sb.glue("test_time", test_time.interval)

  from pyarrow import HadoopFileSystem


In [18]:
# cleanup spark instance
spark.stop()