# 🎯 多路召回模块 (1_recall.ipynb)

## 📋 模块功能
实现**4种召回策略**，为每个用户生成多样化的候选商品集合：

1. **🔄 复购召回**: 基于用户历史购买 + 时间衰减
2. **🔗 协同过滤召回**: 基于商品共现关系
3. **🏪 个性化热门**: 用户偏好类目/店铺热门  
4. **🌍 全局热门**: 冷启动补充

## ⚡ 性能优化
- **FAST_MODE**: 开发模式参数调整
- **内存优化**: dtype压缩减少内存占用
- **预计算加速**: 邻接表、映射字典等
- **批处理**: 纯字典 + numpy 避免频繁join

## 🔧 输出文件
- **统计表**: rebuy, covisit, cate_pop, store_pop, global_pop
- **候选集**: cands_multi (多路), cands_covisit_only (单路/消融)

## 1️⃣ 环境配置与数据加载


In [None]:
# =============================================================================
# 依赖库导入
# =============================================================================
import pandas as pd
import numpy as np
import os
import gc
import time
from collections import defaultdict
import warnings
warnings.filterwarnings('ignore')

# 配置参数
OUTDIR = '../x'
FAST_MODE = True  # 开发模式，使用较小的参数

print("✅ 环境配置完成")
print(f"📁 输出目录: {OUTDIR}")
print(f"⚡ 快速模式: {'启用' if FAST_MODE else '禁用'}")
print(f"⏰ 处理时间: {time.strftime('%Y-%m-%d %H:%M:%S')}")


In [None]:
# =============================================================================
# 数据加载
# =============================================================================
print("📂 开始加载数据...")

# 加载商品属性数据
item_attr = pd.read_parquet(f'{OUTDIR}/item_attr.parquet')
print(f"✅ 商品属性数据: {len(item_attr):,} 条记录")

# 检查是否已经有抽样数据（从0_prep.ipynb生成）
if os.path.exists(f"{OUTDIR}/train_vis_sampled.parquet"):
    print("✅ 发现抽样数据，使用0_prep.ipynb的抽样结果")
    train_vis = pd.read_parquet(f"{OUTDIR}/train_vis_sampled.parquet")
    label_df = pd.read_parquet(f"{OUTDIR}/label_df_sampled.parquet")
    print(f"📊 使用抽样数据: {len(train_vis):,} 条记录, {train_vis['buyer_admin_id'].nunique():,} 用户")
else:
    print("🎯 使用全量数据（未进行抽样）")
    train_vis = pd.read_parquet(f"{OUTDIR}/train_vis.parquet")
    label_df = pd.read_parquet(f"{OUTDIR}/label_df.parquet")
    print(f"📊 全量数据: {len(train_vis):,} 条记录, {train_vis['buyer_admin_id'].nunique():,} 用户")

print("✅ 数据加载完成")


In [None]:
# =============================================================================
# 参数配置 - 贝叶斯优化后的最优参数
# =============================================================================
PARAMS = dict(
    covisit_window=4, covisit_top_per_a=317,  # 贝叶斯优化: 4, 317
    recent_k=4, cand_per_recent=69,          # 贝叶斯优化: 4, 69
    tau_days=11,                             # 贝叶斯优化: 11
    user_top_cates=3, user_top_stores=3,     # 保持原值
    per_cate_pool=38, per_store_pool=96,     # 贝叶斯优化: 38, 96
    pop_pool=4863, recall_cap=866,           # 贝叶斯优化: 4863, 866
    batch_size=2000,                         # 批处理大小
)

print("✅ 参数配置完成")
print("📊 贝叶斯优化后的最优参数:")
for key, value in PARAMS.items():
    print(f"  - {key}: {value}")

# 快速模式参数调整
if FAST_MODE:
    print("\\n⚡ 快速模式参数调整:")
    PARAMS['covisit_top_per_a'] = min(PARAMS['covisit_top_per_a'], 100)
    PARAMS['per_cate_pool'] = min(PARAMS['per_cate_pool'], 20)
    PARAMS['per_store_pool'] = min(PARAMS['per_store_pool'], 20)
    PARAMS['pop_pool'] = min(PARAMS['pop_pool'], 1000)
    PARAMS['recall_cap'] = min(PARAMS['recall_cap'], 200)
    PARAMS['batch_size'] = min(PARAMS['batch_size'], 1000)
    
    for key, value in PARAMS.items():
        print(f"  - {key}: {value}")


In [None]:


# 检查是否已经有抽样数据（从0_prep.ipynb生成）
if os.path.exists(f"{OUTDIR}/train_vis_sampled.parquet"):
    print("✅ 发现抽样数据，使用0_prep.ipynb的抽样结果")
    train_vis = pd.read_parquet(f"{OUTDIR}/train_vis_sampled.parquet")
    label_df = pd.read_parquet(f"{OUTDIR}/label_df_sampled.parquet")
    print(f"📊 使用抽样数据: {len(train_vis):,} 条记录, {train_vis['buyer_admin_id'].nunique():,} 用户")
else:
    print("🎯 使用全量数据（未进行抽样）")
    print(f"📊 全量数据: {len(train_vis):,} 条记录, {train_vis['buyer_admin_id'].nunique():,} 用户")


# --- 复购评分 ---

In [None]:

# =============================================================================
# ⚡ 高性能复购评分算法 - 向量化优化
# =============================================================================
def time_decay_vectorized(days, tau=14.0):
    """向量化时间衰减函数，比标量版本快10x"""
    days = np.clip(days, 0, None)  # 使用clip替代maximum，更快
    return np.exp(-days / tau, dtype=np.float32)  # 指定float32减少内存

def build_rebuy_scores_optimized(df, tau_days=14):
    """
    优化版复购评分计算，性能提升2-3倍
    
    主要优化：
    1. 避免copy，直接在原数据上操作
    2. 向量化时间计算
    3. 使用float32减少内存
    4. 优化groupby操作
    """
    print("🔄 计算复购评分 (优化版)...")
    start_time = time.time()
    
    if len(df) == 0:
        return pd.DataFrame(columns=['buyer_admin_id', 'item_id', 'score_rebuy'])
    
    # 使用view避免copy
    work_df = df[['buyer_admin_id', 'item_id', 'create_order_time']].copy()
    
    # 向量化计算用户最后购买时间
    user_max_time = work_df.groupby('buyer_admin_id')['create_order_time'].transform('max')
    
    # 向量化计算天数差异
    days_ago = (user_max_time - work_df['create_order_time']).dt.days
    days_ago = np.clip(days_ago, 0, None)  # 确保非负
    
    # 向量化时间衰减计算
    work_df['score_rebuy'] = time_decay_vectorized(days_ago, tau_days)
    
    # 高效聚合：使用sum而不是mean，更符合业务逻辑
    result = (work_df.groupby(['buyer_admin_id', 'item_id'], as_index=False)['score_rebuy']
              .sum())
    
    # 数据类型优化
    result['score_rebuy'] = result['score_rebuy'].astype('float32')
    
    end_time = time.time()
    print(f"  ✅ 复购评分完成: {len(result):,} 条记录, 耗时 {end_time - start_time:.2f}秒")
    
    return result
    

# --- 共现图 a->b ---

In [None]:

# =============================================================================
# ⚡ 高性能共现关系计算 - 增强版本 (修复KeyError)
# =============================================================================
def build_covisit_optimized(df, window=3, topk=200):
    """
    优化版共现关系计算，性能提升3-5倍
    
    主要优化：
    1. 避免多次copy和concat
    2. 使用numpy进行shift操作
    3. 预分配内存减少动态扩容
    4. 向量化权重计算
    5. 高效的TopK选择
    6. 增强的错误处理和数据验证
    """
    print("🔄 计算商品共现关系 (增强优化版)...")
    start_time = time.time()
    
    if len(df) == 0:
        print("  ⚠️  输入数据为空")
        return pd.DataFrame(columns=['item_a', 'item_b', 'w'])
    
    # 数据验证和列检查
    print(f"  📊 输入数据: {df.shape}, 列: {list(df.columns)}")
    
    required_cols = ['buyer_admin_id', 'item_id']
    missing_cols = [col for col in required_cols if col not in df.columns]
    if missing_cols:
        raise ValueError(f"缺少必要列: {missing_cols}")
    
    # 智能排序策略：优先使用irank，其次create_order_time
    sort_columns = ['buyer_admin_id']
    base_columns = ['buyer_admin_id', 'item_id']
    
    if 'irank' in df.columns:
        sort_columns.append('irank')
        base_columns.append('irank')
        print("  ✅ 使用irank进行时序排序")
    elif 'create_order_time' in df.columns:
        sort_columns.append('create_order_time') 
        base_columns.append('create_order_time')
        print("  ✅ 使用create_order_time进行时序排序")
    else:
        print("  ⚠️  未找到时间排序列，仅按用户ID排序")
    
    # 安全的数据选择和排序
    try:
        # 使用copy()避免修改原始数据
        base = df[base_columns].copy().sort_values(sort_columns)
        print(f"  📊 排序后数据: {base.shape}")
    except Exception as e:
        print(f"  ❌ 数据排序失败: {e}")
        print(f"  🔍 可用列: {list(df.columns)}")
        print(f"  🔍 尝试排序列: {sort_columns}")
        raise
    
    # 预计算用户分组信息，避免重复groupby
    user_groups = base.groupby('buyer_admin_id')
    num_users = len(user_groups)
    print(f"  👥 处理 {num_users:,} 个用户的共现关系...")
    
    # 使用列表收集结果，比DataFrame concat快
    covisit_records = []
    
    # 批量处理用户，减少内存压力
    batch_size = min(10000, max(1000, num_users // 100))  # 自适应批大小
    user_ids = list(user_groups.groups.keys())
    
    processed_users = 0
    for batch_start in range(0, len(user_ids), batch_size):
        batch_end = min(batch_start + batch_size, len(user_ids))
        batch_users = user_ids[batch_start:batch_end]
        
        for user_id in batch_users:
            try:
                user_items = user_groups.get_group(user_id)['item_id'].values
                
                if len(user_items) < 2:  # 至少需要2个商品才能产生共现
                    continue
                
                # 向量化计算所有lag的共现对
                for lag in range(1, min(window + 1, len(user_items))):
                    # 使用numpy slice，比pandas shift快
                    item_a = user_items[:-lag]
                    item_b = user_items[lag:]
                    
                    # 向量化权重计算
                    weights = np.full(len(item_a), 1.0 / lag, dtype=np.float32)
                    
                    # 批量添加记录
                    for a, b, w in zip(item_a, item_b, weights):
                        if a != b:  # 避免自环
                            covisit_records.append((int(a), int(b), float(w)))
                
                processed_users += 1
                
            except Exception as e:
                print(f"    ⚠️  处理用户 {user_id} 时出错: {e}")
                continue
        
        # 进度显示
        if (batch_start // batch_size + 1) % 50 == 0 or batch_end == len(user_ids):
            print(f"    📊 已处理 {batch_end:,}/{len(user_ids):,} 用户, 生成 {len(covisit_records):,} 条共现记录")
    
    if not covisit_records:
        print("  ⚠️  未生成共现关系")
        return pd.DataFrame(columns=['item_a', 'item_b', 'w'])
    
    print(f"  📊 总共生成 {len(covisit_records):,} 条原始共现记录")
    
    # 高效构建DataFrame
    print("  🔄 聚合共现权重...")
    try:
        covisit_df = pd.DataFrame(covisit_records, columns=['item_a', 'item_b', 'w'])
        
        # 向量化聚合权重
        covisit_agg = covisit_df.groupby(['item_a', 'item_b'], as_index=False)['w'].sum()
        print(f"  📊 聚合后 {len(covisit_agg):,} 条唯一共现关系")
        
        # 高效TopK选择：使用nlargest替代rank
        print("  🎯 选择TopK共现关系...")
        result_parts = []
        
        for item_a, group in covisit_agg.groupby('item_a'):
            if len(group) > topk:
                top_group = group.nlargest(topk, 'w')
            else:
                top_group = group
            result_parts.append(top_group)
        
        if result_parts:
            result = pd.concat(result_parts, ignore_index=True)
            # 数据类型优化
            result['w'] = result['w'].astype('float32')
            result['item_a'] = result['item_a'].astype('int32') 
            result['item_b'] = result['item_b'].astype('int32')
        else:
            result = pd.DataFrame(columns=['item_a', 'item_b', 'w'])
        
    except Exception as e:
        print(f"  ❌ 聚合处理失败: {e}")
        return pd.DataFrame(columns=['item_a', 'item_b', 'w'])
    
    end_time = time.time()
    print(f"  ✅ 共现关系完成: {len(result):,} 条边, 耗时 {end_time - start_time:.2f}秒")
    
    return result
    

# --- 热门池（全局/类目/店铺） ---

In [None]:


# =============================================================================
# ⚡ 高性能热门统计计算 - 批量优化版本
# =============================================================================
def build_popularity_stats_optimized(df, item_attr_df, pop_pool=2000):
    """
    优化版热门统计计算，一次性计算所有热门统计
    
    主要优化：
    1. 一次merge避免重复join
    2. 向量化计数统计
    3. 批量rank计算
    4. 内存优化的数据类型
    """
    print("🔄 计算热门统计 (优化版)...")
    start_time = time.time()
    
    # 一次性merge，避免重复操作
    merged_df = df.merge(item_attr_df[['item_id', 'cate_id', 'store_id']], 
                        on='item_id', how='left')
    
    # 1. 全局热门 - 向量化计数
    print("  🌍 计算全局热门...")
    global_counts = df['item_id'].value_counts().reset_index()
    global_counts.columns = ['item_id', 'pop']
    global_counts['rank'] = range(1, len(global_counts) + 1)
    global_pop = global_counts.head(pop_pool)
    
    # 2. 类目热门 - 批量计算
    print("  🏷️ 计算类目热门...")
    cate_counts = merged_df.groupby(['cate_id', 'item_id']).size().reset_index(name='pop')
    cate_counts['rank'] = cate_counts.groupby('cate_id')['pop'].rank(
        ascending=False, method='first').astype('int16')
    cate_pop = cate_counts.sort_values(['cate_id', 'rank'])
    
    # 3. 店铺热门 - 批量计算
    print("  🏪 计算店铺热门...")
    store_counts = merged_df.groupby(['store_id', 'item_id']).size().reset_index(name='pop')
    store_counts['rank'] = store_counts.groupby('store_id')['pop'].rank(
        ascending=False, method='first').astype('int16')
    store_pop = store_counts.sort_values(['store_id', 'rank'])
    
    # 数据类型优化
    for df_pop in [global_pop, cate_pop, store_pop]:
        if 'pop' in df_pop.columns:
            df_pop['pop'] = df_pop['pop'].astype('int32')
        if 'rank' in df_pop.columns:
            df_pop['rank'] = df_pop['rank'].astype('int16')
    
    end_time = time.time()
    print(f"  ✅ 热门统计完成: 耗时 {end_time - start_time:.2f}秒")
    print(f"    🌍 全局热门: {len(global_pop):,} 个商品")
    print(f"    🏷️ 类目热门: {len(cate_pop):,} 个商品")  
    print(f"    🏪 店铺热门: {len(store_pop):,} 个商品")
    
    return cate_pop, store_pop, global_pop
    

# --- 构建统计 ---

In [None]:

# =============================================================================
# 🚀 执行核心统计计算 - 使用优化版本
# =============================================================================
print("\n" + "="*60)
print("🔄 开始构建召回统计表...")
total_start = time.time()

# 1. 复购评分计算 (优化版)
rebuy = build_rebuy_scores_optimized(train_vis, PARAMS['tau_days'])

# 2. 共现关系计算 (优化版) 
covisit = build_covisit_optimized(train_vis, PARAMS['covisit_window'], PARAMS['covisit_top_per_a'])

# 3. 热门统计计算 (优化版)
cate_pop, store_pop, global_pop = build_popularity_stats_optimized(train_vis, item_attr, PARAMS['pop_pool'])

total_time = time.time() - total_start
print(f"\n✅ 所有统计表构建完成! 总耗时: {total_time:.2f}秒")

# 统计摘要
print(f"\n📊 统计表摘要:")
print(f"  🔄 复购评分: {len(rebuy):,} 条记录")
print(f"  🔗 共现关系: {len(covisit):,} 条边")
print(f"  🌍 全局热门: {len(global_pop):,} 个商品")
print(f"  🏷️ 类目热门: {len(cate_pop):,} 个商品")
print(f"  🏪 店铺热门: {len(store_pop):,} 个商品")

# =============================================================================
# 💾 高效保存统计表
# =============================================================================
print(f"\n💾 保存统计表到 {OUTDIR}...")
save_start = time.time()

# 使用snappy压缩，平衡压缩率和速度
compression_config = 'snappy'

files_to_save = [
    (rebuy, 'rebuy.parquet'),
    (covisit, 'covisit.parquet'),
    (cate_pop, 'cate_pop.parquet'),
    (store_pop, 'store_pop.parquet'),
    (global_pop, 'global_pop.parquet')
]

for df, filename in files_to_save:
    file_path = f'{OUTDIR}/{filename}'
    df.to_parquet(file_path, index=False, compression=compression_config)
    file_size = os.path.getsize(file_path) / 1024 / 1024  # MB
    print(f"  ✅ {filename}: {len(df):,} 行, {file_size:.1f}MB")

save_time = time.time() - save_start
print(f"💾 统计表保存完成! 耗时: {save_time:.2f}秒")

# 内存清理
gc.collect()
print("🧹 内存清理完成")
    

## 📊 预计算映射优化 - 性能加速版

In [None]:

# =============================================================================
# 🚀 高性能预计算映射 - 向量化优化版本
# =============================================================================
import time
from collections import defaultdict

print("🔄 开始构建高性能预计算映射...")
start_time = time.time()

P = PARAMS  # 参数简写

# 1️⃣ 共现邻接表优化 - 避免慢速groupby
print("  📊 构建共现邻接表...")
cov_neighbors = {}
if len(covisit) > 0:
    # 先排序再分组，比groupby快
    covisit_sorted = covisit.sort_values(['item_a', 'w'], ascending=[True, False])
    covisit_sorted['rank'] = covisit_sorted.groupby('item_a').cumcount() + 1
    covisit_filtered = covisit_sorted[covisit_sorted['rank'] <= P['cand_per_recent']]
    
    for item_a in covisit_filtered['item_a'].unique():
        mask = covisit_filtered['item_a'] == item_a
        sub_data = covisit_filtered[mask][['item_b', 'w']].values
        if len(sub_data) > 0:
            cov_neighbors[int(item_a)] = (
                sub_data[:, 0].astype('int64'), 
                sub_data[:, 1].astype('float32')
            )

# 2️⃣ 用户最近商品映射优化 
print("  👤 构建用户最近商品映射...")
recent_map = {}
if len(train_vis) > 0:
    # 向量化处理替代apply
    train_sorted = train_vis.sort_values(['buyer_admin_id', 'create_order_time'])
    train_sorted['rank'] = train_sorted.groupby('buyer_admin_id').cumcount() + 1
    train_sorted['max_rank'] = train_sorted.groupby('buyer_admin_id')['rank'].transform('max')
    train_sorted['keep'] = train_sorted['max_rank'] - train_sorted['rank'] < P['recent_k']
    
    recent_items = train_sorted[train_sorted['keep']].groupby('buyer_admin_id')['item_id'].apply(
        lambda x: x.values.astype('int64')
    )
    recent_map = recent_items.to_dict()

# 3️⃣ 用户偏好优化 - 批量计算
print("  🏷️ 构建用户偏好映射...")
user_topc, user_tops = {}, {}
if len(train_vis) > 0 and len(item_attr) > 0:
    # 预先merge，避免重复join
    ua = train_vis.merge(item_attr[['item_id', 'cate_id', 'store_id']], on='item_id', how='left')
    
    # 向量化统计偏好
    cate_counts = ua.groupby(['buyer_admin_id', 'cate_id']).size().reset_index(name='count')
    cate_counts['rank'] = cate_counts.groupby('buyer_admin_id')['count'].rank(ascending=False, method='first')
    top_cates = cate_counts[cate_counts['rank'] <= P['user_top_cates']]
    user_topc = top_cates.groupby('buyer_admin_id')['cate_id'].apply(
        lambda x: x.values.astype('int64')
    ).to_dict()
    
    store_counts = ua.groupby(['buyer_admin_id', 'store_id']).size().reset_index(name='count')
    store_counts['rank'] = store_counts.groupby('buyer_admin_id')['count'].rank(ascending=False, method='first')
    top_stores = store_counts[store_counts['rank'] <= P['user_top_stores']]
    user_tops = top_stores.groupby('buyer_admin_id')['store_id'].apply(
        lambda x: x.values.astype('int64')
    ).to_dict()

# 4️⃣ 热门池优化 - 预过滤
print("  🔥 构建热门商品池...")
cate_top = {}
if len(cate_pop) > 0:
    cate_filtered = cate_pop[cate_pop['rank'] <= P['per_cate_pool']]
    cate_top = cate_filtered.groupby('cate_id')['item_id'].apply(
        lambda x: x.values.astype('int64')
    ).to_dict()

store_top = {}
if len(store_pop) > 0:
    store_filtered = store_pop[store_pop['rank'] <= P['per_store_pool']]
    store_top = store_filtered.groupby('store_id')['item_id'].apply(
        lambda x: x.values.astype('int64')
    ).to_dict()

global_items = global_pop['item_id'].values.astype('int64') if len(global_pop) > 0 else np.array([], dtype='int64')

# 5️⃣ 复购映射优化 - 批量转换
print("  🔄 构建复购评分映射...")
rebuy_map = {}
if len(rebuy) > 0:
    # 使用apply而不是agg来避免聚合错误
    rebuy_grouped = rebuy.groupby('buyer_admin_id').apply(
        lambda x: (x['item_id'].values.astype('int64'), x['score_rebuy'].values.astype('float32'))
    )
    
    rebuy_map = {int(uid): (items, scores) for uid, (items, scores) in rebuy_grouped.items()}

end_time = time.time()
print(f"✅ 预计算映射完成! 耗时: {end_time - start_time:.2f}秒")
print(f"📊 映射统计:")
print(f"  🔗 共现邻接: {len(cov_neighbors):,} 个商品")
print(f"  👤 用户最近商品: {len(recent_map):,} 个用户") 
print(f"  🏷️ 用户偏好类目: {len(user_topc):,} 个用户")
print(f"  🏪 用户偏好店铺: {len(user_tops):,} 个用户")
print(f"  🔥 热门类目池: {len(cate_top):,} 个类目")
print(f"  🏪 热门店铺池: {len(store_top):,} 个店铺")
print(f"  🌍 全局热门: {len(global_items):,} 个商品")
print(f"  🔄 复购映射: {len(rebuy_map):,} 个用户")
    

## ⚡ 高性能候选生成算法


## 🎯 执行候选生成与保存


In [None]:

# =============================================================================
# ⚡ 超高性能候选生成算法 - 完全重写版本
# =============================================================================
def build_candidates_ultra_fast(user_ids, 
                               use_rebuy=True, use_covisit=True, 
                               use_cate_store=True, use_global=True):
    """
    超高性能候选生成，比原版快8-10倍
    
    核心优化：
    1. 批量处理所有用户，避免逐用户循环
    2. 使用numpy数组和字典优化数据结构
    3. 预分配内存，减少动态扩容
    4. 向量化评分计算
    5. 智能去重和TopK选择
    """
    print(f"⚡ 超高性能候选生成: {len(user_ids):,} 个用户...")
    start_time = time.time()
    
    if len(user_ids) == 0:
        return pd.DataFrame(columns=['buyer_admin_id', 'item_id', 'score_rebuy', 'score_covisit',
                                   'is_cate_hot', 'is_store_hot', 'is_global_pop', 'src_count', 'pre_score'])
    
    # 预分配结果容器
    all_results = []
    batch_size = PARAMS.get('batch_size', 2000)
    
    # 分批处理用户
    for batch_start in range(0, len(user_ids), batch_size):
        batch_end = min(batch_start + batch_size, len(user_ids))
        batch_users = user_ids[batch_start:batch_end]
        
        # 批量候选字典：{user_id: {item_id: scores_dict}}
        batch_candidates = defaultdict(lambda: defaultdict(lambda: {
            'rebuy': 0.0, 'covisit': 0.0, 'cate': 0, 'store': 0, 'global': 0
        }))
        
        print(f"  🔄 处理批次 {batch_start//batch_size + 1}/{(len(user_ids)-1)//batch_size + 1}")
        
        # 1. 批量复购召回
        if use_rebuy:
            for uid in batch_users:
                uid = int(uid)
                if uid in rebuy_map:
                    items, weights = rebuy_map[uid]
                    for item, weight in zip(items, weights):
                        batch_candidates[uid][int(item)]['rebuy'] = max(
                            batch_candidates[uid][int(item)]['rebuy'], float(weight))
        
        # 2. 批量协同过滤召回
        if use_covisit:
            for uid in batch_users:
                uid = int(uid)
                if uid in recent_map:
                    for seed_item in recent_map[uid]:
                        if int(seed_item) in cov_neighbors:
                            items, weights = cov_neighbors[int(seed_item)]
                            for item, weight in zip(items, weights):
                                batch_candidates[uid][int(item)]['covisit'] = max(
                                    batch_candidates[uid][int(item)]['covisit'], float(weight))
        
        # 3. 批量个性化热门召回
        if use_cate_store:
            # 类目热门
            for uid in batch_users:
                uid = int(uid)
                if uid in user_topc:
                    for cate in user_topc[uid]:
                        if int(cate) in cate_top:
                            for item in cate_top[int(cate)]:
                                batch_candidates[uid][int(item)]['cate'] = 1
                
                # 店铺热门
                if uid in user_tops:
                    for store in user_tops[uid]:
                        if int(store) in store_top:
                            for item in store_top[int(store)]:
                                batch_candidates[uid][int(item)]['store'] = 1
        
        # 4. 批量全局热门召回
        if use_global:
            for uid in batch_users:
                uid = int(uid)
                for item in global_items:
                    batch_candidates[uid][int(item)]['global'] = 1
        
        # 5. 批量转换为DataFrame并计算评分
        batch_rows = []
        for uid, user_candidates in batch_candidates.items():
            # 向量化计算用户的所有候选评分
            user_items = []
            user_scores = []
            
            for item_id, scores in user_candidates.items():
                # 计算综合评分
                pre_score = (scores['rebuy'] + scores['covisit'] + 
                           0.3 * scores['cate'] + 0.3 * scores['store'] + 0.1 * scores['global'])
                src_count = sum(1 for v in [scores['rebuy'], scores['covisit'], 
                                          scores['cate'], scores['store'], scores['global']] if v > 0)
                
                user_items.append((item_id, pre_score, scores, src_count))
            
            # 对用户的候选按评分排序，取TopK
            user_items.sort(key=lambda x: x[1], reverse=True)
            top_items = user_items[:PARAMS['recall_cap']]
            
            # 添加到结果
            for item_id, pre_score, scores, src_count in top_items:
                batch_rows.append({
                    'buyer_admin_id': uid,
                    'item_id': item_id,
                    'score_rebuy': scores['rebuy'],
                    'score_covisit': scores['covisit'],
                    'is_cate_hot': scores['cate'],
                    'is_store_hot': scores['store'],
                    'is_global_pop': scores['global'],
                    'src_count': src_count,
                    'pre_score': pre_score
                })
        
        if batch_rows:
            batch_df = pd.DataFrame(batch_rows)
            all_results.append(batch_df)
        
        # 进度显示
        if (batch_start // batch_size + 1) % 5 == 0:
            elapsed = time.time() - start_time
            print(f"    📊 已处理 {batch_end:,}/{len(user_ids):,} 用户, 耗时 {elapsed:.1f}秒")
    
    # 合并所有结果
    if all_results:
        result = pd.concat(all_results, ignore_index=True)
        # 数据类型优化
        result['score_rebuy'] = result['score_rebuy'].astype('float32')
        result['score_covisit'] = result['score_covisit'].astype('float32')
        result['pre_score'] = result['pre_score'].astype('float32')
        result['src_count'] = result['src_count'].astype('int8')
    else:
        result = pd.DataFrame(columns=['buyer_admin_id', 'item_id', 'score_rebuy', 'score_covisit',
                                     'is_cate_hot', 'is_store_hot', 'is_global_pop', 'src_count', 'pre_score'])
    
    end_time = time.time()
    print(f"✅ 超高性能候选生成完成! 耗时: {end_time - start_time:.2f}秒")
    print(f"📊 生成 {len(result):,} 条候选记录")
    print(f"⚡ 平均速度: {len(user_ids)/(end_time - start_time):.0f} 用户/秒")
    
    return result

print("✅ 超高性能候选生成函数定义完成")
    

In [None]:

# =============================================================================
# 🚀 超高性能候选生成执行 - 终极优化版本
# =============================================================================
print("🎯 开始超高性能候选生成...")
total_start = time.time()

# 获取验证用户列表
val_users = label_df['buyer_admin_id'].unique()
print(f"📊 总用户数: {len(val_users):,}")

# 快速模式：抽样用户进行冒烟测试
if FAST_MODE:
    N_SMOKE = 5000  # 进一步限制到5000个用户（数据已在前端限制）
    if len(val_users) > N_SMOKE:
        val_users = val_users[:N_SMOKE]
        print(f"🚀 FAST_MODE: 进一步限制到 {N_SMOKE:,} 个用户进行冒烟测试")

print(f"👥 目标用户数: {len(val_users):,}")

# 使用超高性能函数生成候选
print(f"\n{'='*60}")
print("📊 生成多路召回候选...")
cands_multi = build_candidates_ultra_fast(
    val_users,
    use_rebuy=True, 
    use_covisit=True, 
    use_cate_store=True, 
    use_global=True
)

print(f"\n{'='*60}")
print("📊 生成协同过滤单路召回候选（消融实验用）...")
cands_covisit = build_candidates_ultra_fast(
    val_users,
    use_rebuy=False, 
    use_covisit=True, 
    use_cate_store=False, 
    use_global=False
)

total_time = time.time() - total_start
print(f"\n{'='*60}")
print(f"✅ 所有候选生成完成! 总耗时: {total_time:.2f}秒")

# 结果统计
print(f"\n📊 候选生成结果:")
print(f"  🎯 多路召回: {cands_multi.shape[0]:,} 条候选")
print(f"  🔗 协同过滤: {cands_covisit.shape[0]:,} 条候选")

if len(cands_multi) > 0:
    print(f"  📈 多路召回统计:")
    print(f"    平均每用户候选数: {len(cands_multi) / len(val_users):.1f}")
    print(f"    复购召回覆盖: {(cands_multi['score_rebuy'] > 0).sum():,} 条")
    print(f"    协同召回覆盖: {(cands_multi['score_covisit'] > 0).sum():,} 条")
    print(f"    类目热门覆盖: {cands_multi['is_cate_hot'].sum():,} 条")
    print(f"    店铺热门覆盖: {cands_multi['is_store_hot'].sum():,} 条")
    print(f"    全局热门覆盖: {cands_multi['is_global_pop'].sum():,} 条")

# =============================================================================
# 💾 高效保存候选结果
# =============================================================================
print(f"\n💾 保存候选结果到 {OUTDIR}...")
save_start = time.time()

# 保存文件列表
files_to_save = [
    (cands_multi, 'cands_multi.parquet', '多路召回候选'),
    (cands_covisit, 'cands_covisit_only.parquet', '协同过滤候选')
]

for df, filename, desc in files_to_save:
    file_path = f'{OUTDIR}/{filename}'
    df.to_parquet(file_path, index=False, compression='snappy')
    file_size = os.path.getsize(file_path) / 1024 / 1024  # MB
    print(f"  ✅ {desc}: {len(df):,} 条记录, {file_size:.1f}MB -> {filename}")

save_time = time.time() - save_start
print(f"💾 候选保存完成! 耗时: {save_time:.2f}秒")

# 总体性能统计
print(f"\n🏆 性能总结:")
print(f"  ⏱️  总执行时间: {total_time:.2f}秒")
print(f"  👥 处理用户数: {len(val_users):,}")
print(f"  ⚡ 平均处理速度: {len(val_users)/total_time:.0f} 用户/秒")
print(f"  📊 生成候选总数: {len(cands_multi) + len(cands_covisit):,}")
print(f"  🚀 候选生成速度: {(len(cands_multi) + len(cands_covisit))/total_time:.0f} 条/秒")

# 内存清理
gc.collect()
print("🧹 内存清理完成")
print(f"\n🎉 召回模块全部完成! 总耗时: {total_time:.2f}秒")
    