In [1]:
import pandas as pd
import numpy as np

train_df = pd.read_parquet('../data/train.parquet')
test_df = pd.read_parquet('../data/CV/valid_seqs.parquet')
seven_days = 7*24*60*60
train_cutoff = train_df.ts.max() - seven_days
train_df = train_df[train_df.ts <= train_cutoff]
test_df

Unnamed: 0,session,aid,ts,type
0,11098528,11830,1661119200,0
1,11098528,1679529,1661119417,0
2,11098528,92401,1661119474,0
3,11098528,1055218,1661119598,0
4,11098528,1561739,1661119644,0
...,...,...,...,...
7686120,12899774,33035,1661723968,0
7686121,12899775,1743151,1661723970,0
7686122,12899776,548599,1661723972,0
7686123,12899777,384045,1661723976,0


In [2]:
train_df.session.max()

11098527

In [3]:
from tqdm import tqdm

DO_LOCAL_VALIDATION = True
if DO_LOCAL_VALIDATION:
    seven_days = 7*24*60*60
    train_cutoff = train_df.ts.max() - seven_days
    test_df = train_df[train_df.ts > train_cutoff]
    train_df = train_df[train_df.ts <= train_cutoff]

    # 丢弃跨越train test的session
    overlapped_session = set(train_df.session).intersection(test_df.session)
    test_df = test_df[~test_df.session.isin(overlapped_session)]

    new_test = []
    data_to_cal_validation_score = []
    for grp in tqdm(test_df.groupby("session")):
        if grp[1].shape[0] < 2 : continue
        cutoff = np.random.randint(1,grp[1].shape[0])
        new_test.append(grp[1].iloc[:cutoff])
        data_to_cal_validation_score.append(grp[1].iloc[cutoff:])
    test_df = pd.concat(new_test).reset_index(drop=True)
    valid = pd.concat(data_to_cal_validation_score).reset_index(drop=True)

    test_df.to_parquet('../data/CV/local_train2.parquet')
    valid.to_parquet('../data/CV/local_train2_labels.parquet')
    train_df.to_parquet("../data/CV/local_train1.parquet")
    del new_test, data_to_cal_validation_score
else :
    seven_days = 7*24*60*60
    train_cutoff = train_df.ts.max() - seven_days
    train_df = train_df[train_df.ts <= train_cutoff]

100%|██████████| 2455308/2455308 [03:46<00:00, 10820.49it/s]


In [4]:
fraction_of_sessions_to_use = 1
if fraction_of_sessions_to_use != 1 :
    lucky_sessions_train = train_df.drop_duplicates(['session']).sample(frac=fraction_of_sessions_to_use,random_state=42)['session']
    subset_of_train = train_df[train_df.session.isin(lucky_sessions_train)]
    lucky_sessions_test = test_df.drop_duplicates(['session'])['session']
    subset_of_test = test_df[test_df.session.isin(lucky_sessions_test)]
else :
    subset_of_train = train_df
    subset_of_test = test_df

subset_of_train.head()

Unnamed: 0,session,aid,ts,type
0,0,1517085,1659304800,0
1,0,1563459,1659304904,0
2,0,1309446,1659367439,0
3,0,16246,1659367719,0
4,0,1781822,1659367871,0


In [10]:
subset_of_train.session.max()

8643219

In [12]:
subset_of_test.session.max()

11098509

In [13]:
from tqdm import tqdm
from collections import defaultdict,Counter

def recall_covisitation_matrix(train,test,recall_num):
    subset_of_train = train
    subset_of_test = test
    subset_of_train.index = pd.MultiIndex.from_frame(subset_of_train[['session']])
    subset_of_test.index = pd.MultiIndex.from_frame(subset_of_test[['session']])
    chunk_size = 3000000
    min_ts = subset_of_train.ts.min()
    max_ts = subset_of_train.ts.max()

    next_AIDs = defaultdict(Counter)
    # subsets = pd.concat([subset_of_train, subset_of_test])
    subsets = subset_of_train
    sessions = subsets.session.unique()

    for i in tqdm(range(0,sessions.shape[0],chunk_size)):
        current_chunk = subsets.loc[sessions[i]:sessions[min(sessions.shape[0]-1,i+chunk_size-1)]].reset_index(drop=True)
        current_chunk = current_chunk.groupby('session',as_index=False).nth(list(range(-30,0))).reset_index(drop=True)
        consecutive_AIDs = current_chunk.merge(current_chunk,on='session')
        consecutive_AIDs = consecutive_AIDs[consecutive_AIDs['aid_x'] != consecutive_AIDs['aid_y']]
        consecutive_AIDs['days_elapsed'] = (consecutive_AIDs.ts_y - consecutive_AIDs.ts_x) / (24*60*60)
        consecutive_AIDs = consecutive_AIDs[(consecutive_AIDs.days_elapsed >= 0) & (consecutive_AIDs.days_elapsed <= 1)]
        for aid_x , aid_y in zip(consecutive_AIDs['aid_x'], consecutive_AIDs['aid_y']):
            next_AIDs[aid_x][aid_y] += 1

    session_type = ['clicks','carts','orders']
    test_session_AIDs = test_df.reset_index(drop=True).groupby('session')['aid'].apply(list)
    test_session_types = test_df.reset_index(drop=True).groupby('session')['type'].apply(list)
    labels = []
    no_data = 0
    no_data_all_aids = 0
    type_weight_multipliers = {0:1,1:6,2:3}
    for AIDs , types in tqdm(zip(test_session_AIDs,test_session_types),total=len(test_session_types)):
        if len(AIDs) >= 30 :
            weights = np.logspace(0.1,1,len(AIDs),base=2,endpoint=True)
            aids_temp = defaultdict(lambda : 0)
            for aid,w,t in zip(AIDs,weights,types):
                aids_temp[aid] += w * type_weight_multipliers[t]
            sorted_aids = [k for k,v in sorted(aids_temp.items(),key=lambda item:-item[1])]
            labels.append(sorted_aids[:recall_num])
        else :
            AIDs = list(dict.fromkeys(AIDs[::-1]))
            AIDs_len_start = len(AIDs)

            candidates = []
            for AID in AIDs:
                if AID in next_AIDs: candidates += [aid for aid, count in next_AIDs[AID].most_common(20)]
            AIDs += [AID for AID, cnt in Counter(candidates).most_common(recall_num) if AID not in AIDs]

            labels.append(AIDs[:recall_num])
            if candidates == []: no_data += 1
            if AIDs_len_start == len(AIDs): no_data_all_aids += 1
    return pd.DataFrame(data={'session_type': test_session_AIDs.index, 'labels': labels})


In [None]:
recall1 = recall_covisitation_matrix(subset_of_train,subset_of_test,recall_num=150)

In [22]:
train_df = train_df.reset_index(drop=True)
train_df

Unnamed: 0,session,aid,ts,type
0,0,1517085,1659304800,0
1,0,1563459,1659304904,0
2,0,1309446,1659367439,0
3,0,16246,1659367719,0
4,0,1781822,1659367871,0
...,...,...,...,...
163955176,11098523,175715,1661119197,0
163955177,11098524,1088524,1661119198,0
163955178,11098525,182927,1661119199,0
163955179,11098526,510055,1661119199,0


In [23]:
from gensim.models import Word2Vec
# 训练 Word2Vec
recall_num=20
vector_size=64
window=5
sg=1
min_count=1
workers=4
sessions = train_df.groupby("session")['aid'].apply(list).tolist()
w2v_model = Word2Vec(
    sentences=[[str(a) for a in s] for s in sessions],
    vector_size=vector_size,
    window=window,
    sg=sg,
    min_count=min_count,
    workers=workers
)

In [24]:
from collections import defaultdict, Counter
import numpy as np
import pandas as pd
from tqdm import tqdm
from sklearn.preprocessing import normalize
from gensim.models import Word2Vec
import faiss

def recall_word2vec(train, test, recall_num=100, w2v_model=None):
    """
    Word2Vec 批量 Faiss 召回 (避免循环每个 aid)
    """
    # ===== 训练 Word2Vec 模型 =====
    if w2v_model is None:
        sessions = train.groupby("session")['aid'].apply(list).tolist()
        w2v_model = Word2Vec(
            sentences=[[str(a) for a in s] for s in sessions],
            vector_size=64, window=5, sg=1, min_count=1, workers=4
        )

    # ===== 构建 Faiss 索引 =====
    keys = w2v_model.wv.index_to_key
    emb_matrix = normalize(w2v_model.wv.vectors).astype('float32')
    aid2idx = {int(k): i for i, k in enumerate(keys)}

    d = emb_matrix.shape[1]
    index = faiss.IndexFlatIP(d)
    index.add(emb_matrix)

    # ===== 测试集 =====
    test = test.reset_index(drop=True)
    test_grouped = test.groupby('session').agg(list)
    test_session_AIDs = test_grouped['aid']
    test_session_types = test_grouped['type']

    labels = []
    type_weight_multipliers = {0:1, 1:6, 2:3}

    # --- 收集所有短 session 的 aid，一次性批量查询 ---
    short_sessions = []
    short_aids = set()
    for idx, AIDs in tqdm(enumerate(test_session_AIDs)):
        AIDs = list(dict.fromkeys(AIDs[::-1]))  # 最近交互优先
        short_sessions.append((idx, AIDs))
        short_aids.update([aid for aid in AIDs if aid in aid2idx])

    short_aids = list(short_aids)
    if short_aids:
        query_vecs = np.array([normalize(w2v_model.wv[str(aid)].reshape(1, -1))[0] for aid in tqdm(short_aids)], dtype='float32')
        D, I = index.search(query_vecs, 30)  # batch search
        print("查询完成")
        # 构建 aid -> topn candidates 映射
        aid2candidates = {aid: [int(keys[i]) for i in row if int(keys[i]) != aid][:30]
                          for aid, row in tqdm(zip(short_aids, I), total=len(short_aids))}
    else:
        aid2candidates = {}

    # --- 生成最终 labels ---
    for idx, AIDs in tqdm(enumerate(test_session_AIDs)):
        if len(AIDs) >= 30:
            AIDs = AIDs[:30]

        AIDs = list(dict.fromkeys(AIDs[::-1]))
        candidates = []
        for aid in AIDs:
            if aid in aid2candidates:
                candidates.extend(aid2candidates[aid])
        # 去重补全
        AIDs = []
        AIDs += [AID for AID, cnt in Counter(candidates).most_common(recall_num) if AID not in AIDs]
        labels.append(AIDs[:recall_num])

    return pd.DataFrame({'session_type': test_session_AIDs.index, 'labels': labels})


In [25]:
recall2 = recall_word2vec(subset_of_train,subset_of_test,recall_num=100,w2v_model=w2v_model)

1801251it [00:25, 71486.44it/s] 
100%|██████████| 856251/856251 [01:10<00:00, 12215.35it/s]


查询完成


100%|██████████| 856251/856251 [00:12<00:00, 66693.84it/s]
1801251it [01:08, 26446.05it/s]


In [None]:
recall2

In [None]:
recall1

In [None]:
import pandas as pd
from collections import defaultdict

def merge_recalls_with_rank(*recalls, topk=None):
    """
    合并多个召回 DataFrame，去重并保留每个 aid 在各个召回中的原始排名
    参数:
        *recalls: 任意数量的 DataFrame，每个包含 'session_type' 和 'labels'
        topk: 合并后取前 topk 个 aid
    返回:
        merged_df: 合并去重后的 DataFrame，包含 'session_type' 和 'labels'
        aid_rank_all: 每个 session 的 aid 在各个召回中的排名列表
    """
    if not recalls:
        return None, None

    # 确保 merged_df 有 labels 列
    merged_df = recalls[0][['session_type']].copy()
    merged_df['labels'] = [[] for _ in range(len(merged_df))]

    session_ids = merged_df['session_type'].tolist()
    aid_rank_all = {}

    # 遍历每个 session
    for session in tqdm(session_ids):
        combined = []
        rank_dict = defaultdict(lambda: [None]*len(recalls))

        for i, recall_df in enumerate(recalls):
            row = recall_df[recall_df['session_type']==session]
            if row.empty:
                continue
            labels = row['labels'].values[0]
            for rank, aid in enumerate(labels):
                if aid not in combined:
                    combined.append(aid)
                if rank_dict[aid][i] is None:
                    rank_dict[aid][i] = rank

        if topk:
            combined = combined[:topk]

        # 用 .at 安全赋值单元格，避免 loc 批量赋值报错
        idx = merged_df[merged_df['session_type']==session].index[0]
        merged_df.at[idx, 'labels'] = combined
        aid_rank_all[session] = rank_dict

    return merged_df, aid_rank_all

# 使用示例
recalled, rank_df = merge_recalls_with_rank(recall1, recall2, topk=100)


In [None]:
recalled

In [None]:
recalled.to_parquet("../output/recall.parquet")

In [None]:
import pandas as pd

def rank_dict_to_df(aid_rank_all):
    rows = []
    for session, rank_dict in tqdm(aid_rank_all.items()):
        for aid, ranks in rank_dict.items():
            rows.append({
                "session_type": session,
                "aid": aid,
                **{f"recall{i+1}_rank": r for i, r in enumerate(ranks)}
            })
    return pd.DataFrame(rows)

# 转换并保存
rank_df = rank_dict_to_df(rank_df)
rank_df.to_parquet("aid_ranks.parquet", index=False)  # 保存成 parquet


In [None]:
rank_df

In [None]:
recalled

In [None]:
valid

In [None]:
curvalid = valid[valid.type == 0]

In [None]:
merged_df = pd.merge(recalled,curvalid,left_on="session_type",right_on="session",how="left")
merged_df

In [None]:
merged_df = merged_df.groupby("session_type").head(1)
merged_df

In [None]:
def calculate_recall_at_k(pred_df, valid_df, k=20):
    """
    严格按照比赛规则计算Recall@K
    
    参数:
    - pred_df: 预测结果DataFrame，包含'session_type'和'labels'列
    - valid_df: 验证集DataFrame，包含'session', 'aid', 'type'列
    - k: topk的k值，默认20
    
    返回:
    - 各类型的recall值和加权平均
    """
    results = {}
    
    # 处理clicks: 只看下一个点击
    clicks_valid = valid_df[valid_df.type == 0].copy()
    clicks_valid = clicks_valid.sort_values(['session', 'ts'])
    # 只取每个session第一个click
    first_clicks = clicks_valid.groupby('session', as_index=False).first()
    
    if len(first_clicks) > 0:
        hits = 0
        for _, row in first_clicks.iterrows():
            session = row['session']
            next_click = row['aid']
            pred_row = pred_df[pred_df['session_type'] == session]
            if len(pred_row) > 0:
                pred_aids = pred_row.iloc[0]['labels'][:k]
                if next_click in pred_aids:
                    hits += 1
        clicks_recall = hits / len(first_clicks)
        results['clicks'] = clicks_recall
        print(f"Clicks Recall@{k}: {clicks_recall:.4f} ({hits}/{len(first_clicks)})")
    
    # 处理carts和orders
    for type_id, type_name in [(1, 'carts'), (2, 'orders')]:
        type_valid = valid_df[valid_df.type == type_id]
        if len(type_valid) > 0:
            recall_sum = 0
            valid_sessions = 0
            
            # 按session分组获取真实的aid列表
            for session, group in type_valid.groupby('session'):
                pred_row = pred_df[pred_df['session_type'] == session]
                if len(pred_row) > 0:
                    pred_aids = pred_row.iloc[0]['labels'][:k]
                    true_aids = set(group['aid'].unique())
                    
                    # 分母取真实aid数量和k的最小值
                    denominator = min(len(true_aids), k)
                    # 分子是预测集合与真实集合的交集大小
                    hits = len(set(pred_aids).intersection(true_aids))
                    
                    recall_sum += hits / denominator
                    valid_sessions += 1
            
            if valid_sessions > 0:
                type_recall = recall_sum / valid_sessions
                results[type_name] = type_recall
                print(f"{type_name.capitalize()} Recall@{k}: {type_recall:.4f} ({valid_sessions} sessions)")
    
    # 计算加权平均
    if len(results) == 3:  # 确保所有类型都有结果
        weighted_recall = 0.1 * results['clicks'] + 0.3 * results['carts'] + 0.6 * results['orders']
        results['weighted_avg'] = weighted_recall
        print(f"\nWeighted Average Recall@{k}: {weighted_recall:.4f}")
    
    return results

# 计算召回率
print("计算整体的召回率：")
metrics = calculate_recall_at_k(recalled, valid, k=20)