In [2]:
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

import sys
import pyspark
from pyspark.ml.recommendation import ALS
import pyspark.sql.functions as F
from pyspark.ml.feature import StringIndexer
from pyspark.sql import SparkSession
from pyspark.mllib.util import Saveable
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import StringType, FloatType, IntegerType, LongType

from recommenders.utils.timer import Timer
from recommenders.utils.notebook_utils import is_jupyter
from recommenders.datasets.spark_splitters import spark_random_split
from recommenders.evaluation.spark_evaluation import SparkRatingEvaluation, SparkRankingEvaluation
from recommenders.utils.spark_utils import start_or_get_spark
from recommenders.utils.notebook_utils import store_metadata

print(f"System version: {sys.version}")
print("Spark version: {}".format(pyspark.__version__))


System version: 3.10.12 (main, Nov 20 2023, 15:14:05) [GCC 11.4.0]
Spark version: 3.5.0


In [3]:
# top k items to recommend
TOP_K = 10

# Column names for the dataset
COL_USER = "user_id"
COL_ITEM = "item_id"
COL_RATING = "rating"

In [4]:
# the following settings work well for debugging locally on VM - change when running on a cluster
# set up a giant single executor with many threads and specify memory cap
spark = start_or_get_spark("ALS PySpark", memory="10g")
spark.conf.set("spark.sql.analyzer.failAmbiguousSelfJoin", "false")

24/01/12 01:28:22 WARN Utils: Your hostname, QuangHieu resolves to a loopback address: 127.0.1.1; using 192.168.1.5 instead (on interface wlp1s0)
24/01/12 01:28:22 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/01/12 01:28:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
sc = spark.sparkContext
sc.setCheckpointDir('checkpoint')

In [7]:
minio_access_key = "minioadmin"
minio_secret_key = "minioadmin"
minio_endpoint = "localhost:9000"
minio_bucket = "data-for-recommend"


In [6]:
df = spark.read.csv('./data/cleaned_ratings.csv', inferSchema=True, header=True)
df.show(10)

                                                                                

+-----+-------+----------+------+
|  _c0|user_id|      ISBN|rating|
+-----+-------+----------+------+
|14854|   2276|0020960808|    10|
|14856|   2276|0030632366|     9|
|14858|   2276|0061030643|     8|
|14860|   2276|0061098353|     8|
|14861|   2276|0061099155|     9|
|14867|   2276|0071407944|     7|
|14869|   2276|0131953621|     8|
|14870|   2276|0133143023|     9|
|14871|   2276|0139634479|     6|
|14875|   2276|031209261X|    10|
+-----+-------+----------+------+
only showing top 10 rows



24/01/12 01:28:42 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , user_id, ISBN, rating
 Schema: _c0, user_id, ISBN, rating
Expected: _c0 but found: 
CSV file: file:///media/qhieu/01DA1E046C32C520/AADocker/jovyan/Another/data/cleaned_ratings.csv


In [9]:
indexer = StringIndexer(inputCol="ISBN", outputCol="item_id")
data = indexer.fit(df).transform(df)

In [10]:
data.show(10)

+---+-------+----------+------+--------+
|_c0|user_id|      ISBN|rating| item_id|
+---+-------+----------+------+--------+
|  1| 276726|0155061224|     5| 67111.0|
|  3| 276729|052165615X|     3| 98858.0|
|  4| 276729|0521795028|     6| 98875.0|
|  6| 276736|3257224281|     8| 19512.0|
|  7| 276737|0600570967|     6|105804.0|
|  8| 276744|038550120X|     7|   217.0|
|  9| 276745| 342310538|    10|163094.0|
| 16| 276747|0060517794|     9|  1085.0|
| 19| 276747|0671537458|     9|  2662.0|
| 20| 276747|0679776818|     8|  1996.0|
+---+-------+----------+------+--------+
only showing top 10 rows



# Split data

In [14]:
train, test = spark_random_split(data, ratio=0.75, seed=123)
print ("N train", train.cache().count())
print ("N test", test.cache().count())

N train 325373
N test 108298


## Train the ALS model on the training data, get the top-k recommendations for our testing data

In [15]:
header = {
    "userCol": COL_USER,
    "itemCol": COL_ITEM,
    "ratingCol": COL_RATING,
}


als = ALS(
    rank=10,
    maxIter=15,
    implicitPrefs=False,
    regParam=0.05,
    coldStartStrategy='drop',
    nonnegative=False,
    seed=42,
    **header
)

In [16]:
with Timer() as train_time:
    model = als.fit(train)

print("Took {} seconds for training.".format(train_time.interval))

Took 87.96339289999742 seconds for training.


In [17]:
with Timer() as test_time:

    # Get the cross join of all user-item pairs and score them.
    users = train.select(COL_USER).distinct()
    items = train.select(COL_ITEM).distinct()
    user_item = users.crossJoin(items)
    dfs_pred = model.transform(user_item)

    # Remove seen items.
    dfs_pred_exclude_train = dfs_pred.alias("pred").join(
        train.alias("train"),
        (dfs_pred[COL_USER] == train[COL_USER]) & (dfs_pred[COL_ITEM] == train[COL_ITEM]),
        how='outer'
    )

    top_all = dfs_pred_exclude_train.filter(dfs_pred_exclude_train[f"train.{COL_RATING}"].isNull()) \
        .select('pred.' + COL_USER, 'pred.' + COL_ITEM, 'pred.' + "prediction")

    # In Spark, transformations are lazy evaluation
    # Use an action to force execute and measure the test time 
    top_all.cache().count()

print("Took {} seconds for prediction.".format(test_time.interval))

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/socket.py", line 706, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 

In [39]:
top_all.schema

StructType([StructField('User-ID', IntegerType(), True), StructField('Item-ID', DoubleType(), True), StructField('prediction', FloatType(), True)])

## Evaluate ALS model

In [34]:
test.show(2)

+---+-------+----------+-----------+--------+
|_c0|User-ID|      ISBN|Book-Rating| Item-ID|
+---+-------+----------+-----------+--------+
|  4| 276729|0521795028|          6| 98875.0|
|  9| 276745| 342310538|         10|163094.0|
+---+-------+----------+-----------+--------+
only showing top 2 rows



In [None]:
top_all.show(2)

In [33]:
rank_eval = SparkRankingEvaluation(test, top_all, k = TOP_K, col_user=COL_USER, col_item="Item-ID", 
                                    col_rating=COL_RATING, col_prediction="prediction", 
                                    relevancy_method="top_k")

AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `Item` cannot be resolved. Did you mean one of the following? [`ISBN`, `Item-ID`, `_c0`, `User-ID`, `Book-Rating`].; line 1 pos 13;
'Aggregate [User-ID#18], [User-ID#18, 'collect_list(('Item - 'ID)) AS ground_truth#2973]
+- Sample 0.75, 1.0, false, 123
   +- Sort [_c0#17 ASC NULLS FIRST, User-ID#18 ASC NULLS FIRST, ISBN#19 ASC NULLS FIRST, Book-Rating#20 ASC NULLS FIRST, Item-ID#557 ASC NULLS FIRST], false
      +- Project [_c0#17, User-ID#18, ISBN#19, Book-Rating#20, UDF(cast(ISBN#19 as string)) AS Item-ID#557]
         +- Relation [_c0#17,User-ID#18,ISBN#19,Book-Rating#20] csv


In [None]:
print("Model:\tALS",
      "Top K:\t%d" % rank_eval.k,
      "MAP:\t%f" % rank_eval.map_at_k(),
      "NDCG:\t%f" % rank_eval.ndcg_at_k(),
      "Precision@K:\t%f" % rank_eval.precision_at_k(),
      "Recall@K:\t%f" % rank_eval.recall_at_k(), sep='\n')

## Evaluate rating prediction

In [None]:
# Generate predicted ratings.
prediction = model.transform(test)
prediction.cache().show()


In [None]:
rating_eval = SparkRatingEvaluation(test, prediction, col_user=COL_USER, col_item=COL_ITEM, 
                                    col_rating=COL_RATING, col_prediction="prediction")

print("Model:\tALS rating prediction",
      "RMSE:\t%f" % rating_eval.rmse(),
      "MAE:\t%f" % rating_eval.mae(),
      "Explained variance:\t%f" % rating_eval.exp_var(),
      "R squared:\t%f" % rating_eval.rsquared(), sep='\n')