In [1]:
import pandas as pd
import numpy as np
import lightgbm as lgb
from sklearn.preprocessing import LabelEncoder
import joblib
import json
import os
import polars as pl
import kaggle_evaluation.mcts_inference_server

### 步骤 1: 准备蓝图 - 从 G1, G2, G3 加载"知识"

此单元格加载所有训练数据**一次**，以构建所有必需的"蓝图"（要删除的列、编码器、特征列表）。实际的预测步骤将**不会**再加载它们。

In [2]:
KAGGLE_INPUT_DIR = "../input/um-game-playing-strength-of-mcts-variants/"

BLUEPRINT_DIR = "../input/um-utils/" 

# G1 蓝图
G1_BLUEPRINT_PATH = f"{BLUEPRINT_DIR}g1_constant_cols.json"

# G2 蓝图
G2_ENCODERS_PATH = f"{BLUEPRINT_DIR}g2_encoders.pkl"
G2_FEATURES_PATH = f"{BLUEPRINT_DIR}g2_selected_features.json"

# G3 蓝图
G3_ENCODERS_PATH = f"{BLUEPRINT_DIR}g3_encoders.pkl"
G3_MODEL_PATH = f"{BLUEPRINT_DIR}lightgbm_groupkfold_baseline.txt"

print("文件路径定义完毕。")

文件路径定义完毕。


In [3]:
with open(f"{BLUEPRINT_DIR}g1_constant_cols.json", 'r') as f:
    constant_cols_g1 = json.load(f) 

cols_to_drop_g1_initial = [
    'Id', 'GameRulesetName', 'EnglishRules', 'LudRules', 
    'num_wins_agent1', 'num_draws_agent1', 'num_losses_agent1'
]
print(f"G1: 成功加载 {len(constant_cols_g1)} 个常量列。")

# --- G2 蓝图 ---
encoders_g2 = joblib.load(G2_ENCODERS_PATH)
print("G2: 成功加载 G2 编码器。")
with open(G2_FEATURES_PATH, 'r') as f:
    selected_features_g2 = json.load(f)
print(f"G2: 成功加载 {len(selected_features_g2)} 个特征列表。")

# --- G3 蓝图 ---
encoders_g3 = joblib.load(G3_ENCODERS_PATH)
print("G3: 成功加载 G3 编码器。")
model_g3 = lgb.Booster(model_file=G3_MODEL_PATH)
print("G3: 成功加载模型。")

G1: 成功加载 198 个常量列。
G2: 成功加载 G2 编码器。
G2: 成功加载 100 个特征列表。
G3: 成功加载 G3 编码器。
G3: 成功加载模型。


https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations


In [4]:
def parse_agent_strings_g1(df: pd.DataFrame) -> pd.DataFrame:
    df_copy = df.copy()
    agent1_parts = df_copy['agent1'].str.split('-', expand=True)
    df_copy['agent1_selection'] = agent1_parts[1]
    df_copy['agent1_exploration'] = agent1_parts[2].astype(float)
    df_copy['agent1_playout'] = agent1_parts[3]
    df_copy['agent1_score_bounds'] = agent1_parts[4].map({'true': 1, 'false': 0}).astype(int)

    agent2_parts = df_copy['agent2'].str.split('-', expand=True)
    df_copy['agent2_selection'] = agent2_parts[1]
    df_copy['agent2_exploration'] = agent2_parts[2].astype(float)
    df_copy['agent2_playout'] = agent2_parts[3]
    df_copy['agent2_score_bounds'] = agent2_parts[4].map({'true': 1, 'false': 0}).astype(int)
    
    df_copy.drop(columns=['agent1', 'agent2'], inplace=True)
    return df_copy

# 复刻自 G2 (G2_Feature_engineering.ipynb, Cell 19)
def create_interaction_features_g2(df: pd.DataFrame, encoders: dict) -> pd.DataFrame:
    df_copy = df.copy()
    
    top_10_features = [
        'agent2_selection', 'agent1_selection', 'AdvantageP1', 'agent1_playout',
        'agent2_playout', 'agent2_exploration', 'PlayoutsPerSecond',
        'DurationTurnsNotTimeouts', 'GameTreeComplexity', 'agent1_exploration'
    ]
    agent_strategy_features = ['agent1_selection', 'agent1_playout', 'agent2_selection', 'agent2_playout']

    # 1. 应用已加载的 G2 编码器 (transform)
    for col, le in encoders.items():
        classes = set(le.classes_)
        df_copy[f'{col}_encoded'] = df_copy[col].apply(lambda x: x if x in classes else 'unknown')
        if 'unknown' not in le.classes_:
            le.classes_ = np.append(le.classes_, 'unknown')
        df_copy[f'{col}_encoded'] = le.transform(df_copy[f'{col}_encoded'])
    
    # 2. 创建 G2 交互特征 (乘法)
    interaction_features_to_create = [f for f in top_10_features if f not in agent_strategy_features and f in df_copy.columns]
    
    for top_feat in interaction_features_to_create:
        df_copy[f'int_{top_feat}_x_a1_select'] = df_copy[top_feat] * df_copy['agent1_selection_encoded']
        df_copy[f'int_{top_feat}_x_a1_playout'] = df_copy[top_feat] * df_copy['agent1_playout_encoded']
        df_copy[f'int_{top_feat}_x_a2_select'] = df_copy[top_feat] * df_copy['agent2_selection_encoded']
        df_copy[f'int_{top_feat}_x_a2_playout'] = df_copy[top_feat] * df_copy['agent2_playout_encoded']
        
    return df_copy

In [5]:
def process_test_batch(test_df_pandas: pd.DataFrame) -> pd.DataFrame:
    
    # --- G1 (eda.ipynb) 流程 ---
    test_df_pandas.drop(columns=cols_to_drop_g1_initial, inplace=True, errors='ignore')
    test_df_pandas.drop(columns=constant_cols_g1, inplace=True, errors='ignore')
    
    #
    test_df_processed = parse_agent_strings_g1(test_df_pandas)
    
    # --- G2 (G2_Feature_engineering.ipynb) 流程 ---
    #
    test_df_processed = create_interaction_features_g2(test_df_processed, encoders_g2)
    
    #
    test_df_final_features = test_df_processed.reindex(columns=selected_features_g2)
    
    # --- G3 (baseline_lightgbm.ipynb) 流程 ---
    #
    for col, le in encoders_g3.items():
        if col in test_df_final_features.columns:
            # 处理 test.csv 中可能出现的新类别
            classes = set(le.classes_)
            test_df_final_features[col] = test_df_final_features[col].astype(str).apply(lambda x: x if x in classes else 'unknown')
            if 'unknown' not in le.classes_:
                le.classes_ = np.append(le.classes_, 'unknown')
            test_df_final_features[col] = le.transform(test_df_final_features[col])
    
    return test_df_final_features

In [6]:
def predict(test: pl.DataFrame, sample_sub: pl.DataFrame) -> pl.DataFrame:
    print(f"--- 接收到 {len(test)} 行测试数据 ---")
    
    # 1. 将 Polars 转换为 Pandas (因为 G1,G2,G3 都是用 Pandas 写的)
    test_df_pandas = test.to_pandas()
    
    # 2. 调用我们完整的 G1, G2, G3 处理函数
    test_df_processed = process_test_batch(test_df_pandas)
    
    # 3. 执行检测机制（可选，但推荐）
    if list(test_df_processed.columns) != selected_features_g2:
        print("❌ 错误: 处理后的列与 G2 蓝图不匹配!")
    non_numeric_cols = test_df_processed.select_dtypes(exclude=np.number).columns
    if len(non_numeric_cols) > 0:
        print(f"❌ 错误: 发现非数字列: {list(non_numeric_cols)}")
        
    # 4. 使用 G3 模型进行预测
    predictions = model_g3.predict(test_df_processed)
    
    # 5. 将预测结果放回提交模板 (Polars 格式)
    sample_sub = sample_sub.with_columns(pl.lit(predictions).alias('utility_agent1'))
    
    print(f"--- 成功预测 {len(sample_sub)} 行 ---")
    return sample_sub

In [7]:
print("--- 正在初始化 MCTS 推理服务器 ---")
inference_server = kaggle_evaluation.mcts_inference_server.MCTSInferenceServer(predict)

# 检查我们是否在 Kaggle 的提交环境中运行
if os.getenv('KAGGLE_IS_COMPETITION_RERUN'):
    print("--- 在提交环境中：启动服务器 .serve() ---")
    inference_server.serve()
else:
    print("--- 在本地环境中：运行 .run_local_gateway() ---")
    try:
        inference_server.run_local_gateway(
            (
                f'{KAGGLE_INPUT_DIR}test.csv',
                f'{KAGGLE_INPUT_DIR}sample_submission.csv'
            )
        )
        print("--- 本地模拟运行完毕。将生成 submission.parquet ---")
    except FileNotFoundError:
        print("\n 警告: 未在 ../input/ 中找到 test.csv 或 sample_submission.csv。")
    except Exception as e:
        print(f"\n 本地模拟运行时出错: {e}")


--- 正在初始化 MCTS 推理服务器 ---
--- 在本地环境中：运行 .run_local_gateway() ---
--- 接收到 3 行测试数据 ---
--- 成功预测 3 行 ---
--- 本地模拟运行完毕。将生成 submission.parquet ---
