In [2]:
import recommenders
# set the environment path to find Recommenders
import sys
import pyspark
from pyspark.ml.recommendation import ALS
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import StringType, FloatType, IntegerType, LongType
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

from recommenders.utils.timer import Timer
#from recommenders.datasets import movielens
from recommenders.utils.notebook_utils import is_jupyter
from recommenders.datasets.spark_splitters import spark_random_split
from recommenders.evaluation.spark_evaluation import SparkRatingEvaluation, SparkRankingEvaluation
from recommenders.utils.spark_utils import start_or_get_spark

print("System version: {}".format(sys.version))
print("Spark version: {}".format(pyspark.__version__))

System version: 3.8.8 (default, Apr 13 2021, 12:59:45) 
[Clang 10.0.0 ]
Spark version: 3.1.2


## PySpark Environment Setup

In [3]:
# the following settings work well for debugging locally on VM - change when running on a cluster
# set up a giant single executor with many threads and specify memory cap
spark = start_or_get_spark("ALS PySpark", memory="16g")
spark.conf.set("spark.sql.analyzer.failAmbiguousSelfJoin", "false")

In [8]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType

# 创建Spark会话
spark = SparkSession.builder \
    .appName("ALS PySpark") \
    .getOrCreate()

# 从CSV文件加载数据
data = pd.read_csv("../data/df_for_CF_cleaned.csv")

# 选择需要的列，并重置索引
df_all = data.loc[:,["iid","pid","dec"]].reset_index(drop=True)

# 为列指定新的名称
p_col = ['userID','itemID','rating']
df_all.columns = p_col

# 定义Schema
schema = StructType([
    StructField("userID", IntegerType()),
    StructField("itemID", IntegerType()),
    StructField("rating", FloatType())
])

# 将Pandas DataFrame 转换为Spark DataFrame
spark_df = spark.createDataFrame(df_all.astype({"userID": int, "itemID": int, "rating": float}), schema=schema)


In [11]:
spark_df

DataFrame[userID: int, itemID: int, rating: float]

In [9]:
data = spark_df
train, test = spark_random_split(data, ratio=0.75, seed=123)
print ("N train", train.cache().count())
print ("N test", test.cache().count())

N train 6084
N test 1938


## Train the ALS model on the training data, and get the top-k recommendations for our testing data

To predict movie ratings, we use the rating data in the training set as users' explicit feedback. The hyperparameters used in building the model are referenced from [here](http://mymedialite.net/examples/datasets.html). We do not constrain the latent factors (`nonnegative = False`) in order to allow for both positive and negative preferences.

In [12]:
# top k items to recommend
TOP_K = 10

# Select MovieLens data size: 100k, 1m, 10m, or 20m
MOVIELENS_DATA_SIZE = '100k'

# Column names for the dataset
COL_USER = "userID"
COL_ITEM = "itemID"
COL_RATING = "rating"
COL_TIMESTAMP = "Timestamp"

header = {
    "userCol": COL_USER,
    "itemCol": COL_ITEM,
    "ratingCol": COL_RATING,
}


als = ALS(
    rank=10,
    maxIter=15,
    implicitPrefs=False,
    regParam=0.05,
    coldStartStrategy='drop',
    nonnegative=False,
    seed=42,
    **header
)

In [13]:
with Timer() as train_time:
    model = als.fit(train)

print("Took {} seconds for training.".format(train_time.interval))

Took 4.106173681999962 seconds for training.


In [14]:
with Timer() as test_time:

    # Get the cross join of all user-item pairs and score them.
    users = train.select(COL_USER).distinct()
    items = train.select(COL_ITEM).distinct()
    user_item = users.crossJoin(items)
    dfs_pred = model.transform(user_item)

    # Remove seen items.
    dfs_pred_exclude_train = dfs_pred.alias("pred").join(
        train.alias("train"),
        (dfs_pred[COL_USER] == train[COL_USER]) & (dfs_pred[COL_ITEM] == train[COL_ITEM]),
        how='outer'
    )

    top_all = dfs_pred_exclude_train.filter(dfs_pred_exclude_train[f"train.{COL_RATING}"].isNull()) \
        .select('pred.' + COL_USER, 'pred.' + COL_ITEM, 'pred.' + "prediction")

    # In Spark, transformations are lazy evaluation
    # Use an action to force execute and measure the test time 
    top_all.cache().count()

print("Took {} seconds for prediction.".format(test_time.interval))

Took 20.73038315200006 seconds for prediction.


In [15]:
top_all.show()

+------+------+------------+
|userID|itemID|  prediction|
+------+------+------------+
|     2|    80| -0.36090994|
|     2|   303|   0.7053553|
|     2|   472| -0.52474487|
|     3|    22|         0.0|
|     3|    57|         0.0|
|     3|    89|         0.0|
|     3|   367|         0.0|
|     4|   185| -0.17475829|
|     4|   405|  0.05618856|
|     4|   457|  0.12476618|
|     5|   225|  0.19171292|
|     6|   117| 0.093515545|
|     6|   274|   0.5912022|
|     6|   327|-0.098053694|
|     6|   393| 0.024363607|
|     6|   408|  -0.3395221|
|     6|   520|  -0.5121326|
|     7|    55| -0.04255647|
|     7|   132|   0.6185148|
|     7|   475|  0.45173293|
+------+------+------------+
only showing top 20 rows



## Evaluate how well ALS performs

In [16]:
rank_eval = SparkRankingEvaluation(test, top_all, k = TOP_K, col_user=COL_USER, col_item=COL_ITEM, 
                                    col_rating=COL_RATING, col_prediction="prediction", 
                                    relevancy_method="top_k")

In [17]:
print("Model:\tALS",
      "Top K:\t%d" % rank_eval.k,
      "MAP:\t%f" % rank_eval.map_at_k(),
      "NDCG:\t%f" % rank_eval.ndcg_at_k(),
      "Precision@K:\t%f" % rank_eval.precision_at_k(),
      "Recall@K:\t%f" % rank_eval.recall_at_k(), sep='\n')

Model:	ALS
Top K:	10
MAP:	0.035411
NDCG:	0.069272
Precision@K:	0.036364
Recall@K:	0.098300


In [18]:
# Generate predicted ratings.
prediction = model.transform(test)
prediction.cache().show()



+------+------+------+----------+
|userID|itemID|rating|prediction|
+------+------+------+----------+
|   163|   148|   0.0|0.23417708|
|   173|   148|   0.0|0.97054243|
|   494|   471|   1.0| 0.3461488|
|   490|   471|   1.0| 0.7004903|
|   488|   471|   0.0|0.47787538|
|   483|   471|   0.0|0.11732125|
|   503|   496|   1.0| 0.5846836|
|   235|   243|   1.0|       0.0|
|   400|   392|   0.0| 0.5791444|
|   399|   392|   0.0|       0.0|
|   401|   392|   0.0|       0.0|
|   530|   540|   1.0|0.20571373|
|   519|   540|   0.0|       0.0|
|   509|   540|   1.0|0.14940831|
|    54|    31|   1.0| 0.6736362|
|    48|    31|   1.0| 0.3251678|
|    43|    31|   1.0|0.93497974|
|    50|    31|   1.0| 0.9269944|
|   539|   516|   1.0| 0.5122782|
|   531|   516|   1.0|0.14309825|
+------+------+------+----------+
only showing top 20 rows



In [19]:
rating_eval = SparkRatingEvaluation(test, prediction, col_user=COL_USER, col_item=COL_ITEM, 
                                    col_rating=COL_RATING, col_prediction="prediction")

print("Model:\tALS rating prediction",
      "RMSE:\t%f" % rating_eval.rmse(),
      "MAE:\t%f" % rating_eval.mae(),
      "Explained variance:\t%f" % rating_eval.exp_var(),
      "R squared:\t%f" % rating_eval.rsquared(), sep='\n')

Model:	ALS rating prediction
RMSE:	0.476986
MAE:	0.364778
Explained variance:	0.114680
R squared:	0.063770


In [20]:
# cleanup spark instance
spark.stop()