In [None]:
# set the environment 
import sys
import pyspark
from pyspark.ml.recommendation import ALS
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import StringType, FloatType, IntegerType, LongType
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

from recommenders.utils.timer import Timer
from recommenders.datasets import movielens
from recommenders.utils.notebook_utils import is_jupyter
from recommenders.datasets.spark_splitters import spark_random_split
from recommenders.evaluation.spark_evaluation import SparkRatingEvaluation, SparkRankingEvaluation
from recommenders.utils.spark_utils import start_or_get_spark

print("System version: {}".format(sys.version))
print("Spark version: {}".format(pyspark.__version__))


System version: 3.8.0 (default, Nov  6 2019, 21:49:08) 
[GCC 7.3.0]
Spark version: 3.2.0


In [None]:
TOP_K = 10

DATA_SIZE = '100k'

COL_USER = "UserId"
COL_ITEM = "EventType"
COL_RATING = "Rating"
COL_TIMESTAMP = "Location"

In [None]:
spark = start_or_get_spark("ALS PySpark", memory="16g")
spark.conf.set("spark.sql.analyzer.failAmbiguousSelfJoin", "false")

In [None]:
schema = StructType(
    (
        StructField(COL_USER, IntegerType()),
        StructField(COL_ITEM, IntegerType()),
        StructField(COL_RATING, FloatType()),
        StructField(COL_TIMESTAMP, LongType()),
    )
)

data = movielens.load_spark_df(spark, size=MOVIELENS_DATA_SIZE, schema=schema)
data.show()

100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 4.81k/4.81k [00:05<00:00, 882KB/s]
                                                                                

+------+-------+------+---------+
|UserId|MovieId|Rating|Timestamp|
+------+-------+------+---------+
|   196|    242|   3.0|881250949|
|   186|    302|   3.0|891717742|
|    22|    377|   1.0|878887116|
|   244|     51|   2.0|880606923|
|   166|    346|   1.0|886397596|
|   298|    474|   4.0|884182806|
|   115|    265|   2.0|881171488|
|   253|    465|   5.0|891628467|
|   305|    451|   3.0|886324817|
|     6|     86|   3.0|883603013|
|    62|    257|   2.0|879372434|
|   286|   1014|   5.0|879781125|
|   200|    222|   5.0|876042340|
|   210|     40|   3.0|891035994|
|   224|     29|   3.0|888104457|
|   303|    785|   3.0|879485318|
|   122|    387|   5.0|879270459|
|   194|    274|   2.0|879539794|
|   291|   1042|   4.0|874834944|
|   234|   1184|   2.0|892079237|
+------+-------+------+---------+
only showing top 20 rows



In [None]:
train, test = spark_random_split(data, ratio=0.75, seed=123)
print ("N train", train.cache().count())
print ("N test", test.cache().count())

N train 75018
N test 24982


In [None]:
header = {
    "userCol": COL_USER,
    "itemCol": COL_ITEM,
    "ratingCol": COL_RATING,
}


als = ALS(
    rank=10,
    maxIter=15,
    implicitPrefs=False,
    regParam=0.05,
    coldStartStrategy='drop',
    nonnegative=False,
    seed=42,
    **header
)

In [None]:
with Timer() as train_time:
    model = als.fit(train)

print("Took {} seconds for training.".format(train_time.interval))

Took 7.5410127229988575 seconds for training.


In [None]:
with Timer() as test_time:

    users = train.select(COL_USER).distinct()
    items = train.select(COL_ITEM).distinct()
    user_item = users.crossJoin(items)
    dfs_pred = model.transform(user_item)

    # Remove seen items.
    dfs_pred_exclude_train = dfs_pred.alias("pred").join(
        train.alias("train"),
        (dfs_pred[COL_USER] == train[COL_USER]) & (dfs_pred[COL_ITEM] == train[COL_ITEM]),
        how='outer'
    )

    top_all = dfs_pred_exclude_train.filter(dfs_pred_exclude_train[f"train.{COL_RATING}"].isNull()) \
        .select('pred.' + COL_USER, 'pred.' + COL_ITEM, 'pred.' + "prediction")

    top_all.cache().count()

print("Took {} seconds for prediction.".format(test_time.interval))



Took 25.246142672998758 seconds for prediction.


                                                                                

In [None]:
top_all.show()

+------+-------+----------+
|UserId|MovieId|prediction|
+------+-------+----------+
|     1|    587| 4.1602826|
|     1|    869| 2.7732863|
|     1|   1208|  2.033383|
|     1|   1348| 1.0019257|
|     1|   1357| 0.9430026|
|     1|   1677| 2.8777318|
|     2|     80|  2.351385|
|     2|    472| 2.5865319|
|     2|    582| 3.9548612|
|     2|    838| 0.9482963|
|     2|    975| 3.1133535|
|     2|   1260| 1.9871743|
|     2|   1325| 1.2368056|
|     2|   1381| 3.5477588|
|     2|   1530|   2.08829|
|     3|     22| 3.1524537|
|     3|     57| 3.6980162|
|     3|     89| 3.9733813|
|     3|    367| 3.6629045|
|     3|   1091| 0.9144474|
+------+-------+----------+
only showing top 20 rows



In [None]:
rank_eval = SparkRankingEvaluation(test, top_all, k = TOP_K, col_user=COL_USER, col_item=COL_ITEM, 
                                    col_rating=COL_RATING, col_prediction="prediction", 
                                    relevancy_method="top_k")

                                                                                

In [None]:
rating_eval = SparkRatingEvaluation(test, prediction, col_user=COL_USER, col_item=COL_ITEM, 
                                    col_rating=COL_RATING, col_prediction="prediction")

print("Model:\tALS rating prediction",
      "RMSE:\t%f" % rating_eval.rmse(),
      "MAE:\t%f" % rating_eval.mae(),
      "Explained variance:\t%f" % rating_eval.exp_var(),
      "R squared:\t%f" % rating_eval.rsquared(), sep='\n')



Model:	ALS rating prediction
RMSE:	0.967434
MAE:	0.753340
Explained variance:	0.265916
R squared:	0.259532


                                                                                

In [None]:
spark.stop()