In [1]:
import dask.dataframe as dd
import pandas as pd
import numpy as np
import xgboost as xgb
import gc

from sklearn.utils import shuffle
from google.colab import drive
import os, sys, pickle, glob, gc
drive.mount('/content/drive')

VER = 1
USE = 'clicks'

# EARLY STOPPING
VALIDATE = True

CT = 50

# 读取 Parquet 文件
ddf = dd.read_parquet('/content/drive/MyDrive/OTTO/merge_features_labels.parquet')

# 替换 inf 为 NaN
ddf = ddf.replace([np.inf, -np.inf], np.nan)

# 提取所有 session（唯一值）到本地
unique_sessions = ddf['session'].drop_duplicates().compute()
unique_sessions = unique_sessions.sample(frac=1, random_state=42)  # 打乱顺序

FOLDS = 5
fold_size = len(unique_sessions) // FOLDS
folds = []
for i in range(FOLDS):
    start = i * fold_size
    end = (i+1) * fold_size if i < FOLDS-1 else len(unique_sessions)
    valid_sessions = unique_sessions.iloc[start:end]
    train_sessions = pd.concat([unique_sessions.iloc[:start], unique_sessions.iloc[end:]])
    folds.append((train_sessions, valid_sessions))

#########################
# 0. 超参数与初始设置
#########################
FOLDS = 5
SEED = 42
VER = 412
CT = 50
VALIDATE = True
USE_LOADER = True

# XGBoost 参数
xgb_parms = {
    'objective':'rank:pairwise',
    'eval_metric':'map',
    'tree_method':'hist',
    'device': 'cuda',
    'learning_rate':0.1,
    'max_depth':4,
    'subsample':0.7,
    'colsample_bytree':0.5,
    'random_state': SEED
}

#########################
# 1. 用 Dask 读入数据
#########################
file_path = "/content/drive/MyDrive/OTTO/merge_features_labels.parquet"

# 用 Dask DataFrame 读取
ddf = dd.read_parquet(file_path)

print("Dask DataFrame Structure:")
print(ddf)

# 将 inf 替换为 NaN
ddf = ddf.replace([np.inf, -np.inf], np.nan)

# 如果需要减少内存，可转换类型（示例：float64 -> float32）
for col, dtype_ in ddf.dtypes.items():
    if dtype_ == 'float64':
        ddf[col] = ddf[col].astype('float32')

# 选择特征列：假设你和之前一样，取 columns[2:-3] 并排除目标
all_cols = ddf.columns
FEATURES = all_cols[2:-3]
FEATURES = [c for c in FEATURES if c not in [USE, 'session']]  # 确保不含目标/分组列


#########################
# 2. 构造分组（Group K-Fold）
#########################
# 先获取所有 session 的唯一值到本地
unique_sessions = ddf['session'].drop_duplicates().compute()
print("Total unique sessions:", len(unique_sessions))

# 打乱 session 顺序
unique_sessions = shuffle(unique_sessions, random_state=SEED).reset_index(drop=True)

fold_size = len(unique_sessions) // FOLDS
folds = []
for i in range(FOLDS):
    start = i * fold_size
    end = (i+1) * fold_size if i < FOLDS-1 else len(unique_sessions)
    valid_sessions = unique_sessions.iloc[start:end]
    train_sessions = pd.concat([unique_sessions.iloc[:start], unique_sessions.iloc[end:]])
    folds.append((train_sessions, valid_sessions))

# 预先分配 OOF 数组：需要先拿到全局行索引
row_count = ddf.shape[0].compute()
oof = np.zeros(row_count, dtype=np.float32)

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Dask DataFrame Structure:
Dask DataFrame Structure:
               session    aid session_length session_click_ratio session_cart_ratio session_order_ratio session_last_click_aid session_last_atc_aid session_last_order_aid session_last_click_timestamp session_last_atc_timestamp session_last_order_timestamp item_popularity item_cart_count item_order_count item_conversion_rate item_atc_rate item_atc_conversion_rate item_recent_clicks item_recent_carts item_recent_orders item_type_mean session_item_click_count session_item_cart_count session_item_order_count session_item_last_interaction_type_clicks session_item_last_interaction_type_orders session_item_abs_click_hots session_item_abs_atc_hots session_item_abs_order_hots session_item_abs_click_time session_item_abs_atc_time session_item_abs_order_time clicks  carts orders
npartitions=10                          

In [2]:
#########################
# 0. 手动设置要跑的 fold
#########################
FOLD_TO_RUN = 0  # 例如只跑第 0 折

#########################
# 1. 初始化一些全局对象
#########################

row_count = ddf.shape[0].compute()
oof = np.zeros(row_count, dtype=np.float32)

#########################
# 2. 只执行指定的 fold
#########################
train_sessions, valid_sessions = folds[FOLD_TO_RUN]
fold = FOLD_TO_RUN

print("\n", "#"*25)
print(f"### Fold {fold+1}")
print("Train sessions:", len(train_sessions), "Valid sessions:", len(valid_sessions))
print("#"*25)

#########################
# 2.1 过滤当前折的训练/验证数据
#########################
dtrain_dd = ddf[ ddf.session.isin(train_sessions) ]
dvalid_dd = ddf[ ddf.session.isin(valid_sessions) ]

# 将 Dask DataFrame 转成 pandas
train_df = dtrain_dd.compute()
valid_df = dvalid_dd.compute()

print("Train size:", len(train_df), "Valid size:", len(valid_df))

#########################
# 2.2 构建 XGBoost DMatrix
#########################
group_train = train_df.groupby('session').size().tolist()
group_valid = valid_df.groupby('session').size().tolist()

dtrain = xgb.DMatrix(data=train_df[FEATURES], label=train_df[USE])
dtrain.set_group(group_train)

if VALIDATE:
    dvalid = xgb.DMatrix(data=valid_df[FEATURES], label=valid_df[USE])
    dvalid.set_group(group_valid)
    evals_list = [(dtrain, 'train'), (dvalid, 'valid')]
else:
    evals_list = [(dtrain, 'train')]

#########################
# 2.3 训练 XGBoost 模型
#########################
model = xgb.train(
    params=xgb_parms,
    dtrain=dtrain,
    evals=evals_list,
    num_boost_round=10000,
    early_stopping_rounds=100,
    verbose_eval=100
)

# 保存模型
model.save_model(f'XGB_fold{fold}_{USE}_v{VER}.json')

#########################

#########################
# 2.5 分批推断 (OOF)
#########################
valid_df['global_idx'] = valid_df.index
valid_df.reset_index(drop=True, inplace=True)

local_oof = np.zeros(len(valid_df), dtype=np.float32)
print('Inferring in chunks...')
batch_size = 2_000_000
start_pos = 0
while start_pos < len(valid_df):
    end_pos = min(start_pos + batch_size, len(valid_df))
    X_batch = valid_df.iloc[start_pos:end_pos][FEATURES]
    dval_batch = xgb.DMatrix(X_batch)
    preds = model.predict(dval_batch)

    local_oof[start_pos:end_pos] = preds

    start_pos = end_pos
    del X_batch, dval_batch, preds
    gc.collect()

# 映射回全局 OOF 数组
global_idx_for_valid = valid_df['global_idx'].values
oof[global_idx_for_valid] = local_oof

np.save(f"valid_idx_fold{fold}.npy", global_idx_for_valid)
np.save(f"oof_fold{fold}.npy", local_oof)

#########################
# 2.6 释放内存
#########################
del dtrain, train_df, group_train
if VALIDATE:
    del dvalid, group_valid
del valid_df, local_oof, global_idx_for_valid
del model
gc.collect()

#########################
# 3. 单折训练完成
#########################
print(f"Fold {fold} finished.")
print("You can now save or export `oof`, `importances`, etc.")



 #########################
### Fold 1
Train sessions: 1441001 Valid sessions: 360250
#########################
Train size: 72050050 Valid size: 18012500
[0]	train-map:0.76870	valid-map:0.76919
[100]	train-map:0.75948	valid-map:0.76033
[102]	train-map:0.75954	valid-map:0.76038
Inferring in chunks...
Fold 0 finished.
You can now save or export `oof`, `importances`, etc.


In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
import numpy as np


row_count = 90062550
final_oof = np.zeros(row_count, dtype=np.float32)

# 依次加载每折的索引和预测
for fold in range(5):
    valid_idx = np.load(f"/content/drive/MyDrive/OTTO/clicks/idx/valid_idx_fold{fold}.npy")  # 本折验证集行索引
    local_oof = np.load(f"/content/drive/MyDrive/OTTO/clicks/oof/oof_fold{fold}.npy")        # 本折 OOF 预测

    # 放回到全局 OOF 数组对应的位置
    final_oof[valid_idx] = local_oof

# 现在 final_oof 就包含了所有行的 OOF 预测结果


In [4]:
import dask.dataframe as dd
import pandas as pd
import numpy as np
import xgboost as xgb
import gc

from sklearn.utils import shuffle
from google.colab import drive
import os, sys, pickle, glob, gc
USE = 'clicks'
ddf = dd.read_parquet('/content/drive/MyDrive/OTTO/merge_features_labels.parquet')
sub = ddf[['session','aid']].compute().copy()

In [5]:
sub['p'] = final_oof
#sub = cudf.DataFrame(sub)
sub = sub.sort_values(['session','p'],ascending=[True,False])
print( sub.shape )
sub.head()

sub = sub.reset_index(drop=True)
sub['n'] = sub.groupby('session').aid.cumcount().astype('int8')
sub = sub.loc[sub.n<20]

sub2 = sub.groupby('session').aid.apply(list)
sub2 = sub2.to_frame().reset_index()
sub2.aid = sub2.aid.apply(lambda x: " ".join(map(str,x)))
sub2.columns = ['session_type','labels']
sub2.session_type = sub2.session_type.astype('str')+ f'_{USE}'

(90062550, 3)


In [6]:
sub.head()

Unnamed: 0,session,aid,p,n
0,11098528,1633746,1.738267,0
1,11098528,855613,1.439878,1
2,11098528,1586171,-0.510245,2
3,11098528,986164,-0.605452,3
4,11098528,523174,-0.642584,4


In [7]:
sub2.head()

Unnamed: 0,session_type,labels
0,11098528_clicks,1633746 855613 1586171 986164 523174 876493 16...
1,11098529_clicks,612829 1307369 1632356 132016 982457 1450681 2...
2,11098530_clicks,801774 1083665 1257293 77440 544144 332654 297...
3,11098531_clicks,77440 698990 653835 1415171 71884 1136142 1239...
4,11098532_clicks,1767768 876469 7651 970284 1629847 1704066 315...


In [8]:
submission = sub2.copy()

In [9]:
submission.head()

Unnamed: 0,session_type,labels
0,11098528_clicks,1633746 855613 1586171 986164 523174 876493 16...
1,11098529_clicks,612829 1307369 1632356 132016 982457 1450681 2...
2,11098530_clicks,801774 1083665 1257293 77440 544144 332654 297...
3,11098531_clicks,77440 698990 653835 1415171 71884 1136142 1239...
4,11098532_clicks,1767768 876469 7651 970284 1629847 1704066 315...


In [10]:
def fill_up(x):
    return []

In [11]:
%%time
submission['session'] = submission.session_type.apply(lambda x: int(x.split('_')[0]))
submission['type'] = submission.session_type.apply(lambda x: x.split('_')[1])
submission.labels = submission.labels.apply(lambda x: [int(i) for i in x.split(' ')[:20]])

test_labels = pd.read_parquet('/content/drive/MyDrive/OTTO/val_labels.parquet')
test_labels = test_labels.loc[test_labels['type']==USE]

test_labels = test_labels.merge(submission, how='left', on=['session', 'type'])
test_labels.loc[test_labels.labels.isna(),'labels'] =\
    test_labels.loc[test_labels.labels.isna(),'labels'].map(fill_up)

test_labels['hits'] = test_labels.apply(lambda df: len(set(df.ground_truth).intersection(set(df.labels))), axis=1)
test_labels['gt_count'] = test_labels.ground_truth.str.len().clip(0,20)

recall_per_type = test_labels.groupby(['type'])['hits'].sum() / test_labels.groupby(['type'])['gt_count'].sum()

score = (recall_per_type * pd.Series({'clicks': 0.10, 'carts': 0.30, 'orders': 0.60})).sum()
print( score , recall_per_type)


0.04981133945568699 type
clicks    0.498113
dtype: float64
CPU times: user 29.1 s, sys: 855 ms, total: 30 s
Wall time: 31.6 s
