In [1]:
import pandas as pd
import numpy as np
import random
from tqdm import tqdm
from itertools import islice

In [2]:
# 训练集和测试集
def get_train_data():
    train_data = []
    with open('/Users/chao/workspace/d2l/data/ml-100k/u1.base', 'r') as file:
        for line in file.readlines():
            user, item, rating, timestamp = line.split('	')
            if int(rating) >= 4:
                train_data.append((int(user), int(item), int(rating)))
    return train_data

def get_test_data():
    test_data = []
    with open('/Users/chao/workspace/d2l/data/ml-100k/u1.test', 'r') as file:
        for line in file.readlines():
            user, item, rating, timestamp = line.split('	')
            if int(rating) >= 4:
                test_data.append((int(user), int(item), int(rating)))
    return test_data

In [3]:
# 用户-物品矩阵
user_num = 943
item_num = 1682
ratings = np.zeros((user_num + 1, item_num + 1), int)
y_ui = np.zeros((user_num + 1, item_num + 1), int)

train_data = get_train_data()

for data in train_data:
    user, item, rating = data[0], data[1], data[2]
    ratings[int(user)][int(item)] = int(rating)
    y_ui[int(user)][int(item)] = 1

p = y_ui.sum()
density = p / (user_num * item_num)
r_ = ratings.sum() / p

In [4]:
# 求用户均值和物品均值
r_1 = ratings.sum(axis=0)
r_2 = ratings.sum(axis=1)
r_3 = y_ui.sum(axis=0)
r_4 = y_ui.sum(axis=1)

r_u = np.zeros(user_num + 1, float)
for i in range(1, user_num + 1):
    r_u[i] = (r_ if r_4[i] == 0 else r_2[i] / r_4[i])

r_i = np.zeros(item_num + 1, float)
for i in range(1, item_num + 1):
    r_i[i] = (r_ if r_3[i] == 0 else r_1[i] / r_3[i])

b_u = np.zeros(user_num + 1, float)
for i in range(1, user_num + 1):
    b_u[i] = (0 if r_4[i] == 0 else (y_ui[i] * (ratings[i] - r_i)).sum() / r_4[i])

b_i = np.zeros(item_num + 1, float)
for i in range(1, item_num + 1):
    b_i[i] = (0 if r_3[i] == 0 else (y_ui[:, i] * (ratings[:, i] - r_u)).sum() / r_3[i])

In [5]:
# 求test_data中的数据
test_data = get_test_data()
test_ratings = np.zeros((user_num + 1, item_num + 1), int)
y_test = np.zeros((user_num + 1, item_num + 1), int)

for data in test_data:
    user, item, rating = data[0], data[1], data[2]
    test_ratings[int(user)][int(item)] = int(rating)
    y_test[int(user)][int(item)] = 1

# 求测试集中所有评分物品数不为0的用户
test_users = []
for u in range(1, user_num + 1):
    if y_test[u].sum() != 0:
        test_users.append(u)

# 损失函数
def Pre(predict_rule, k):
    loss = 0.
    for user in test_users:
        recommended_items = get_recommended_items(user, k, predict_rule)
        # 用户感兴趣的物品
        prefer_items = I_u_test(user)
        loss += len(np.intersect1d(recommended_items, prefer_items)) / k
    return loss / len(test_users)

def Rec(predict_rule, k):
    loss = 0.
    for user in test_users:
        recommended_items = get_recommended_items(user, k, predict_rule)
        # 用户感兴趣的物品
        prefer_items = I_u_test(user)
        loss += len(np.intersect1d(recommended_items, prefer_items)) / len(prefer_items)
    return loss / len(test_users)

def F1(predict_rule, k):
    loss = 0.
    for user in test_users:
        recommended_items = get_recommended_items(user, k, predict_rule)
        # 用户感兴趣的物品
        prefer_items = I_u_test(user)
        pre = len(np.intersect1d(recommended_items, prefer_items)) / k
        rec = len(np.intersect1d(recommended_items, prefer_items)) / len(prefer_items)
        loss += 2 * (pre * rec) / (pre + rec) if pre + rec != 0 else 0
    return loss / len(test_users)

def NDCG(predict_rule, k):
    loss = 0.
    for user in test_users:
        recommended_items = get_recommended_items(user, k, predict_rule)
        # 用户感兴趣的物品
        prefer_items = I_u_test(user)
        # 计算DCG
        dcg = 0.0
        for l in range(k):
            if recommended_items[l] in prefer_items:
                 dcg += 1.0 / np.log(l + 2)
        idcg = sum(1.0 / np.log(i + 2) for i in range(min(len(prefer_items), k)))
        loss += dcg / idcg if idcg > 0 else 0.0
    return loss / len(test_users)

def One_call(predict_rule, k):
    loss = 0.
    for user in test_users:
        recommended_items = get_recommended_items(user, k, predict_rule)
        # 用户感兴趣的物品
        prefer_items = I_u_test(user)
        loss += 1 if len(np.intersect1d(recommended_items, prefer_items)) > 0 else 0
    return loss / len(test_users)

def MRR(predict_rule, k):
    loss = 0.
    for user in test_users:
        recommended_items = get_recommended_items(user, k, predict_rule)
        # 用户感兴趣的物品
        prefer_items = I_u_test(user)
        # 计算第一个在recommended_items上的prefer_items下标
        index = 0
        for l in range(k):
            if recommended_items[l] in prefer_items:
                index = l + 1
                break
        loss += 1 / index if index != 0 else 0.0
    return loss / len(test_users)

def MAP(predict_rule):
    return 0

def ARP(predict_rule):
    return 0

def AUC(predict_rule):
    return 0

In [6]:
# 用户u评分过的所有物品
def I_u(u):
    return np.where(y_ui[u] == 1)[0]

# 给物品j评分过的所有用户
def U_j(j):
    return np.where(y_ui[:, j] == 1)[0]

# 用户u评分过的所有分数
def R_u(u):
    items = np.unique(ratings[u] * y_ui[u])
    return items[items != 0]

# 求用户u和w共同评分过的物品
def I_u_w(u, w):
    return np.intersect1d(I_u(u), I_u(w))

# 给物品k和j都评分过的用户
def U_k_j(k, j):
    return np.intersect1d(U_j(k), U_j(j))

# 用户u评分为r的所有物品
def I_u_r(u, r):
    items = I_u(u)
    return np.where(ratings[u] == r)[0]

# 测试集中用户感兴趣的物品
def I_u_test(u):
    return np.where(y_test[u] == 1)[0]

In [7]:
# 取用户u的推荐物品
def get_recommended_items(u, k, predict_rule):
    ranks = {i: 0 for i in range(1, item_num + 1)}
    rated_items = I_u(u)
    ranks = {key: value for key, value in ranks.items() if key not in rated_items}
    for i in ranks:
        ranks[i] = predict_rule(u, i)
    ranks = dict(sorted(ranks.items(), key=lambda item: item[1], reverse=True))
    recommended_items = [key for key, value in islice(ranks.items(), k)]
    return recommended_items

In [8]:
# 预测
def predict(predict_rule, k):
    print(f"Pre: {Pre(predict_rule, k):.4f}")
    print(f"Rec: {Rec(predict_rule, k):.4f}")
    print(f"F1: {F1(predict_rule, k):.4f}")
    print(f"NDCG: {NDCG(predict_rule, k):.4f}")
    print(f"One_call: {One_call(predict_rule, k):.4f}")