In [9]:
!pip install recommenders

Collecting recommenders
  Using cached recommenders-0.7.0-py3-none-manylinux1_x86_64.whl (314 kB)
Collecting transformers<5,>=2.5.0
  Using cached transformers-4.15.0-py3-none-any.whl (3.4 MB)
Collecting scikit-surprise<=1.1.1,>=0.19.1
  Using cached scikit-surprise-1.1.1.tar.gz (11.8 MB)
Collecting memory-profiler<1,>=0.54.0
  Using cached memory_profiler-0.60.0.tar.gz (38 kB)
Collecting scikit-learn<1,>=0.22.1
  Using cached scikit_learn-0.24.2-cp37-cp37m-manylinux2010_x86_64.whl (22.3 MB)
Collecting pymanopt<1,>=0.2.5
  Using cached pymanopt-0.2.5-py3-none-any.whl (59 kB)
Collecting lightfm<2,>=1.15
  Using cached lightfm-1.16.tar.gz (310 kB)
Collecting nltk<4,>=3.4
  Using cached nltk-3.6.7-py3-none-any.whl (1.5 MB)
Collecting pydocumentdb>=2.3.3<3
  Using cached pydocumentdb-2.3.5-py3-none-any.whl (93 kB)
Collecting category-encoders<2,>=1.3.0
  Using cached category_encoders-1.3.0-py2.py3-none-any.whl (61 kB)
Collecting pyyaml<6,>=5.4.1
  Using cached PyYAML-5.4.1-cp37-cp37m-many

In [10]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 43 kB/s 
[?25hCollecting py4j==0.10.9.2
  Downloading py4j-0.10.9.2-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 66.0 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.0-py2.py3-none-any.whl size=281805912 sha256=c957e08848c902d4fba4000d93db1586063b752c08e56f7961bdf45ca8c82504
  Stored in directory: /root/.cache/pip/wheels/0b/de/d2/9be5d59d7331c6c2a7c1b6d1a4f463ce107332b1ecd4e80718
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.2 pyspark-3.2.0


In [11]:
import sys
import pyspark

In [12]:
from pyspark.ml.recommendation import ALS
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import StringType, FloatType, IntegerType, LongType
from recommenders.utils.timer import Timer
from recommenders.datasets import movielens
from recommenders.utils.notebook_utils import is_jupyter
from recommenders.datasets.spark_splitters import spark_random_split
from recommenders.evaluation.spark_evaluation import SparkRatingEvaluation, SparkRankingEvaluation
from recommenders.utils.spark_utils import start_or_get_spark

In [13]:
top_k = 10 #top number of items to recommend
movielens_size = '100k' #number of movies to retrieve
col_user = 'UserId'
col_item = 'MovieId'
col_rating = 'Rating'
col_timestamp = 'Timestamp'

In [14]:
#SETTING UP THE SPARK CONTEXT
spark = start_or_get_spark('ALS PySpark', memory='16g')

In [15]:
schema = StructType(
    (
        StructField(col_user, IntegerType()),
        StructField(col_item, IntegerType()),
        StructField(col_rating, FloatType()),
        StructField(col_timestamp, LongType()),
    )
)

In [16]:
data = movielens.load_spark_df(spark, size=movielens_size, schema=schema)

100%|██████████| 4.81k/4.81k [00:00<00:00, 7.65kKB/s]


In [17]:
data.show()

+------+-------+------+---------+
|UserId|MovieId|Rating|Timestamp|
+------+-------+------+---------+
|   196|    242|   3.0|881250949|
|   186|    302|   3.0|891717742|
|    22|    377|   1.0|878887116|
|   244|     51|   2.0|880606923|
|   166|    346|   1.0|886397596|
|   298|    474|   4.0|884182806|
|   115|    265|   2.0|881171488|
|   253|    465|   5.0|891628467|
|   305|    451|   3.0|886324817|
|     6|     86|   3.0|883603013|
|    62|    257|   2.0|879372434|
|   286|   1014|   5.0|879781125|
|   200|    222|   5.0|876042340|
|   210|     40|   3.0|891035994|
|   224|     29|   3.0|888104457|
|   303|    785|   3.0|879485318|
|   122|    387|   5.0|879270459|
|   194|    274|   2.0|879539794|
|   291|   1042|   4.0|874834944|
|   234|   1184|   2.0|892079237|
+------+-------+------+---------+
only showing top 20 rows



In [18]:
#Split into train and test 
train, test = spark_random_split(data, ratio=0.75, seed=123)
print ("N train", train.cache().count())
print ("N test", test.cache().count())

N train 75018
N test 24982


In [19]:
header = {'userCol': col_user, 'itemCol': col_item, 'ratingCol': col_rating}
als = ALS(
    rank=10,
    maxIter=15,
    implicitPrefs=False,
    regParam=0.05,
    coldStartStrategy='drop',
    nonnegative=False,
    seed=42,
    **header
)

In [20]:
with Timer() as train_time:
  model = als.fit(train)
print(f'Took {train_time.interval} seconds to train')

Took 14.903468817999965 seconds to train


In [27]:
with Timer() as test_time:
  users = train.select(col_user).distinct()
  items = train.select(col_item).distinct()
  user_item = users.crossJoin(items)
  dfs_pred = model.transform(user_item)

  dfs_pred_exclude_train = dfs_pred.alias("pred").join(
      train.alias("train"), 
      (F.col(f'pred.{col_user}') == F.col(f'train.{col_user}')) & (F.col(f'pred.{col_item}') == F.col(f'train.{col_item}')), 
      how='outer'
      )
  
  top_all = dfs_pred_exclude_train.filter(dfs_pred_exclude_train[f"train.{col_rating}"].isNull()) \
        .select('pred.' + col_user, 'pred.' + col_item, 'pred.' + "prediction")
  top_all.cache().count()

print("Took {} seconds for prediction.".format(test_time.interval))


Took 40.56677972499983 seconds for prediction.


In [28]:
top_all.show()

+------+-------+----------+
|UserId|MovieId|prediction|
+------+-------+----------+
|     1|    587| 4.1602826|
|     1|    869| 2.7732863|
|     1|   1208|  2.033383|
|     1|   1348| 1.0019257|
|     1|   1357| 0.9430026|
|     1|   1677| 2.8777318|
|     2|     80|  2.351385|
|     2|    472| 2.5865319|
|     2|    582| 3.9548612|
|     2|    838| 0.9482963|
|     2|    975| 3.1133535|
|     2|   1260| 1.9871743|
|     2|   1325| 1.2368056|
|     2|   1381| 3.5477588|
|     2|   1530|   2.08829|
|     3|     22| 3.1524537|
|     3|     57| 3.6980162|
|     3|     89| 3.9733813|
|     3|    367| 3.6629045|
|     3|   1091| 0.9144474|
+------+-------+----------+
only showing top 20 rows



In [30]:
rank_eval = SparkRankingEvaluation(test, top_all, k = top_k, col_user=col_user, col_item=col_item, 
                                    col_rating=col_rating, col_prediction="prediction", 
                                    relevancy_method="top_k")



In [31]:
print("Model:\tALS",
      "Top K:\t%d" % rank_eval.k,
      "MAP:\t%f" % rank_eval.map_at_k(),
      "NDCG:\t%f" % rank_eval.ndcg_at_k(),
      "Precision@K:\t%f" % rank_eval.precision_at_k(),
      "Recall@K:\t%f" % rank_eval.recall_at_k(), sep='\n')

Model:	ALS
Top K:	10
MAP:	0.006527
NDCG:	0.051718
Precision@K:	0.051274
Recall@K:	0.018840


In [32]:
prediction = model.transform(test)
prediction.cache().show()

+------+-------+------+---------+----------+
|UserId|MovieId|Rating|Timestamp|prediction|
+------+-------+------+---------+----------+
|   580|    148|   4.0|884125773| 3.4059548|
|   406|    148|   3.0|879540276| 2.7134619|
|   916|    148|   2.0|880843892| 2.2241986|
|   663|    148|   4.0|889492989|  2.714362|
|   330|    148|   4.0|876544781|   4.52321|
|   935|    148|   4.0|884472892| 4.3838587|
|   308|    148|   3.0|887740788| 2.6169493|
|    20|    148|   5.0|879668713| 4.3721194|
|   923|    148|   4.0|880387474| 3.9818575|
|   455|    148|   3.0|879110346| 3.0764186|
|    15|    148|   3.0|879456049| 2.9913845|
|   374|    148|   4.0|880392992| 3.2223384|
|   880|    148|   2.0|880167030| 2.8111982|
|   677|    148|   4.0|889399265| 3.8451843|
|    49|    148|   1.0|888068195| 1.3751594|
|   244|    148|   2.0|880605071| 2.6781514|
|    84|    148|   4.0|883452274| 3.6721768|
|   627|    148|   3.0|879530463| 2.6362069|
|   434|    148|   3.0|886724797| 3.0973828|
|   793|  

In [34]:
rating_eval = SparkRatingEvaluation(test, prediction, col_user=col_user, col_item=col_user, 
                                    col_rating=col_rating, col_prediction="prediction")

print("Model:\tALS rating prediction",
      "RMSE:\t%f" % rating_eval.rmse(),
      "MAE:\t%f" % rating_eval.mae(),
      "Explained variance:\t%f" % rating_eval.exp_var(),
      "R squared:\t%f" % rating_eval.rsquared(), sep='\n')
'''
Why is r-squared beloew zero??
'''



Model:	ALS rating prediction
RMSE:	1.247362
MAE:	0.987191
Explained variance:	-0.176321
R squared:	-0.180425


In [35]:
if is_jupyter():
    # Record results with papermill for tests
    import papermill as pm
    import scrapbook as sb
    sb.glue("map", rank_eval.map_at_k())
    sb.glue("ndcg", rank_eval.ndcg_at_k())
    sb.glue("precision", rank_eval.precision_at_k())
    sb.glue("recall", rank_eval.recall_at_k())
    sb.glue("rmse", rating_eval.rmse())
    sb.glue("mae", rating_eval.mae())
    sb.glue("exp_var", rating_eval.exp_var())
    sb.glue("rsquared", rating_eval.rsquared())
    sb.glue("train_time", train_time.interval)
    sb.glue("test_time", test_time.interval)

In [36]:
spark.stop()