# 多路召回

所谓的“多路召回”策略，就是指采用不同的策略、特征或简单模型，分别召回一部分候选集，然后把候选集混合在一起供后续排序模型使用，可以明显的看出，“多路召回策略”是在“计算速度”和“召回率”之间进行权衡的结果。

其中，各种简单策略保证候选集的快速召回，从不同角度设计的策略保证召回率接近理想的状态，不至于损伤排序效果。如下图是多路召回的一个示意图，在多路召回中，每个策略之间毫不相关，所以一般可以写并发多线程同时进行，这样可以更加高效。

<img src="../image/architecture.png">

上图只是一个多路召回的例子，也就是说可以使用多种不同的策略来获取用户排序的候选商品集合，而具体使用哪些召回策略其实是与业务强相关的 ，针对不同的任务就会有对于该业务真实场景下需要考虑的召回规则。例如新闻推荐，召回规则可以是“热门视频”、“导演召回”、“演员召回”、“最近上映“、”流行趋势“、”类型召回“等等。

## 1 导入相关包

In [None]:
import math
import faiss
import pickle
import random
import collections
import numpy as np
import pandas as pd
from tqdm import tqdm
from collections import defaultdict

from tensorflow.python.keras import backend as K
from tensorflow.python.keras.models import Model
from tensorflow.keras.utils import pad_sequences

from sklearn.preprocessing import MinMaxScaler, LabelEncoder
from deepctr.feature_column import SparseFeat, VarLenSparseFeat

from deepmatch.models import YoutubeDNN
from deepmatch.utils import sampledsoftmaxloss

DeepMatch version 0.3.1 detected. Your version is 0.2.0.
Use `pip install -U deepmatch` to upgrade.Changelog: https://github.com/shenweichen/DeepMatch/releases/tag/v0.3.1
DeepCTR version 0.9.3 detected. Your version is 0.8.2.
Use `pip install -U deepctr` to upgrade.Changelog: https://github.com/shenweichen/DeepCTR/releases/tag/v0.9.3


In [8]:
data_path = '../data/'
save_path = '../tmp_results/'
# 做召回评估的一个标志，如果不进行评估就是直接使用全量数据进行召回
metric_recall = False

---

## 2 读取数据

在一般的推荐系统比赛中读取数据部分主要分为三种模式，不同的模式对应的不同的数据集：

1. Debug模式：这个的目的是帮助我们基于数据先搭建一个简易的 baseline 并跑通，保证写的 baseline 代码没有什么问题。由于推荐比赛的数据往往非常巨大，如果一上来直接采用全部的数据进行分析，搭建baseline框架，往往会带来时间和设备上的损耗，所以这时候我们往往需要从海量数据的训练集中随机抽取一部分样本来进行调试（train_click_log_sample），先跑通一个 baseline。
2. 线下验证模式：这个的目的是帮助我们在线下基于已有的训练集数据，来选择好合适的模型和一些超参数。所以我们这一块只需要加载整个训练集（train_click_log），然后把整个训练集再分成训练集和验证集。训练集是模型的训练数据，验证集部分帮助我们调整模型的参数和其他的一些超参数。
3. 线上模式：我们用 debug 模式搭建起一个推荐系统比赛的 baseline，用线下验证模式选择好了模型和一些超参数，这一部分就是真正的对于给定的测试集进行预测，提交到线上，所以这一块使用的训练数据集是全量的数据集（train_click_log+test_click_log）

下面就分别对这三种不同的数据读取模式先建立不同的代导入函数，方便后面针对不同的模式下导入数据。

debug 模式

In [None]:
def get_all_click_sample(data_path: str, sample_nums: int=10000) -> pd.DataFrame:
    """
    训练集中采样一部分数据调试。
    
    Args:
        data_path(`str`): 原数据的存储路径
        sample_nums(`int`): 采样数目（这里由于机器的内存限制，可以采样用户做）

    Returns:
        pd.DataFrame: 返回一个包含采样后的点击日志数据的 DataFrame；
                      该数据只包含随机选择的用户的点击记录，并且去除了重复的点击记录。
    """
    all_click = pd.read_csv(data_path + 'train_click_log.csv')
    all_user_ids = all_click.user_id.unique()

    sample_user_ids = np.random.choice(all_user_ids, size=sample_nums, replace=False) 
    all_click = all_click[all_click['user_id'].isin(sample_user_ids)]
    
    all_click = all_click.drop_duplicates((['user_id', 'click_article_id', 'click_timestamp']))
    return all_click

根据线上或线下获取对应大小的数据集

In [2]:
def get_all_click_df(data_path: str='../data/', offline: bool=True) -> pd.DataFrame:
    """
    读取点击数据，支持线上和线下数据的获取。
    根据  offline 参数的值决定读取训练集或训练集与测试集的合并数据。

    Args:
        data_path (`str`): 数据存储路径，默认为 './data_raw/'。
        offline (`bool`): 是否仅使用训练集数据。若为 True，则只读取训练集；若为 False，则合并训练集和测试集。

    Returns:
        pd.DataFrame: 返回一个包含用户点击日志的 DataFrame，数据中去除了重复的点击记录。
    """
    all_click = pd.read_csv(data_path + 'train_click_log.csv')
    if not offline:
        tst_click = pd.read_csv(data_path + 'testA_click_log.csv')
        all_click = pd.concat([all_click, tst_click], ignore_index=True)
    
    all_click = all_click.drop_duplicates(['user_id', 'click_article_id', 'click_timestamp'])
    return all_click

In [None]:
# 读取 articles.csv 数据集
def get_item_info_df(data_path):
    item_info_df = pd.read_csv(data_path + 'articles.csv')
    item_info_df = item_info_df.rename(columns={'article_id': 'click_article_id'})
    return item_info_df

In [None]:
# 读取文章的 Embedding 数据集，articles_emb.csv
def get_item_emb_dict(data_path):
    item_emb_df = pd.read_csv(data_path + 'articles_emb.csv')
    item_emb_cols = [x for x in item_emb_df.columns if 'emb' in x]
    item_emb_np = np.ascontiguousarray(item_emb_df[item_emb_cols])
    # 进行归一化
    item_emb_np = item_emb_np / np.linalg.norm(item_emb_np, axis=1, keepdims=True)

    item_emb_dict = dict(zip(item_emb_df['article_id'], item_emb_np))
    pickle.dump(item_emb_dict, open('item_content_emb.pkl', 'wb'))
    return item_emb_dict

在Python中，pickle模块用于序列化和反序列化Python对象结构。pickle文件是一种二进制文件，可以保存几乎所有的Python对象，包括列表、字典、自定义类等。通过pickle，我们可以方便地将Python对象保存到文件中，并在需要时将其加载回来。

In [3]:
# 读取全量数据集
all_click_df = get_all_click_df(offline=False)
# 对时间戳进行线性函数归一化，用于在关联规则的时候计算权重
all_click_df['click_timestamp'] = all_click_df[['click_timestamp']].apply(lambda x: (x - np.min(x)) / (np.max(x) - np.min(x)))

In [8]:
all_click_df

Unnamed: 0,user_id,click_article_id,click_timestamp,click_environment,click_deviceGroup,click_os,click_country,click_region,click_referrer_type
0,199999,160417,0.019350,4,1,17,1,13,1
1,199999,5408,0.019351,4,1,17,1,13,1
2,199999,50823,0.019359,4,1,17,1,13,1
3,199998,157770,0.019340,4,1,17,1,25,5
4,199998,96613,0.019378,4,1,17,1,25,5
...,...,...,...,...,...,...,...,...,...
1630628,221924,70758,0.343615,4,3,2,1,25,2
1630629,207823,331116,0.343675,4,3,2,1,25,1
1630630,207823,234481,0.343760,4,3,2,1,25,1
1630631,207823,211442,0.343853,4,3,2,1,25,1


In [None]:
# 读取 articles.csv
item_info_df = get_item_info_df(data_path)
item_info_df

In [None]:
# 读取 articles_emb.csv
item_emb_dict = get_item_emb_dict(data_path)
len(item_emb_dict)

## 3 工具函数

### 获取用户-文章-时间函数

这个在基于关联规则的用户协同过滤的时候会用到。

In [None]:
# 根据点击时间获取用户的点击文章序列   {user1: {item1: time1, item2: time2..}...}
def get_user_item_time(click_df):
    
    click_df = click_df.sort_values('click_timestamp')
    
    def make_item_time_pair(df):
        return list(zip(df['click_article_id'], df['click_timestamp']))
    
    user_item_time_df = click_df.groupby('user_id')[['click_article_id', 'click_timestamp']].apply(lambda x: make_item_time_pair(x))\
                                                            .reset_index().rename(columns={0: 'item_time_list'})
    user_item_time_dict = dict(zip(user_item_time_df['user_id'], user_item_time_df['item_time_list']))
    
    return user_item_time_dict

### 获取文章-用户-时间函数

这个在基于关联规则的文章协同过滤的时候会用到。

In [None]:
# 根据时间获取商品被点击的用户序列  {item1: {user1: time1, user2: time2...}...}
# 这里的时间是用户点击当前商品的时间，好像没有直接的关系。
def get_item_user_time_dict(click_df):
    def make_user_time_pair(df):
        return list(zip(df['user_id'], df['click_timestamp']))
    
    click_df = click_df.sort_values('click_timestamp')
    item_user_time_df = click_df.groupby('click_article_id')[['user_id', 'click_timestamp']].apply(lambda x: make_user_time_pair(x))\
                                                            .reset_index().rename(columns={0: 'user_time_list'})
    
    item_user_time_dict = dict(zip(item_user_time_df['click_article_id'], item_user_time_df['user_time_list']))
    return item_user_time_dict

### 获取历史和最后一次点击

这个在评估召回结果， 特征工程和制作标签转成监督学习测试集的时候回用到。

In [None]:
def get_hist_and_last_click(all_click: pd.DataFrame) -> tuple:
    """
    获取每个用户的历史点击和最后一次点击记录。
    函数将用户的所有点击按时间排序，并返回历史点击和最后一次点击的两个 DataFrame。
    
    Args:
        all_click (`pd.DataFrame`): 包含用户点击数据的 DataFrame，必须包含 'user_id' 和 'click_timestamp' 列。

    Returns:
        tuple: 返回一个元组，其中第一个元素是包含历史点击的 DataFrame，第二个元素是包含最后一次点击的 DataFrame。
    """
    # 按用户 ID 和点击时间排序
    all_click = all_click.sort_values(by=['user_id', 'click_timestamp'])
    
    # 获取每个用户的最后一次点击记录
    click_last_df = all_click.groupby('user_id').tail(1)

    # 定义函数以获取用户的历史点击
    def hist_func(user_df: pd.DataFrame) -> pd.DataFrame:
        """
        获取用户的历史点击记录。
        如果用户只有一个点击记录，返回该记录；否则，返回除了最后一次点击以外的所有记录。
        
        Args:
            user_df (`pd.DataFrame`): 当前用户的点击数据 DataFrame。

        Returns:
            pd.DataFrame: 用户的历史点击记录。
        """
        if len(user_df) == 1:
            return user_df  # 只有一个点击，直接返回
        else:
            return user_df[:-1]  # 返回除了最后一次点击以外的所有记录

    # 获取每个用户的历史点击记录
    click_hist_df = all_click.groupby('user_id').apply(hist_func).reset_index(drop=True)

    return click_hist_df, click_last_df  # 返回历史点击和最后一次点击

### 获取文章属性特征

In [None]:
# 获取文章id对应的基本属性，保存成字典的形式，方便后面召回阶段，冷启动阶段直接使用
def get_item_info_dict(item_info_df):
    item_info_df['created_at_ts'] = item_info_df[['created_at_ts']].apply(lambda x: (x - np.min(x)) / (np.max(x) - np.min(x)))
    item_category_dict = dict(zip(item_info_df['click_article_id'], item_info_df['category_id']))
    item_words_dict = dict(zip(item_info_df['click_article_id'], item_info_df['words_count']))
    item_created_time_dict = dict(zip(item_info_df['click_article_id'], item_info_df['created_at_ts']))
    return item_category_dict, item_words_dict, item_created_time_dict

### 获取用户历史点击的文章信息

In [None]:
def get_user_hist_item_info_dict(all_click):
    # 获取user_id对应的用户历史点击文章类型的集合字典
    user_hist_item_typs = all_click.groupby('user_id')['category_id'].agg(set).reset_index()
    user_hist_item_typs_dict = dict(zip(user_hist_item_typs['user_id'], user_hist_item_typs['category_id']))
    
    # 获取user_id对应的用户点击文章的集合
    user_hist_item_ids_dict = all_click.groupby('user_id')['click_article_id'].agg(set).reset_index()
    user_hist_item_ids_dict = dict(zip(user_hist_item_ids_dict['user_id'], user_hist_item_ids_dict['click_article_id']))
    
    # 获取user_id对应的用户历史点击的文章的平均字数字典
    user_hist_item_words = all_click.groupby('user_id')['words_count'].agg('mean').reset_index()
    user_hist_item_words_dict = dict(zip(user_hist_item_words['user_id'], user_hist_item_words['words_count']))
    
    # 获取user_id对应的用户最后一次点击的文章的创建时间
    all_click_ = all_click.sort_values('click_timestamp')
    user_last_item_created_time = all_click_.groupby('user_id')['created_at_ts'].apply(lambda x: x.iloc[-1]).reset_index()
    
    max_min_scaler = lambda x : (x-np.min(x))/(np.max(x)-np.min(x))
    user_last_item_created_time['created_at_ts'] = user_last_item_created_time[['created_at_ts']].apply(max_min_scaler)
    
    user_last_item_created_time_dict = dict(zip(user_last_item_created_time['user_id'],
                                                user_last_item_created_time['created_at_ts']))
    
    return user_hist_item_typs_dict, user_hist_item_ids_dict, user_hist_item_words_dict, user_last_item_created_time_dict

### 获取点击最多的 TopK 个文章

In [None]:
def get_item_topk_click(click_df: pd.DataFrame, k: int) -> pd.Series:
    """
    获取点击次数最多的前 k 个文章 ID。

    Args:
        click_df (`pd.DataFrame`): 包含点击数据的 DataFrame，必须包含 'click_article_id' 列。
        k (`int`): 需要返回的前 k 个文章的数量。

    Returns:
        `pd.Series`: 返回一个包含前 k 个点击次数最多的文章 ID 的 Series。
    """
    return click_df['click_article_id'].value_counts().index[:k]

### 定义多路召回字典

In [None]:
# 获取文章的属性信息，保存成字典的形式方便查询
item_type_dict, item_words_dict, item_created_time_dict = get_item_info_dict(item_info_df)
len(item_type_dict), len(item_words_dict), len(item_created_time_dict)

In [None]:
# 定义一个多路召回的字典，将各路召回的结果都保存在这个字典当中
user_multi_recall_dict =  {'itemcf_sim_itemcf_recall': {},
                           'embedding_sim_item_recall': {},
                           'youtubednn_recall': {},
                           'youtubednn_usercf_recall': {}, 
                           'cold_start_recall': {}}
user_multi_recall_dict

In [None]:
# 提取最后一次点击作为召回评估，如果不需要做召回评估直接使用全量的训练集进行召回（线下验证模型）
# 如果不是召回评估，直接使用全量数据进行召回，不用将最后一次提取出来
train_hist_click_df, train_last_click_df = get_hist_and_last_click(all_click_df)

In [None]:
len(train_hist_click_df), len(train_last_click_df)

### 召回效果评估

做完了召回有时候也需要对当前的召回方法或者参数进行调整以达到更好的召回效果，因为召回的结果决定了最终排序的上限，下面也会提供一个召回评估的方法。


In [12]:
def metrics_recall(user_recall_items_dict, trn_last_click_df, topk=50):
    """
    依次评估召回的前 10, 20, 30, 40, 50 个文章中的击中率。

    Args:
        user_recall_items_dict (`dict`): 用户到物品的推荐字典。
        trn_last_click_df (`DataFrame`): 包含用户最后一次点击的文章的数据框。
        topk (`int`): 评估的最大召回数量。

    Returns:
        None
    """
    # 创建用户最后点击的物品字典
    last_click_item_dict = dict(zip(trn_last_click_df['user_id'], trn_last_click_df['click_article_id']))
    user_num = len(user_recall_items_dict)  # 用户数量
    
    for k in range(10, topk + 1, 10):  # 依次评估前 k 个召回结果
        hit_num = 0  # 记录击中数量
        for user, item_list in user_recall_items_dict.items():
            # 获取前 k 个召回的结果
            tmp_recall_items = [x[0] for x in user_recall_items_dict[user][:k]]
            # 检查用户最后点击的文章是否在召回结果中
            if last_click_item_dict[user] in set(tmp_recall_items):
                hit_num += 1  # 如果击中，计数加一
        
        # 计算击中率
        hit_rate = round(hit_num * 1.0 / user_num, 5)
        print('topk:', k, ' : ', 'hit_num:', hit_num, 'hit_rate:', hit_rate, 'user_num:', user_num)

---

## 4 计算相似性矩阵

这一部分主要是通过协同过滤以及向量检索得到相似性矩阵，相似性矩阵主要分为user2user和item2item，下面依次获取基于itemCF的item2item的相似性矩阵。

### itemCF i2i_sim

借鉴 KDD2020 的去偏商品推荐，在计算 item2item 相似性矩阵时，使用关联规则，使得计算的文章的相似性还考虑到了:

1. 用户点击的时间权重
2. 用户点击的顺序权重
3. 文章创建的时间权重

In [None]:
def itemcf_sim(df, item_created_time_dict):
    """
    文章与文章之间的相似性矩阵计算。
    基于物品的协同过滤和关联规则来生成文章相似度矩阵。

    Args:
        df (`DataFrame`): 用户点击数据表。
        item_created_time_dict (`dict`): 文章创建时间的字典。

    Returns:
        dict: 文章与文章的相似性矩阵，键为文章ID，值为与之相似的文章及其相似度。
    """
    
    # 获取用户与文章的点击时间字典
    user_item_time_dict = get_user_item_time(df)
    
    # 初始化物品相似度字典和物品计数器
    i2i_sim = {}
    item_cnt = defaultdict(int)
    
    # 遍历每个用户及其点击的文章时间列表
    for user, item_time_list in tqdm(user_item_time_dict.items()):
        # 对于每个用户的点击记录，计算物品之间的相似度
        for loc1, (i, i_click_time) in enumerate(item_time_list):
            item_cnt[i] += 1  # 记录物品被点击的次数
            i2i_sim.setdefault(i, {})  # 初始化物品相似度字典
            
            # 遍历同一用户点击的其他物品
            for loc2, (j, j_click_time) in enumerate(item_time_list):
                if i == j:
                    continue  # 跳过自身比较
                
                # 考虑文章的正向顺序点击和反向顺序点击    
                loc_alpha = 1.0 if loc2 > loc1 else 0.7
                # 位置信息权重，参数可以调节
                loc_weight = loc_alpha * (0.9 ** (np.abs(loc2 - loc1) - 1))
                # 点击时间权重，参数可以调节
                click_time_weight = np.exp(0.7 ** np.abs(i_click_time - j_click_time))
                # 文章创建时间的权重，参数可以调节
                created_time_weight = np.exp(0.8 ** np.abs(item_created_time_dict[i] - item_created_time_dict[j]))
                
                i2i_sim[i].setdefault(j, 0)  # 初始化相似度值
                # 综合考虑多种因素计算文章之间的相似度
                i2i_sim[i][j] += loc_weight * click_time_weight * created_time_weight / math.log(len(item_time_list) + 1)
    
    # 归一化相似度矩阵
    i2i_sim_ = i2i_sim.copy()
    for i, related_items in i2i_sim.items():
        for j, wij in related_items.items():
            i2i_sim_[i][j] = wij / math.sqrt(item_cnt[i] * item_cnt[j])  # 归一化处理

    # 将得到的相似性矩阵保存到本地
    pickle.dump(i2i_sim_, open(save_path + 'itemcf_i2i_sim.pkl', 'wb'))
    
    return i2i_sim_  # 返回相似性矩阵

In [None]:
i2i_sim = itemcf_sim(all_click_df, item_created_time_dict)

### UserCF u2u_sim

在计算用户之间的相似度的时候，也可以使用一些简单的关联规则，比如用户活跃度权重，这里将用户的点击次数作为用户活跃度的指标。

In [None]:
def get_user_activate_degree_dict(all_click_df):
    # 得到每个用户点击了多少文章
    all_click_df_ = all_click_df.groupby('user_id', as_index=False)['click_article_id'].count()
    # 用户活跃度归一化
    mm = MinMaxScaler()
    all_click_df_['click_article_id'] = mm.fit_transform(all_click_df_[['click_article_id']])
    user_activate_degree_dict = dict(zip(all_click_df_['user_id'], all_click_df_['click_article_id']))
    return user_activate_degree_dict

In [None]:
def usercf_sim(all_click_df, user_activate_degree_dict):
    """
    用户相似性矩阵计算。
    
    Args:
        all_click_df: 数据表。
        user_activate_degree_dict (`dict`): 用户活跃度的字典。

    Returns:
        `dict`: 用户相似性矩阵。

    思路: 基于用户的协同过滤 + 关联规则。
    """
    # 获取物品与用户的点击时间字典
    item_user_time_dict = get_item_user_time_dict(all_click_df)
    
    u2u_sim = {}  # 用户相似性字典
    user_cnt = defaultdict(int)  # 用户点击计数

    # 遍历每个物品及其对应的用户点击时间列表
    for item, user_time_list in tqdm(item_user_time_dict.items()):
        for u, click_time in user_time_list:
            user_cnt[u] += 1  # 记录用户点击次数
            u2u_sim.setdefault(u, {})  # 初始化用户相似性字典

            # 计算与其他用户的相似性
            for v, click_time in user_time_list:
                u2u_sim[u].setdefault(v, 0)  # 初始化相似度值
                if u == v:
                    continue  # 跳过自身比较

                # 用户平均活跃度作为活跃度的权重
                activate_weight = 100 * 0.5 * (user_activate_degree_dict[u] + user_activate_degree_dict[v])
                u2u_sim[u][v] += activate_weight / math.log(len(user_time_list) + 1)  # 更新相似度

    # 归一化相似性矩阵
    u2u_sim_ = u2u_sim.copy()
    for u, related_users in u2u_sim.items():
        for v, wij in related_users.items():
            u2u_sim_[u][v] = wij / math.sqrt(user_cnt[u] * user_cnt[v])  # 归一化处理

    # 将得到的相似性矩阵保存到本地
    pickle.dump(u2u_sim_, open(save_path + 'usercf_u2u_sim.pkl', 'wb'))

    return u2u_sim_  # 返回用户相似性矩阵

In [None]:
# 由于 usercf 计算的时候太耗费内存了，这里就不直接运行了
# 如果是采样的话是可以运行的
user_activate_degree_dict = get_user_activate_degree_dict(all_click_df)
user_activate_degree_dict

In [None]:
u2u_sim = usercf_sim(all_click_df, user_activate_degree_dict)
u2u_sim

### item embedding sim

使用 Embedding 计算 item 之间的相似度是为了后续冷启动的时候可以获取未出现在点击数据中的文章，后面有对冷启动专门的介绍，这里简单的说一下 faiss。

aiss 是 Facebook 的 AI 团队开源的一套用于做聚类或者相似性搜索的软件库，底层是用 C++ 实现。Faiss 因为超级优越的性能，被广泛应用于推荐相关的业务当中.

faiss 工具包一般使用在推荐系统中的**向量召回**部分。在做向量召回的时候要么是 u2u、u2i 或者 i2i，这里的 u 和 i 指的是 user 和 item。我们知道在实际的场景中 user 和 item 的数量都是海量的，我们最容易想到的基于向量相似度的召回就是使用两层循环遍历 user 列表或者 item 列表计算两个向量的相似度，但是这样做在面对海量数据是不切实际的，faiss 就是用来加速计算某个查询向量最相似的 topk 个索引向量。

**faiss** 查询的原理：

faiss 使用了 PCA 和 PQ（Product quantization 乘积量化）两种技术进行向量压缩和编码，当然还使用了其他的技术进行优化，但是 PCA 和 PQ 是其中最核心部分。

1. PCA 降维算法细节参考下面这个链接进行学习：[主成分分析（PCA）原理总结](https://www.cnblogs.com/pinard/p/6239403.html)
2. PQ 编码的细节参考下面这个链接进行学习：[实例理解 product quantization 算法](http://www.fabwrite.com/productquantization)

faiss 使用：

In [None]:
# 向量检索相似度计算
# topk指的是每个 item，faiss 搜索后返回最相似的 topk 个 item
def embdding_sim(click_df, item_emb_df, save_path, topk):
    """
    基于内容的文章 embedding 相似性矩阵计算。

    Args:
        click_df: 数据表。
        item_emb_df: 文章的 embedding。
        save_path: 保存路径。
        topk (`int`): 找最相似的 topk 篇文章。

    Returns:
        `dict`: 文章相似性矩阵。

    思路: 对于每一篇文章，基于 embedding 的相似性返回 topk 个与其最相似的文章，
    由于文章数量太多，这里用了 faiss 进行加速。
    """
    
    # 文章索引与文章 id 的字典映射
    item_idx_2_rawid_dict = dict(zip(item_emb_df.index, item_emb_df['article_id']))
    
    item_emb_cols = [x for x in item_emb_df.columns if 'emb' in x]
    item_emb_np = np.ascontiguousarray(item_emb_df[item_emb_cols].values, dtype=np.float32)
    # 向量进行单位化
    item_emb_np = item_emb_np / np.linalg.norm(item_emb_np, axis=1, keepdims=True)
    
    # 建立 faiss 索引
    item_index = faiss.IndexFlatIP(item_emb_np.shape[1])
    item_index.add(item_emb_np)
    # 相似度查询，给每个索引位置上的向量返回 topk 个 item 以及相似度
    sim, idx = item_index.search(item_emb_np, topk)  # 返回的是列表
    
    # 将向量检索的结果保存成原始 id 的对应关系
    item_sim_dict = collections.defaultdict(dict)
    for target_idx, sim_value_list, rele_idx_list in tqdm(zip(range(len(item_emb_np)), sim, idx)):
        target_raw_id = item_idx_2_rawid_dict[target_idx]
        # 从 1 开始是为了去掉商品本身，所以最终获得的相似商品只有 topk-1
        for rele_idx, sim_value in zip(rele_idx_list[1:], sim_value_list[1:]): 
            rele_raw_id = item_idx_2_rawid_dict[rele_idx]
            item_sim_dict[target_raw_id][rele_raw_id] = item_sim_dict.get(target_raw_id, {}).get(rele_raw_id, 0) + sim_value
     
    # 保存 i2i 相似度矩阵
    pickle.dump(item_sim_dict, open(save_path + 'emb_i2i_sim.pkl', 'wb'))   
    
    return item_sim_dict  # 返回文章相似性矩阵

In [None]:
item_emb_df = pd.read_csv(data_path + 'articles_emb.csv')
emb_i2i_sim = embdding_sim(all_click_df, item_emb_df, save_path, topk=10)

---

## 5 召回

这个就是我们开篇提到的那个问题，36 万篇文章，20 多万用户的推荐，我们又有哪些策略来缩减问题的规模？我们就可以再召回阶段筛选出用户对于点击文章的候选集合，从而降低问题的规模。召回常用的策略：

- Youtube DNN 召回
- 基于文章的召回
    - 文章的协同过滤
    - 基于文章 Embedding 的召回
- 基于用户的召回
    - 用于的协同过滤
    - 基于用户 Embedding 的召回

上面的各种召回方式一部分在基于用户已经看得文章的基础上去召回与这些文章相似的一些文章，而这个相似性的计算方式不同，就得到了不同的召回方式，比如文章的协同过滤，文章内容的embedding等。

还有一部分是根据用户的相似性进行推荐，对于某用户推荐与其相似的其他用户看过的文章，比如用户的协同过滤和用户embedding。还有一种思路是类似矩阵分解的思路，先计算出用户和文章的embedding之后，就可以直接算用户和文章的相似度，根据这个相似度进行推荐，比如 YouTube DNN。

我们下面详细来看一下每一个召回方法。

Youbute DNN 召回

这一步直接获取用户召回的候选文章列表：

<img src="../image/youtube_dnn.png">

关于 Youtube DNN 原理和应用推荐看王喆老师的两篇博客：

1. [重读Youtube深度学习推荐系统论文，字字珠玑，惊为神文](https://zhuanlan.zhihu.com/p/52169807)
2. [YouTube深度学习推荐系统的十大工程问题](https://zhuanlan.zhihu.com/p/52504407)

**参考文献：**

1. [YouTubeDNN原理](https://zhuanlan.zhihu.com/p/52169807)
2. [word2vec放到排序中的w2v的介绍部分](https://zhuanlan.zhihu.com/p/26306795)

In [12]:
# 获取双塔召回时的训练验证数据
# negsample 指的是通过滑窗构建样本时的负样本数量
def gen_data_set(data, negsample=0):
    """
    生成训练和测试数据集。

    Args:
        data (`DataFrame`): 输入数据，包含用户点击历史。
        negsample (`int`): 每个正样本生成的负样本数量。

    Returns:
        `tuple`: 训练集和测试集的元组。
    """
    data.sort_values("click_timestamp", inplace=True)  # 按点击时间戳排序
    item_ids = data['click_article_id'].unique()  # 获取所有文章 ID

    train_set = []  # 训练集
    test_set = []   # 测试集

    for reviewerID, hist in tqdm(data.groupby('user_id')):  # 按用户分组
        pos_list = hist['click_article_id'].tolist()  # 用户点击的正样本列表
        
        if negsample > 0:
            candidate_set = list(set(item_ids) - set(pos_list))  # 用户未点击的文章作为候选负样本
            neg_list = np.random.choice(candidate_set, size=len(pos_list) * negsample, replace=True)  # 随机选择负样本
            
        # 当正样本长度为1时，确保该样本被加入训练集和测试集
        if len(pos_list) == 1:
            train_set.append((reviewerID, [pos_list[0]], pos_list[0], 1, len(pos_list)))
            test_set.append((reviewerID, [pos_list[0]], pos_list[0], 1, len(pos_list)))
            
        # 滑窗构造正负样本
        for i in range(1, len(pos_list)):
            hist = pos_list[:i]  # 用户历史点击的正样本
            
            if i != len(pos_list) - 1:
                # 添加正样本到训练集中
                train_set.append((reviewerID, hist[::-1], pos_list[i], 1, len(hist[::-1])))  # 正样本
                for negi in range(negsample):
                    # 添加负样本到训练集中
                    train_set.append((reviewerID, hist[::-1], neg_list[i * negsample + negi], 0, len(hist[::-1])))  # 负样本
            else:
                # 将最长的序列长度作为测试数据
                test_set.append((reviewerID, hist[::-1], pos_list[i], 1, len(hist[::-1])))
                
    random.shuffle(train_set)  # 随机打乱训练集
    random.shuffle(test_set)    # 随机打乱测试集
    
    return train_set, test_set  # 返回训练集和测试集

In [10]:
# 将输入的数据进行padding，使得序列特征的长度都一致
def gen_model_input(train_set,user_profile,seq_max_len):

    train_uid = np.array([line[0] for line in train_set])
    train_seq = [line[1] for line in train_set]
    train_iid = np.array([line[2] for line in train_set])
    train_label = np.array([line[3] for line in train_set])
    train_hist_len = np.array([line[4] for line in train_set])

    train_seq_pad = pad_sequences(train_seq, maxlen=seq_max_len, padding='post', truncating='post', value=0)
    train_model_input = {"user_id": train_uid, "click_article_id": train_iid, "hist_article_id": train_seq_pad,
                         "hist_len": train_hist_len}

    return train_model_input, train_label

In [6]:
def youtubednn_u2i_dict(data, topk=20):    
    """
    生成用户到物品的推荐字典。

    Args:
        data (`DataFrame`): 输入数据，包含用户点击历史。
        topk (`int`): 每个用户推荐的物品数量。

    Returns:
        `dict`: 用户到物品的推荐字典。
    """
    sparse_features = ["click_article_id", "user_id"]
    SEQ_LEN = 30  # 用户点击序列的长度，短的填充，长的截断
    
    user_profile_ = data[["user_id"]].drop_duplicates('user_id')  # 用户画像
    item_profile_ = data[["click_article_id"]].drop_duplicates('click_article_id')  # 物品画像
    
    # 类别编码
    features = ["click_article_id", "user_id"]
    feature_max_idx = {}
    
    for feature in features:
        lbe = LabelEncoder()
        data[feature] = lbe.fit_transform(data[feature])  # 将特征编码为数字
        feature_max_idx[feature] = data[feature].max() + 1  # 记录最大索引
    
    # 提取用户和物品的画像
    user_profile = data[["user_id"]].drop_duplicates('user_id')
    item_profile = data[["click_article_id"]].drop_duplicates('click_article_id')  
    
    # 创建用户和物品的索引与原始 ID 的映射
    user_index_2_rawid = dict(zip(user_profile['user_id'], user_profile_['user_id']))
    item_index_2_rawid = dict(zip(item_profile['click_article_id'], item_profile_['click_article_id']))
    
    # 划分训练和测试集
    train_set, test_set = gen_data_set(data, 0)  # 生成训练集和测试集
    train_model_input, train_label = gen_model_input(train_set, user_profile, SEQ_LEN)  # 整理训练数据
    test_model_input, test_label = gen_model_input(test_set, user_profile, SEQ_LEN)  # 整理测试数据
    
    # 确定 Embedding 的维度
    embedding_dim = 16
    
    # 准备模型输入特征
    user_feature_columns = [
        SparseFeat('user_id', feature_max_idx['user_id'], embedding_dim),
        VarLenSparseFeat(SparseFeat('hist_article_id', feature_max_idx['click_article_id'], embedding_dim,
                                     embedding_name="click_article_id"), SEQ_LEN, 'mean', 'hist_len'),
    ]
    item_feature_columns = [SparseFeat('click_article_id', feature_max_idx['click_article_id'], embedding_dim)]
    
    # 定义模型
    model = YoutubeDNN(user_feature_columns, item_feature_columns, num_sampled=5, user_dnn_hidden_units=(64, embedding_dim))
    
    # 编译模型
    model.compile(optimizer="adam", loss=sampledsoftmaxloss)  
    
    # 训练模型
    history = model.fit(train_model_input, train_label, batch_size=256, epochs=1, verbose=1, validation_split=0.0)
    
    # 提取训练的 Embedding
    test_user_model_input = test_model_input
    all_item_model_input = {"click_article_id": item_profile['click_article_id'].values}

    user_embedding_model = Model(inputs=model.user_input, outputs=model.user_embedding)
    item_embedding_model = Model(inputs=model.item_input, outputs=model.item_embedding)
    
    # 保存用户和物品的 Embedding
    user_embs = user_embedding_model.predict(test_user_model_input, batch_size=2 ** 12)
    item_embs = item_embedding_model.predict(all_item_model_input, batch_size=2 ** 12)
    
    # Embedding 归一化
    user_embs = user_embs / np.linalg.norm(user_embs, axis=1, keepdims=True)
    item_embs = item_embs / np.linalg.norm(item_embs, axis=1, keepdims=True)
    
    # 将 Embedding 转换为字典形式
    raw_user_id_emb_dict = {user_index_2_rawid[k]: v for k, v in zip(user_profile['user_id'], user_embs)}
    raw_item_id_emb_dict = {item_index_2_rawid[k]: v for k, v in zip(item_profile['click_article_id'], item_embs)}
    
    # 保存 Embedding 到本地
    pickle.dump(raw_user_id_emb_dict, open(save_path + 'user_youtube_emb.pkl', 'wb'))
    pickle.dump(raw_item_id_emb_dict, open(save_path + 'item_youtube_emb.pkl', 'wb'))
    
    # 使用 FAISS 进行相似性搜索，找到与用户最相似的 topk 个物品
    index = faiss.IndexFlatIP(embedding_dim)
    index.add(item_embs)  # 将物品向量构建索引
    sim, idx = index.search(np.ascontiguousarray(user_embs), topk)  # 查询最相似的 topk 个物品
    
    user_recall_items_dict = collections.defaultdict(dict)
    for target_idx, sim_value_list, rele_idx_list in tqdm(zip(test_user_model_input['user_id'], sim, idx)):
        target_raw_id = user_index_2_rawid[target_idx]
        # 从1开始是为了去掉商品本身，最终获得的相似商品只有 topk-1
        for rele_idx, sim_value in zip(rele_idx_list[1:], sim_value_list[1:]): 
            rele_raw_id = item_index_2_rawid[rele_idx]
            user_recall_items_dict[target_raw_id][rele_raw_id] = user_recall_items_dict.get(target_raw_id, {}).get(rele_raw_id, 0) + sim_value
            
    # 将召回的结果进行排序
    user_recall_items_dict = {k: sorted(v.items(), key=lambda x: x[1], reverse=True) for k, v in user_recall_items_dict.items()}
    
    # 保存召回结果
    pickle.dump(user_recall_items_dict, open(save_path + 'youtube_u2i_dict.pkl', 'wb'))
    
    return user_recall_items_dict  # 返回用户到物品的推荐字典

In [13]:
# 由于这里需要做召回评估，所以将训练集中的最后一次点击都提取了出来
if not metric_recall:
    user_multi_recall_dict['youtubednn_recall'] = youtubednn_u2i_dict(all_click_df, topk=20)
else:
    train_hist_click_df, train_last_click_df = get_hist_and_last_click(all_click_df)
    user_multi_recall_dict['youtubednn_recall'] = youtubednn_u2i_dict(train_hist_click_df, topk=20)
    # 召回效果评估
    metrics_recall(user_multi_recall_dict['youtubednn_recall'], train_last_click_df, topk=20)

100%|██████████| 250000/250000 [00:12<00:00, 19404.19it/s]


TypeError: Could not build a TypeSpec for KerasTensor(type_spec=TensorSpec(shape=(None, 16), dtype=tf.float32, name=None), name='tf.compat.v1.squeeze/Squeeze:0', description="created by layer 'tf.compat.v1.squeeze'") of unsupported type <class 'tensorflow.python.keras.engine.keras_tensor.KerasTensor'>.