### 合并多路召回的结果

In [1]:
import os
import numpy as np
from typing import Dict, Tuple, List, Set
from tqdm import tqdm
from collections import defaultdict
import copy
import faiss
os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
os.environ["CUDA_VISIBLE_DEVICES"] = "7"
import tensorflow as tf
logger = tf.get_logger()
logger.setLevel('ERROR')  # 设置日志级别为ERROR
import polars as pl
import pickle
from utils.exposure_data import get_all_expose_df, get_user_item_time
from data.dataset import NewsDataset
from data.preprocessing import build_feature_columns
from utils.config import load_configs
# 设置文件目录
offline = False
mode = "offline" if offline else "online"
data_path = f"/data3/zxh/news_rec/{mode}_data"
public_path = "/data3/zxh/news_rec/public_data"

#### 1. ItemCF召回

In [2]:
def itemcf_recall(user_id, user_item_time_dict, itemcf_inverted_index, sim_item_topk=10, recall_item_num=50):
    """
    基于物品协同过滤的召回通道。
    :param user_id: 用户ID
    :param user_item_time_dict: 用户历史点击记录 {user_id: [(item_id, timestamp), ...]}
    :param itemcf_inverted_index: 物品倒排索引表 {item_id: [(similar_item_id, similarity_score), ...]}
    :param sim_item_topk: 选择每个物品最相似的前K个物品
    :param recall_item_num: 召回的物品数量
    :param last_n: 根据每个用户的LastN进行召回
    :return: 召回的物品列表 {item1, item2, ...}
    """
    user_hist_items = list(dict.fromkeys(click_article_id for click_article_id, _ in user_item_time_dict.get(user_id, [])))
    item_rank = defaultdict(float)
    
    # 遍历用户点击过的物品
    for item in user_hist_items:
        # 获取与当前物品最相似的topK物品
        for similar_item, similarity in itemcf_inverted_index[item][:sim_item_topk]:
            # 过滤掉用户已经点击过的物品
            if similar_item in user_hist_items:
                continue
            
            # 计算用户的兴趣得分
            item_rank[similar_item] += similarity

    return [item for item, like_socre in sorted(item_rank.items(), key=lambda x: x[1], reverse=True)[:recall_item_num]]

In [3]:
with open(f"/data3/zxh/news_rec/temp_results/itemcf_top_100_inverted_index_{mode}.pkl", "rb") as f:
    itemcf_inverted_index = pickle.load(f)
train_df, _ = get_all_expose_df(offline=offline) # 获取训练数据集
user_item_time_dict = get_user_item_time(train_df) # 获取用户的 last-N 点击

#### 2. Swing召回

In [4]:
with open(f"/data3/zxh/news_rec/temp_results/swing_top_100_inverted_index_{mode}.pkl", "rb") as f:
    swing_inverted_index = pickle.load(f)

In [5]:
def swing_recall(user_id, user_item_time_dict, swing_inverted_index, sim_item_topk=10, recall_item_num=50):
    """
    基于 Swing 相似度的召回通道。
    
    :param user_id: 用户ID
    :param user_item_time_dict: 用户历史点击记录 {user_id: [(item_id, timestamp), ...]}
    :param swing_inverted_index: Swing 物品倒排索引表 {item_id: [(similar_item_id, similarity_score), ...]}
    :param sim_item_topk: 选择每个物品最相似的前K个物品
    :param recall_item_num: 召回的物品数量
    :param last_n: 使用最近的 N 条点击记录进行召回
    :return: 召回的物品集合 {item1, item2, ...}
    """
    # 去重+保序
    user_hist_items = list(dict.fromkeys(click_article_id for click_article_id, _ in user_item_time_dict.get(user_id, [])))
    item_rank = defaultdict(float)

    # 遍历最近的点击物品
    for item in user_hist_items:
        # 获取 Swing 倒排索引中与该 item 最相似的 sim_item_topk 个物品
        for similar_item, similarity in swing_inverted_index.get(item, [])[:sim_item_topk]:
            if similar_item in user_hist_items:
                continue
            item_rank[similar_item] += similarity

    # 返回按兴趣得分排序的 topN 物品
    return [item for item, score in sorted(item_rank.items(), key=lambda x: x[1], reverse=True)[:recall_item_num]]

#### 3. 双塔召回

##### 3.1 准备模型和数据

In [6]:
# 加载用户塔模型
user_embedding_model = tf.keras.models.load_model("/data3/zxh/news_rec/temp_results/dssm_user/003_online/")

# 加载物品塔模型
item_embedding_model = tf.keras.models.load_model("/data3/zxh/news_rec/temp_results/dssm_item/003_online/")

2025-04-07 17:10:10.276767: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2025-04-07 17:10:10.680369: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1510] Created device /job:localhost/replica:0/task:0/device:GPU:0 with 685 MB memory:  -> device: 0, name: NVIDIA GeForce RTX 3090, pci bus id: 0000:b2:00.0, compute capability: 8.6
2025-04-07 17:10:11.330550: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:185] None of the MLIR Optimization Passes are enabled (registered 2)
2025-04-07 17:10:11.389953: W tensorflow/core/framework/cpu_allocator_impl.cc:80] Allocation of 230400288 exceeds 10% of free system memory.
2025-04-07 17:10:13.222133: W tensorflow/core/framework/cpu_allocator_impl.cc:80] Allocation

In [7]:
# 加载配置
base_config = load_configs()
# 加载特征列
feature_columns, feature_groups = build_feature_columns(base_config['feature'])

In [8]:
item_config, user_config = copy.deepcopy(base_config), copy.deepcopy(base_config)
item_config["feature"]["csv_schema"] = ["article_id","keywords","image_count","category_level1","category_level2",
                                        "docid_history_count","docid_expose_count","docid_ctr","docid_history_duration_mean",
                                        "category1_ctr","category1_popularity","category1_history_duration_mean",
                                        "category2_ctr","category2_popularity","category2_history_duration_mean"]
user_config["feature"]["csv_schema"] = ["user_id", "network_env", "refresh_count","device_name","os","province",
                                        "city","age","gender","userid_history_duration_mean","userid_history_count",
                                        "userid_expose_count","userid_ctr", "expose_hour"]

In [9]:
test_data = pl.read_csv("/data3/zxh/news_rec/recall_csv_data/test_data/test_data.csv", separator="\t")
test_data

user_id,article_id,expose_hour,is_clicked
i64,i64,i64,i64
2413368274,465760067,4,0
2231512322,466772262,13,0
2240894400,466655257,10,0
1486216632,466651314,12,0
2439239452,466398247,0,0
…,…,…,…
1406769714,466077472,13,0
2430514764,466449125,4,0
2092924226,466636411,10,0
1468517980,466336113,2,0


In [10]:
# 初始化数据集处理器
item_dataset_creator = NewsDataset(item_config, feature_groups['item'])
user_dataset_creator = NewsDataset(user_config, feature_groups['user'])

test_dataset = {
    "item" : item_dataset_creator.create_dataset(data_path="/data3/zxh/news_rec/recall_csv_data/test_data/test_item_features.csv").map(lambda x, y: x),
    "user" : user_dataset_creator.create_dataset(data_path="/data3/zxh/news_rec/recall_csv_data/test_data/test_user_features.csv").map(lambda x, y: x),
}

2025-04-07 17:10:28.174088: W tensorflow/core/common_runtime/bfc_allocator.cc:457] Allocator (GPU_0_bfc) ran out of memory trying to allocate 8B (rounded to 256)requested by op StridedSlice
If the cause is memory fragmentation maybe the environment variable 'TF_GPU_ALLOCATOR=cuda_malloc_async' will improve the situation. 
Current allocation summary follows.
Current allocation summary follows.
2025-04-07 17:10:28.174163: I tensorflow/core/common_runtime/bfc_allocator.cc:1004] BFCAllocator dump for GPU_0_bfc
2025-04-07 17:10:28.174189: I tensorflow/core/common_runtime/bfc_allocator.cc:1011] Bin (256): 	Total Chunks: 8, Chunks in use: 8. 2.0KiB allocated for chunks. 2.0KiB in use in bin. 484B client-requested in use in bin.
2025-04-07 17:10:28.174203: I tensorflow/core/common_runtime/bfc_allocator.cc:1011] Bin (512): 	Total Chunks: 2, Chunks in use: 2. 1.0KiB allocated for chunks. 1.0KiB in use in bin. 640B client-requested in use in bin.
2025-04-07 17:10:28.174217: I tensorflow/core/comm

InternalError: Failed copying input tensor from /job:localhost/replica:0/task:0/device:CPU:0 to /job:localhost/replica:0/task:0/device:GPU:0 in order to run StridedSlice: Dst tensor is not initialized. [Op:StridedSlice] name: strided_slice/

##### 3.2 将物料item 存入Faiss

In [None]:
# 初始化全局索引（确保只创建一次）
embedding_dim = None  # 会在第一个批次后确定维度
index = None

# 遍历每个批次
for batch in test_dataset['item']:
    # 提取当前批次的 article_id 并转换为 int64
    article_ids = batch['article_id'].numpy()
    article_ids_int = np.array([int(id.decode('utf-8')) for id in article_ids], dtype=np.int64)
    
    # 获取当前批次的嵌入向量并转换为 float32
    output = item_embedding_model.predict(batch)
    embeddings = np.ascontiguousarray(output.astype('float32'))
    
    # 如果是第一个批次，初始化索引
    if index is None:
        embedding_dim = embeddings.shape[1]
        base_index = faiss.IndexFlatIP(embedding_dim)
        index = faiss.IndexIDMap(base_index)
    
    # 将当前批次添加到索引
    index.add_with_ids(embeddings, article_ids_int)

2025-04-07 15:01:52.372549: I tensorflow/stream_executor/cuda/cuda_blas.cc:1760] TensorFloat-32 will be used for the matrix multiplication. This will only be logged once.


In [None]:
def dssm_recall(
    user_embedding_map: Dict[Tuple[int, int], np.ndarray],
    user_keys: List[Tuple[int, int]],
    faiss_index: faiss.Index,
    k: int = 50
) -> np.ndarray:
    """
    对给定的用户查询向量执行 FAISS 搜索。

    Args:
        user_embedding_map (dict): 用户嵌入字典 {(user_id, expose_hour): embedding}.
        user_keys (List[Tuple[int, int]]): 用户分组键。
        faiss_index (faiss.Index): FAISS 构建好的物品索引。
        k (int): TopK 召回数量。

    Returns:
        indices (np.ndarray): shape=(num_queries, k)，每行是召回的 item 索引。
    """
    if not user_keys:
        return np.array([])
    query_embeddings = []
    for key in tqdm(user_keys, desc="Preparing user embeddings"):
        query_embeddings.append(user_embedding_map[key])

    query_embeddings = np.stack(query_embeddings).astype(np.float32)
    distances, indices = faiss_index.search(query_embeddings, k)
    return indices

#### 4. 多路召回合并

##### 4.1 存储user_embedding（按照user_id和expose_hour为key）

In [None]:
user_embedding_map = {}

# 遍历每个批次
for batch in tqdm(test_dataset['user'],desc="Genrating user embbeding map"):
    user_ids = np.array([int(id.decode('utf-8')) for id in batch['user_id'].numpy()], dtype=np.int64)
    expose_hours = batch['expose_hour'].numpy()

    # 获取当前批次的用户嵌入向量（假设 output 为 np.ndarray）
    output = user_embedding_model.predict(batch)
    embeddings = np.ascontiguousarray(output.astype('float32'))

    # 存储到 user_embedding_map 中
    for uid, hour, emb in zip(user_ids, expose_hours, embeddings):
        user_embedding_map[(uid, hour)] = emb

Genrating user embbeding map: 145it [06:51,  2.84s/it]


In [None]:
test_data

user_id,article_id,expose_hour,is_clicked
i64,i64,i64,i64
2413368274,465760067,4,0
2231512322,466772262,13,0
2240894400,466655257,10,0
1486216632,466651314,12,0
2439239452,466398247,0,0
…,…,…,…
1406769714,466077472,13,0
2430514764,466449125,4,0
2092924226,466636411,10,0
1468517980,466336113,2,0


##### 4.2 准备数据

In [None]:
def prepare_user_query_data(
    test_data: pl.DataFrame,
    group_cols: List[str] = ["user_id", "expose_hour"]
) -> Tuple[List[np.ndarray], List[Tuple[int, int]], List[set]]:
    """
    准备用户查询向量、user_keys 和真实点击集合。
    """

    # 聚合曝光集 & 点击集
    grouped = test_data.group_by(group_cols).agg([
        pl.col("article_id").unique().alias("expose_set"),
        pl.col("article_id").filter(pl.col("is_clicked") == 1).unique().alias("click_set")
    ])

    # 提取 user_keys 和 sets
    user_keys = list(zip(
        grouped["user_id"].to_numpy(),
        grouped["expose_hour"].to_list()
    ))

    expose_lists = [set(x) for x in grouped["expose_set"].to_list()]
    click_lists = [set(x) for x in grouped["click_set"].to_list()]

    return user_keys, expose_lists, click_lists


In [None]:
user_keys, expose_lists, click_lists = prepare_user_query_data(test_data)

##### 4.3 召回合并，测试Recall 和 Precision

In [None]:
dssm_recall_lists = dssm_recall(user_embedding_map, user_keys, index, k=100)

Preparing user embeddings:   3%|▎         | 31335/910176 [00:00<00:02, 313313.70it/s]

Preparing user embeddings: 100%|██████████| 910176/910176 [00:02<00:00, 370272.84it/s]


In [None]:
itemcf_recall_lists = []
swing_recall_lists = []
for user_id, expose_hour in tqdm(user_keys, desc="Genearting itemcf & swing recall results"):
    itemcf_recall_lists.append(itemcf_recall(int(user_id), user_item_time_dict, itemcf_inverted_index))
    swing_recall_lists.append(swing_recall(int(user_id), user_item_time_dict, swing_inverted_index))

Genearting itemcf & swing recall results:   1%|          | 10181/910176 [00:27<34:03, 440.31it/s]

Genearting itemcf & swing recall results:  38%|███▊      | 341758/910176 [15:38<24:46, 382.34it/s]  

In [None]:
def evaluate_recall(
    recall_results: Dict[str, List[List[int]]],
    click_lists: List[Set[int]],
    expose_lists: List[Set[int]],
    recall_k_config: Dict[str, int]
) -> Tuple[float, float, float]:
    """
    多路召回融合后计算 Precision@K、Recall@K 和 HitRate@K。

    Args:
        recall_results (dict): 每条召回通道的召回结果。
        click_lists (List[set]): 每个用户的真实点击集合。
        expose_lists (List[set]): 每个用户的真实曝光集合。
        recall_k_config (dict): 每条召回路径保留的 Top-K 数量，如 {'itemcf': 20, 'swing': 10, 'dssm': 30}。

    Returns:
        Tuple[float, float, float]: 合并后的 precision@k、recall@k 和 hitrate@k。
    """
    total_precision = 0.0
    total_recall = 0.0
    total_hitrate = 0.0
    total_requests = 0

    for i in tqdm(range(len(click_lists)), desc="Evaluating Merged Recall"):
        merged_predicted_ids = set()

        for method, recall_list in recall_results.items():
            topk = recall_k_config.get(method, 0)
            if i < len(recall_list):
                merged_predicted_ids.update(recall_list[i][:topk])

        actual_article_ids = click_lists[i]
        group_size = len(expose_lists[i])

        if not actual_article_ids or not merged_predicted_ids or group_size == 0:
            continue

        num_correct = len(merged_predicted_ids & actual_article_ids)
        precision_k = num_correct / len(merged_predicted_ids)
        recall_k = num_correct / len(actual_article_ids)
        hitrate_k = num_correct

        total_precision += precision_k * group_size
        total_recall += recall_k * group_size
        total_hitrate += hitrate_k
        total_requests += group_size

    final_precision = total_precision / total_requests if total_requests else 0.0
    final_recall = total_recall / total_requests if total_requests else 0.0
    final_hitrate = total_hitrate / sum(len(s) for s in click_lists) if click_lists else 0.0

    return final_precision, final_recall, final_hitrate

In [None]:
recall_results = {
    #"itemcf": itemcf_recall_lists,
    # "swing": swing_recall_lists,
    "dssm": dssm_recall_lists
}

recall_k_config = {
    # "itemcf": 0,
    # "swing": 0,
    "dssm": 50
}

precision, recall, hr = evaluate_recall(
    recall_results=recall_results,
    click_lists=click_lists,
    expose_lists=expose_lists,
    recall_k_config=recall_k_config
)

print(f"Merged Recall -> Precision @ {sum(recall_k_config.values())}: \
      {precision:.4f}, Recall @ {sum(recall_k_config.values())}: {recall:.4f}, \
      HitRate @ {sum(recall_k_config.values())}: {hr:.4f}")

Evaluating Merged Recall:   0%|          | 0/910176 [00:00<?, ?it/s]

Evaluating Merged Recall: 100%|██████████| 910176/910176 [00:07<00:00, 114151.22it/s]


Merged Recall -> Precision @ 50:       0.0068, Recall @ 50: 0.0877,       HitRate @ 50: 0.1110
