In [1]:
import random
import math
import time
from tqdm import tqdm

In [2]:
def timmer(func):
    def wrapper(*args, **kwargs):
        start_time = time.time()
        res = func(*args, **kwargs)
        stop_time = time.time()
        print('Func %s, run time: %s' % (func.__name__, stop_time - start_time))
        return res
    return wrapper

In [3]:
class DataSet():
    def __init__(self, fp):
        self.data = self.load(fp)

    def load(self, fp):
        data = []
        for i in open(fp):
            data.append(tuple(map(int, i.strip().split('::')[0:2])))
        return data

    def split_data(self, M, k, seed=1):
        '''
        :params: data, 加载的所有(user, item)数据条目
        :params: M, 划分的数目，最后需要取M折的平均
        :params: k, 本次是第几次划分，k~[0, M)
        :params: seed, random的种子数，对于不同的k应设置成一样的
        :return: train, test
        每次实验选取不同的k(0<= k <= M-1)和相同的随机数种子seed，进行M次实验就可以得到M个不同的训练集和测试集，分别进行实验，用M次实验的
        平均值作为最后的评测指标。这样做主要是为了防止过拟合
        '''
        train, test = [], []
        random.seed(seed)
        for user, item in self.data:
            # random.randint 为左闭右闭
            # np.random.randint 为左闭右开
            if random.randint(0, M - 1) == k:
                test.append([user, item])
            else:
                train.append([user, item])
        #print(test)
        #print(train)

        def convert_dict(data):
            dict = {}
            for user, item in data:
                if user not in dict.keys():
                    dict[user] = []
                    dict[user].append(item)
                else:
                    dict[user].append(item)
            return dict

        return convert_dict(train), convert_dict(test)
        
        

In [4]:
class Metric():
    def __init__(self, train, test, GetRecommendation):
        '''
        :params: train, 训练数据
        :params: test, 测试数据
        :params: GetRecommendation, 为某个用户获取推荐物品的接口函数
        '''
        self.train = train
        self.test = test
        self.GetRecommendation = GetRecommendation
        self.recs = self.get_recommandation()

    #为test中的每一个用户进行推荐
    def get_recommandation(self):
        recs={}
        for user in self.test.keys():
            rank=self.GetRecommendation(user)
            recs[user]=rank
        return recs    
    
    #推荐系统常见问题：TopN 推荐是对训练集中的用户进行推荐，还是对测试集中的用户进行推荐？
    #https://blog.csdn.net/qiqi123i/article/details/104925774
    def precision(self):
        hit, all = 0, 0
        for user in self.test.keys():
            tu = self.test[user]
            rank = self.recs[user]
            for item, score in rank:
                if item in tu:
                    hit += 1
            all += len(rank)
        return hit / (all * 1.0)

    def recall(self):
        hit, all = 0, 0
        for user in self.test.keys():
            tu = self.test[user]
            rank = self.recs[user]
            for item, score in rank:
                if item in tu:
                    hit += 1
            all += len(tu)
        return hit / (all * 1.0)

    def coverage(self):
        recommend_items = set()
        all_items = set()
        for user in self.test.keys():
            for item in self.train[user]:
                all_items.add(item)
            rank = self.recs[user]
            for item, score in rank:
                recommend_items.add(item)
        return len(recommend_items) / (len(all_items) * 1.0)

    
       # 定义新颖度指标计算方式
    def popularity(self):
        # 计算物品的流行度
        item_pop = {}
        for user in self.train.keys():
            for item in self.train[user]:
                if item not in item_pop:
                    item_pop[item] = 0
                item_pop[item] += 1

        num, pop = 0, 0
        for user in self.test.keys():
            rank = self.recs[user]
            for item, score in rank:
                # 取对数，防止因长尾问题带来的被流行物品所主导
                pop += math.log(1 + item_pop[item])
                num += 1
        return round(pop / num, 6)
    
    def eval(self):
        metric = {'Precision': self.precision(),
                  'Recall': self.recall(),
                  'Coverage': self.coverage(),
                  'Popularity': self.popularity()}
        print('Metric:', metric)
        return metric
    
    

In [5]:
# 1. 基于物品余弦相似度的推荐
def ItemCF(train, K, N):
    '''
    :params: train, 训练数据集
    :params: K, 超参数，设置取TopK相似物品数目
    :params: N, 超参数，设置取TopN推荐物品数目
    :return: GetRecommendation, 推荐接口函数
    '''
    # 计算物品相似度矩阵
    sim = {}
    num = {}
    for user in train:
        items = train[user]
        for i in range(len(items)):
            u = items[i]
            if u not in num:
                num[u] = 0
            num[u] += 1
            if u not in sim:
                sim[u] = {}
            for j in range(len(items)):
                if j == i: continue
                v = items[j]
                if v not in sim[u]:
                    sim[u][v] = 0
                sim[u][v] += 1
    for u in sim:
        for v in sim[u]:
            sim[u][v] /= math.sqrt(num[u] * num[v])
    
    # 按照相似度排序
    sorted_item_sim = {k: list(sorted(v.items(), \
                               key=lambda x: x[1], reverse=True)) \
                       for k, v in sim.items()}
    
    # 获取接口函数
    def GetRecommendation(user):
        items = {}
        seen_items = set(train[user])
        for item in train[user]:
            for u, _ in sorted_item_sim[item][:K]:
                if u not in seen_items:
                    if u not in items:
                        items[u] = 0
                    items[u] += sim[item][u]
        recs = list(sorted(items.items(), key=lambda x: x[1], reverse=True))[:N]
        return recs
    
    
    #返回接口函数
    return GetRecommendation

In [6]:
# 2. 基于改进的物品余弦相似度的推荐
def ItemIUF(train, K, N):
    '''
    :params: train, 训练数据集
    :params: K, 超参数，设置取TopK相似物品数目
    :params: N, 超参数，设置取TopN推荐物品数目
    :return: GetRecommendation, 推荐接口函数
    ''' 
    # 计算物品相似度矩阵
    sim = {}
    num = {}
    for user in train:
        items = train[user]
        for i in range(len(items)):
            u = items[i]
            if u not in num:
                num[u] = 0
            num[u] += 1
            if u not in sim:
                sim[u] = {}
            for j in range(len(items)):
                if j == i: continue
                v = items[j]
                if v not in sim[u]:
                    sim[u][v] = 0
                # 相比ItemCF，主要是改进了这里
                sim[u][v] += 1 / math.log(1 + len(items))
    for u in sim:
        for v in sim[u]:
            sim[u][v] /= math.sqrt(num[u] * num[v])
    
    # 按照相似度排序
    sorted_item_sim = {k: list(sorted(v.items(), \
                               key=lambda x: x[1], reverse=True)) \
                       for k, v in sim.items()}
    
    # 获取接口函数
    def GetRecommendation(user):
        items = {}
        seen_items = set(train[user])
        for item in train[user]:
            for u, _ in sorted_item_sim[item][:K]:
                # 要去掉用户见过的
                if u not in seen_items:
                    if u not in items:
                        items[u] = 0
                    items[u] += sim[item][u]
        recs = list(sorted(items.items(), key=lambda x: x[1], reverse=True))[:N]
        return recs
    
    return GetRecommendation

In [7]:
# 3. 基于归一化的物品余弦相似度的推荐
def ItemCF_Norm(train, K, N):
    '''
    :params: train, 训练数据集
    :params: K, 超参数，设置取TopK相似物品数目
    :params: N, 超参数，设置取TopN推荐物品数目
    :return: GetRecommendation, 推荐接口函数
    '''
    # 计算物品相似度矩阵
    sim = {}
    num = {}
    for user in train:
        items = train[user]
        for i in range(len(items)):
            u = items[i]
            if u not in num:
                num[u] = 0
            num[u] += 1
            if u not in sim:
                sim[u] = {}
            for j in range(len(items)):
                if j == i: continue
                v = items[j]
                if v not in sim[u]:
                    sim[u][v] = 0
                sim[u][v] += 1
    for u in sim:
        for v in sim[u]:
            sim[u][v] /= math.sqrt(num[u] * num[v])
            
    # 对相似度矩阵进行按行归一化
    for u in sim:
        s = 0
        for v in sim[u]:
            s += sim[u][v]
        if s > 0:
            for v in sim[u]:
                sim[u][v] /= s
    
    # 按照相似度排序
    sorted_item_sim = {k: list(sorted(v.items(), \
                               key=lambda x: x[1], reverse=True)) \
                       for k, v in sim.items()}
    
    # 获取接口函数
    def GetRecommendation(user):
        items = {}
        seen_items = set(train[user])
        for item in train[user]:
            for u, _ in sorted_item_sim[item][:K]:
                if u not in seen_items:
                    if u not in items:
                        items[u] = 0
                    items[u] += sim[item][u]
        recs = list(sorted(items.items(), key=lambda x: x[1], reverse=True))[:N]
        return recs
    
    return GetRecommendation

In [8]:
class Experiment():
    
    def __init__(self, M, K, N, fp='/Users/felix/PycharmProjects/RecommendSystemPractice/dataSet/ml-1m/ratingsTest.dat', rt='ItemCF'):
        '''
        :params: M, 进行多少次实验
        :params: K, TopK相似物品的个数
        :params: N, TopN推荐物品的个数
        :params: fp, 数据文件路径
        :params: rt, 推荐算法类型
        '''
        self.M = M
        self.K = K
        self.N = N
        self.fp = fp
        self.rt = rt
        self.alg = {'ItemCF': ItemCF, 'ItemIUF': ItemIUF, 'ItemCF-Norm': ItemCF_Norm}
    
    # 定义单次实验
    @timmer
    def worker(self, train, test):
        '''
        :params: train, 训练数据集
        :params: test, 测试数据集
        :return: 各指标的值
        '''
        getRecommendation = self.alg[self.rt](train, self.K, self.N)
        metric = Metric(train, test, getRecommendation)
        return metric.eval()
    
    # 多次实验取平均
    @timmer
    def run(self):
        metrics = {'Precision': 0, 'Recall': 0, 
                   'Coverage': 0, 'Popularity': 0}
        dataset = DataSet(self.fp)
        for ii in range(self.M):
            train, test = dataset.split_data(self.M, ii)
            print('Experiment {}:'.format(ii))
            metric = self.worker(train, test)
            metrics = {k: metrics[k]+metric[k] for k in metrics}
        metrics = {k: metrics[k] / self.M for k in metrics}
        print('Average Result (M={}, K={}, N={}): {}'.format(\
                              self.M, self.K, self.N, metrics))

In [9]:
# 1. ItemCF实验
M, N = 8, 10
for K in [5, 10, 20, 40, 80, 160]:
    cf_exp = Experiment(M, K, N, rt='ItemCF')
    cf_exp.run()

Experiment 0:
Metric: {'Precision': 0.0, 'Recall': 0.0, 'Coverage': 0.084070796460177, 'Popularity': 0.888371}
Func worker, run time: 0.016270875930786133
Experiment 1:
Metric: {'Precision': 0.05555555555555555, 'Recall': 0.025, 'Coverage': 0.06465517241379311, 'Popularity': 0.873354}
Func worker, run time: 0.0159451961517334
Experiment 2:
Metric: {'Precision': 0.0, 'Recall': 0.0, 'Coverage': 0.06306306306306306, 'Popularity': 0.936426}
Func worker, run time: 0.015272378921508789
Experiment 3:
Metric: {'Precision': 0.0, 'Recall': 0.0, 'Coverage': 0.10407239819004525, 'Popularity': 0.922911}
Func worker, run time: 0.01519012451171875
Experiment 4:
Metric: {'Precision': 0.07142857142857142, 'Recall': 0.05, 'Coverage': 0.09170305676855896, 'Popularity': 0.89588}
Func worker, run time: 0.014463186264038086
Experiment 5:
Metric: {'Precision': 0.034482758620689655, 'Recall': 0.02857142857142857, 'Coverage': 0.09012875536480687, 'Popularity': 0.898809}
Func worker, run time: 0.014521121978759

In [10]:
# 2. ItemIUF实验

M, N = 8, 10
K = 10 # 与书中保持一致
iuf_exp = Experiment(M, K, N, rt='ItemIUF')
iuf_exp.run()

Experiment 0:
Metric: {'Precision': 0.05128205128205128, 'Recall': 0.05, 'Coverage': 0.11504424778761062, 'Popularity': 0.860828}
Func worker, run time: 0.02000904083251953
Experiment 1:
Metric: {'Precision': 0.0, 'Recall': 0.0, 'Coverage': 0.06896551724137931, 'Popularity': 0.722109}
Func worker, run time: 0.021938323974609375
Experiment 2:
Metric: {'Precision': 0.027777777777777776, 'Recall': 0.03225806451612903, 'Coverage': 0.13963963963963963, 'Popularity': 0.889336}
Func worker, run time: 0.020570039749145508
Experiment 3:
Metric: {'Precision': 0.0, 'Recall': 0.0, 'Coverage': 0.16289592760180996, 'Popularity': 0.906778}
Func worker, run time: 0.01935291290283203
Experiment 4:
Metric: {'Precision': 0.05555555555555555, 'Recall': 0.05, 'Coverage': 0.11353711790393013, 'Popularity': 0.794513}
Func worker, run time: 0.019886016845703125
Experiment 5:
Metric: {'Precision': 0.046511627906976744, 'Recall': 0.05714285714285714, 'Coverage': 0.12446351931330472, 'Popularity': 0.860138}
Func

# 