In [None]:
import os, re, numpy as np, pandas as pd
import tensorflow_decision_forests as tfdf
from sklearn.model_selection import GroupKFold
from sklearn.metrics import accuracy_score

DATA_DIR = "/kaggle/input/spaceship-titanic-ut-komaba-2025"
WORK_DIR = "/kaggle/working"
RANDOM_STATE = 42

train = pd.read_csv(f"{DATA_DIR}/train.csv")
test  = pd.read_csv(f"{DATA_DIR}/test.csv")
sample_sub = pd.read_csv(f"{DATA_DIR}/sample_submission.csv")

SPEND_COLS = ["RoomService","FoodCourt","ShoppingMall","Spa","VRDeck"]

# ---------- 前処理（学習/推論で完全一致） ----------
def preprocess(df: pd.DataFrame, *, training: bool):
    X = df.copy()

    # groupは検証用にだけ使う（特徴には入れない）
    X["__Group__"] = X["PassengerId"].astype(str).str.split("_").str[0]

    # Cabin 分解
    parts = X["Cabin"].astype(str).str.split("/", expand=True)
    X["Deck"] = parts[0].replace({"nan": np.nan})
    X["Cabin_num"] = pd.to_numeric(parts[1], errors="coerce")
    X["Side"] = parts[2].replace({"nan": np.nan})
    X.drop(columns=["Cabin"], inplace=True)

    # Booleans → 0/1
    for c in ["CryoSleep","VIP"]:
        if c in X.columns:
            X[c] = X[c].fillna(False).astype(int)

    # 支出：CryoSleep==1 の NA は 0、その他の NA も 0
    if "CryoSleep" in X.columns:
        mask = X["CryoSleep"] == 1
        for c in SPEND_COLS:
            if c in X.columns:
                X.loc[mask & X[c].isna(), c] = 0
    for c in SPEND_COLS:
        if c in X.columns:
            X[c] = X[c].fillna(0)

    # 合計/派生
    X["TotalSpend"] = X[SPEND_COLS].sum(axis=1, min_count=1).fillna(0)
    X["NoSpend"] = (X["TotalSpend"] == 0).astype(int)
    X["TotalSpend_log1p"] = np.log1p(X["TotalSpend"])

    # Age 補完（HomePlanet×Deck→全体中央値）
    if "Age" in X.columns:
        grp_med = X.groupby(["HomePlanet","Deck"])["Age"].transform("median")
        X["Age"] = X["Age"].fillna(grp_med)
        X["Age"] = X["Age"].fillna(X["Age"].median())
        X["IsChild"] = (X["Age"] < 13).astype(int)
        X["IsTeen"]  = ((X["Age"] >= 13) & (X["Age"] < 20)).astype(int)

    # カテゴリ NA → "Unknown"
    for c in ["HomePlanet","Destination","Deck","Side"]:
        if c in X.columns:
            X[c] = X[c].fillna("Unknown")

    # 学習時だけ label を保持
    y = None
    if training:
        y = X["Transported"].astype(int).values
        X = X.drop(columns=["Transported"])

    # 強い識別子は削除（Groupは残すが特徴には入れない）
    for col in ["PassengerId","Name"]:
        if col in X.columns:
            X.drop(columns=[col], inplace=True)

    return X, y

X_train, y = preprocess(train, training=True)
X_test, _   = preprocess(test,  training=False)

# ---------- 1) スモークテスト群 ----------
def assert_no_mismatch_columns():
    cols_train = set(X_train.columns) - {"__Group__"}
    cols_test  = set(X_test.columns) - {"__Group__"}
    assert cols_train == cols_test, f"列ずれ検出: train={len(cols_train)} cols, test={len(cols_test)} cols, 差分={sorted(list(cols_train ^ cols_test))}"

def assert_spend_symmetry():
    # 支出列（train/testともに NA 0化できているか）
    for c in SPEND_COLS:
        assert X_train[c].isna().sum()==0, f"trainの {c} にNAが残っています"
        assert X_test[c].isna().sum()==0,  f"test の {c} にNAが残っています"

def assert_basic_dtypes():
    # CryoSleep/VIP が 0/1 か
    for c in ["CryoSleep","VIP"]:
        assert set(X_train[c].unique()) <= {0,1}, f"{c} が 0/1 ではありません（train）"
        assert set(X_test[c].unique())  <= {0,1}, f"{c} が 0/1 ではありません（test）"
    # 代表数値が数値型か
    for c in ["Age","Cabin_num","TotalSpend","TotalSpend_log1p"]:
        assert c in X_train.columns, f"列が見つかりません: {c}"
        assert pd.api.types.is_numeric_dtype(X_train[c]), f"{c} が数値型ではありません（train）"
        assert pd.api.types.is_numeric_dtype(X_test[c]),  f"{c} が数値型ではありません（test）"

def assert_no_missing_after_preprocess():
    # 特徴にNAが残っていないか（木系はNA扱えるが、意図しないNAを検知したい）
    feat_cols = [c for c in X_train.columns if c != "__Group__"]
    assert X_train[feat_cols].isna().sum().sum()==0, "trainの特徴にNAが残っています"
    assert X_test[feat_cols].isna().sum().sum()==0,  "test の特徴にNAが残っています"

assert_no_mismatch_columns()
assert_spend_symmetry()
assert_basic_dtypes()
assert_no_missing_after_preprocess()

print("✓ スモークテスト: 前処理の一致 / 欠損 / 型 → OK")

# ---------- 2) リーク検査（GroupKFoldでグループ交差なしを保証） ----------
groups = X_train["__Group__"].values
gkf = GroupKFold(n_splits=5)
for i, (tr_idx, va_idx) in enumerate(gkf.split(X_train, y, groups), 1):
    overlap = set(groups[tr_idx]) & set(groups[va_idx])
    assert len(overlap)==0, f"fold{i}: GroupKFoldでグループが重複（リーク）: {list(overlap)[:5]}"
print("✓ リーク検査: GroupKFold のグループ重複なし → OK")

# ---------- 3) クイックCV（1 foldだけ・小さめ木） ----------
#   *目的*: ランダム・ベースラインを超えるか（安全ラインを 0.70 に設定）
def quick_cv_one_fold_accuracy_threshold(threshold=0.70):
    # 1 foldだけ回す（高速）
    tr_idx, va_idx = next(iter(gkf.split(X_train, y, groups)))
    # TF-DF は pandas→tf.data に変換して学習
    label_col = "__label__"
    # 特徴から __Group__ を外す
    feat_cols = [c for c in X_train.columns if c != "__Group__"]
    df_tr = X_train.iloc[tr_idx][feat_cols].copy()
    df_va = X_train.iloc[va_idx][feat_cols].copy()
    df_tr[label_col] = y[tr_idx]
    df_va[label_col] = y[va_idx]

    ds_tr = tfdf.keras.pd_dataframe_to_tf_dataset(df_tr, label=label_col)
    ds_va = tfdf.keras.pd_dataframe_to_tf_dataset(df_va, label=label_col)

    model = tfdf.keras.GradientBoostedTreesModel(
        num_trees=300,  # 小さめ
        max_depth=6,
        subsample=0.9,
        verbose=0,
        random_seed=RANDOM_STATE,
    )
    model.compile(metrics=["accuracy"])
    model.fit(ds_tr)
    eval_dict = model.evaluate(ds_va, return_dict=True, verbose=0)
    acc = float(eval_dict["accuracy"])
    print(f"Quick CV (1-fold) Accuracy: {acc:.4f}")
    assert acc >= threshold, f"QuickCV精度が閾値未満: acc={acc:.4f} < {threshold:.2f}"
    return acc

_ = quick_cv_one_fold_accuracy_threshold(threshold=0.70)
print("✓ クイックCV: 1-fold精度が最低ラインをクリア → OK")

# ---------- 4) 擬似提出テスト（形だけ確認・提出はしない） ----------
# 最終的な推論パスが壊れていないか、行数・列名・型だけ確認
def pseudo_submit_shape_check():
    feat_cols = [c for c in X_train.columns if c != "__Group__"]
    # full-train（小さめ構成）
    df_full = X_train[feat_cols].copy()
    df_full["__label__"] = y
    ds_full = tfdf.keras.pd_dataframe_to_tf_dataset(df_full, label="__label__")
    model = tfdf.keras.GradientBoostedTreesModel(
        num_trees=350, max_depth=6, subsample=0.9, verbose=0, random_seed=RANDOM_STATE
    )
    model.compile(metrics=["accuracy"])
    model.fit(ds_full, verbose=0)

    ds_test = tfdf.keras.pd_dataframe_to_tf_dataset(X_test[feat_cols])
    preds = model.predict(ds_test, verbose=0).squeeze()
    # TF-DFは確率（Positiveクラス）を返す構成を想定
    assert preds.shape[0] == len(test), "予測数がtest行数と一致しません"
    assert np.isfinite(preds).all(), "予測に非有限値が含まれています"
    pred_bool = (preds >= 0.5)
    out = sample_sub.copy()
    out["Transported"] = pred_bool
    out_path = os.path.join(WORK_DIR, "submission_TEST.csv")
    out.to_csv(out_path, index=False)
    # 形状チェック
    chk = pd.read_csv(out_path)
    assert list(chk.columns)==["PassengerId","Transported"], "提出列名が不正です"
    assert chk["PassengerId"].dtype == sample_sub["PassengerId"].dtype, "PassengerIdのdtypeが想定と異なります"
    assert chk["Transported"].dtype == bool, "Transportedはboolであるべきです"
    print(f"✓ 擬似提出: 形状チェックOK → {out_path}")

pseudo_submit_shape_check()

print("\n=== All tests passed ===")
