# Alternating Least Squares (ALS) Model

ALS (Alternating Least Squares) is a collaborative filtering algorithm commonly used in recommendation systems. It excels at matrix factorization, aiming to predict missing entries in a user-item interaction matrix. The core idea is to decompose this matrix into two lower-dimensional matrices: one capturing user preferences and the other capturing item characteristics.

**How ALS Works**

- **Matrix Factorization:**  
  Given a user-item interaction matrix \( R \) (such as ratings or watch ratios), ALS approximates \( R \) as the product of two matrices:  
  - \( U \): Each row represents a user's latent preferences.  
  - \( V \): Each row represents an item's latent features.  
  The objective is to minimize the reconstruction error between \( R \) and \( U \times V^T \).

- **Alternating Optimization:**  
  ALS works by alternately fixing one matrix (e.g., \( U \)) and solving for the other (e.g., \( V \)), repeating this process until convergence or a set number of iterations.

- **Handling Sparsity:**  
  Since real-world interaction matrices are typically sparse (with many missing values), ALS is designed to efficiently optimize using only the observed interactions.

## Fetching dataset
(can ignore this part if everyhting already loaded)

In [72]:
%%bash
wget --no-check-certificate 'https://drive.usercontent.google.com/download?id=1qe5hOSBxzIuxBb1G_Ih5X-O65QElollE&export=download&confirm=t&uuid=b2002093-cc6e-4bd5-be47-9603f0b33470
' -O KuaiRec.zip
unzip KuaiRec.zip -d data_final_project

--2025-05-17 19:24:52--  https://drive.usercontent.google.com/download?id=1qe5hOSBxzIuxBb1G_Ih5X-O65QElollE&export=download&confirm=t&uuid=b2002093-cc6e-4bd5-be47-9603f0b33470%0A
Resolving drive.usercontent.google.com (drive.usercontent.google.com)... 216.58.214.161, 2a00:1450:4007:80c::2001
Connecting to drive.usercontent.google.com (drive.usercontent.google.com)|216.58.214.161|:443... connected.
HTTP request sent, awaiting response... 

Process was interrupted.


CalledProcessError: Command 'b"wget --no-check-certificate 'https://drive.usercontent.google.com/download?id=1qe5hOSBxzIuxBb1G_Ih5X-O65QElollE&export=download&confirm=t&uuid=b2002093-cc6e-4bd5-be47-9603f0b33470\n' -O KuaiRec.zip\nunzip KuaiRec.zip -d data_final_project\n"' died with <Signals.SIGINT: 2>.

## Libraries Imports

In [None]:
import os
import numpy as np
import pandas as pd
from utils import get_data_path, matrix_cleanup 

from sklearn.metrics import ndcg_score
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder, MultiLabelBinarizer

import plotly.express as px

DATA_PATH = get_data_path()
TRAINED_MODEL = "trained/pyspark_als_model"

# Step 1: Load the dataframes

In [None]:
small_matrix = pd.read_csv(f"{DATA_PATH}/small_matrix.csv")

small_matrix = matrix_cleanup(small_matrix)

big_matrix = pd.read_csv(f"{DATA_PATH}/big_matrix.csv")

big_matrix = matrix_cleanup(big_matrix)


## Caption Category

In [None]:
caption_category = pd.read_csv(f"{DATA_PATH}/kuairec_caption_category.csv", lineterminator='\n')
caption_category.head(10)

Unnamed: 0,video_id,manual_cover_text,caption,topic_tag,first_level_category_id,first_level_category_name,second_level_category_id,second_level_category_name,third_level_category_id,third_level_category_name
0,0,UNKNOWN,精神小伙路难走 程哥你狗粮慢点撒,[],8,颜值,673,颜值随拍,-124,UNKNOWN
1,1,UNKNOWN,,[],27,高新数码,-124,UNKNOWN,-124,UNKNOWN
2,2,UNKNOWN,晚饭后，运动一下！,[],9,喜剧,727,搞笑互动,-124,UNKNOWN
3,3,UNKNOWN,我平淡无奇，惊艳不了时光，温柔不了岁月，我只想漫无目的的走走，努力发笔小财，给自己买花 自己长大.,[],26,摄影,686,主题摄影,2434,景物摄影
4,4,五爱街最美美女 一天1q,#搞笑 #感谢快手我要上热门 #五爱市场 这真是完美搭配啊！,"[五爱市场,感谢快手我要上热门,搞笑]",5,时尚,737,营销售卖,2596,女装
5,5,UNKNOWN,“你们吵的越狠 他们的手就握的越紧” #文轩 #刘耀文 #宋亚轩 #顾子璇...,"[刘耀文,宋亚轩,文轩,顾子璇是樱桃吖,顾子璇超级喜欢文轩]",6,明星娱乐,667,娱乐八卦,2375,饭制
6,6,UNKNOWN,记住我的名字还有我带给你的故事.,[],19,情感,-124,UNKNOWN,-124,UNKNOWN
7,7,最高级动物.,#灵魂属性大揭秘 我变的动物最高级，不接受反驳...,[灵魂属性大揭秘],8,颜值,-124,UNKNOWN,-124,UNKNOWN
8,8,76岁老汉吃白皮拉面,#感谢快手官大大送上热门 #感谢推广小助手 #感谢推广小助手助我上热门 #看得起农村人的点个...,"[感谢快手官大大送上热门,感谢推广小助手,感谢推广小助手助我上热门,看得起农村人的点个双击吧]",12,美食,292,美食日常,1461,美食分享
9,9,UNKNOWN,,[],33,自拍,-124,UNKNOWN,-124,UNKNOWN


## Item category

We have various video characteristics (such as author_id, video_type, etc.), but these require minimal preprocessing.
For content-based filtering, we will utilize video features like the list of tags, applying a straightforward one-hot encoding approach.

In [21]:
# No missing values for this data
item_categories = pd.read_csv(f"{DATA_PATH}/item_categories.csv")

## Item daily features and User features

This dataset is also valuable for content-based filtering.
Since it primarily consists of textual data, we will encode the video features using a TF-IDF vectorizer.

In [None]:
item_daily_features = pd.read_csv(f"{DATA_PATH}/item_daily_features.csv", lineterminator='\n')
item_daily_features.fillna(-1, inplace=True)

user_features = pd.read_csv(f"{DATA_PATH}/user_features.csv", lineterminator='\n')
user_features.fillna(-1, inplace=True)

# Step 2: Feature Engineering

- Extract relevant features from interactions and metadata (such as content tags and user activity history)
- Construct the user-item interaction matrix
- Optionally, derive features based on time or item popularity

#### Item categories

In [22]:
# Use MultiLabelBinarizer to manage efficiently the feat column
mlb = MultiLabelBinarizer()

# Transform the feat column to a list (evaluate with python)
item_categories["feat"] = item_categories["feat"].apply(eval)

item_categories = pd.DataFrame(mlb.fit_transform(item_categories["feat"]), 
                  columns=mlb.classes_,
                  index=item_categories["video_id"])


item_categories.reset_index(drop=True, inplace=True)
item_categories[item_categories.columns] = item_categories[item_categories.columns].astype("int16")

item_categories.head(10)


Unnamed: 0,0,1,2,3,4,5,6,7,8,9,...,21,22,23,24,25,26,27,28,29,30
0,0,0,0,0,0,0,0,0,1,0,...,0,0,0,0,0,0,0,0,0,0
1,0,0,0,0,0,0,0,0,0,1,...,0,0,0,0,0,0,1,0,0,0
2,0,0,0,0,0,0,0,0,0,1,...,0,0,0,0,0,0,0,0,0,0
3,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,1,0,0,0,0
4,0,0,0,0,0,1,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
5,0,0,0,0,0,0,1,0,0,0,...,0,0,0,0,0,0,0,0,0,0
6,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
7,0,0,0,0,0,0,0,0,1,0,...,0,0,0,0,0,0,0,0,0,0
8,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
9,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0


#### Item daily features

We take the oldest data point for a given video_id.

Depending on the complexity, you can choose the number of features.

In [23]:
TEXT_FEATURES = ["video_type", "upload_type"] # "visible_status"
INT_FEATURES = ['video_duration','video_width', 'video_height', 'music_id', 'video_tag_id','show_cnt', 'show_user_num', 'play_cnt', 'play_user_num',
                'play_duration', 'complete_play_cnt', 'complete_play_user_num', 'valid_play_cnt', 'valid_play_user_num', 'long_time_play_cnt',
                'long_time_play_user_num', 'short_time_play_cnt', 'short_time_play_user_num', 'play_progress']

In [24]:
# Keep the latest date for each video_id
item_daily_features = item_daily_features.loc[item_daily_features.groupby("video_id")["date"].idxmax()].reset_index(drop=True)

# One-hot str features
text_daily_features = item_daily_features[TEXT_FEATURES]
onehotter = OneHotEncoder(handle_unknown="ignore")
onehot_array = onehotter.fit_transform(text_daily_features).toarray()

# Convert to DataFrame
text_daily_features = pd.DataFrame(
    onehot_array,
    columns=onehotter.get_feature_names_out(TEXT_FEATURES),
    index=item_daily_features.index)

# Merge the one-hot encoded features back into the original DataFrame
item_daily_features = pd.concat([item_daily_features[INT_FEATURES], text_daily_features], axis=1)

In [25]:
# No IDs, because it is the index
item_features_map = pd.concat([item_daily_features, item_categories], axis=1)

# Column names should be str
item_features_map.columns = item_features_map.columns.map(str)

# We can keep all the columns
item_features_columns = item_features_map.columns.tolist()

### User features

In [26]:
user_features_columns = [
    "is_lowactive_period","is_live_streamer", "is_video_author",
    "onehot_feat0", "onehot_feat1", "onehot_feat2", "onehot_feat3",
    "onehot_feat4", "onehot_feat5", "onehot_feat6", "onehot_feat7",
    "onehot_feat8", "onehot_feat9", "onehot_feat10", "onehot_feat11", 
    "onehot_feat12", "onehot_feat13", "onehot_feat14", "onehot_feat15",
    "onehot_feat16", "onehot_feat17"
]
user_features_map = user_features[user_features_columns].copy()

user_features_map[user_features_map.columns] = user_features_map[user_features_map.columns].astype("int16")

In [27]:
# Index is the associated IDs for quick creation
display(user_features_map.head(3))
display(item_features_map.head(3))

Unnamed: 0,is_lowactive_period,is_live_streamer,is_video_author,onehot_feat0,onehot_feat1,onehot_feat2,onehot_feat3,onehot_feat4,onehot_feat5,onehot_feat6,...,onehot_feat8,onehot_feat9,onehot_feat10,onehot_feat11,onehot_feat12,onehot_feat13,onehot_feat14,onehot_feat15,onehot_feat16,onehot_feat17
0,0,0,0,0,1,17,638,2,0,1,...,184,6,3,0,0,0,0,0,0,0
1,0,0,0,0,3,25,1021,0,0,1,...,186,6,2,0,0,0,0,0,0,0
2,0,0,0,0,6,8,402,0,0,0,...,51,2,3,0,0,0,0,0,0,0


Unnamed: 0,video_duration,video_width,video_height,music_id,video_tag_id,show_cnt,show_user_num,play_cnt,play_user_num,play_duration,...,21,22,23,24,25,26,27,28,29,30
0,5966.0,720,1280,3350323409,8,3710,2649,2213,1635,19547072,...,0,0,0,0,0,0,0,0,0,0
1,-1.0,886,1015,1812462382,27,30,25,8,7,94830,...,0,0,0,0,0,0,1,0,0,0
2,8000.0,720,1280,0,9,72,59,17,16,212893,...,0,0,0,0,0,0,0,0,0,0


## Dataset preparation

Environment variables

In [28]:
INTERACTION_N = 20_000_000
# Either small_matrix or big_matrix
DATASET = big_matrix
# Number of relevant items to get
K = 10
# Tolerance for considering a hit
WATCH_RATIO_THRESHOLD = 0.7

## Misc

In [29]:
print(f"Proportion of small_matrix relative to big_matrix: {small_matrix.shape[0] * 100 / big_matrix.shape[0]:.2f}%")

Proportion of small_matrix relative to big_matrix: 38.89%


# Step 3: Model architecture

We will use the ALS algorithm from pyspark.ml.recommendation with hyperparameters tuning and cross-validation.

The model is cut into 4 parts:
- Data preparation and tuning
- Model training
- Model evaluation
- Model saving


### Pyspark imports and setup

In [None]:
import pyspark
from pyspark.ml.recommendation import ALS, ALSModel
from pyspark.sql import SparkSession
from pyspark.sql.functions import collect_list, col, expr
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, CrossValidatorModel, Model

print(f"Spark version: {pyspark.__version__}")
print(f"Pandas version: {pd.__version__}")

spark = SparkSession.builder \
    .appName("KuaiRec ALS") \
    .getOrCreate()

Spark version: 3.5.5
Pandas version: 2.2.3


your 131072x1 screen size is bogus. expect trouble
25/05/17 18:50:21 WARN Utils: Your hostname, TIDJs-Warmachine resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/05/17 18:50:21 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/17 18:50:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Data preparation for Pyspark

In [32]:
# We load directly from the CSV to avoid memory issues
small_matrix = spark.read.csv(
    f"{DATA_PATH}/small_matrix.csv",
    header=True,
    sep=",",
    nullValue="",
    # We have to infer for correct types
    inferSchema=True,
).select("user_id", "video_id", "watch_ratio").na.drop(subset=["user_id", "video_id", "watch_ratio"])

                                                                                

In [None]:
# Makes a lot of problems, we will not use it for now
big_matrix = spark.read.csv(
    f"{DATA_PATH}/big_matrix.csv",
    header=True,
    sep=",",
    inferSchema=True,
    nullValue="",
).select("user_id", "video_id", "watch_ratio").na.drop(subset=["user_id", "video_id", "watch_ratio"])

### Hyperparameter tuning and Cross Validation

In [33]:
# ALS model configuration
als = ALS(
    maxIter=10,
    rank=10,
    userCol="user_id",
    itemCol="video_id",
    ratingCol="watch_ratio",
    implicitPrefs=True,
)

# For CrossValidator
params = ParamGridBuilder() \
    .addGrid(als.maxIter, [10, 15]) \
    .addGrid(als.regParam, [0.1]) \
    .build()


# RMSE
evaluator = RegressionEvaluator(metricName="rmse", labelCol="watch_ratio", predictionCol="prediction")


# CrossValidator
cvs = CrossValidator(
    estimator=als,
    estimatorParamMaps=params,
    evaluator=evaluator,
    # Between 2 and 5
    numFolds=3,
)

### Training
Now with the training, we should have:

R ≈ U x V

Where:
- R is the user-item interaction matrix
- U is the user feature matrix
- V is the item feature matrix

In [34]:
(training, test) = small_matrix.randomSplit([0.8, 0.2], seed=42)

In [35]:
# Fit the ALS model on the train data
models : CrossValidatorModel = cvs.fit(training)

25/05/17 18:51:33 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
                                                                                

### Metrics evaluation

In [36]:
# Take the best model from the CrossValidator
pyspark_als_model : Model = models.bestModel
predictions = pyspark_als_model.transform(test)
rmse = evaluator.evaluate(predictions.na.drop())

                                                                                

In [37]:
print(f"RMSE: {rmse}")
print(f"Rank: {pyspark_als_model.rank}")
print(f"MaxIter: {pyspark_als_model._java_obj.parent().getMaxIter()}")
print(f"RegParam: {pyspark_als_model._java_obj.parent().getRegParam()}")

RMSE: 1.2658234339405554
Rank: 10
MaxIter: 15
RegParam: 0.1


### Saving

In [38]:
pyspark_als_model.save(TRAINED_MODEL)

25/05/17 18:53:59 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/05/17 18:53:59 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
25/05/17 18:53:59 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
25/05/17 18:54:00 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
25/05/17 18:54:00 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/05/17 18:54:00 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/05/17 18:54:00 WARN MemoryManager: Total allocation exceeds 95.00%

# Step 4: Recommendation

- Predict which videos are likely to be enjoyed by each user in the test set
- Generate a top-N ranked list of recommendations for each user

### Loading model

In [39]:
try:
    pyspark_als_model
except NameError:
    print("Model not found. Trying to load it.")
    if os.path.exists(TRAINED_MODEL):
        print("Model found. Loading it.")
        pyspark_als_model = ALSModel.load(TRAINED_MODEL)
        print("Model loaded.")
    else:
        print("Model not found. Please train the model first.")
        raise FileNotFoundError("Model not found. Please train the model first.")


### Recommendation

In [40]:
# If you have not already used Caption Category
caption_category = pd.read_csv(f"{DATA_PATH}/kuairec_caption_category.csv", lineterminator='\n')

def video_id_to_caption(video_id: int) -> str:
    """
    Get the caption of a video from its id

    Args:
        video_id (int): The id of the video

    Returns:
        str: The caption of the video
    """

    # Get the caption from the video_id
    match = caption_category[caption_category["video_id"] == video_id]["caption"]
    # Check not NaN
    if not match.empty and match.values[0] == match.values[0]:
        return str(match.values[0])
    else:
        return "Unknown"

In [50]:
# All user recommendations
recommends = pyspark_als_model.recommendForAllUsers(10)
recommends_df = recommends.toPandas()

# Explode to have each line as a recommendation
recommends_df = recommends_df.explode("recommendations")
recommends_df["recommendations"] = recommends_df["recommendations"].apply(
    lambda x: f"{video_id_to_caption(x[0])}")
recommends_df.set_index("user_id", inplace=True)
recommends_df

                                                                                

Unnamed: 0_level_0,recommendations
user_id,Unnamed: 1_level_1
120,#2020高考季 #快手助推 #作品推广 #申请热门 #感谢快手我要上热门
120,初学，不知道这是啥动作 @玩轮滑e段子手☞王☜能能(O7469221)
120,Unknown
120,#感谢快手平台和一路陪伴与支持的家人们 #感谢官方大大给的每一次热门 #跪求官方大大给个热门...
120,完蛋了 今天又是对你的大型心动现场
...,...
7162,这才叫过弯！！！#快说车
7162,Unknown
7162,#排球 4号位打一个😉
7162,#成龙 七夕节快乐


In [67]:
# Top 5 users recommendations

# Get the top 5 users

print("Top 5 users:")
# Get the top 5 users
top_5_users = recommends_df.index.unique()[:5].tolist()
print(top_5_users)
top_users = top_5_users
top_users_recommends_df = recommends_df[recommends_df.index.isin(top_users)]
top_users_recommends_df

Top 5 users:
[120, 137, 140, 155, 157]


Unnamed: 0_level_0,recommendations
user_id,Unnamed: 1_level_1
120,#2020高考季 #快手助推 #作品推广 #申请热门 #感谢快手我要上热门
120,初学，不知道这是啥动作 @玩轮滑e段子手☞王☜能能(O7469221)
120,Unknown
120,#感谢快手平台和一路陪伴与支持的家人们 #感谢官方大大给的每一次热门 #跪求官方大大给个热门...
120,完蛋了 今天又是对你的大型心动现场
120,这才叫过弯！！！#快说车
120,Unknown
120,#排球 4号位打一个😉
120,#成龙 七夕节快乐
120,Unknown


# Step 5: Model Evaluation

In [68]:
ground_truth = test.groupBy("user_id") \
    .agg(collect_list("video_id").alias("true_items"))

predicted = recommends.select(
    col("user_id"),
    expr("transform(recommendations, x -> x.video_id)").alias("pred_items")
)
ranking_df = predicted.join(ground_truth, on="user_id", how="inner")

ranking_pd = ranking_df.toPandas()

                                                                                

In [69]:
def recall_at_k(y_true, y_pred, k):
    return len(set(y_true) & set(y_pred[:k])) / len(set(y_true)) if y_true else 0

def map_at_k(y_true, y_pred, k):
    score = 0.0
    hit_count = 0.0
    for i, p in enumerate(y_pred[:k]):
        if p in y_true:
            hit_count += 1.0
            score += hit_count / (i + 1.0)
    return score / min(len(y_true), k) if y_true else 0.0

def ndcg_at_k(y_true, y_pred, k):
    relevance = [1 if item in y_true else 0 for item in y_pred[:k]]
    return ndcg_score([relevance], [relevance])
recalls, maps, ndcgs = [], [], []

In [70]:
for row in ranking_pd.itertuples():
    true_items = row.true_items
    pred_items = row.pred_items
    
    recalls.append(recall_at_k(true_items, pred_items, 10))
    maps.append(map_at_k(true_items, pred_items, 10))
    ndcgs.append(ndcg_at_k(true_items, pred_items, 10))

print(f"Recall@10: {np.mean(recalls):.4f}")
print(f"MAP@10:    {np.mean(maps):.4f}")
print(f"NDCG@10:   {np.mean(ndcgs):.4f}")


Recall@10: 0.0029
MAP@10:    0.0805
NDCG@10:   0.8760
