In [None]:
class Config:
    data_dir = "data"
    n_trials_base = 30
    n_trials_meta = 30
    n_splits = 5
    random_state = 42
    output_name = "submission/pipe_task1_predictions.csv"
    summary_name = "pipe_task1_summary.json"
    base_trials = {"rf": 10}

FEATURE_COLUMNS = [
    "kill_death_ratio", "headshot_percentage", "win_rate", "accuracy_score",
    "kill_consistency", "reaction_time_ms", "account_age_days", "level",
    "level_progression_speed", "friend_network_size", "reports_received",
    "night_play_ratio", "weapon_switch_speed", "aiming_smoothness",
    "spray_control_score", "game_sense_score", "communication_rate",
    "team_play_score", "buy_decision_score", "first_blood_rate",
    "survival_time_avg", "damage_per_round", "utility_usage_rate",
    "crosshair_placement",
]
TARGET_COLUMN = "is_cheater"
ID_COLUMN = "id"
BETA = 2.0

In [None]:
def json_serialize(obj):
    if isinstance(obj, (np.integer, np.floating)): return obj.item()
    if isinstance(obj, np.ndarray): return obj.tolist()
    return str(obj)

def prepare_features(train_df, test_df, features):
    train_df, test_df = train_df.copy(), test_df.copy()
    categorical_cols = [col for col in features if train_df[col].dtype == "object"]
    if categorical_cols:
        encoder = OrdinalEncoder(handle_unknown="use_encoded_value", unknown_value=-1)
        combined = pd.concat([train_df[categorical_cols], test_df[categorical_cols]], axis=0, ignore_index=True)
        encoder.fit(combined.fillna("missing").astype(str))
        train_df[categorical_cols] = encoder.transform(train_df[categorical_cols].fillna("missing").astype(str))
        test_df[categorical_cols] = encoder.transform(test_df[categorical_cols].fillna("missing").astype(str))
    
    fill_values = train_df[features].median()
    train_df[features] = train_df[features].fillna(fill_values)
    test_df[features] = test_df[features].fillna(fill_values)
    return train_df[features].to_numpy(np.float32), test_df[features].to_numpy(np.float32)

def f2_metric(y_true, y_score):
    preds = (y_score.ravel() >= 0.5).astype(int)
    return fbeta_score(y_true, preds, beta=BETA, average="macro", zero_division=0)

def get_predictions(model, X):
    if hasattr(model, "predict_proba"):
        proba = model.predict_proba(X)
        return proba[:, -1].reshape(-1, 1) if proba.ndim == 2 else proba.reshape(-1, 1)
    return np.asarray(model.predict(X)).reshape(-1, 1)

In [None]:
def build_model(model_name, params, random_state):
    if model_name == "rf":
        return RandomForestClassifier(**params, n_jobs=-1, random_state=random_state)
    if model_name == "lgbm":
        return lgb.LGBMClassifier(**params, objective="binary", n_jobs=-1, verbosity=-1, random_state=random_state)
    if model_name == "xgb":
        return xgb.XGBClassifier(**params, objective="binary:logistic", eval_metric="logloss", tree_method="hist", random_state=random_state, n_jobs=-1)
    if model_name == "cat":
        return CatBoostClassifier(**params, loss_function="Logloss", verbose=False, allow_writing_files=False, random_seed=random_state)
    raise ValueError(f"Unknown model {model_name}")

def sample_params(model_name, trial):
    if model_name == "rf":
        return {
            "n_estimators": trial.suggest_int("n_estimators", 200, 800),
            "max_depth": trial.suggest_int("max_depth", 4, 10),
            "min_samples_split": trial.suggest_int("min_samples_split", 2, 10),
            "min_samples_leaf": trial.suggest_int("min_samples_leaf", 1, 5),
            "max_features": trial.suggest_float("max_features", 0.5, 1.0),
        }
    if model_name == "lgbm":
        return {
            "n_estimators": trial.suggest_int("n_estimators", 200, 800),
            "num_leaves": trial.suggest_int("num_leaves", 16, 128),
            "learning_rate": trial.suggest_float("learning_rate", 0.01, 0.2, log=True),
            "max_depth": trial.suggest_int("max_depth", 3, 10),
            "min_child_samples": trial.suggest_int("min_child_samples", 10, 60),
            "subsample": trial.suggest_float("subsample", 0.6, 1.0),
            "colsample_bytree": trial.suggest_float("colsample_bytree", 0.6, 1.0),
            "reg_alpha": trial.suggest_float("reg_alpha", 1e-4, 10.0, log=True),
            "reg_lambda": trial.suggest_float("reg_lambda", 1e-4, 10.0, log=True),
        }
    if model_name == "xgb":
        return {
            "n_estimators": trial.suggest_int("n_estimators", 200, 800),
            "learning_rate": trial.suggest_float("learning_rate", 0.01, 0.3, log=True),
            "max_depth": trial.suggest_int("max_depth", 3, 10),
            "subsample": trial.suggest_float("subsample", 0.6, 1.0),
            "colsample_bytree": trial.suggest_float("colsample_bytree", 0.6, 1.0),
            "min_child_weight": trial.suggest_float("min_child_weight", 1e-2, 10.0, log=True),
            "gamma": trial.suggest_float("gamma", 1e-3, 5.0, log=True),
            "reg_alpha": trial.suggest_float("reg_alpha", 1e-4, 10.0, log=True),
            "reg_lambda": trial.suggest_float("reg_lambda", 1e-4, 10.0, log=True),
        }
    if model_name == "cat":
        return {
            "iterations": trial.suggest_int("iterations", 200, 800),
            "depth": trial.suggest_int("depth", 4, 10),
            "learning_rate": trial.suggest_float("learning_rate", 0.02, 0.3, log=True),
            "l2_leaf_reg": trial.suggest_float("l2_leaf_reg", 1.0, 10.0),
        }
    raise ValueError(f"Unknown model {model_name}")

def optimize_model(model_name, X, y, cv, n_trials, random_state):
    def objective(trial):
        params = sample_params(model_name, trial)
        scores = []
        for train_idx, valid_idx in cv.split(X, y):
            model = build_model(model_name, params, random_state)
            model.fit(X[train_idx], y[train_idx])
            preds = get_predictions(model, X[valid_idx])
            scores.append(f2_metric(y[valid_idx], preds))
        return float(np.mean(scores))
    study = optuna.create_study(direction="maximize")
    study.optimize(objective, n_trials=n_trials, show_progress_bar=True)
    return study.best_params, study.best_value

def generate_oof_predictions(model_name, params, X, y, X_test, cv, random_state):
    oof = np.zeros((len(X), 1), dtype=np.float32)
    test_preds = np.zeros((len(X_test), 1), dtype=np.float32)
    for fold, (train_idx, valid_idx) in enumerate(cv.split(X, y), 1):
        model = build_model(model_name, params, random_state + fold)
        model.fit(X[train_idx], y[train_idx])
        oof[valid_idx] = get_predictions(model, X[valid_idx])
        test_preds += get_predictions(model, X_test) / cv.get_n_splits()
    return oof, test_preds

In [None]:
def optimize_meta_model(meta_X, y, cv, n_trials, random_state):
    def objective(trial):
        params = {
            "n_estimators": trial.suggest_int("n_estimators", 200, 800),
            "num_leaves": trial.suggest_int("num_leaves", 8, 64),
            "learning_rate": trial.suggest_float("learning_rate", 0.01, 0.2, log=True),
            "max_depth": trial.suggest_int("max_depth", 3, 10),
            "min_child_samples": trial.suggest_int("min_child_samples", 10, 60),
            "subsample": trial.suggest_float("subsample", 0.6, 1.0),
            "colsample_bytree": trial.suggest_float("colsample_bytree", 0.6, 1.0),
            "reg_alpha": trial.suggest_float("reg_alpha", 1e-4, 10.0, log=True),
            "reg_lambda": trial.suggest_float("reg_lambda", 1e-4, 10.0, log=True),
        }
        scores = []
        for train_idx, valid_idx in cv.split(meta_X, y):
            model = lgb.LGBMClassifier(**params, objective="binary", n_jobs=-1, random_state=random_state, verbosity=-1)
            model.fit(meta_X[train_idx], y[train_idx])
            preds = get_predictions(model, meta_X[valid_idx])
            scores.append(f2_metric(y[valid_idx], preds))
        return float(np.mean(scores))
    study = optuna.create_study(direction="maximize")
    study.optimize(objective, n_trials=n_trials, show_progress_bar=True)
    return study.best_params, study.best_value

def train_meta(params, meta_X, y, meta_test, cv, random_state):
    oof = np.zeros((len(meta_X), 1), dtype=np.float32)
    test_preds = np.zeros((len(meta_test), 1), dtype=np.float32)
    for fold, (train_idx, valid_idx) in enumerate(cv.split(meta_X, y), 1):
        model = lgb.LGBMClassifier(**params, objective="binary", n_jobs=-1, random_state=random_state + fold, verbosity=-1)
        model.fit(meta_X[train_idx], y[train_idx])
        oof[valid_idx] = get_predictions(model, meta_X[valid_idx])
        test_preds += get_predictions(model, meta_test) / cv.get_n_splits()
    return oof, test_preds

def optimize_threshold(y_true, y_score):
    best_threshold, best_score = 0.5, -1.0
    for threshold in np.linspace(0.05, 0.95, 181):
        preds = (y_score.ravel() >= threshold).astype(int)
        score = fbeta_score(y_true, preds, beta=BETA, average="macro", zero_division=0)
        if score > best_score:
            best_score, best_threshold = score, float(threshold)
    return best_threshold, best_score

In [None]:
optuna.logging.set_verbosity(optuna.logging.WARNING)

train_df = pd.read_csv(f"{Config.data_dir}/train.csv")
test_df = pd.read_csv(f"{Config.data_dir}/test.csv")
train_df = train_df[train_df[TARGET_COLUMN].notna()].reset_index(drop=True)

X, X_test = prepare_features(train_df, test_df, FEATURE_COLUMNS)
y = train_df[TARGET_COLUMN].to_numpy(np.float32)
cv = StratifiedKFold(n_splits=Config.n_splits, shuffle=True, random_state=Config.random_state)

base_models = ["rf", "lgbm", "xgb", "cat"]
meta_features, meta_test_features = [], []
base_scores, base_params = {}, {}

for model_name in tqdm(base_models, desc="Base models"):
    print(f"Optimizing {model_name}...")
    n_trials = Config.base_trials.get(model_name, Config.n_trials_base)
    best_params, best_score = optimize_model(model_name, X, y, cv, n_trials, Config.random_state)
    base_scores[model_name] = best_score
    base_params[model_name] = best_params
    print(f"{model_name} Best CV F2: {best_score:.5f}")
    
    oof, test_preds = generate_oof_predictions(model_name, best_params, X, y, X_test, cv, Config.random_state)
    meta_features.append(oof)
    meta_test_features.append(test_preds)

meta_X = np.column_stack(meta_features)
meta_test = np.column_stack(meta_test_features)

print("Optimizing Meta Model...")
meta_params, meta_score = optimize_meta_model(meta_X, y, cv, Config.n_trials_meta, Config.random_state)
print(f"Meta Model Best CV F2: {meta_score:.5f}")

meta_oof, meta_test_preds = train_meta(meta_params, meta_X, y, meta_test, cv, Config.random_state)
best_threshold, tuned_score = optimize_threshold(y, meta_oof)
print(f"Best Threshold: {best_threshold:.3f} | Tuned CV F2: {tuned_score:.5f}")

final_preds = (meta_test_preds.ravel() >= best_threshold).astype(int)
output_df = pd.DataFrame({ID_COLUMN: test_df[ID_COLUMN], TARGET_COLUMN: final_preds})
output_df.to_csv(Config.output_name, index=False)
print(f"Submission saved to {Config.output_name}")

summary = {
    "base_scores": base_scores, "base_params": base_params,
    "meta_score": meta_score, "tuned_threshold": best_threshold,
    "tuned_meta_score": tuned_score, "meta_params": meta_params
}
with open(Config.summary_name, "w") as f:
    json.dump(summary, f, indent=2, default=json_serialize)
print(f"Summary saved to {Config.summary_name}")