In [1]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

# ================================
# 1. 读取并准备数据
# ================================
user_features = pd.read_csv('user_features.csv')
movie_features = pd.read_csv('movie_features.csv')
ratings = pd.read_csv('ratings_cleaned.csv')  # 包含 userId, movieId, rating, ts(或timestamp)

# 建立 userId 和 movieId 到连续索引的映射
unique_users = user_features['userId'].unique()
unique_movies = movie_features['movieId'].unique()

user2index = {u: i for i, u in enumerate(unique_users)}
movie2index = {m: i for i, m in enumerate(unique_movies)}

# 在 user_features 和 movie_features 中新增一列 idx
user_features['user_idx'] = user_features['userId'].map(user2index)
movie_features['movie_idx'] = movie_features['movieId'].map(movie2index)

# 筛选我们需要的特征列（示例中直接保留除去 userId, user_idx 以外的列都当作特征）
# 你也可以根据需求手动指定特征列
user_feature_cols = [c for c in user_features.columns if c not in ['userId','user_idx']]
movie_feature_cols = [c for c in movie_features.columns if c not in ['movieId','movie_idx','title','genres','year']]

print("User feature columns:", user_feature_cols)
print("Movie feature columns:", movie_feature_cols)


User feature columns: ['user_avg_rating', 'user_rating_count', 'user_rating_std', 'user_rating_max', 'user_rating_min', 'user_last_timestamp', 'Action', 'Adventure', 'Animation', 'Children', 'Comedy', 'Crime', 'Documentary', 'Drama', 'Fantasy', 'Film-Noir', 'Horror', 'IMAX', 'Musical', 'Mystery', 'Romance', 'Sci-Fi', 'Thriller', 'War', 'Western']
Movie feature columns: ['(no genres listed)', 'Action', 'Adventure', 'Animation', 'Children', 'Comedy', 'Crime', 'Documentary', 'Drama', 'Fantasy', 'Film-Noir', 'Horror', 'IMAX', 'Musical', 'Mystery', 'Romance', 'Sci-Fi', 'Thriller', 'War', 'Western', 'movie_avg_rating', 'movie_rating_count', 'movie_rating_std']


In [2]:


# ================================
# 2. 多通道召回函数
# ================================
# 2.1 基于规则的召回
# -------------------------------
# 示例：热门召回 (movie_rating_count 排序) / 高分召回 (movie_avg_rating 排序)
def rule_based_recall(top_k=50, method='popularity'):
    if method == 'popularity':
        sorted_df = movie_features.sort_values('movie_rating_count', ascending=False)
    elif method == 'high_score':
        sorted_df = movie_features.sort_values('movie_avg_rating', ascending=False)
    else:
        sorted_df = movie_features.sort_values('movie_rating_count', ascending=False)
    return sorted_df.head(top_k)['movieId'].tolist()

# 2.2 基于协同过滤（Item-based CF）召回
# -------------------------------
# 构建物品相似度矩阵（余弦相似度）
ratings_pivot = ratings.pivot_table(index='userId', columns='movieId', values='rating', fill_value=0)
item_user_matrix = ratings_pivot.T  # 行=movieId, 列=userId
# 计算余弦相似度
from sklearn.metrics.pairwise import cosine_similarity
similarity_matrix = cosine_similarity(item_user_matrix.values)
similarity_df = pd.DataFrame(similarity_matrix, 
                             index=item_user_matrix.index, 
                             columns=item_user_matrix.index)

def cf_recall(user_id, top_n=50, rating_threshold=3.5):
    if user_id not in ratings_pivot.index:
        # 如果该用户在训练集中不存在，则用热门召回
        return rule_based_recall(top_k=top_n)
    user_ratings = ratings_pivot.loc[user_id]
    liked_movies = user_ratings[user_ratings >= rating_threshold].index
    
    score_series = pd.Series(dtype=float)
    for mid in liked_movies:
        score_series = score_series.add(similarity_df[mid], fill_value=0)
    
    # 去掉用户看过的电影
    score_series = score_series.drop(liked_movies, errors='ignore')
    score_series = score_series.sort_values(ascending=False)
    return score_series.head(top_n).index.tolist()



In [3]:

# ================================
# 3. 双塔模型 (PyTorch)
# ================================
# 3.1 定义SENet模块
class SENetLayer(nn.Module):
    """
    PyTorch 版 SENet (简化): Squeeze and Excitation
    """
    def __init__(self, channel_dim, reduction_ratio=4):
        super(SENetLayer, self).__init__()
        hidden_dim = channel_dim // reduction_ratio
        self.fc_squeeze = nn.Linear(channel_dim, hidden_dim)
        self.fc_excitation = nn.Linear(hidden_dim, channel_dim)
        self.relu = nn.ReLU()
        self.sigmoid = nn.Sigmoid()
    
    def forward(self, x):
        """
        x: [batch_size, channel_dim]
        """
        # Squeeze: Global Average Pooling (对batch内每个样本自身的feature做平均)
        # 但这里 x 已经是 [batch, channel_dim]，相当于 seq_len=1 情况
        # 也可以对多维度做 pooling，这里做简化
        avg = x.mean(dim=0, keepdim=True)  # 也可以 mean(dim=1) 看具体需求
        # 如果要做真正的通道注意力，需要在CNN或多时序维度的场景更明显
        # 这里只是模拟流程
        
        # 全连接
        z = self.fc_squeeze(avg)      # [1, hidden_dim]
        z = self.relu(z)
        z = self.fc_excitation(z)     # [1, channel_dim]
        z = self.sigmoid(z)           # [1, channel_dim]
        
        # 通道注意力乘法
        # broadcast到 [batch_size, channel_dim]
        scale = x * z
        return scale

# 3.2 定义User Tower和Item Tower
class UserTower(nn.Module):
    def __init__(self, input_dim, emb_dim=16):
        super(UserTower, self).__init__()
        self.mlp = nn.Sequential(
            nn.Linear(input_dim, 64),
            nn.ReLU(),
            # SENet
            # 由于 PyTorch MLP 直接是 [batch, feature_dim], 我们这里用一个SENet对这维度做处理
            SENetLayer(channel_dim=64, reduction_ratio=4),
            nn.Linear(64, 32),
            nn.ReLU(),
            nn.Linear(32, emb_dim)
        )
    
    def forward(self, x):
        """
        x: [batch_size, input_dim]
        return: [batch_size, emb_dim]
        """
        out = self.mlp(x)  # [batch_size, emb_dim]
        # L2 normalize
        out = nn.functional.normalize(out, p=2, dim=1)
        return out

class ItemTower(nn.Module):
    def __init__(self, input_dim, emb_dim=16):
        super(ItemTower, self).__init__()
        self.mlp = nn.Sequential(
            nn.Linear(input_dim, 64),
            nn.ReLU(),
            SENetLayer(channel_dim=64, reduction_ratio=4),
            nn.Linear(64, 32),
            nn.ReLU(),
            nn.Linear(32, emb_dim)
        )
    
    def forward(self, x):
        out = self.mlp(x)
        out = nn.functional.normalize(out, p=2, dim=1)
        return out

# 3.3 定义双塔模型
class TwoTowerModel(nn.Module):
    def __init__(self, user_dim, item_dim, emb_dim=16):
        super(TwoTowerModel, self).__init__()
        self.user_tower = UserTower(user_dim, emb_dim=emb_dim)
        self.item_tower = ItemTower(item_dim, emb_dim=emb_dim)
    
    def forward(self, user_x, item_x):
        """
        user_x: [batch_size, user_dim]
        item_x: [batch_size, item_dim]
        return: [batch_size, 1] (sigmoid后的点击/喜好概率)
        """
        u_emb = self.user_tower(user_x)  # [batch, emb_dim]
        i_emb = self.item_tower(item_x)  # [batch, emb_dim]
        
        # dot => 余弦相似度 (因已做 L2 norm)
        logit = torch.sum(u_emb * i_emb, dim=1, keepdim=True)  # [batch,1]
        out = torch.sigmoid(logit)  # (0,1)
        return out

# 3.4 构建训练数据
# -------------------------------
# 示例: 将评分 >= 3.5 视为正样本, 否则负样本
ratings['label'] = (ratings['rating'] >= 3.5).astype(int)
# 采样一部分数据用于演示
ratings_sample = ratings.sample(frac=0.01, random_state=42)

# 准备 user_feature, item_feature
def get_user_feature(u_id):
    u_idx = user2index[u_id]
    row = user_features.loc[user_features['user_idx'] == u_idx, user_feature_cols]
    return row.values[0]

def get_item_feature(i_id):
    i_idx = movie2index[i_id]
    row = movie_features.loc[movie_features['movie_idx'] == i_idx, movie_feature_cols]
    return row.values[0]

class TwoTowerDataset(Dataset):
    def __init__(self, df):
        super(TwoTowerDataset, self).__init__()
        self.samples = df[['userId','movieId','label']].values
    
    def __len__(self):
        return len(self.samples)
    
    def __getitem__(self, idx):
        user_id, movie_id, label = self.samples[idx]
        user_feat = get_user_feature(user_id)
        item_feat = get_item_feature(movie_id)
        return torch.tensor(user_feat, dtype=torch.float32), \
               torch.tensor(item_feat, dtype=torch.float32), \
               torch.tensor(label, dtype=torch.float32)

dataset = TwoTowerDataset(ratings_sample)
dataloader = DataLoader(dataset, batch_size=256, shuffle=True)

# 3.5 训练模型
user_dim = len(user_feature_cols)
item_dim = len(movie_feature_cols)
emb_dim = 16

model = TwoTowerModel(user_dim, item_dim, emb_dim=emb_dim)
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=1e-3)

# 这里简单地跑几轮
epochs = 5
model.train()
for epoch in range(epochs):
    total_loss = 0.0
    for user_x, item_x, label in dataloader:
        optimizer.zero_grad()
        pred = model(user_x, item_x)
        loss = criterion(pred, label.unsqueeze(1))
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    print(f"Epoch {epoch+1}/{epochs}, Loss: {total_loss/len(dataloader):.4f}")

# 3.6 生成全量 embedding
# -------------------------------
model.eval()
with torch.no_grad():
    # 计算所有电影embedding
    all_item_idx = movie_features['movie_idx'].values
    item_feat_mat = movie_features[movie_feature_cols].values
    item_feat_tensor = torch.tensor(item_feat_mat, dtype=torch.float32)
    all_item_emb = model.item_tower(item_feat_tensor)  # [num_items, emb_dim]
    
    # 建立 idx2movie
    idx2movie = {v: k for k, v in movie2index.items()}

def tower_recall(user_id, top_n=50):
    """
    使用训练好的双塔模型做召回
    1. 计算该用户embedding
    2. 与所有item embedding做相似度(点积)
    3. 取 top_n
    """
    if user_id not in user2index:
        return rule_based_recall(top_k=top_n)
    
    model.eval()
    with torch.no_grad():
        # 获取用户特征
        u_feat = get_user_feature(user_id)
        u_feat_tensor = torch.tensor(u_feat, dtype=torch.float32).unsqueeze(0)  # [1, user_dim]
        user_emb = model.user_tower(u_feat_tensor)  # [1, emb_dim]
        
        # 计算相似度
        # 因为已 L2 normalize, dot 即 cosine
        sim = torch.matmul(all_item_emb, user_emb.squeeze(0))  # [num_items]
        sim = sim.numpy()  # 转成 numpy
        
        # 排序
        top_indices = np.argsort(sim)[::-1][:top_n]
        top_movie_ids = [idx2movie[all_item_idx[i]] for i in top_indices]
        return top_movie_ids



Epoch 1/5, Loss: 0.6844
Epoch 2/5, Loss: 0.6672
Epoch 3/5, Loss: 0.6621
Epoch 4/5, Loss: 0.6546
Epoch 5/5, Loss: 0.6527


In [4]:

# ================================
# 4. 多通道召回控制
# ================================
def multi_channel_recall(user_id, top_n=50, channels=['rule','cf','tower']):
    candidates = set()
    for ch in channels:
        if ch == 'rule':
            # 默认热门召回
            c = rule_based_recall(top_k=top_n, method='popularity')
            candidates.update(c)
        elif ch == 'cf':
            c = cf_recall(user_id, top_n=top_n)
            candidates.update(c)
        elif ch == 'tower':
            c = tower_recall(user_id, top_n=top_n)
            candidates.update(c)
        else:
            pass
    # 简单去重后截取
    candidates_list = list(candidates)
    if len(candidates_list) > top_n:
        candidates_list = candidates_list[:top_n]
    return candidates_list


In [5]:

# ================================
# 5. 测试
# ================================
test_user_id = user_features['userId'].sample(1).iloc[0]
print("测试用户ID:", test_user_id)

rule_candidates = multi_channel_recall(test_user_id, top_n=10, channels=['rule'])
print("规则通道召回:", rule_candidates)

cf_candidates = multi_channel_recall(test_user_id, top_n=10, channels=['cf'])
print("CF通道召回:", cf_candidates)

tower_candidates = multi_channel_recall(test_user_id, top_n=10, channels=['tower'])
print("双塔通道召回:", tower_candidates)

all_candidates = multi_channel_recall(test_user_id, top_n=10, channels=['rule','cf','tower'])
print("全部通道召回:", all_candidates)


测试用户ID: 275
规则通道召回: [480, 356, 260, 296, 2571, 589, 110, 527, 593, 318]
CF通道召回: [4384, 5884, 1574, 1641, 5258, 298, 2964, 1206, 5272, 3192]
双塔通道召回: [4384, 4386, 4387, 4388, 4389, 4390, 4392, 193609, 4394, 4393]
全部通道召回: [260, 5258, 2571, 527, 2964, 5272, 4384, 4386, 4387, 4388]
