# CF - ALS (by Spark)

## Algorithm Theory

### Matrix factorization for collaborative filtering problem

Matrix factorization tries to find latent factors that represent intrinsic user and item attributes in a lower dimension. 

1. **Prediction Formula**

$$\hat r_{u,i} = q_{i}^{T}p_{u}$$

where:
- $\hat r_{u,i}$ is the predicted ratings for user $u$ and item $i$
- $q_{i}$ and $p_{u}$ are latent factors for item and user, respectively.

---> The goal is to find these latent factors $q_{i}^{T}$ and $p_{u}$ that best approximate the observed ratings.

2. **Optimization Problem**

The challenge to the matrix factorization problem is to find $q_{i}^{T}$ and $p_{u}$. This is achieved by methods such as **matrix decomposition**. A learning approach is therefore developed to converge the decomposition results close to the observed ratings as much as possible. Furthermore, to avoid overfitting issue, the learning process is **regularized**. 

***The Goal is minimize the prediction error***:

$$\min\sum(r_{u,i} - q_{i}^{T}p_{u})^2 + \lambda(||q_{i}||^2 + ||p_{u}||^2)$$

where $\lambda$ is a the regularization parameter that controls the trade-off between fitting the data well and keeping the model simple. 

**Trường hợp phải tính toán rating dựa trên interaction của KH với items - Implicit Feedback**

In case **explict ratings** are not available, **implicit ratings** which are usually derived from users' historical interactions with the items (e.g., *clicks*, *views*, *purchases*, etc.). To account for such implicit ratings, the original matrix factorization algorithm can be formulated as 

$$\min\sum c_{u,i}(p_{u,i} - q_{i}^{T}p_{u})^2 + \lambda(||q_{i}||^2 + ||p_{u}||^2)$$

where:
- $c_{u,i}=1+\alpha r_{u,i}$: là trọng số cho từng tương tác. Nó phụ thuộc vào giá trị $r_{u,i}$, là một biểu thị của sự yêu thích của người dùng $u$ cho item $i$, giúp mô hình nhận biết rằng không phải tất cả các tương tác đều có giá trị như nhau. Một tương tác với số lần nhấp nhiều hơn sẽ được coi là quan trọng hơn so với một tương tác có ít lần nhấp.
- $p_{u,i}=1$ if $r_{u,i}>0$ else $p_{u,i}=0$ (if $r_{u,i}=0$): Điều này có nghĩa là chỉ khi có tương tác, chúng ta mới tính đến phản hồi đó
  > $r_{u,i}$ is a numerical representation of users' preferences (e.g., number of clicks, etc.). 

### Alternating Least Square (ALS)

Owing to the term of $q_{i}^{T}p_{u}$ the loss function is non-convex. Gradient descent method can be applied but this will incur expensive computations. An Alternating Least Square (ALS) algorithm was therefore developed to overcome this issue. 

<img src="https://encrypted-tbn0.gstatic.com/images?q=tbn:ANd9GcTGkKzxWWEfGMc2qp2b4wQXdgkMNxt6ijs9tg&s">

The basic idea of ALS is to learn one of $q$ and $p$ at a time for optimization while keeping the other as constant. This makes the objective at each iteration convex and solvable. The alternating between $q$ and $p$ stops when there is convergence to the optimal. It is worth noting that this iterative computation can be parallelised and/or distributed, which makes the algorithm desirable for use cases where the dataset is large and thus the user-item rating matrix is super sparse (as is typical in recommendation scenarios). A comprehensive discussion of ALS and its distributed computation can be found [here](http://stanford.edu/~rezab/classes/cme323/S15/notes/lec14.pdf).

### Spark Mllib implementation

The matrix factorization algorithm is available as `ALS` module in [Spark `ml`](https://spark.apache.org/docs/latest/ml-collaborative-filtering.html) for DataFrame or [Spark `mllib`](https://spark.apache.org/docs/latest/mllib-collaborative-filtering.html) for RDD. 

* The uniqueness of ALS implementation is that it distributes the matrix factorization model training by using "Alternating Least Square" method. 
* In the training method, there are parameters that can be selected to control the model performance.
* Both explicit and implicit ratings are supported by Spark ALS model.

## Setup enviroment and parameters

In [1]:
import warnings

warnings.simplefilter(action="ignore", category=FutureWarning)

import sys

# from matplotlib import pyplot as plt
# import numpy as np
import pandas as pd
# import seaborn as sns

import pyspark

from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

# import pyspark.sql.functions as F
# from pyspark.sql.functions import col
# from pyspark.ml.tuning import CrossValidator
# from pyspark.sql.types import StructType, StructField
# from pyspark.sql.types import FloatType, IntegerType, LongType
from pyspark.sql.types import StructType, StructField, StringType, FloatType

# from recommenders.datasets import movielens
from recommenders.utils.spark_utils import start_or_get_spark
from recommenders.evaluation.spark_evaluation import (  # noqa: E402
    # SparkRankingEvaluation,
    SparkRatingEvaluation,
)
# from recommenders.tuning.parameter_sweep import generate_param_grid
# from recommenders.datasets.spark_splitters import spark_random_split

print(f"System version: {sys.version}")
print(f"Pandas version: {pd.__version__}")
print(f"PySpark version: {pyspark.__version__}")

System version: 3.11.5 (main, Sep 11 2023, 08:31:25) [Clang 14.0.6 ]
Pandas version: 2.2.2
PySpark version: 3.5.2


**model parameters**

In [2]:
# model hyper-parameters
# rank of the factorization
RANK = 10
# Số lượng vòng lặp tối đa
MAX_ITER = 15
# Hệ số kiểm soát regularization
REG_PARAM = 0.05

# Number of recommended items
K = 15

# rating columns
USER_COL = "acnt_no"
ITEM_COL = "symbol"
RATING_COL = "final_score"

# Path to the CSV file
train_datapath = r"data/label/data_process_UserItemRating_min5items_train.csv"
test_datapath = r"data/label/data_process_UserItemRating_min5items_train.csv"

**Spark session**

In [5]:
spark = SparkSession.builder.getOrCreate()
spark.conf.set("spark.sql.analyzer.failAmbiguousSelfJoin", "false")
spark.conf.set("spark.sql.shuffle.partitions", "5")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
spark.conf.set("spark.sql.shuffle.partitions", "10")

In [4]:
# STOP SESSION
spark.stop()

## Train model

**Load data**

In [6]:
# Define the schema
schema = StructType(
    [
        StructField(USER_COL, StringType(), True),
        StructField(ITEM_COL, StringType(), True),
        StructField(RATING_COL, FloatType(), True),
    ]
)

# Read the CSV file into a DataFrame with the specified schema
train = spark.read.csv(train_datapath, sep="|", header=True, schema=schema)
test = spark.read.csv(test_datapath, sep="|", header=True, schema=schema)

# Show the DataFrame to verify
test.show(3)

+--------+------+-----------+
| acnt_no|symbol|final_score|
+--------+------+-----------+
|00018956|   BID|       0.02|
|00024968|   BID|       0.03|
|00025170|   VIX|      3.924|
+--------+------+-----------+
only showing top 3 rows



In [13]:
train.describe().show()

+-------+-----------------+------+-------------------+
|summary|          acnt_no|symbol|        final_score|
+-------+-----------------+------+-------------------+
|  count|           532012|532012|             532012|
|   mean|59160.27250137215|  NULL|0.22697969638126742|
| stddev|52375.70211050739|  NULL| 0.8522782496254795|
|    min|         00000002|   AAA|              0.002|
|    max|         00186799|   YTC|             12.869|
+-------+-----------------+------+-------------------+



**Preprocessing pipeline**

Do mô hình ALS spark yêu câu dữ liệu **user** và **item** ở dạng `numeric`, nên cần xây dựng pipeline chuyển đổi dữ liệu

In [7]:
# Chuyển đổi cột user
user_indexer = StringIndexer(inputCol=USER_COL, outputCol="userIndex")

# Chuyển đổi cột item
item_indexer = StringIndexer(inputCol=ITEM_COL, outputCol="itemIndex")

**Setup model**

It is worth noting that Spark ALS model allows dropping cold users to favor a robust evaluation with the testing data. In case there are cold users, Spark ALS implementation allows users to drop cold users in order to make sure evaluations on the prediction results are sound.

In [8]:
als = ALS(
    maxIter=MAX_ITER,
    rank=RANK,
    regParam=REG_PARAM,
    userCol="userIndex",
    itemCol="itemIndex",
    ratingCol=RATING_COL,
    nonnegative=True,
    coldStartStrategy="drop",
)

**Train model**

In [9]:
# make pipeline
model_pipeline = Pipeline(stages=[user_indexer, item_indexer, als])

# fit transform train data
model = model_pipeline.fit(train)

24/08/21 07:57:46 WARN DAGScheduler: Broadcasting large task binary with size 1108.0 KiB
24/08/21 07:57:46 WARN DAGScheduler: Broadcasting large task binary with size 1110.3 KiB
24/08/21 07:57:47 WARN DAGScheduler: Broadcasting large task binary with size 1111.8 KiB


CodeCache: size=131072Kb used=24798Kb max_used=24812Kb free=106273Kb
 bounds [0x00000001069e8000, 0x0000000108248000, 0x000000010e9e8000]
 total_blobs=9720 nmethods=8731 adapters=900
 compilation: disabled (not enough contiguous free space left)


24/08/21 07:57:47 WARN DAGScheduler: Broadcasting large task binary with size 1113.1 KiB
24/08/21 07:57:52 WARN DAGScheduler: Broadcasting large task binary with size 1112.1 KiB
24/08/21 07:57:52 WARN DAGScheduler: Broadcasting large task binary with size 1113.4 KiB
24/08/21 07:57:53 WARN DAGScheduler: Broadcasting large task binary with size 1114.2 KiB
24/08/21 07:57:53 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/08/21 07:57:54 WARN DAGScheduler: Broadcasting large task binary with size 1117.3 KiB
24/08/21 07:57:54 WARN DAGScheduler: Broadcasting large task binary with size 1118.7 KiB
24/08/21 07:57:54 WARN DAGScheduler: Broadcasting large task binary with size 1120.1 KiB
24/08/21 07:57:54 WARN DAGScheduler: Broadcasting large task binary with size 1121.5 KiB
24/08/21 07:57:54 WARN DAGScheduler: Broadcasting large task binary with size 1122.9 KiB
24/08/21 07:57:54 WARN DAGScheduler: Broadcasting large task binary with size 1124.3 KiB
24/

**Model Prediction**

In [10]:
test_predictions = model.transform(test)

## Model Evaluation

In [11]:
evaluations = SparkRatingEvaluation(
    test_predictions,
    test_predictions,
    col_user="userIndex",
    col_item="itemIndex",
    col_rating=RATING_COL,
    col_prediction="prediction",
)

print(
    "RMSE score = {}".format(evaluations.rmse()),
    "MAE score = {}".format(evaluations.mae()),
    "R2 score = {}".format(evaluations.rsquared()),
    "Explained variance score = {}".format(evaluations.exp_var()),
    sep="\n",
)

24/08/21 07:57:57 WARN DAGScheduler: Broadcasting large task binary with size 1088.5 KiB
24/08/21 07:57:57 WARN DAGScheduler: Broadcasting large task binary with size 1166.1 KiB
24/08/21 07:57:57 WARN DAGScheduler: Broadcasting large task binary with size 1164.8 KiB
24/08/21 07:57:58 WARN DAGScheduler: Broadcasting large task binary with size 1195.1 KiB
24/08/21 07:58:00 WARN DAGScheduler: Broadcasting large task binary with size 1223.4 KiB
24/08/21 07:58:01 WARN DAGScheduler: Broadcasting large task binary with size 1088.5 KiB
24/08/21 07:58:01 WARN DAGScheduler: Broadcasting large task binary with size 1166.1 KiB
24/08/21 07:58:01 WARN DAGScheduler: Broadcasting large task binary with size 1164.8 KiB
24/08/21 07:58:01 WARN DAGScheduler: Broadcasting large task binary with size 1195.1 KiB
24/08/21 07:58:02 WARN DAGScheduler: Broadcasting large task binary with size 1223.4 KiB
24/08/21 07:58:02 WARN DAGScheduler: Broadcasting large task binary with size 1089.0 KiB
24/08/21 07:58:02 WAR

RMSE score = 0.5255889928752164
MAE score = 0.146146664142994
R2 score = 0.6196963192818451
Explained variance score = 0.6204599566890703


                                                                                

Các ranking metrics should be apply the differenct from those that have been rated by the users

In [13]:
dfs_pred_exclude_train.show()

24/08/21 08:24:35 WARN DAGScheduler: Broadcasting large task binary with size 1166.1 KiB
24/08/21 08:24:35 WARN DAGScheduler: Broadcasting large task binary with size 1164.8 KiB
24/08/21 08:24:36 WARN DAGScheduler: Broadcasting large task binary with size 1126.8 KiB
24/08/21 08:25:14 WARN DAGScheduler: Broadcasting large task binary with size 1209.4 KiB
24/08/21 08:26:35 WARN DAGScheduler: Broadcasting large task binary with size 1234.7 KiB
24/08/21 08:28:37 WARN DAGScheduler: Broadcasting large task binary with size 1243.1 KiB
[Stage 1040:>                                                       (0 + 1) / 1]

+--------+------+---------+---------+-----------+-------+------+-----------+
| acnt_no|symbol|userIndex|itemIndex| prediction|acnt_no|symbol|final_score|
+--------+------+---------+---------+-----------+-------+------+-----------+
|00000002|   ACV|  17914.0|    167.0| 0.55084217|   NULL|  NULL|       NULL|
|00000002|   AFC|  17914.0|   1559.0| 0.03381136|   NULL|  NULL|       NULL|
|00000002|   AMD|  17914.0|    136.0|  1.4452658|   NULL|  NULL|       NULL|
|00000002|   APT|  17914.0|   1115.0| 0.07506817|   NULL|  NULL|       NULL|
|00000002|   AVF|  17914.0|    515.0| 0.31911272|   NULL|  NULL|       NULL|
|00000002|   BAL|  17914.0|    951.0| 0.26299208|   NULL|  NULL|       NULL|
|00000002|   BBT|  17914.0|    952.0|  0.0714774|   NULL|  NULL|       NULL|
|00000002|   BCF|  17914.0|    924.0| 0.31961817|   NULL|  NULL|       NULL|
|00000002|   BDG|  17914.0|    697.0| 0.13765448|   NULL|  NULL|       NULL|
|00000002|   BDT|  17914.0|    791.0| 0.62766224|   NULL|  NULL|       NULL|

                                                                                

24/08/21 08:39:09 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 259264 ms exceeds timeout 120000 ms
24/08/21 08:39:09 WARN SparkContext: Killing executors is not supported by current scheduler.
24/08/21 08:39:11 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$

In [12]:
# Get the cross join of all user-item pairs and score them.
users = train.select(USER_COL).distinct()
items = train.select(ITEM_COL).distinct()
user_item = users.crossJoin(items)
dfs_pred = model.transform(user_item)

# Remove seen items.
dfs_pred_exclude_train = dfs_pred.alias("pred").join(
    train.alias("train"),
    (dfs_pred[USER_COL] == train[USER_COL])
    & (dfs_pred[ITEM_COL] == train[ITEM_COL]),
    how="outer",
)

dfs_pred_final = dfs_pred_exclude_train.filter(
    dfs_pred_exclude_train["train.Rating"].isNull()
).select("pred." + USER_COL, "pred." + ITEM_COL, "pred." + "prediction")

dfs_pred_final.show()


24/08/21 07:58:16 WARN Column: Constructing trivially true equals predicate, 'acnt_no#0 = acnt_no#0'. Perhaps you need to use aliases.


AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `train`.`Rating` cannot be resolved. Did you mean one of the following? [`acnt_no`, `symbol`, `userIndex`, `itemIndex`, `prediction`, `acnt_no`, `symbol`, `final_score`].

## Appendix

### Transform label

In [7]:
import pandas as pd

train = pd.read_csv(
    r"data/label/data_process_UserItemRating_min5items_train.csv",
    sep="|",
    dtype={"acnt_no": str},
)
test = pd.read_csv(
    r"data/label/data_process_UserItemRating_min5items_test.csv",
    sep="|",
    dtype={"acnt_no": str},
)