In [2]:
import pandas as pd
import numpy as np

In [3]:

R = np.array([
    [5, 3, 0, 1],
    [4, 0, 0, 1],
    [1, 1, 0, 5],
    [1, 0, 0, 4],
    [0, 1, 5, 4],
])

num_users, num_items = R.shape

In [4]:
import torch
import torch.nn as nn
import torch.optim as optim

In [5]:
class MatrixFactorization(nn.Module):
    def __init__(self, num_users, num_items, latent_dim=10):
        super(MatrixFactorization, self).__init__()
        self.user_factors = nn.Embedding(num_users, latent_dim)
        self.item_factors = nn.Embedding(num_items, latent_dim)

    def forward(self, user, item):
        user_embedding = self.user_factors(user)
        item_embedding = self.item_factors(item)
        return (user_embedding * item_embedding).sum(1)

In [13]:
model = MatrixFactorization(num_users, num_items)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.01)

In [14]:
# 训练矩阵分解模型
epochs = 10  # 训练次数可根据需求调整
for epoch in range(epochs):
    total_loss = 0
    for user in range(num_users):
        for item in range(num_items):
            if R[user, item] > 0:
                user_tensor = torch.tensor([user], dtype=torch.long)
                item_tensor = torch.tensor([item], dtype=torch.long)
                rating_tensor = torch.tensor([R[user, item]], dtype=torch.float)
                prediction = model(user_tensor, item_tensor)
                loss = criterion(prediction, rating_tensor)
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()
                total_loss += loss.item()
    if (epoch + 1) % 1 == 0:
        print(f'Epoch {epoch + 1}/{epochs}, Loss: {total_loss:.4f}')

Epoch 1/10, Loss: 352.2582
Epoch 2/10, Loss: 299.1411
Epoch 3/10, Loss: 259.4073
Epoch 4/10, Loss: 225.3027
Epoch 5/10, Loss: 195.7170
Epoch 6/10, Loss: 169.8949
Epoch 7/10, Loss: 147.2373
Epoch 8/10, Loss: 127.2727
Epoch 9/10, Loss: 109.6356
Epoch 10/10, Loss: 94.0438


In [15]:
# 生成用户对电影的预测评分
user_id = 0  # 假设我们要为第一个用户生成推荐
user_tensor = torch.tensor([user_id], dtype=torch.long)
predicted_scores = (model.user_factors(user_tensor) @ model.item_factors.weight.t()).detach().numpy().flatten()

In [16]:
# 获取Top-N推荐项目
N = 2  # Top-N推荐列表中的项目数
top_N_items = np.argsort(predicted_scores)[-N:]
print(f"Top-{N} 推荐项目: {top_N_items}")

Top-2 推荐项目: [0 2]


In [17]:
def influence_function(user_embedding, item_embedding, target_item_embedding, loss_fn):
    return torch.autograd.grad(loss_fn(user_embedding * target_item_embedding, item_embedding * target_item_embedding), user_embedding, retain_graph=True)[0]

target_item = 1  # 假设我们要攻击的目标项目（此处选择第一个项目）

user_to_attack = 0  # 假设我们选择第一个用户进行攻击

user_embedding = model.user_factors(torch.tensor([user_to_attack], dtype=torch.long))
item_embedding = model.item_factors(torch.tensor([target_item], dtype=torch.long))
target_item_embedding = model.item_factors(torch.tensor([target_item], dtype=torch.long))

influential_user_embedding = influence_function(user_embedding, item_embedding, target_item_embedding, criterion)

fake_user_embedding = user_embedding + influential_user_embedding

fake_ratings = (fake_user_embedding @ model.item_factors.weight.t()).detach().numpy()

print("伪造用户的评分: ", fake_ratings)

伪造用户的评分:  [[ 0.18888462 -0.7721038   2.0993068   0.05643463]]


In [18]:
N = 2  # Top-N推荐列表中的项目数

original_recommendations = []
for user in range(num_users):
    user_tensor = torch.tensor([user], dtype=torch.long)
    scores = (model.user_factors(user_tensor) @ model.item_factors.weight.t()).detach().numpy().flatten()
    top_N_items = np.argsort(scores)[-N:]
    original_recommendations.append(top_N_items)

model.user_factors = nn.Embedding(num_users + 1, model.user_factors.embedding_dim)  # 增加一个伪造用户
model.user_factors.weight.data[:num_users] = model.user_factors.weight.data[:num_users]  # 保持原始用户嵌入不变
model.user_factors.weight.data[num_users] = fake_user_embedding  # 添加伪造用户嵌入

updated_recommendations = []
for user in range(num_users):
    user_tensor = torch.tensor([user], dtype=torch.long)
    scores = (model.user_factors(user_tensor) @ model.item_factors.weight.t()).detach().numpy().flatten()
    top_N_items = np.argsort(scores)[-N:]
    updated_recommendations.append(top_N_items)

original_hit_ratio = sum([target_item in rec for rec in original_recommendations]) / num_users
updated_hit_ratio = sum([target_item in rec for rec in updated_recommendations]) / num_users

print(f"原始命中率: {original_hit_ratio:.2f}")
print(f"更新后命中率: {updated_hit_ratio:.2f}")

原始命中率: 0.20
更新后命中率: 0.60
