# 相关训练过程

1. 读取数据集

In [1]:
import pandas as pd

ratings = pd.read_csv(
    "../datasets/ml-25m/ratings.csv"
)
movies = pd.read_csv(
    "../datasets/ml-25m/movies.csv",
)
df = pd.merge(ratings, movies[['movieId', 'title']], on='movieId')


2. 使用留出法制作训练集和测试集

In [None]:
import pandas as pd
import numpy as np

def split_low_memory(df, test_ratio=0.2, min_items=5, seed=42):
    rng = np.random.default_rng(seed)
    test_indices = []

    for uid, group in df.groupby("userId"):
        if len(group) < min_items:
            continue

        test_size = max(1, int(len(group) * test_ratio))
        chosen = rng.choice(group.index, size=test_size, replace=False)
        test_indices.extend(chosen)

    test_indices = set(test_indices)
    mask = df.index.isin(test_indices)
    return df[~mask], df[mask]


train_df, test_df = split_low_memory(df)
print("train:", len(train_df), "test:", len(test_df))

train: 20062533 test: 4937562


回收df，减少内存占用

In [3]:
import gc
del df

gc.collect()

0

3. 建立训练集的稀疏矩阵，同时将测试集转换为（用户，电影，评分）元组的列表

In [4]:
from scipy.sparse import csr_matrix
import pandas as pd


def build_train_test(train_df, test_df):
    # category 创建映射
    user_cat = train_df["userId"].astype("category")
    movie_cat = train_df["movieId"].astype("category")

    u_train = user_cat.cat.codes.values
    m_train = movie_cat.cat.codes.values
    r_train = train_df["rating"].values

    user_categories = user_cat.cat.categories
    movie_categories = movie_cat.cat.categories

    # 测试集映射到训练集空间
    u_test = pd.Categorical(test_df["userId"], user_categories).codes
    m_test = pd.Categorical(test_df["movieId"], movie_categories).codes
    r_test = test_df["rating"].values

    # 过滤掉 -1（训练集没出现）
    mask = (u_test != -1) & (m_test != -1)
    test_tuples = list(zip(u_test[mask], m_test[mask], r_test[mask]))

    mat = csr_matrix((r_train, (u_train, m_train)), shape=(len(user_categories), len(movie_categories)))

    return mat, test_tuples
train_sparse, test_tuples = build_train_test(train_df, test_df)
print("train sparse shape:", train_sparse.shape)
print("test tuples:", len(test_tuples))

train sparse shape: (162541, 56654)
test tuples: 4934713


4. 将训练集进行svd分解防止在后续训练过程中爆内存

In [6]:
from sklearn.preprocessing import normalize
from sklearn.decomposition import TruncatedSVD


def build_user_vectors(train_mat,dim=128):
    # 每个用户是一行
    # 我们需要 dense 向量给 hnswlib，用 float32
    svd = TruncatedSVD(n_components=dim, random_state=42)
    X = svd.fit_transform(train_mat.astype("float32"))
    X = normalize(X, axis=1)
    return X

train_user_vectors = build_user_vectors(train_sparse)
print("train user vectors shape:", train_user_vectors.shape)

train user vectors shape: (162541, 128)


5. 建立hnsw进行查询时需要的索引

In [None]:
import hnswlib


def build_ann_index(X, ef=200, M=32):
    dim = X.shape[1]
    num_users = X.shape[0]
    index = hnswlib.Index(space="cosine", dim=dim)
    index.init_index(max_elements=num_users, ef_construction=ef, M=M)
    index.add_items(X)
    index.set_ef(ef)
    return index

ann_index = build_ann_index(train_user_vectors)
print("ANN index built.")

ANN index built.


6. 将每个用户的前50个最相似用户及相应的余弦距离进行缓存

In [None]:
import numpy as np


def cache_user_neighbors(index, user_emb, k):
    num_users = user_emb.shape[0]
    neighbors_cache = {}
    sims_cache = {}

    for u in range(num_users):
        neigh, sims = index.knn_query(user_emb[u], k=k + 1)
        neigh = neigh[0]
        sims = sims[0]

        mask = neigh != u
        neighbors_cache[u] = neigh[mask]
        sims_cache[u] = sims[mask]

    return neighbors_cache, sims_cache

neighbors_cache, sims_cache = cache_user_neighbors(ann_index, train_user_vectors, k=50)
print("Neighbors cache shape:", len(neighbors_cache))
print("Sims cache shape:", len(sims_cache))

Neighbors cache shape: (162541, 50)
Sims cache shape: (162541, 50)


7. 根据前面得到的缓存计算每个用户对电影列表内电影的评分

In [None]:
def predict_user_movies(train_mat, user, movie_list, neighbors_cache, sims_cache):
    neigh = neighbors_cache[user]
    sims = sims_cache[user] 

    neigh_ratings = train_mat[neigh].toarray()
    sims = sims.reshape(-1, 1)

    weighted_sum = (neigh_ratings * sims).sum(axis=0)
    weight_norm = sims.sum()

    user_pred = weighted_sum / (weight_norm + 1e-8)

    return user_pred[movie_list]


8. 计算rmse(均方根误差)

In [None]:
def rmse_user_batch(train_mat, test_tuples, neighbors_cache, sims_cache):
    # 把 test_tuples 按用户聚合
    user_to_movies = {}
    for u, m, r in test_tuples:
        user_to_movies.setdefault(u, []).append((m, r))

    se = 0.0
    n = 0

    for u, pairs in user_to_movies.items():
        movies = [m for m, _ in pairs]
        reals = np.array([r for _, r in pairs])

        preds = predict_user_movies(train_mat, u, movies, neighbors_cache, sims_cache)

        se += ((preds - reals) ** 2).sum()
        n += len(pairs)

    return (se / n) ** 0.5

rmse = rmse_user_batch(train_sparse, test_tuples, neighbors_cache, sims_cache)
print("Test RMSE:", rmse)

Test RMSE: 2.607566735338708


9. 计算用户对所有电影的评分，并得到其中的topk个电影

In [None]:
def predict_user_all_movies(train_mat, user, neighbors_cache, sims_cache):
    neigh = neighbors_cache[user]
    sims = sims_cache[user].reshape(-1, 1)

    neigh_ratings = train_mat[neigh].toarray()  # K × M

    weighted_sum = (neigh_ratings * sims).sum(axis=0)
    denom = sims.sum() + 1e-8

    preds = weighted_sum / denom  # shape = (M,)
    return preds

def recommend_topk(train_mat, user, Krec, watched_set, neighbors_cache, sims_cache):
    preds = predict_user_all_movies(train_mat, user, neighbors_cache, sims_cache)

    # 去掉用户训练集中已经看过的电影
    preds[list(watched_set)] = -1e9

    # 选取评分最高的 Krec
    topk = np.argpartition(preds, -Krec)[-Krec:]
    topk = topk[np.argsort(-preds[topk])]

    return topk

topk = recommend_topk(train_sparse, user=0, Krec=10, watched_set={1,2,3}, neighbors_cache=neighbors_cache, sims_cache=sims_cache)
print("Top-K recommendations for user 0:", topk)

Top-K recommendations for user 0: [4867 7236  292 5904 6588 4040 2600  302 7198 4742]


10. 进行召回率的计算

In [None]:
def recall_at_k(train_mat, test_tuples, neighbors_cache, sims_cache, Krec=50, like_threshold=4.0):
    # 组织测试集：user → [(movie, rating)]
    test_map = {}
    for u, m, r in test_tuples:
        test_map.setdefault(u, []).append((m, r))

    # 训练集已看电影（避免推荐重复）
    watched = {u: set(train_mat[u].indices) for u in range(train_mat.shape[0])}

    sum_recall = 0
    cnt_user = 0

    for u, pairs in test_map.items():
        # 用户在测试集中真正喜欢的电影
        liked = [m for m, r in pairs if r >= like_threshold]
        if not liked:
            continue  # 没有喜欢电影就跳过，不影响指标

        rec_list = recommend_topk(train_mat, u, Krec, watched[u], neighbors_cache, sims_cache)

        hit = sum(1 for m in liked if m in rec_list)
        recall = hit / len(liked)

        sum_recall += recall
        cnt_user += 1

    return sum_recall / cnt_user

topk_recall = recall_at_k(train_sparse, test_tuples, neighbors_cache, sims_cache, Krec=50, like_threshold=4.0)
print("Top-K Recall@50:", topk_recall)

Top-K Recall@50: 0.5043139056021698
