In [1]:
# 导入相关的包
import random
import math 
import numpy as np 
import time 
from tqdm.autonotebook import tqdm, trange

from collections import defaultdict

from scipy.sparse import csc_matrix, linalg, eye
from copy import deepcopy

  return f(*args, **kwds)
  return f(*args, **kwds)


# 通用装饰器函数

In [2]:
# 定义时间装饰器，监控运行时间
def timmer(func):
    def wrapper(*args, **kwargs):
        start_time = time.time()
        res = func(*args, **kwargs)
        stop_time = time.time()
        print("Func {:s}, run time: {:f}".format(func.__name__, stop_time - start_time))
        return res 
    return wrapper

# 数据处理相关函数

In [3]:
# 数据集的读取
class Dataset:
    def __init__(self, fp):
        self.data = self.loadData(fp)
    
    @timmer
    def loadData(self, fp):
        data = []
        # 只取用户id和电影id
        for l in open(fp):
            data.append(tuple(map(int, l.strip().split("::")[:2])))
        return data 
    
    @timmer
    def splitData(self, M, k, seed=1):
        '''
        data: 加载所有(user_id, movie_id)def数据条目
        M: 划分的数目，最后去M折的平均
        k: 本次是第几次划分，k~[0,M)
        seed: 随机种子数，对于不同k应该设置为相同值
        return train, test
        '''
        train, test = [], [] 
        random.seed(seed)
        for user, item in self.data:
            if random.randint(0, M-1) == k:
                test.append((user, item))
            else:
                train.append((user, item))
        
        ## 处理成字典形式
        def convert_dict(data):
            data_dict = defaultdict(set)
            for user, item in data: 
                data_dict[user].add(item)
            data_dict = {k: list(data_dict[k]) for k in data_dict}
            return data_dict 
        
        return convert_dict(train), convert_dict(test)

# 评价指标函数

In [4]:
class Metric:
    
    def __init__(self, train, test, GetRecommendation):
        '''
        GetRecommendation: 为某个用户推荐物品的接口函数，返回值为电影id组成的list
        '''
        self.train = train
        self.test = test 
        self.GetRecommendation = GetRecommendation
        # 保存为测试集用户推荐结果的值
        self.recs = self.getRec()
        
    # 为test中的每个用户进行推荐
    def getRec(self):
        recs = {}
        for user in self.test:
            rank = self.GetRecommendation(user)
            recs[user] = rank
        return recs
    
    ## 定义精确率指标
    def precision(self):
        all, hit = 0, 0 
        for user in self.test:
            test_items = set(self.test[user])
            rank = self.recs[user]
            for item, score in rank:
                if item in test_items:
                    hit += 1 
            all += len(rank)
        return round(hit / all * 100, 2)
    
    ## 定义召回率指标
    def recall(self):
        all, hit = 0, 0 
        for user in self.test:
            test_items = set(self.test[user])
            rank = self.recs[user]
            for item, score in rank: 
                if item in test_items:
                    hit += 1 
            all += len(test_items)
        return round(hit / all * 100, 2)
    
    ## 定义覆盖率指标
    def coverage(self):
        all_item, recom_item = set(), set()
        for user in self.test:
            for item in self.train[user]:
                all_item.add(item)
            rank = self.recs[user]
            for item, score in rank: 
                recom_item.add(item)
                
        return round(len(recom_item) / len(all_item) * 100, 2)
    
    ## 定义新颖度指标
    def popularity(self):
        ## 计算推荐物品的平均流行度
        item_pop = {}
        for user in self.train: 
            for item in self.train[user]:
                item_pop[item] = item_pop.get(item, 0) + 1 
        num, pop = 0, 0 
        for user in self.test: 
            rank = self.recs[user]
            for item, score in rank:
                # 取对数，防止长尾问题带来被流行物品主导的问题
                pop += math.log(1+item_pop[item])
                num += 1 
        return round(pop / num, 6)
    
    def eval(self):
        metric = {
            "Precision": self.precision(),
            "Recall": self.recall(),
            "Coverage": self.coverage(),
            "Popularity": self.popularity()
        }
        print("Metric:  ", metric)
        return metric

# PersonalRank算法实现

In [6]:
def PersonalRank(train, alpha, N):
    '''
    train: 训练数据　
    alpha: 继续随机游走的概率
    N: 推荐TopN物品的个数
    '''
    
    # 构建索引
    items = []
    for user in train:
        items.extend(train[user])
        
    id2item = list(set(items))
    users = {u:i for i, u in enumerate(train.keys())}
    items = {u: i+len(users) for i, u in enumerate(id2item)}
    
    # 计算转移矩阵
    ## 构建商品到用户的倒排表
    item_user = defaultdict(list)
    for user in train:
        for item in train[user]:
            item_user[item].append(user)
    
    data, row, col = [], [], []
    # 对于训练集中的每一个用户
    for u in train:
        # 对于用户对应的每一个物品
        for v in train[u]:
            ## 初始化转移概率
            ## 记录对应的用户和商品ID
            data.append(1/len(train[u]))
            row.append(users[u])
            col.append(items[v])
    # 对于每一个商品
    for u in item_user:
        # 对于商品对应的每一个用户
        for v in item_user[u]:
            ## 初始化转移概率
            ## 记录对应的用户和商品ID
            data.append(1/len(item_user[u]))
            row.append(items[u])
            col.append(users[v])
    
    ## 对矩阵进行压缩，得到稀疏矩阵
    M = csc_matrix((data, (row, col)), shape=(len(data), len(data)))
    
    ## 获取接口函数
    def GetRecommendation(user):
        seen_items = set(train[user])
        # 解矩阵方程
        r0 = [0] *  len(data)
        ## 将看过的商品置为１
        r0[users[user]] = 1 
        r0 = csc_matrix(r0)
        ## 对r进行更新
        r = (1-alpha) * linalg.inv(eye(len(data)) - alpha * M.T)*r0
        ## 取出随机游走到各个商品的概率
        r = r.T.toarray()[0][len(users):]
        #print(r)
        ## 由于原函数是按照从小到大排的，所以去-号
        idx = np.argsort(-r)[:N]
        recs = [(id2item[ii], r[ii]) for ii in idx]
        return recs
    
    return GetRecommendation

# PersonalRank实验

In [7]:
class Experiment:
    def __init__(self, M, N, alpha, fp="data/ml-1m/ratings.dat"):
        self.M = M 
        self.N = N 
        self.alpha = alpha
        self.fp = fp 
        self.alg = PersonalRank
        
    ## 定义单次实验
    @timmer
    def worker(self, train, test):
        getRecommendation = self.alg(train, self.alpha, self.N)
        metric = Metric(train, test, getRecommendation)
        
        return metric.eval()
    
    ## 多次实验取平均
    @timmer
    def run(self):
        metrics = {"Precision": 0, "Recall": 0, "Coverage": 0, "Popularity": 0}
        dataset = Dataset(self.fp)
        for ii in range(self.M):
            train, test = dataset.splitData(self.M, ii)
            print(f"Experiment {ii}: ")
            metric = self.worker(train, test)
            metrics = {k: metrics[k] + metric[k] for k in metrics}
        
        metrics = {k: metrics[k] / self.M for k in metrics}
        print("Average Result (M={}, N={}, ratio={}): {}".format(self.M, self.N, self.ratio, metrics))

## 实验过程


In [None]:
M, N, alpha = 8, 10, 0.8
exp = Experiment(M, N, alpha)
exp.run()

Func loadData, run time: 0.791828
Func splitData, run time: 1.112444
Experiment 0: 


