In [1]:
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

import sys
import pyspark
from pyspark.ml.recommendation import ALS
import pyspark.sql.functions as F
from pyspark.ml.feature import StringIndexer
from pyspark.sql import SparkSession
from pyspark.mllib.util import Saveable
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import StringType, FloatType, IntegerType, LongType

from recommenders.utils.timer import Timer
from recommenders.utils.notebook_utils import is_jupyter
from recommenders.datasets.spark_splitters import spark_random_split
from recommenders.evaluation.spark_evaluation import SparkRatingEvaluation, SparkRankingEvaluation
from recommenders.utils.spark_utils import start_or_get_spark
from recommenders.utils.notebook_utils import store_metadata

print(f"System version: {sys.version}")
print("Spark version: {}".format(pyspark.__version__))




System version: 3.10.12 (main, Nov 20 2023, 15:14:05) [GCC 11.4.0]
Spark version: 3.5.0


In [2]:
# top k items to recommend
TOP_K = 10

# Column names for the dataset
COL_USER = "user_id"
COL_ITEM = "item_id"
COL_RATING = "rating"
COL_TIMESTAMP = "timestamp"

## Set up Spark context

In [3]:
# the following settings work well for debugging locally on VM - change when running on a cluster
# set up a giant single executor with many threads and specify memory cap
spark = start_or_get_spark("ALS PySpark", memory="6g")
spark.conf.set("spark.sql.analyzer.failAmbiguousSelfJoin", "false")

24/01/02 23:24:55 WARN Utils: Your hostname, QuangHieu resolves to a loopback address: 127.0.1.1; using 192.168.0.100 instead (on interface wlp1s0)
24/01/02 23:24:55 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/01/02 23:24:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
df = spark.read.csv('./data/user_item_rating.csv', header=True, inferSchema=True)

                                                                                

In [5]:
df.show(5)

+---+-------+----------+------+--------------------+-----------------+----+--------------------+--------------------+-----------------+
|_c0|user_id|      ISBN|rating|               title|           author|year|           publisher|             img_url|number_of_ratings|
+---+-------+----------+------+--------------------+-----------------+----+--------------------+--------------------+-----------------+
|  0| 277427|002542730X|    10|Politically Corre...|James Finn Garner|1994|John Wiley &amp; ...|http://images.ama...|               82|
|  1|   3363|002542730X|     0|Politically Corre...|James Finn Garner|1994|John Wiley &amp; ...|http://images.ama...|               82|
|  2|  11676|002542730X|     6|Politically Corre...|James Finn Garner|1994|John Wiley &amp; ...|http://images.ama...|               82|
|  3|  12538|002542730X|    10|Politically Corre...|James Finn Garner|1994|John Wiley &amp; ...|http://images.ama...|               82|
|  4|  13552|002542730X|     0|Politically Corre

24/01/02 23:25:11 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , user_id, ISBN, rating, title, author, year, publisher, img_url, number_of_ratings
 Schema: _c0, user_id, ISBN, rating, title, author, year, publisher, img_url, number_of_ratings
Expected: _c0 but found: 
CSV file: file:///media/qhieu/01DA1E046C32C520/Downloads/book_recommendation/data/user_item_rating.csv


In [6]:
df_1 = df['user_id','ISBN', 'rating']
indexer = StringIndexer(inputCol="ISBN", outputCol="item_id")
data = indexer.fit(df_1).transform(df_1)

                                                                                

In [7]:
data.show(5)

+-------+----------+------+-------+
|user_id|      ISBN|rating|item_id|
+-------+----------+------+-------+
| 277427|002542730X|    10|  167.0|
|   3363|002542730X|     0|  167.0|
|  11676|002542730X|     6|  167.0|
|  12538|002542730X|    10|  167.0|
|  13552|002542730X|     0|  167.0|
+-------+----------+------+-------+
only showing top 5 rows



## Split data

In [8]:
train, test = spark_random_split(data, ratio=0.75, seed=123)
print ("N train", train.cache().count())
print ("N test", test.cache().count())

                                                                                

N train 44916
N test 14934


## Train the ALS model on the training data, get the top-k recommendations for our testing data

In [10]:
header = {
    "userCol": COL_USER,
    "itemCol": COL_ITEM,
    "ratingCol": COL_RATING,
}


als = ALS(
    rank=10,
    maxIter=15,
    implicitPrefs=False,
    regParam=0.05,
    coldStartStrategy='drop',
    nonnegative=False,
    seed=42,
    **header
)

In [11]:
with Timer() as train_time:
    model = als.fit(train)

print("Took {} seconds for training.".format(train_time.interval))

Took 10.87290737699999 seconds for training.


In order to recommend for users, we recommend all books to all users, and then remove user-book pair that exist in the training dataset

In [22]:
with Timer() as test_time:

    # Get the cross join of all user-item pairs and score them.
    users = train.select(COL_USER).distinct()
    items = train.select(COL_ITEM).distinct()
    user_item = users.crossJoin(items)
    dfs_pred = model.transform(user_item)

    # Remove seen items.
    dfs_pred_exclude_train = dfs_pred.alias("pred").join(
        train.alias("train"),
        (dfs_pred[COL_USER] == train[COL_USER]) & (dfs_pred[COL_ITEM] == train[COL_ITEM]),
        how='outer'
    )

    top_all = dfs_pred_exclude_train.filter(dfs_pred_exclude_train[f"train.{COL_RATING}"].isNull()) \
        .select('pred.' + COL_USER, 'pred.' + COL_ITEM, 'pred.' + "prediction")

    # In Spark, transformations are lazy evaluation
    # Use an action to force execute and measure the test time 
    top_all.cache().count()

print("Took {} seconds for prediction.".format(test_time.interval))

Took 15.266388000003644 seconds for prediction.


In [23]:
top_all.show()

+-------+-------+-----------+
|user_id|item_id| prediction|
+-------+-------+-----------+
|    254|    4.0|  2.4980075|
|    254|    5.0|  1.2135823|
|    254|    9.0| -1.7044722|
|    254|   10.0|   4.778119|
|    254|   11.0| -2.0966806|
|    254|   15.0|  1.0929846|
|    254|   17.0|  2.0610933|
|    254|   24.0|  1.6685146|
|    254|   26.0|  4.1553884|
|    254|   33.0|0.039255977|
|    254|   36.0|  2.3925767|
|    254|   47.0|0.023091853|
|    254|   56.0|  0.9483514|
|    254|   59.0|  2.1068928|
|    254|   62.0|-0.25018203|
|    254|   66.0|  1.9257193|
|    254|   68.0|     1.5458|
|    254|   86.0| 0.19778377|
|    254|   89.0| 0.22504199|
|    254|  101.0|  4.9174776|
+-------+-------+-----------+
only showing top 20 rows



## Evaluate ALS model

In [24]:
rank_eval = SparkRankingEvaluation(test, top_all, k = TOP_K, col_user=COL_USER, col_item=COL_ITEM, 
                                    col_rating=COL_RATING, col_prediction="prediction", 
                                    relevancy_method="top_k")

In [25]:
print("Model:\tALS",
      "Top K:\t%d" % rank_eval.k,
      "MAP:\t%f" % rank_eval.map_at_k(),
      "NDCG:\t%f" % rank_eval.ndcg_at_k(),
      "Precision@K:\t%f" % rank_eval.precision_at_k(),
      "Recall@K:\t%f" % rank_eval.recall_at_k(), sep='\n')

Model:	ALS
Top K:	10
MAP:	0.001807
NDCG:	0.005973
Precision@K:	0.005910
Recall@K:	0.003719


## Evaluate rating prediction

In [26]:
# Generate predicted ratings.
prediction = model.transform(test)
prediction.cache().show()


+-------+----------+------+-------+-----------+
|user_id|      ISBN|rating|item_id| prediction|
+-------+----------+------+-------+-----------+
|  15957|0804106304|     0|   12.0|   1.888555|
|  26583|0971880107|     0|    0.0|        0.0|
|  69042|0345380371|     0|   83.0|  0.4556758|
|  69042|034538475X|     0|  160.0|   1.643925|
|  69042|0425140032|     0|  677.0|-0.03700009|
|  69042|0425155404|     0|  434.0| -0.4238061|
|  69042|0425158632|     0|  465.0| 0.85212326|
|  69042|042516098X|     0|  171.0| -1.1480521|
|  69042|0440206154|     0|   31.0|  1.2551156|
|  69042|0440221471|     0|   29.0| 0.32294464|
|  69042|0446359866|     0|  183.0| 0.16515827|
|  69042|0451160525|     0|  345.0|   1.556454|
|  69042|051513287X|     0|  100.0| 0.34791058|
|  69042|055356451X|     0|  202.0|  1.6637728|
|  69042|0553571818|     0|  508.0|-0.21547653|
|  69042|055357230X|     0|  471.0|  0.7807924|
|  69042|0553579606|     0|  177.0|   0.271469|
|  69042|0671024248|     0|  284.0|0.020

In [27]:
rating_eval = SparkRatingEvaluation(test, prediction, col_user=COL_USER, col_item=COL_ITEM, 
                                    col_rating=COL_RATING, col_prediction="prediction")

print("Model:\tALS rating prediction",
      "RMSE:\t%f" % rating_eval.rmse(),
      "MAE:\t%f" % rating_eval.mae(),
      "Explained variance:\t%f" % rating_eval.exp_var(),
      "R squared:\t%f" % rating_eval.rsquared(), sep='\n')

Model:	ALS rating prediction
RMSE:	4.526363
MAE:	3.144447
Explained variance:	-0.600920
R squared:	-0.610986


In [28]:
# cleanup spark instance
spark.stop()

## To Sum up

Base model with rank = 10, iter = 15, regParam = 0.05

Model:	ALS rating prediction

RMSE:	4.526363

MAE:	3.14444
7
Explained variance:	-0.6009
20
R squared:	-0.61


We get pretty bad model0986