In [1]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

/kaggle/input/mallorn-dataset/sample_submission.csv
/kaggle/input/mallorn-dataset/test_log.csv
/kaggle/input/mallorn-dataset/train_log.csv
/kaggle/input/mallorn-dataset/split_17/train_full_lightcurves.csv
/kaggle/input/mallorn-dataset/split_17/test_full_lightcurves.csv
/kaggle/input/mallorn-dataset/split_01/train_full_lightcurves.csv
/kaggle/input/mallorn-dataset/split_01/test_full_lightcurves.csv
/kaggle/input/mallorn-dataset/split_02/train_full_lightcurves.csv
/kaggle/input/mallorn-dataset/split_02/test_full_lightcurves.csv
/kaggle/input/mallorn-dataset/split_08/train_full_lightcurves.csv
/kaggle/input/mallorn-dataset/split_08/test_full_lightcurves.csv
/kaggle/input/mallorn-dataset/split_04/train_full_lightcurves.csv
/kaggle/input/mallorn-dataset/split_04/test_full_lightcurves.csv
/kaggle/input/mallorn-dataset/split_07/train_full_lightcurves.csv
/kaggle/input/mallorn-dataset/split_07/test_full_lightcurves.csv
/kaggle/input/mallorn-dataset/split_15/train_full_lightcurves.csv
/kaggle/i

 # Kaggle CPU Environment Setup

In [2]:
# ============================================================
# STAGE 0 — Kaggle CPU Environment Setup (ONE CELL, SAFE + COHESIVE)
# - Kaggle Web Notebook (CPU only)
# - No heavy loading (no full lightcurve concat)
# - Hard guards: paths exist, splits exist, required files exist
# - Safe thread limits to avoid freeze/oversubscription
# ============================================================

import os, sys, gc, random, warnings
from pathlib import Path

import numpy as np
import pandas as pd

# ----------------------------
# 0) Quiet + deterministic (reduce noisy warnings, keep critical)
# ----------------------------
warnings.filterwarnings("ignore", category=FutureWarning)
warnings.filterwarnings("ignore", category=pd.errors.DtypeWarning)

SEED = 2025
os.environ["PYTHONHASHSEED"] = str(SEED)
random.seed(SEED)
np.random.seed(SEED)

# ----------------------------
# 1) CPU thread limits (anti-freeze on Kaggle CPU)
# ----------------------------
# Prevent BLAS/OMP oversubscription which can make Kaggle CPU notebooks crawl/hang.
os.environ.setdefault("OMP_NUM_THREADS", "2")
os.environ.setdefault("OPENBLAS_NUM_THREADS", "2")
os.environ.setdefault("MKL_NUM_THREADS", "2")
os.environ.setdefault("VECLIB_MAXIMUM_THREADS", "2")
os.environ.setdefault("NUMEXPR_NUM_THREADS", "2")
os.environ.setdefault("TOKENIZERS_PARALLELISM", "false")

try:
    import torch
    torch.manual_seed(SEED)
    torch.set_num_threads(2)
    torch.set_num_interop_threads(1)
except Exception:
    torch = None  # torch may be unavailable in some environments

# ----------------------------
# 2) Paths (as you listed)
# ----------------------------
DATA_ROOT = Path("/kaggle/input/mallorn-dataset")

PATHS = {
    "DATA_ROOT": DATA_ROOT,
    "SAMPLE_SUB": DATA_ROOT / "sample_submission.csv",
    "TRAIN_LOG":  DATA_ROOT / "train_log.csv",
    "TEST_LOG":   DATA_ROOT / "test_log.csv",
    "SPLITS":     [DATA_ROOT / f"split_{i:02d}" for i in range(1, 21)],
}

# ----------------------------
# 3) Working directories (writeable on Kaggle)
# ----------------------------
WORKDIR = Path("/kaggle/working")
RUN_DIR = WORKDIR / "mallorn_run"
ART_DIR = RUN_DIR / "artifacts"
CKPT_DIR = RUN_DIR / "checkpoints"
OOF_DIR = RUN_DIR / "oof"
SUB_DIR = RUN_DIR / "submissions"
LOG_DIR = RUN_DIR / "logs"

for d in [RUN_DIR, ART_DIR, CKPT_DIR, OOF_DIR, SUB_DIR, LOG_DIR]:
    d.mkdir(parents=True, exist_ok=True)

# ----------------------------
# 4) Hard guards: files must exist
# ----------------------------
def _must_exist(p: Path, what: str):
    if not p.exists():
        raise FileNotFoundError(f"[MISSING] {what}: {p}")

_must_exist(PATHS["SAMPLE_SUB"], "sample_submission.csv")
_must_exist(PATHS["TRAIN_LOG"], "train_log.csv")
_must_exist(PATHS["TEST_LOG"],  "test_log.csv")

# Validate split folders + key files inside
missing_splits = [s for s in PATHS["SPLITS"] if not s.exists()]
if missing_splits:
    # Show first few to help debug, then fail hard
    sample = "\n".join(str(x) for x in missing_splits[:5])
    raise FileNotFoundError(f"Some split folders are missing (showing up to 5):\n{sample}")

# Verify presence of lightcurve csvs per split (train/test)
bad = []
for sd in PATHS["SPLITS"]:
    tr = sd / "train_full_lightcurves.csv"
    te = sd / "test_full_lightcurves.csv"
    if (not tr.exists()) or (not te.exists()):
        bad.append((sd.name, tr.exists(), te.exists()))
if bad:
    msg = "\n".join([f"- {name}: train={tr_ok}, test={te_ok}" for name, tr_ok, te_ok in bad[:10]])
    raise FileNotFoundError(
        "Some split lightcurve files are missing (showing up to 10):\n"
        f"{msg}"
    )

# ----------------------------
# 5) Load small metadata only (safe on CPU)
# ----------------------------
# Use dtype hints to reduce parsing warnings and memory.
df_sub = pd.read_csv(PATHS["SAMPLE_SUB"])
if not {"object_id", "prediction"}.issubset(df_sub.columns):
    raise ValueError(f"sample_submission columns must include object_id,prediction. Found: {list(df_sub.columns)}")

df_train_log = pd.read_csv(PATHS["TRAIN_LOG"])
df_test_log  = pd.read_csv(PATHS["TEST_LOG"])

# Basic column sanity (do not assume perfect casing)
def _norm_cols(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()
    df.columns = [c.strip() for c in df.columns]
    return df

df_train_log = _norm_cols(df_train_log)
df_test_log  = _norm_cols(df_test_log)

need_train = {"object_id", "EBV", "Z", "split", "target"}
need_test  = {"object_id", "EBV", "Z", "split"}  # Z_err may or may not exist; handle later
missing_train = sorted(list(need_train - set(df_train_log.columns)))
missing_test  = sorted(list(need_test - set(df_test_log.columns)))

if missing_train:
    raise ValueError(f"train_log.csv missing required columns: {missing_train}")
if missing_test:
    raise ValueError(f"test_log.csv missing required columns: {missing_test}")

# Ensure split names align with folders you have (split_01..split_20)
# (Keep as string; later we use it for routing.)
df_train_log["split"] = df_train_log["split"].astype(str).str.strip()
df_test_log["split"]  = df_test_log["split"].astype(str).str.strip()

# Object_id uniqueness check
if df_train_log["object_id"].duplicated().any():
    dup_n = int(df_train_log["object_id"].duplicated().sum())
    raise ValueError(f"train_log.csv has duplicated object_id rows: {dup_n} duplicates found.")
if df_test_log["object_id"].duplicated().any():
    dup_n = int(df_test_log["object_id"].duplicated().sum())
    raise ValueError(f"test_log.csv has duplicated object_id rows: {dup_n} duplicates found.")

# Quick target sanity
if not set(pd.unique(df_train_log["target"])).issubset({0, 1}):
    raise ValueError(f"train_log target must be binary 0/1. Found: {sorted(pd.unique(df_train_log['target']).tolist())}")

pos = int((df_train_log["target"] == 1).sum())
neg = int((df_train_log["target"] == 0).sum())
tot = int(len(df_train_log))

# Verify all submission object_id exist in test_log (or warn)
sub_missing = set(df_sub["object_id"]) - set(df_test_log["object_id"])
if sub_missing:
    # do not fail hard; Kaggle sample_submission sometimes includes all test ids, so this is serious
    # We'll fail hard to avoid silent mismatch.
    sample = list(sub_missing)[:5]
    raise ValueError(f"sample_submission has object_id not found in test_log (showing up to 5): {sample}")

# ----------------------------
# 6) Summarize environment (minimal, helpful)
# ----------------------------
print("ENV OK (Kaggle CPU)")
print(f"- Python: {sys.version.split()[0]}")
print(f"- Numpy:  {np.__version__}")
print(f"- Pandas: {pd.__version__}")
if torch is not None:
    print(f"- Torch:  {torch.__version__} | CUDA available: {torch.cuda.is_available()}")
else:
    print("- Torch:  not available")

print("\nDATA OK")
print(f"- train_log: {len(df_train_log):,} objects | pos(TDE)={pos:,} | neg={neg:,} | pos%={(pos/max(tot,1))*100:.2f}%")
print(f"- test_log:  {len(df_test_log):,} objects")
print(f"- submission template rows: {len(df_sub):,}")
print(f"- splits detected: {len(PATHS['SPLITS'])} folders (split_01..split_20)")

# Optional: save a tiny config snapshot for reproducibility
cfg_path = RUN_DIR / "env_config.txt"
with open(cfg_path, "w", encoding="utf-8") as f:
    f.write(f"SEED={SEED}\n")
    f.write(f"DATA_ROOT={DATA_ROOT}\n")
    f.write(f"WORKDIR={WORKDIR}\n")
    f.write("THREADS:\n")
    for k in ["OMP_NUM_THREADS","OPENBLAS_NUM_THREADS","MKL_NUM_THREADS","NUMEXPR_NUM_THREADS"]:
        f.write(f"  {k}={os.environ.get(k,'')}\n")

# Keep objects in globals for next stages
globals().update({
    "SEED": SEED,
    "PATHS": PATHS,
    "RUN_DIR": RUN_DIR,
    "ART_DIR": ART_DIR,
    "CKPT_DIR": CKPT_DIR,
    "OOF_DIR": OOF_DIR,
    "SUB_DIR": SUB_DIR,
    "LOG_DIR": LOG_DIR,
    "df_sub": df_sub,
    "df_train_log": df_train_log,
    "df_test_log": df_test_log,
})

gc.collect()
print(f"\nSaved env snapshot: {cfg_path}")


ENV OK (Kaggle CPU)
- Python: 3.12.12
- Numpy:  2.0.2
- Pandas: 2.2.2
- Torch:  2.8.0+cu126 | CUDA available: False

DATA OK
- train_log: 3,043 objects | pos(TDE)=148 | neg=2,895 | pos%=4.86%
- test_log:  7,135 objects
- submission template rows: 7,135
- splits detected: 20 folders (split_01..split_20)

Saved env snapshot: /kaggle/working/mallorn_run/env_config.txt


# Verify Dataset Paths & Split Discovery

In [3]:
# ============================================================
# STAGE 1 — Verify Dataset Paths & Split Discovery (ONE CELL, CPU-SAFE)
# - Uses globals from STAGE 0: PATHS, df_train_log, df_test_log
# - Normalizes split names -> "split_XX"
# - Verifies: split folders + required files + lightcurve column sanity (nrows only)
# - Summarizes: object counts per split + file sizes
# ============================================================

import os, re, gc
from pathlib import Path
import numpy as np
import pandas as pd

# ----------------------------
# 0) Require STAGE 0 globals
# ----------------------------
for need in ["PATHS", "df_train_log", "df_test_log"]:
    if need not in globals():
        raise RuntimeError(f"Missing `{need}`. Jalankan STAGE 0 dulu (Kaggle CPU Environment Setup).")

DATA_ROOT = PATHS["DATA_ROOT"]
SPLIT_DIRS = {p.name: p for p in PATHS["SPLITS"]}  # split_01..split_20 -> Path

# ----------------------------
# 1) Helpers
# ----------------------------
def normalize_split_name(x: str) -> str:
    """
    Convert various split formats to canonical 'split_XX'.
    Accepts: 'split_01', '01', '1', 'split_1', 'SPLIT_01', etc.
    """
    if x is None or (isinstance(x, float) and np.isnan(x)):
        return ""
    s = str(x).strip().lower()
    # already canonical?
    m = re.fullmatch(r"split_(\d{1,2})", s)
    if m:
        k = int(m.group(1))
        return f"split_{k:02d}"
    # maybe just digits
    m = re.fullmatch(r"(\d{1,2})", s)
    if m:
        k = int(m.group(1))
        return f"split_{k:02d}"
    # maybe splitXX without underscore
    m = re.fullmatch(r"split(\d{1,2})", s)
    if m:
        k = int(m.group(1))
        return f"split_{k:02d}"
    # fallback: keep raw, but mark as unknown
    return s

def must_exist(p: Path, what: str):
    if not p.exists():
        raise FileNotFoundError(f"[MISSING] {what}: {p}")

def sizeof_mb(p: Path) -> float:
    try:
        return p.stat().st_size / (1024**2)
    except Exception:
        return float("nan")

def quick_check_lightcurve_csv(p: Path, nrows: int = 5):
    """
    Read tiny head to validate columns without heavy IO.
    """
    df = pd.read_csv(p, nrows=nrows)
    df.columns = [c.strip() for c in df.columns]
    return df

REQ_LC_COLS = {"object_id", "Time (MJD)", "Flux", "Flux_err", "Filter"}

# ----------------------------
# 2) Normalize split columns in logs (in-place for later stages)
# ----------------------------
for df, name in [(df_train_log, "train_log"), (df_test_log, "test_log")]:
    if "split" not in df.columns:
        raise ValueError(f"{name} missing 'split' column.")
    df["split"] = df["split"].astype(str).str.strip()
    df["split"] = df["split"].map(normalize_split_name)

# ----------------------------
# 3) Verify split values referenced by logs exist on disk
# ----------------------------
train_splits = set(df_train_log["split"].unique())
test_splits  = set(df_test_log["split"].unique())
disk_splits  = set(SPLIT_DIRS.keys())

bad_train = sorted([s for s in train_splits if s not in disk_splits])
bad_test  = sorted([s for s in test_splits  if s not in disk_splits])

if bad_train:
    raise FileNotFoundError(f"train_log references split(s) not found on disk: {bad_train[:10]}")
if bad_test:
    raise FileNotFoundError(f"test_log references split(s) not found on disk: {bad_test[:10]}")

# Also verify we actually have 20 splits (as expected)
if len(disk_splits) != 20:
    # not always fatal, but better to fail early for this competition
    raise RuntimeError(f"Expected 20 split folders, found {len(disk_splits)}: {sorted(list(disk_splits))}")

# ----------------------------
# 4) Verify required files per split exist
# ----------------------------
missing_files = []
split_file_info = []

for split_name in sorted(disk_splits):
    sd = SPLIT_DIRS[split_name]
    tr = sd / "train_full_lightcurves.csv"
    te = sd / "test_full_lightcurves.csv"
    if not tr.exists():
        missing_files.append(str(tr))
    if not te.exists():
        missing_files.append(str(te))
    split_file_info.append((split_name, sizeof_mb(tr), sizeof_mb(te)))

if missing_files:
    sample = "\n".join(missing_files[:10])
    raise FileNotFoundError(f"Some lightcurve files missing (showing up to 10):\n{sample}")

# ----------------------------
# 5) Lightweight column sanity check (read only a few rows per split)
# ----------------------------
col_issues = []
filter_issues = []

for split_name in sorted(disk_splits):
    sd = SPLIT_DIRS[split_name]
    tr = sd / "train_full_lightcurves.csv"
    te = sd / "test_full_lightcurves.csv"

    dtr = quick_check_lightcurve_csv(tr, nrows=5)
    dte = quick_check_lightcurve_csv(te, nrows=5)

    # Columns present?
    miss_tr = sorted(list(REQ_LC_COLS - set(dtr.columns)))
    miss_te = sorted(list(REQ_LC_COLS - set(dte.columns)))
    if miss_tr or miss_te:
        col_issues.append((split_name, miss_tr, miss_te, list(dtr.columns), list(dte.columns)))

    # Filter values sanity (tiny sample)
    if "Filter" in dtr.columns:
        vals = set(dtr["Filter"].astype(str).str.strip().str.lower().unique())
        badf = sorted([v for v in vals if v not in {"u","g","r","i","z","y"}])
        if badf:
            filter_issues.append((split_name, "train", badf, sorted(list(vals))))
    if "Filter" in dte.columns:
        vals = set(dte["Filter"].astype(str).str.strip().str.lower().unique())
        badf = sorted([v for v in vals if v not in {"u","g","r","i","z","y"}])
        if badf:
            filter_issues.append((split_name, "test", badf, sorted(list(vals))))

if col_issues:
    # Print one detailed example then fail hard (structure mismatch)
    s, miss_tr, miss_te, cols_tr, cols_te = col_issues[0]
    raise ValueError(
        "Lightcurve column mismatch detected.\n"
        f"Example split: {s}\n"
        f"Missing in train_full_lightcurves.csv: {miss_tr}\n"
        f"Missing in test_full_lightcurves.csv : {miss_te}\n"
        f"Train columns: {cols_tr}\n"
        f"Test columns : {cols_te}\n"
    )

if filter_issues:
    # Not always fatal, but usually indicates whitespace/format issues. Fail early.
    ex = filter_issues[0]
    raise ValueError(
        "Unexpected Filter values detected (example):\n"
        f"split={ex[0]} file={ex[1]} bad={ex[2]} all_sampled={ex[3]}\n"
        "Fix by stripping/lowercasing Filter during preprocessing."
    )

# ----------------------------
# 6) Summaries (counts per split, file sizes)
# ----------------------------
train_counts = df_train_log["split"].value_counts().to_dict()
test_counts  = df_test_log["split"].value_counts().to_dict()

print("SPLIT DISCOVERY OK")
print(f"- DATA_ROOT: {DATA_ROOT}")
print(f"- Splits on disk: {len(disk_splits)} (split_01..split_20)")

print("\nOBJECT COUNTS PER SPLIT (from logs)")
for s in sorted(disk_splits):
    print(f"- {s}: train_objects={train_counts.get(s,0):,} | test_objects={test_counts.get(s,0):,}")

print("\nLIGHTCURVE FILE SIZES (MB)")
for s, mb_tr, mb_te in split_file_info:
    print(f"- {s}: train_full={mb_tr:8.1f} MB | test_full={mb_te:8.1f} MB")

# ----------------------------
# 7) Export split index for later stages (routing + loops)
# ----------------------------
# Useful for downstream: stable split list + mapping
SPLIT_LIST = [f"split_{i:02d}" for i in range(1, 21)]
globals().update({
    "DATA_ROOT": DATA_ROOT,
    "SPLIT_DIRS": SPLIT_DIRS,
    "SPLIT_LIST": SPLIT_LIST,
})

gc.collect()
print("\nStage 1 complete: splits ready for split-wise preprocessing.")


SPLIT DISCOVERY OK
- DATA_ROOT: /kaggle/input/mallorn-dataset
- Splits on disk: 20 (split_01..split_20)

OBJECT COUNTS PER SPLIT (from logs)
- split_01: train_objects=155 | test_objects=364
- split_02: train_objects=170 | test_objects=414
- split_03: train_objects=138 | test_objects=338
- split_04: train_objects=145 | test_objects=332
- split_05: train_objects=165 | test_objects=375
- split_06: train_objects=155 | test_objects=374
- split_07: train_objects=165 | test_objects=398
- split_08: train_objects=162 | test_objects=387
- split_09: train_objects=128 | test_objects=289
- split_10: train_objects=144 | test_objects=331
- split_11: train_objects=146 | test_objects=325
- split_12: train_objects=155 | test_objects=353
- split_13: train_objects=143 | test_objects=379
- split_14: train_objects=154 | test_objects=351
- split_15: train_objects=158 | test_objects=342
- split_16: train_objects=155 | test_objects=354
- split_17: train_objects=153 | test_objects=351
- split_18: train_objects=

# Load and Validate Train/Test Logs

In [4]:
# ============================================================
# STAGE 2 — Load and Validate Train/Test Logs (ONE CELL, CPU-SAFE)
# - Kaggle CPU: ringan, tanpa load full lightcurves
# - Output:
#   * df_train_meta, df_test_meta  (index=object_id, bersih & siap dipakai)
#   * id2split_train, id2split_test (routing cepat ke split folder)
#   * artifacts/train_log_clean.parquet, artifacts/test_log_clean.parquet
# ============================================================

import os, re, gc, warnings
from pathlib import Path

import numpy as np
import pandas as pd

warnings.filterwarnings("ignore", category=FutureWarning)
warnings.filterwarnings("ignore", category=pd.errors.DtypeWarning)

# ----------------------------
# 0) Require STAGE 0/1 globals
# ----------------------------
for need in ["PATHS", "ART_DIR"]:
    if need not in globals():
        raise RuntimeError(f"Missing `{need}`. Jalankan STAGE 0 dulu.")
if "SPLIT_DIRS" not in globals():
    raise RuntimeError("Missing `SPLIT_DIRS`. Jalankan STAGE 1 (Verify Dataset Paths & Split Discovery) dulu.")

TRAIN_LOG_PATH = Path(PATHS["TRAIN_LOG"])
TEST_LOG_PATH  = Path(PATHS["TEST_LOG"])

def _normalize_split_name(x: str) -> str:
    if x is None or (isinstance(x, float) and np.isnan(x)):
        return ""
    s = str(x).strip().lower()
    m = re.fullmatch(r"split_(\d{1,2})", s)
    if m:
        k = int(m.group(1))
        return f"split_{k:02d}"
    m = re.fullmatch(r"(\d{1,2})", s)
    if m:
        k = int(m.group(1))
        return f"split_{k:02d}"
    m = re.fullmatch(r"split(\d{1,2})", s)
    if m:
        k = int(m.group(1))
        return f"split_{k:02d}"
    return s

def _norm_cols(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()
    df.columns = [c.strip() for c in df.columns]
    return df

def _coerce_numeric(df: pd.DataFrame, col: str) -> None:
    if col not in df.columns:
        return
    df[col] = pd.to_numeric(df[col], errors="coerce")

# ----------------------------
# 1) Load logs (fresh read untuk konsistensi)
# ----------------------------
df_train = pd.read_csv(TRAIN_LOG_PATH)
df_test  = pd.read_csv(TEST_LOG_PATH)

df_train = _norm_cols(df_train)
df_test  = _norm_cols(df_test)

# ----------------------------
# 2) Required columns check
# ----------------------------
req_common = {"object_id", "split", "EBV", "Z"}
req_train  = req_common | {"target"}
req_test   = req_common

miss_train = sorted(list(req_train - set(df_train.columns)))
miss_test  = sorted(list(req_test  - set(df_test.columns)))

if miss_train:
    raise ValueError(f"train_log.csv missing required columns: {miss_train} | found={list(df_train.columns)}")
if miss_test:
    raise ValueError(f"test_log.csv missing required columns: {miss_test} | found={list(df_test.columns)}")

# ----------------------------
# 3) Basic cleaning
# ----------------------------
# object_id string
df_train["object_id"] = df_train["object_id"].astype(str).str.strip()
df_test["object_id"]  = df_test["object_id"].astype(str).str.strip()

# split canonical
df_train["split"] = df_train["split"].astype(str).str.strip().map(_normalize_split_name)
df_test["split"]  = df_test["split"].astype(str).str.strip().map(_normalize_split_name)

# numeric coercion
for c in ["EBV", "Z", "Z_err"]:
    _coerce_numeric(df_train, c)
    _coerce_numeric(df_test, c)

# target binary int
df_train["target"] = pd.to_numeric(df_train["target"], errors="coerce")
if df_train["target"].isna().any():
    bad_n = int(df_train["target"].isna().sum())
    raise ValueError(f"train_log target has NaN after coercion: {bad_n} rows. Cek isi target di train_log.csv.")
df_train["target"] = df_train["target"].astype(int)
uniq_t = set(df_train["target"].unique().tolist())
if not uniq_t.issubset({0, 1}):
    raise ValueError(f"train_log target must be binary 0/1. Found: {sorted(list(uniq_t))}")

# ----------------------------
# 4) Duplicates check (hard fail to avoid silent leakage)
# ----------------------------
if df_train["object_id"].duplicated().any():
    dup = df_train.loc[df_train["object_id"].duplicated(), "object_id"].iloc[:5].tolist()
    raise ValueError(f"Duplicated object_id in train_log (examples): {dup}")
if df_test["object_id"].duplicated().any():
    dup = df_test.loc[df_test["object_id"].duplicated(), "object_id"].iloc[:5].tolist()
    raise ValueError(f"Duplicated object_id in test_log (examples): {dup}")

# ----------------------------
# 5) Missing values handling (anti-error downstream)
# ----------------------------
# Buat flag missing agar tidak hilang informasi
for df in [df_train, df_test]:
    for c in ["EBV", "Z"]:
        df[f"{c}_missing"] = df[c].isna().astype(np.int8)

# Isi EBV NaN -> 0.0 (fisiknya bisa dianggap minimal dust bila tidak ada)
if df_train["EBV"].isna().any():
    df_train["EBV"] = df_train["EBV"].fillna(0.0)
if df_test["EBV"].isna().any():
    df_test["EBV"] = df_test["EBV"].fillna(0.0)

# Isi Z NaN -> median per split (lebih aman daripada global median)
# (Kalau tidak ada NaN, ini tidak mengubah apa-apa)
def _fill_z_by_split(df: pd.DataFrame):
    if not df["Z"].isna().any():
        return df
    df = df.copy()
    med = df.groupby("split")["Z"].median()
    global_med = float(df["Z"].median()) if df["Z"].notna().any() else 0.0
    def _fill_row(row):
        if pd.isna(row["Z"]):
            m = med.get(row["split"], np.nan)
            return float(m) if not pd.isna(m) else global_med
        return row["Z"]
    df["Z"] = df.apply(_fill_row, axis=1)
    return df

df_train = _fill_z_by_split(df_train)
df_test  = _fill_z_by_split(df_test)

# Untuk train: kalau Z_err tidak ada, buat kolomnya (konsisten untuk model)
if "Z_err" not in df_train.columns:
    df_train["Z_err"] = np.nan
# Untuk test: kalau tidak ada Z_err, tetap buat agar pipeline tidak error
if "Z_err" not in df_test.columns:
    df_test["Z_err"] = np.nan

# Flag photometric redshift: train spec-z (anggap 0), test photo-z (anggap 1)
df_train["is_photoz"] = np.int8(0)
df_test["is_photoz"]  = np.int8(1)

# Z_err_missing flag + fill NaN Z_err -> 0.0 agar numeric stabil
for df in [df_train, df_test]:
    df["Z_err_missing"] = df["Z_err"].isna().astype(np.int8)
    df["Z_err"] = df["Z_err"].fillna(0.0)

# ----------------------------
# 6) Split validity check against disk splits
# ----------------------------
disk_splits = set(SPLIT_DIRS.keys())
bad_train_s = sorted([s for s in set(df_train["split"].unique()) if s not in disk_splits])
bad_test_s  = sorted([s for s in set(df_test["split"].unique())  if s not in disk_splits])
if bad_train_s:
    raise FileNotFoundError(f"train_log references unknown split(s): {bad_train_s[:10]}")
if bad_test_s:
    raise FileNotFoundError(f"test_log references unknown split(s): {bad_test_s[:10]}")

# ----------------------------
# 7) Build meta tables (index=object_id) + routing dicts
# ----------------------------
# Simpan hanya kolom yang relevan untuk tahap berikutnya (lebih ringan)
keep_train = ["object_id","split","EBV","Z","Z_err","EBV_missing","Z_missing","Z_err_missing","is_photoz","target"]
keep_test  = ["object_id","split","EBV","Z","Z_err","EBV_missing","Z_missing","Z_err_missing","is_photoz"]

# Jika SpecType ada, simpan juga untuk analisis (tidak wajib untuk model biner)
if "SpecType" in df_train.columns:
    keep_train.append("SpecType")

df_train_meta = df_train[keep_train].copy()
df_test_meta  = df_test[keep_test].copy()

df_train_meta = df_train_meta.set_index("object_id", drop=True).sort_index()
df_test_meta  = df_test_meta.set_index("object_id", drop=True).sort_index()

id2split_train = df_train_meta["split"].to_dict()
id2split_test  = df_test_meta["split"].to_dict()

# ----------------------------
# 8) Save cleaned logs (fast reuse)
# ----------------------------
train_out = Path(ART_DIR) / "train_log_clean.parquet"
test_out  = Path(ART_DIR) / "test_log_clean.parquet"
df_train_meta.to_parquet(train_out, index=True)
df_test_meta.to_parquet(test_out, index=True)

# Save split stats (useful debugging)
split_stats = pd.DataFrame({
    "train_objects": df_train_meta["split"].value_counts().reindex(sorted(disk_splits)).fillna(0).astype(int),
    "test_objects":  df_test_meta["split"].value_counts().reindex(sorted(disk_splits)).fillna(0).astype(int),
})
split_stats.index.name = "split"
split_stats_path = Path(ART_DIR) / "split_stats.csv"
split_stats.to_csv(split_stats_path)

# ----------------------------
# 9) Print summary (minimal but informative)
# ----------------------------
pos = int((df_train_meta["target"] == 1).sum())
neg = int((df_train_meta["target"] == 0).sum())
tot = int(len(df_train_meta))
print("LOGS OK (clean + validated)")
print(f"- train objects: {tot:,} | pos(TDE)={pos:,} | neg={neg:,} | pos%={(pos/max(tot,1))*100:.3f}%")
print(f"- test objects : {len(df_test_meta):,}")
print(f"- saved: {train_out}")
print(f"- saved: {test_out}")
print(f"- saved: {split_stats_path}")

# Keep in globals for next stages
globals().update({
    "df_train_meta": df_train_meta,
    "df_test_meta": df_test_meta,
    "id2split_train": id2split_train,
    "id2split_test": id2split_test,
    "split_stats": split_stats
})

gc.collect()


LOGS OK (clean + validated)
- train objects: 3,043 | pos(TDE)=148 | neg=2,895 | pos%=4.864%
- test objects : 7,135
- saved: /kaggle/working/mallorn_run/artifacts/train_log_clean.parquet
- saved: /kaggle/working/mallorn_run/artifacts/test_log_clean.parquet
- saved: /kaggle/working/mallorn_run/artifacts/split_stats.csv


0

# Lightcurve Loading Strategy

In [5]:
# ============================================================
# STAGE 3 — Lightcurve Loading Strategy (ONE CELL, Kaggle CPU-SAFE)
# - Split-wise file mapping + chunked reader utilities (no full concat)
# - Builds:
#   * SPLIT_FILES: {split_XX: {"train": Path, "test": Path}}
#   * train_ids_by_split / test_ids_by_split: routing object_ids per split
#   * iter_lightcurve_chunks(): generator read_csv(chunksize=...)
#   * load_object_lightcurve(): debug-safe per-object extraction (streaming)
# - Saves:
#   * artifacts/split_file_manifest.csv
#   * artifacts/object_counts_by_split.csv
# ============================================================

import gc, re
from pathlib import Path
import numpy as np
import pandas as pd

# ----------------------------
# 0) Require previous stages
# ----------------------------
for need in ["SPLIT_DIRS", "SPLIT_LIST", "df_train_meta", "df_test_meta", "ART_DIR"]:
    if need not in globals():
        raise RuntimeError(f"Missing `{need}`. Jalankan STAGE 0 -> STAGE 1 -> STAGE 2 dulu.")

# ----------------------------
# 1) Build split file mapping (train/test lightcurves)
# ----------------------------
SPLIT_FILES = {}
for s in SPLIT_LIST:
    sd = SPLIT_DIRS[s]
    tr = sd / "train_full_lightcurves.csv"
    te = sd / "test_full_lightcurves.csv"
    if (not tr.exists()) or (not te.exists()):
        raise FileNotFoundError(f"Missing lightcurve file(s) in {sd}: train={tr.exists()} test={te.exists()}")
    SPLIT_FILES[s] = {"train": tr, "test": te}

# Save split file manifest (helps debug path issues)
manifest = []
for s in SPLIT_LIST:
    manifest.append({
        "split": s,
        "train_path": str(SPLIT_FILES[s]["train"]),
        "test_path": str(SPLIT_FILES[s]["test"]),
        "train_mb": SPLIT_FILES[s]["train"].stat().st_size / (1024**2),
        "test_mb":  SPLIT_FILES[s]["test"].stat().st_size / (1024**2),
    })
df_manifest = pd.DataFrame(manifest).sort_values("split")
manifest_path = Path(ART_DIR) / "split_file_manifest.csv"
df_manifest.to_csv(manifest_path, index=False)

# ----------------------------
# 2) Build object routing by split (VERY important for split-wise processing)
# ----------------------------
train_ids_by_split = {s: [] for s in SPLIT_LIST}
test_ids_by_split  = {s: [] for s in SPLIT_LIST}

# df_train_meta/df_test_meta index is object_id (from STAGE 2)
for oid, row in df_train_meta[["split"]].itertuples():
    train_ids_by_split[row].append(oid)
for oid, row in df_test_meta[["split"]].itertuples():
    test_ids_by_split[row].append(oid)

# Object counts per split (save)
df_counts = pd.DataFrame({
    "split": SPLIT_LIST,
    "train_objects": [len(train_ids_by_split[s]) for s in SPLIT_LIST],
    "test_objects":  [len(test_ids_by_split[s]) for s in SPLIT_LIST],
})
counts_path = Path(ART_DIR) / "object_counts_by_split.csv"
df_counts.to_csv(counts_path, index=False)

# ----------------------------
# 3) Column normalization & dtypes (memory safe)
# ----------------------------
# We canonicalize to: object_id, mjd, flux, flux_err, filter
LC_RENAME = {
    "Time (MJD)": "mjd",
    "Time(MJD)": "mjd",
    "Time": "mjd",
    "Flux": "flux",
    "Flux_err": "flux_err",
    "FluxErr": "flux_err",
    "Filter": "filter",
    "object_id": "object_id",
    "ObjectID": "object_id",
}
# Read only columns we need (safe). If dataset has extra columns, we ignore them.
USECOLS_RAW = ["object_id", "Time (MJD)", "Flux", "Flux_err", "Filter"]
# Dtypes (object_id str, filter category-like but keep as string to avoid pandas category pitfalls in chunks)
DTYPES = {
    "object_id": "string",
    "Flux": "float32",
    "Flux_err": "float32",
    "Filter": "string",
    # Time (MJD) sometimes float64; float32 is usually ok for ML. Keep float32 for memory.
    "Time (MJD)": "float32",
}

def _normalize_lc_chunk(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()
    df.columns = [c.strip() for c in df.columns]
    # rename known columns
    df = df.rename(columns={c: LC_RENAME.get(c, c) for c in df.columns})
    # enforce required canonical columns exist
    need = {"object_id", "mjd", "flux", "flux_err", "filter"}
    missing = sorted(list(need - set(df.columns)))
    if missing:
        raise ValueError(f"Lightcurve chunk missing required columns after rename: {missing}. Found: {list(df.columns)}")
    # trim/normalize filter values
    df["filter"] = df["filter"].astype("string").str.strip().str.lower()
    # object_id cleanup
    df["object_id"] = df["object_id"].astype("string").str.strip()
    return df[["object_id", "mjd", "flux", "flux_err", "filter"]]

# ----------------------------
# 4) Chunked readers (core strategy on Kaggle CPU)
# ----------------------------
def iter_lightcurve_chunks(split_name: str, which: str, chunksize: int = 400_000):
    """
    Stream read a split lightcurve CSV in chunks.
    which: 'train' or 'test'
    yields normalized chunks with canonical columns:
      object_id, mjd, flux, flux_err, filter
    """
    if split_name not in SPLIT_FILES:
        raise KeyError(f"Unknown split_name={split_name}.")
    if which not in ("train", "test"):
        raise ValueError("which must be 'train' or 'test'")
    p = SPLIT_FILES[split_name][which]

    # Use usecols only if they exist; handle header differences robustly:
    # Read header once (cheap), then decide usecols/dtypes mapping.
    header = pd.read_csv(p, nrows=0)
    cols = [c.strip() for c in header.columns]
    # map raw names we expect
    raw_usecols = [c for c in USECOLS_RAW if c in cols]
    # also accept alternative time column names
    if "Time (MJD)" not in raw_usecols:
        for alt in ["Time(MJD)", "Time"]:
            if alt in cols:
                raw_usecols = [c if c != "Time (MJD)" else alt for c in raw_usecols]
                break

    if set(raw_usecols) != set([c for c in raw_usecols]):  # no-op guard
        pass

    # dtypes must match chosen time column key
    dtypes = {}
    for k, v in DTYPES.items():
        if k in cols:
            dtypes[k] = v
    # if time column is alt, set dtype
    for tcol in ["Time(MJD)", "Time"]:
        if tcol in cols and "Time (MJD)" not in cols:
            dtypes[tcol] = "float32"

    reader = pd.read_csv(
        p,
        usecols=raw_usecols if raw_usecols else None,
        dtype=dtypes if dtypes else None,
        chunksize=int(chunksize),
    )
    for chunk in reader:
        yield _normalize_lc_chunk(chunk)

def load_split_lightcurves(split_name: str, which: str):
    """
    Convenience: load entire split file (NOT recommended for huge files).
    Use only for quick debugging on small splits.
    """
    parts = []
    for ch in iter_lightcurve_chunks(split_name, which, chunksize=400_000):
        parts.append(ch)
    return pd.concat(parts, ignore_index=True) if parts else pd.DataFrame(columns=["object_id","mjd","flux","flux_err","filter"])

def load_object_lightcurve(object_id: str, which: str, chunksize: int = 400_000, sort_time: bool = True):
    """
    Debug-safe per-object extraction by streaming the relevant split file.
    This scans the split CSV in chunks (OK for occasional use; do NOT do this for all objects).
    """
    object_id = str(object_id).strip()
    if which == "train":
        if object_id not in df_train_meta.index:
            raise KeyError(f"object_id not found in df_train_meta: {object_id}")
        split_name = df_train_meta.loc[object_id, "split"]
    elif which == "test":
        if object_id not in df_test_meta.index:
            raise KeyError(f"object_id not found in df_test_meta: {object_id}")
        split_name = df_test_meta.loc[object_id, "split"]
    else:
        raise ValueError("which must be 'train' or 'test'")

    pieces = []
    for ch in iter_lightcurve_chunks(split_name, which, chunksize=chunksize):
        sub = ch[ch["object_id"] == object_id]
        if not sub.empty:
            pieces.append(sub)
    if not pieces:
        out = pd.DataFrame(columns=["object_id","mjd","flux","flux_err","filter"])
    else:
        out = pd.concat(pieces, ignore_index=True)
        if sort_time and len(out) > 1:
            out = out.sort_values(["mjd", "filter"], kind="mergesort").reset_index(drop=True)
    return out

# ----------------------------
# 5) Quick smoke test (lightweight, no heavy IO)
# ----------------------------
# Take 1 object from a few splits and ensure extraction works.
test_splits = ["split_01", "split_08", "split_17"]
for s in test_splits:
    if len(train_ids_by_split[s]) == 0 or len(test_ids_by_split[s]) == 0:
        raise RuntimeError(f"Split {s} has 0 objects in train/test log (unexpected).")
    oid_tr = train_ids_by_split[s][0]
    oid_te = test_ids_by_split[s][0]
    df_tr_obj = load_object_lightcurve(oid_tr, "train", chunksize=250_000)
    df_te_obj = load_object_lightcurve(oid_te, "test",  chunksize=250_000)

    if df_tr_obj.empty:
        raise RuntimeError(f"Smoke test failed: empty train lightcurve for {oid_tr} in {s}")
    if df_te_obj.empty:
        raise RuntimeError(f"Smoke test failed: empty test lightcurve for {oid_te} in {s}")

    # filter sanity (allow only u,g,r,i,z,y)
    badf_tr = sorted(set(df_tr_obj["filter"].unique()) - set(["u","g","r","i","z","y"]))
    badf_te = sorted(set(df_te_obj["filter"].unique()) - set(["u","g","r","i","z","y"]))
    if badf_tr or badf_te:
        raise ValueError(f"Unexpected filter values in smoke test split={s}: train_bad={badf_tr} test_bad={badf_te}")

print("LIGHTCURVE LOADING STRATEGY OK (split-wise + chunked)")
print(f"- Saved: {manifest_path}")
print(f"- Saved: {counts_path}")
print("- Ready for next stage: photometric preprocessing + sequence building (split-wise loop).")

# Export to globals for next stages
globals().update({
    "SPLIT_FILES": SPLIT_FILES,
    "train_ids_by_split": train_ids_by_split,
    "test_ids_by_split": test_ids_by_split,
    "iter_lightcurve_chunks": iter_lightcurve_chunks,
    "load_object_lightcurve": load_object_lightcurve,
})

gc.collect()


LIGHTCURVE LOADING STRATEGY OK (split-wise + chunked)
- Saved: /kaggle/working/mallorn_run/artifacts/split_file_manifest.csv
- Saved: /kaggle/working/mallorn_run/artifacts/object_counts_by_split.csv
- Ready for next stage: photometric preprocessing + sequence building (split-wise loop).


348

# Photometric Cleaning (De-extinction + Negative Flux Safe Transform)

In [6]:
# ============================================================
# STAGE 4 — Photometric Cleaning (De-extinction + Negative Flux Safe Transform)
# ONE CELL, Kaggle CPU-SAFE, split-wise + chunked
#
# Prasyarat (sudah ada dari stage sebelumnya):
# - iter_lightcurve_chunks (STAGE 3)
# - df_train_meta, df_test_meta (STAGE 2)
# - ART_DIR, SPLIT_LIST (STAGE 0/1)
#
# Output:
# - /kaggle/working/mallorn_run/artifacts/lc_clean/split_XX/{train|test}/part_*.parquet (atau .csv.gz fallback)
# - manifest CSV per split + summary stats
# ============================================================

import gc, json, math, warnings
from pathlib import Path

import numpy as np
import pandas as pd

warnings.filterwarnings("ignore", category=FutureWarning)
warnings.filterwarnings("ignore", category=pd.errors.DtypeWarning)

# ----------------------------
# 0) Require previous stages
# ----------------------------
for need in ["iter_lightcurve_chunks", "df_train_meta", "df_test_meta", "ART_DIR", "SPLIT_LIST"]:
    if need not in globals():
        raise RuntimeError(f"Missing `{need}`. Jalankan STAGE 0 -> 1 -> 2 -> 3 dulu.")

# ----------------------------
# 1) Settings (CPU-safe defaults)
# ----------------------------
CHUNKSIZE = 350_000          # adjust if needed (bigger = faster but more RAM)
SNR_DET = 3.0                # simple detection proxy
ERR_EPS = 1e-6               # avoid div-by-zero
SCALE_SAMPLE_ROWS_TOTAL = 900_000  # sampling to estimate per-band scales (lightweight)
SCALE_SAMPLES_PER_BAND = 60_000    # cap samples stored per band for median
WRITE_FORMAT = "parquet"     # "parquet" recommended; auto-fallback to "csv.gz" if parquet fails

# For debugging: set a subset of splits to process, e.g. ["split_01","split_02"]
ONLY_SPLITS = None  # None = process all 20 splits

# ----------------------------
# 2) Extinction coefficients (R_lambda for LSST-like bands)
#    NOTE: If kamu punya koefisien resmi dari notebook Using_the_Data, ganti nilai di sini.
#    A_lambda = R_lambda * EBV
#    flux_deext = flux * 10^(0.4 * A_lambda)
# ----------------------------
EXT_RLAMBDA = {
    "u": 4.8,
    "g": 3.6,
    "r": 2.7,
    "i": 2.1,
    "z": 1.6,
    "y": 1.3,
}

BAND2ID = {"u": 0, "g": 1, "r": 2, "i": 3, "z": 4, "y": 5}
ID2BAND = {v: k for k, v in BAND2ID.items()}

# EBV mapping (Series with index object_id) for fast vectorized map
EBV_TRAIN_SER = df_train_meta["EBV"]
EBV_TEST_SER  = df_test_meta["EBV"]

# ----------------------------
# 3) Estimate per-band flux scale (robust, sample-based) for safe transforms
#    We estimate on RAW flux (not de-extincted) to keep this stage fast.
# ----------------------------
def estimate_band_scale_from_sample(which: str, splits, total_rows=SCALE_SAMPLE_ROWS_TOTAL, per_band_cap=SCALE_SAMPLES_PER_BAND):
    """
    Stream sample abs(flux) from split files to estimate median abs flux per band.
    Returns dict: {band: scale} with positive float values.
    """
    rng = np.random.default_rng(2025)
    samples = {b: [] for b in BAND2ID.keys()}
    seen = 0

    for s in splits:
        if seen >= total_rows:
            break
        for ch in iter_lightcurve_chunks(s, which, chunksize=CHUNKSIZE):
            # ch columns: object_id,mjd,flux,flux_err,filter
            f = ch["filter"].to_numpy(dtype="U1", copy=False)
            x = ch["flux"].to_numpy(dtype=np.float32, copy=False)
            ax = np.abs(x)

            # sample a small fraction to keep memory low
            n = len(ch)
            if n == 0:
                continue
            take = int(min(5000, n))  # small per-chunk sample
            idx = rng.choice(n, size=take, replace=False)
            f_s = f[idx]
            ax_s = ax[idx]

            for b in BAND2ID.keys():
                mask = (f_s == b)
                if not np.any(mask):
                    continue
                vals = ax_s[mask]
                if vals.size == 0:
                    continue

                # append with cap
                cur = samples[b]
                if len(cur) < per_band_cap:
                    need = per_band_cap - len(cur)
                    if vals.size > need:
                        vals = vals[:need]
                    cur.extend(vals.tolist())
                # else already full; keep

            seen += n
            if seen >= total_rows:
                break

    band_scale = {}
    for b in BAND2ID.keys():
        arr = np.asarray(samples[b], dtype=np.float32)
        arr = arr[np.isfinite(arr)]
        arr = arr[arr > 0]
        if arr.size == 0:
            band_scale[b] = 1.0
        else:
            med = float(np.median(arr))
            # clamp to avoid too small
            band_scale[b] = max(med, 1e-3)
    return band_scale

splits_to_use = ONLY_SPLITS if (ONLY_SPLITS is not None) else SPLIT_LIST
print(f"[Stage 4] Estimating per-band flux scales from sample ({SCALE_SAMPLE_ROWS_TOTAL:,} rows max) ...")
BAND_SCALE = estimate_band_scale_from_sample("train", splits_to_use)
print("[Stage 4] BAND_SCALE (median |flux| per band):")
for b in ["u","g","r","i","z","y"]:
    print(f"  - {b}: {BAND_SCALE[b]:.6f}")

# ----------------------------
# 4) Chunk photometric cleaning (de-extinction + safe transforms)
# ----------------------------
def clean_photometric_chunk(ch: pd.DataFrame, ebv_ser: pd.Series) -> pd.DataFrame:
    """
    Input chunk columns: object_id,mjd,flux,flux_err,filter (from iter_lightcurve_chunks)
    Output columns (float32/int8):
      object_id, mjd, band_id, flux_deext, err_deext, flux_asinh, err_log1p, snr, detected
    """
    # Map EBV and extinction coefficient
    ebv = ch["object_id"].map(ebv_ser).fillna(0.0).to_numpy(dtype=np.float32)
    filt = ch["filter"].to_numpy(dtype="U1", copy=False)

    r = pd.Series(filt).map(EXT_RLAMBDA).fillna(0.0).to_numpy(dtype=np.float32)
    A = r * ebv  # A_lambda

    # de-extinction multiplier: 10^(0.4*A)
    mul = np.power(10.0, (0.4 * A).astype(np.float32)).astype(np.float32)

    flux = ch["flux"].to_numpy(dtype=np.float32, copy=False)
    err  = ch["flux_err"].to_numpy(dtype=np.float32, copy=False)

    # clamp err
    err = np.maximum(err, np.float32(ERR_EPS))

    flux_deext = (flux * mul).astype(np.float32)
    err_deext  = (err  * mul).astype(np.float32)

    # per-band scale
    scale = pd.Series(filt).map(BAND_SCALE).fillna(1.0).to_numpy(dtype=np.float32)
    scale = np.maximum(scale, np.float32(1e-3))

    # safe transforms
    flux_asinh = np.arcsinh(flux_deext / scale).astype(np.float32)
    err_scaled = (err_deext / scale).astype(np.float32)
    err_log1p  = np.log1p(err_scaled).astype(np.float32)

    snr = (flux_deext / np.maximum(err_deext, np.float32(ERR_EPS))).astype(np.float32)
    detected = (snr > np.float32(SNR_DET)).astype(np.int8)

    # band_id
    band_id = pd.Series(filt).map(BAND2ID).fillna(-1).astype(np.int16).to_numpy()
    if np.any(band_id < 0):
        bad = sorted(set(pd.Series(filt)[band_id < 0].tolist()))
        raise ValueError(f"Unknown filter values encountered: {bad}")

    out = pd.DataFrame({
        "object_id": ch["object_id"].astype("string").to_numpy(),
        "mjd": ch["mjd"].to_numpy(dtype=np.float32, copy=False),
        "band_id": band_id.astype(np.int16),
        "flux_deext": flux_deext,
        "err_deext": err_deext,
        "flux_asinh": flux_asinh,
        "err_log1p": err_log1p,
        "snr": snr,
        "detected": detected,
    })
    return out

# ----------------------------
# 5) Writer (parquet preferred; fallback csv.gz)
# ----------------------------
def write_part(df: pd.DataFrame, out_path: Path, fmt: str):
    out_path.parent.mkdir(parents=True, exist_ok=True)
    if fmt == "parquet":
        try:
            df.to_parquet(out_path, index=False)
            return "parquet"
        except Exception as e:
            # fallback
            alt = out_path.with_suffix(".csv.gz")
            df.to_csv(alt, index=False, compression="gzip")
            return f"csv.gz (fallback from parquet: {type(e).__name__})"
    elif fmt == "csv.gz":
        df.to_csv(out_path.with_suffix(".csv.gz"), index=False, compression="gzip")
        return "csv.gz"
    else:
        raise ValueError("fmt must be 'parquet' or 'csv.gz'")

# ----------------------------
# 6) Process all splits split-wise (stream -> clean -> write parts)
# ----------------------------
LC_CLEAN_DIR = Path(ART_DIR) / "lc_clean"
LC_CLEAN_DIR.mkdir(parents=True, exist_ok=True)

summary_rows = []
manifest_rows = []

def process_split(split_name: str, which: str):
    ebv_ser = EBV_TRAIN_SER if which == "train" else EBV_TEST_SER
    out_dir = LC_CLEAN_DIR / split_name / which
    out_dir.mkdir(parents=True, exist_ok=True)

    part_idx = 0
    n_rows_total = 0
    n_neg_flux = 0
    n_det = 0

    for ch in iter_lightcurve_chunks(split_name, which, chunksize=CHUNKSIZE):
        # clean
        cleaned = clean_photometric_chunk(ch, ebv_ser)

        # simple stats
        n_rows = int(len(cleaned))
        n_rows_total += n_rows
        n_neg_flux += int((cleaned["flux_deext"].to_numpy() < 0).sum())
        n_det += int(cleaned["detected"].to_numpy().sum())

        # write part
        out_path = out_dir / f"part_{part_idx:04d}.parquet"
        used_fmt = write_part(cleaned, out_path, WRITE_FORMAT)

        manifest_rows.append({
            "split": split_name,
            "which": which,
            "part": part_idx,
            "path": str(out_path if used_fmt.startswith("parquet") else out_path.with_suffix(".csv.gz")),
            "rows": n_rows,
            "format": used_fmt,
        })

        part_idx += 1

        # free memory
        del cleaned, ch
        if part_idx % 10 == 0:
            gc.collect()

    summary_rows.append({
        "split": split_name,
        "which": which,
        "parts": part_idx,
        "rows": n_rows_total,
        "neg_flux_frac": (n_neg_flux / max(n_rows_total, 1)),
        "det_frac_snr_gt_3": (n_det / max(n_rows_total, 1)),
    })

    print(f"[Stage 4] {split_name}/{which}: parts={part_idx} | rows={n_rows_total:,} | neg%={100*(n_neg_flux/max(n_rows_total,1)):.2f}% | det%={100*(n_det/max(n_rows_total,1)):.2f}%")

print("[Stage 4] Building cleaned lightcurve cache (split-wise) ...")
for s in splits_to_use:
    process_split(s, "train")
    process_split(s, "test")

# ----------------------------
# 7) Save manifests + summary
# ----------------------------
df_manifest = pd.DataFrame(manifest_rows)
df_summary  = pd.DataFrame(summary_rows)

manifest_path = LC_CLEAN_DIR / "lc_clean_manifest.csv"
summary_path  = LC_CLEAN_DIR / "lc_clean_summary.csv"

df_manifest.to_csv(manifest_path, index=False)
df_summary.to_csv(summary_path, index=False)

# Save coefficients & scales for reproducibility
cfg_path = LC_CLEAN_DIR / "photometric_config.json"
with open(cfg_path, "w", encoding="utf-8") as f:
    json.dump({
        "EXT_RLAMBDA": EXT_RLAMBDA,
        "BAND_SCALE": BAND_SCALE,
        "SNR_DET": SNR_DET,
        "ERR_EPS": ERR_EPS,
        "CHUNKSIZE": CHUNKSIZE,
        "WRITE_FORMAT": WRITE_FORMAT,
        "ONLY_SPLITS": splits_to_use,
    }, f, indent=2)

print("\n[Stage 4] Done.")
print(f"- Saved manifest: {manifest_path}")
print(f"- Saved summary : {summary_path}")
print(f"- Saved config  : {cfg_path}")

# ----------------------------
# 8) Export globals for next stages
# ----------------------------
def get_clean_parts(split_name: str, which: str):
    """Return list of part file paths for cleaned split."""
    m = df_manifest[(df_manifest["split"] == split_name) & (df_manifest["which"] == which)].sort_values("part")
    return m["path"].tolist()

globals().update({
    "EXT_RLAMBDA": EXT_RLAMBDA,
    "BAND2ID": BAND2ID,
    "ID2BAND": ID2BAND,
    "BAND_SCALE": BAND_SCALE,
    "LC_CLEAN_DIR": LC_CLEAN_DIR,
    "lc_clean_manifest": df_manifest,
    "lc_clean_summary": df_summary,
    "get_clean_parts": get_clean_parts,
})

gc.collect()


[Stage 4] Estimating per-band flux scales from sample (900,000 rows max) ...
[Stage 4] BAND_SCALE (median |flux| per band):
  - u: 0.336883
  - g: 0.249368
  - r: 0.377052
  - i: 0.451290
  - z: 0.576561
  - y: 0.996014
[Stage 4] Building cleaned lightcurve cache (split-wise) ...
[Stage 4] split_01/train: parts=1 | rows=26,324 | neg%=38.95% | det%=19.34%
[Stage 4] split_01/test: parts=1 | rows=59,235 | neg%=37.74% | det%=23.02%
[Stage 4] split_02/train: parts=1 | rows=25,609 | neg%=34.02% | det%=24.45%
[Stage 4] split_02/test: parts=1 | rows=71,229 | neg%=36.48% | det%=21.69%
[Stage 4] split_03/train: parts=1 | rows=21,676 | neg%=36.82% | det%=21.65%
[Stage 4] split_03/test: parts=1 | rows=53,751 | neg%=36.70% | det%=21.90%
[Stage 4] split_04/train: parts=1 | rows=22,898 | neg%=38.36% | det%=21.11%
[Stage 4] split_04/test: parts=1 | rows=51,408 | neg%=38.16% | det%=21.70%
[Stage 4] split_05/train: parts=1 | rows=25,934 | neg%=39.19% | det%=18.33%
[Stage 4] split_05/test: parts=1 | rows

204

# Sequence Tokenization (Event-based Tokens)

In [7]:
# ============================================================
# STAGE 5 — Sequence Tokenization (Event-based Tokens) (ONE CELL, Kaggle CPU-SAFE)
#
# Tujuan:
# - Mengubah cleaned lightcurve (STAGE 4) -> token sequence per object_id
# - Token berbasis event/observasi: 1 baris observasi = 1 token
# - Output disimpan dalam shard .npz per split & (train/test)
#
# Input (dari stage sebelumnya):
# - LC_CLEAN_DIR, get_clean_parts, lc_clean_manifest (STAGE 4)
# - df_train_meta, df_test_meta, train_ids_by_split, test_ids_by_split, SPLIT_LIST (STAGE 2/3)
# - ART_DIR
#
# Output:
# - artifacts/seq_tokens/split_XX/{train|test}/shard_*.npz
# - artifacts/seq_tokens/seq_manifest_{train|test}.csv   (mapping object_id -> shard + slice)
# - artifacts/seq_tokens/seq_config.json                 (feature spec)
#
# Catatan:
# - Default mencoba "streaming contiguous object blocks" (hemat disk, cepat).
# - Jika dataset TIDAK contiguous per object_id, otomatis fallback bucket-hash (lebih berat tapi aman).
# ============================================================

import gc, json, math, warnings
from pathlib import Path

import numpy as np
import pandas as pd

warnings.filterwarnings("ignore", category=FutureWarning)
warnings.filterwarnings("ignore", category=pd.errors.DtypeWarning)

# ----------------------------
# 0) Require previous stages
# ----------------------------
for need in ["LC_CLEAN_DIR", "get_clean_parts", "lc_clean_manifest",
             "df_train_meta", "df_test_meta", "train_ids_by_split", "test_ids_by_split",
             "SPLIT_LIST", "ART_DIR"]:
    if need not in globals():
        raise RuntimeError(f"Missing `{need}`. Jalankan STAGE 0 -> 1 -> 2 -> 3 -> 4 dulu.")

# ----------------------------
# 1) Settings (CPU-safe)
# ----------------------------
ONLY_SPLITS = None                 # None = proses semua; atau set list ["split_01","split_02"] untuk debug
COMPRESS_NPZ = False               # True lebih kecil disk tapi jauh lebih lambat di CPU
SHARD_MAX_OBJECTS = 1500           # jumlah object per shard file
SNR_TANH_SCALE = 10.0              # snr_tanh = tanh(snr / scale)
TIME_CLIP_MAX_DAYS = None          # None = no clip; atau mis. 2000.0
DROP_BAD_TIME_ROWS = True          # drop rows with NaN/inf mjd
FALLBACK_NUM_BUCKETS = 64          # dipakai jika fallback (hash bucket) diperlukan

SEQ_DIR = Path(ART_DIR) / "seq_tokens"
SEQ_DIR.mkdir(parents=True, exist_ok=True)

FEATURE_NAMES = ["t_rel_log", "dt_log", "flux_asinh", "err_log1p", "snr_tanh", "detected"]
FEATURE_DIM = len(FEATURE_NAMES)

# ----------------------------
# 2) Robust readers for cleaned parts (parquet or csv.gz)
# ----------------------------
REQ_CLEAN_COLS = {"object_id","mjd","band_id","flux_asinh","err_log1p","snr","detected"}

def _read_clean_part(path: str) -> pd.DataFrame:
    p = Path(path)
    if not p.exists():
        raise FileNotFoundError(f"Clean part missing: {p}")
    if p.suffix == ".parquet":
        df = pd.read_parquet(p)
    elif p.name.endswith(".csv.gz"):
        df = pd.read_csv(p, compression="gzip")
    else:
        # allow plain .csv
        df = pd.read_csv(p)
    df.columns = [c.strip() for c in df.columns]
    missing = sorted(list(REQ_CLEAN_COLS - set(df.columns)))
    if missing:
        raise ValueError(f"Clean part missing columns {missing}. Found: {list(df.columns)} | file={p}")
    # enforce dtypes lightly
    df["object_id"] = df["object_id"].astype("string").str.strip()
    df["mjd"] = pd.to_numeric(df["mjd"], errors="coerce").astype(np.float32)
    df["band_id"] = pd.to_numeric(df["band_id"], errors="coerce").astype(np.int16)
    for c in ["flux_asinh","err_log1p","snr"]:
        df[c] = pd.to_numeric(df[c], errors="coerce").astype(np.float32)
    df["detected"] = pd.to_numeric(df["detected"], errors="coerce").fillna(0).astype(np.int8)
    if DROP_BAD_TIME_ROWS:
        df = df[np.isfinite(df["mjd"].to_numpy())]
    return df

# ----------------------------
# 3) Build tokens for one object (sort by time inside object)
# ----------------------------
def build_object_tokens(df_obj: pd.DataFrame):
    """
    df_obj columns: mjd, band_id, flux_asinh, err_log1p, snr, detected
    returns:
      X: (L, FEATURE_DIM) float32
      B: (L,) int8 band_id
    """
    if df_obj.empty:
        return None, None

    mjd = df_obj["mjd"].to_numpy(dtype=np.float32, copy=False)
    band = df_obj["band_id"].to_numpy(dtype=np.int16, copy=False)
    flux = df_obj["flux_asinh"].to_numpy(dtype=np.float32, copy=False)
    elog = df_obj["err_log1p"].to_numpy(dtype=np.float32, copy=False)
    snr  = df_obj["snr"].to_numpy(dtype=np.float32, copy=False)
    det  = df_obj["detected"].to_numpy(dtype=np.int8, copy=False)

    # sort by mjd, tie-break by band
    order = np.lexsort((band, mjd))
    mjd = mjd[order]
    band = band[order]
    flux = flux[order]
    elog = elog[order]
    snr  = snr[order]
    det  = det[order]

    # time features
    t0 = mjd[0]
    t_rel = mjd - t0
    # dt: first token dt=0
    dt = np.empty_like(t_rel)
    dt[0] = 0.0
    if len(t_rel) > 1:
        dt[1:] = np.maximum(mjd[1:] - mjd[:-1], 0.0)

    if TIME_CLIP_MAX_DAYS is not None:
        t_rel = np.clip(t_rel, 0.0, float(TIME_CLIP_MAX_DAYS))
        dt    = np.clip(dt,    0.0, float(TIME_CLIP_MAX_DAYS))

    t_rel_log = np.log1p(t_rel).astype(np.float32)
    dt_log    = np.log1p(dt).astype(np.float32)

    # robust snr transform
    snr = np.nan_to_num(snr, nan=0.0, posinf=0.0, neginf=0.0)
    snr_tanh = np.tanh(snr / np.float32(SNR_TANH_SCALE)).astype(np.float32)

    # detected -> float32
    det_f = det.astype(np.float32)

    # sanitize other features
    flux = np.nan_to_num(flux, nan=0.0, posinf=0.0, neginf=0.0).astype(np.float32)
    elog = np.nan_to_num(elog, nan=0.0, posinf=0.0, neginf=0.0).astype(np.float32)

    X = np.stack([t_rel_log, dt_log, flux, elog, snr_tanh, det_f], axis=1).astype(np.float32)
    B = band.astype(np.int8)

    return X, B

# ----------------------------
# 4) Shard writer
# ----------------------------
def save_shard(out_path: Path, object_ids, X_concat, B_concat, offsets):
    out_path.parent.mkdir(parents=True, exist_ok=True)
    obj_arr = np.asarray(object_ids, dtype="S")  # bytes to store in npz
    if COMPRESS_NPZ:
        np.savez_compressed(out_path, object_id=obj_arr, x=X_concat, band=B_concat, offsets=offsets)
    else:
        np.savez(out_path, object_id=obj_arr, x=X_concat, band=B_concat, offsets=offsets)

# ----------------------------
# 5) Streaming builder (expects contiguous blocks by object_id across the cleaned parts)
# ----------------------------
def build_sequences_streaming(split_name: str, which: str, expected_ids: set, out_dir: Path):
    """
    Returns:
      manifest_rows (list of dict), n_objects_built (int), fallback_needed (bool)
    """
    parts = get_clean_parts(split_name, which)
    if not parts:
        raise RuntimeError(f"No cleaned parts for {split_name}/{which}. Pastikan STAGE 4 sukses.")

    # shard accumulators
    manifest_rows = []
    shard_idx = 0
    batch_obj_ids = []
    batch_X_list = []
    batch_B_list = []
    batch_lengths = []

    # streaming buffers for current object
    cur_oid = None
    cur_buf = []  # list of DataFrames (small blocks)
    seen_done = set()  # object_ids that are finalized
    fallback_needed = False

    def flush_object(oid, buf_blocks):
        nonlocal batch_obj_ids, batch_X_list, batch_B_list, batch_lengths
        if oid is None or not buf_blocks:
            return
        df_obj = pd.concat(buf_blocks, ignore_index=True)
        # drop rows not expected (safety), but ideally none
        if oid not in expected_ids:
            return
        X, B = build_object_tokens(df_obj)
        if X is None:
            return
        batch_obj_ids.append(oid)
        batch_X_list.append(X)
        batch_B_list.append(B)
        batch_lengths.append(X.shape[0])

    def flush_shard():
        nonlocal shard_idx, batch_obj_ids, batch_X_list, batch_B_list, batch_lengths, manifest_rows
        if not batch_obj_ids:
            return
        # build concat + offsets
        lengths = np.asarray(batch_lengths, dtype=np.int64)
        offsets = np.zeros(len(lengths) + 1, dtype=np.int64)
        offsets[1:] = np.cumsum(lengths)
        X_concat = np.concatenate(batch_X_list, axis=0).astype(np.float32)
        B_concat = np.concatenate(batch_B_list, axis=0).astype(np.int8)

        shard_path = out_dir / f"shard_{shard_idx:04d}.npz"
        save_shard(shard_path, batch_obj_ids, X_concat, B_concat, offsets)

        # manifest entries
        for i, oid in enumerate(batch_obj_ids):
            start = int(offsets[i])
            length = int(lengths[i])
            manifest_rows.append({
                "object_id": oid,
                "split": split_name,
                "which": which,
                "shard": str(shard_path),
                "start": start,
                "length": length
            })

        shard_idx += 1
        # reset batch
        batch_obj_ids = []
        batch_X_list = []
        batch_B_list = []
        batch_lengths = []
        gc.collect()

    # iterate parts sequentially (preserve original order)
    for pi, p in enumerate(parts):
        df = _read_clean_part(p)
        if df.empty:
            continue

        # IMPORTANT: use original row order (do NOT sort by object_id)
        oids = df["object_id"].to_numpy(dtype=object, copy=False)

        # detect contiguous segments where object_id constant
        # boundaries: start indices of segments
        change = np.empty(len(oids), dtype=bool)
        change[0] = True
        change[1:] = oids[1:] != oids[:-1]
        seg_starts = np.flatnonzero(change)
        seg_ends = np.append(seg_starts[1:], len(oids))

        for s_idx, e_idx in zip(seg_starts, seg_ends):
            oid = str(oids[s_idx])
            block = df.iloc[s_idx:e_idx]

            # If we see an oid that already finalized earlier and it's not the current ongoing oid,
            # then file is not contiguous-by-object => need fallback
            if (oid in seen_done) and (oid != cur_oid):
                fallback_needed = True
                break

            if cur_oid is None:
                cur_oid = oid
                cur_buf = [block]
            elif oid == cur_oid:
                cur_buf.append(block)
            else:
                # finalize previous
                flush_object(cur_oid, cur_buf)
                seen_done.add(cur_oid)

                # flush shard if too big
                if len(batch_obj_ids) >= SHARD_MAX_OBJECTS:
                    flush_shard()

                # start new
                cur_oid = oid
                cur_buf = [block]

        del df
        if fallback_needed:
            break

        if (pi + 1) % 10 == 0:
            gc.collect()

    # finalize last object
    if not fallback_needed:
        flush_object(cur_oid, cur_buf)
        if cur_oid is not None:
            seen_done.add(cur_oid)
        flush_shard()

    built = len(seen_done.intersection(expected_ids))
    return manifest_rows, built, fallback_needed

# ----------------------------
# 6) Fallback: Hash-bucket builder (robust if not contiguous), temp bucket files then delete
# ----------------------------
def build_sequences_fallback_bucket(split_name: str, which: str, expected_ids: set, out_dir: Path, num_buckets: int = 64):
    """
    Robust method:
    - Stream read cleaned parts -> write rows to temporary bucket parquet files based on hash(object_id)
    - Process each bucket file: groupby object_id -> build tokens -> write shards
    - Delete temp bucket files to save disk
    """
    try:
        import pyarrow as pa
        import pyarrow.parquet as pq
    except Exception as e:
        raise RuntimeError("Fallback bucketization needs pyarrow. It seems unavailable.") from e

    parts = get_clean_parts(split_name, which)
    if not parts:
        raise RuntimeError(f"No cleaned parts for {split_name}/{which}.")

    tmp_dir = Path(ART_DIR) / "tmp_buckets" / split_name / which
    tmp_dir.mkdir(parents=True, exist_ok=True)

    # Parquet writers dict
    writers = {}
    schemas = {}

    def _bucket_series(s: pd.Series) -> np.ndarray:
        # fast vectorized pandas hash (stable within this env), mod num_buckets
        h = pd.util.hash_pandas_object(s, index=False).to_numpy(dtype=np.uint64, copy=False)
        return (h % np.uint64(num_buckets)).astype(np.int16)

    # write buckets
    for p in parts:
        df = _read_clean_part(p)
        if df.empty:
            continue
        # keep only expected ids to reduce disk
        df = df[df["object_id"].isin(expected_ids)]
        if df.empty:
            continue

        bidx = _bucket_series(df["object_id"])
        df["_b"] = bidx

        for b in np.unique(bidx):
            sub = df[df["_b"] == b].drop(columns=["_b"])
            if sub.empty:
                continue
            file_path = tmp_dir / f"bucket_{int(b):03d}.parquet"

            table = pa.Table.from_pandas(sub, preserve_index=False)
            if int(b) not in writers:
                schemas[int(b)] = table.schema
                writers[int(b)] = pq.ParquetWriter(file_path, table.schema, compression="snappy")
            writers[int(b)].write_table(table)

        del df
        gc.collect()

    # close writers
    for w in writers.values():
        w.close()

    # now process each bucket file to create shards
    manifest_rows = []
    shard_idx = 0

    batch_obj_ids, batch_X_list, batch_B_list, batch_lengths = [], [], [], []

    def flush_shard_local():
        nonlocal shard_idx, batch_obj_ids, batch_X_list, batch_B_list, batch_lengths, manifest_rows
        if not batch_obj_ids:
            return
        lengths = np.asarray(batch_lengths, dtype=np.int64)
        offsets = np.zeros(len(lengths) + 1, dtype=np.int64)
        offsets[1:] = np.cumsum(lengths)
        X_concat = np.concatenate(batch_X_list, axis=0).astype(np.float32)
        B_concat = np.concatenate(batch_B_list, axis=0).astype(np.int8)

        shard_path = out_dir / f"shard_{shard_idx:04d}.npz"
        save_shard(shard_path, batch_obj_ids, X_concat, B_concat, offsets)

        for i, oid in enumerate(batch_obj_ids):
            manifest_rows.append({
                "object_id": oid,
                "split": split_name,
                "which": which,
                "shard": str(shard_path),
                "start": int(offsets[i]),
                "length": int(lengths[i])
            })

        shard_idx += 1
        batch_obj_ids, batch_X_list, batch_B_list, batch_lengths = [], [], [], []
        gc.collect()

    bucket_files = sorted(tmp_dir.glob("bucket_*.parquet"))
    built_ids = set()

    for bf in bucket_files:
        dfb = pd.read_parquet(bf)
        dfb.columns = [c.strip() for c in dfb.columns]
        if dfb.empty:
            bf.unlink(missing_ok=True)
            continue

        # groupby object_id (robust)
        for oid, g in dfb.groupby("object_id", sort=False):
            oid = str(oid)
            if oid in built_ids:
                continue
            X, B = build_object_tokens(g)
            if X is None:
                continue
            batch_obj_ids.append(oid)
            batch_X_list.append(X)
            batch_B_list.append(B)
            batch_lengths.append(X.shape[0])
            built_ids.add(oid)

            if len(batch_obj_ids) >= SHARD_MAX_OBJECTS:
                flush_shard_local()

        # delete bucket file to save disk
        bf.unlink(missing_ok=True)
        del dfb
        gc.collect()

    flush_shard_local()

    # cleanup tmp dir
    try:
        tmp_dir.rmdir()
    except Exception:
        pass

    return manifest_rows, len(built_ids)

# ----------------------------
# 7) Run tokenization for all splits (train & test)
# ----------------------------
splits_to_run = ONLY_SPLITS if (ONLY_SPLITS is not None) else SPLIT_LIST

all_manifest_train = []
all_manifest_test  = []

def expected_set_for(split_name: str, which: str) -> set:
    if which == "train":
        return set(train_ids_by_split[split_name])
    else:
        return set(test_ids_by_split[split_name])

for split_name in splits_to_run:
    for which in ["train", "test"]:
        out_dir = SEQ_DIR / split_name / which
        out_dir.mkdir(parents=True, exist_ok=True)

        expected_ids = expected_set_for(split_name, which)
        if len(expected_ids) == 0:
            raise RuntimeError(f"Expected ids empty for {split_name}/{which}. Cek log/split mapping.")

        print(f"\n[Stage 5] Building sequences: {split_name}/{which} | expected_objects={len(expected_ids):,}")

        manifest_rows, built, fallback_needed = build_sequences_streaming(
            split_name=split_name,
            which=which,
            expected_ids=expected_ids,
            out_dir=out_dir
        )

        # If streaming failed or missing many objects, fallback
        if fallback_needed or built != len(expected_ids):
            print(f"[Stage 5] Streaming not safe for {split_name}/{which} "
                  f"(built={built:,} vs expected={len(expected_ids):,}, fallback_needed={fallback_needed}).")
            print(f"[Stage 5] Switching to robust bucket fallback (temporary files, then cleaned).")

            # Clear any partial outputs in out_dir to avoid mixing
            for f in out_dir.glob("shard_*.npz"):
                try: f.unlink()
                except Exception: pass

            manifest_rows, built2 = build_sequences_fallback_bucket(
                split_name=split_name,
                which=which,
                expected_ids=expected_ids,
                out_dir=out_dir,
                num_buckets=FALLBACK_NUM_BUCKETS
            )
            if built2 != len(expected_ids):
                raise RuntimeError(f"Fallback still mismatch for {split_name}/{which}: built={built2:,} expected={len(expected_ids):,}")
            built = built2

        print(f"[Stage 5] OK: {split_name}/{which} built_objects={built:,} | shards={len(list(out_dir.glob('shard_*.npz'))):,}")

        if which == "train":
            all_manifest_train.extend(manifest_rows)
        else:
            all_manifest_test.extend(manifest_rows)

        gc.collect()

# ----------------------------
# 8) Save manifests + config
# ----------------------------
df_m_train = pd.DataFrame(all_manifest_train).sort_values(["split","shard","start"]).reset_index(drop=True)
df_m_test  = pd.DataFrame(all_manifest_test).sort_values(["split","shard","start"]).reset_index(drop=True)

mtrain_path = SEQ_DIR / "seq_manifest_train.csv"
mtest_path  = SEQ_DIR / "seq_manifest_test.csv"
df_m_train.to_csv(mtrain_path, index=False)
df_m_test.to_csv(mtest_path, index=False)

cfg = {
    "feature_names": FEATURE_NAMES,
    "feature_dim": FEATURE_DIM,
    "snr_tanh_scale": float(SNR_TANH_SCALE),
    "time_clip_max_days": None if TIME_CLIP_MAX_DAYS is None else float(TIME_CLIP_MAX_DAYS),
    "compress_npz": bool(COMPRESS_NPZ),
    "shard_max_objects": int(SHARD_MAX_OBJECTS),
    "fallback_num_buckets": int(FALLBACK_NUM_BUCKETS),
}
cfg_path = SEQ_DIR / "seq_config.json"
with open(cfg_path, "w", encoding="utf-8") as f:
    json.dump(cfg, f, indent=2)

print("\n[Stage 5] DONE")
print(f"- Saved: {mtrain_path} (rows={len(df_m_train):,})")
print(f"- Saved: {mtest_path}  (rows={len(df_m_test):,})")
print(f"- Saved: {cfg_path}")

# ----------------------------
# 9) Smoke test: load one object sequence
# ----------------------------
def load_sequence(object_id: str, which: str):
    """Load one object's sequence from manifest + shard."""
    object_id = str(object_id).strip()
    m = df_m_train if which == "train" else df_m_test
    row = m[m["object_id"] == object_id]
    if row.empty:
        raise KeyError(f"object_id not found in seq manifest ({which}): {object_id}")
    r = row.iloc[0]
    data = np.load(r["shard"], allow_pickle=False)
    start = int(r["start"])
    length = int(r["length"])
    X = data["x"][start:start+length]
    B = data["band"][start:start+length]
    return X, B

# pick one train object to test
_smoke_oid = df_train_meta.index[0]
X_sm, B_sm = load_sequence(_smoke_oid, "train")
print(f"\n[Stage 5] Smoke test object_id={_smoke_oid}")
print(f"- seq_len={len(X_sm)} | X_shape={X_sm.shape} | bands_unique={sorted(set(B_sm.tolist()))}")

# Export globals for next stages
globals().update({
    "SEQ_DIR": SEQ_DIR,
    "seq_manifest_train": df_m_train,
    "seq_manifest_test": df_m_test,
    "SEQ_FEATURE_NAMES": FEATURE_NAMES,
    "load_sequence": load_sequence,
})

gc.collect()



[Stage 5] Building sequences: split_01/train | expected_objects=155
[Stage 5] OK: split_01/train built_objects=155 | shards=1

[Stage 5] Building sequences: split_01/test | expected_objects=364
[Stage 5] OK: split_01/test built_objects=364 | shards=1

[Stage 5] Building sequences: split_02/train | expected_objects=170
[Stage 5] OK: split_02/train built_objects=170 | shards=1

[Stage 5] Building sequences: split_02/test | expected_objects=414
[Stage 5] OK: split_02/test built_objects=414 | shards=1

[Stage 5] Building sequences: split_03/train | expected_objects=138
[Stage 5] OK: split_03/train built_objects=138 | shards=1

[Stage 5] Building sequences: split_03/test | expected_objects=338
[Stage 5] OK: split_03/test built_objects=338 | shards=1

[Stage 5] Building sequences: split_04/train | expected_objects=145
[Stage 5] OK: split_04/train built_objects=145 | shards=1

[Stage 5] Building sequences: split_04/test | expected_objects=332
[Stage 5] OK: split_04/test built_objects=332 | s

55

# Sequence Length Policy (Padding, Truncation, Windowing)

In [8]:
# ============================================================
# STAGE 6 — Sequence Length Policy (Padding, Truncation, Windowing)
# ONE CELL, Kaggle CPU-SAFE, nyambung dengan STAGE 0..5
#
# Tujuan:
# - Tentukan MAX_LEN otomatis (berdasarkan distribusi panjang sequence)
# - Terapkan policy truncation/windowing yang informatif (center around peak)
# - Buat fixed-length cache (memmap) untuk TRAIN & TEST:
#     X: (N, MAX_LEN, F) float32
#     B: (N, MAX_LEN) int8   (band_id)
#     M: (N, MAX_LEN) int8   (attention mask: 1=real token, 0=pad)
#     y_train: (N,) int8
#     ids: (N,) bytes
# - Simpan config agar tahap training tidak error karena mismatch shape
#
# Input:
# - seq_manifest_train, seq_manifest_test, SEQ_FEATURE_NAMES, df_train_meta, df_test_meta
# - (opsional) df_sub dari STAGE 0 untuk urutan test yang sesuai submission
#
# Output:
# - /kaggle/working/mallorn_run/artifacts/fixed_seq/{train|test}_{X|B|M}.dat
# - /kaggle/working/mallorn_run/artifacts/fixed_seq/{train|test}_{ids}.npy
# - /kaggle/working/mallorn_run/artifacts/fixed_seq/train_y.npy
# - /kaggle/working/mallorn_run/artifacts/fixed_seq/length_policy_config.json
# ============================================================

import gc, json, math, warnings
from pathlib import Path

import numpy as np
import pandas as pd

warnings.filterwarnings("ignore", category=FutureWarning)
warnings.filterwarnings("ignore", category=pd.errors.DtypeWarning)

# ----------------------------
# 0) Require previous stages
# ----------------------------
for need in ["seq_manifest_train", "seq_manifest_test", "SEQ_FEATURE_NAMES",
             "df_train_meta", "df_test_meta", "ART_DIR"]:
    if need not in globals():
        raise RuntimeError(f"Missing `{need}`. Jalankan STAGE 0 -> 1 -> 2 -> 3 -> 4 -> 5 dulu.")

m_train = seq_manifest_train.copy()
m_test  = seq_manifest_test.copy()

# feature indices (must match STAGE 5)
feat = {name: i for i, name in enumerate(SEQ_FEATURE_NAMES)}
REQ_FEATS = ["t_rel_log", "dt_log", "flux_asinh", "err_log1p", "snr_tanh", "detected"]
for k in REQ_FEATS:
    if k not in feat:
        raise ValueError(f"SEQ_FEATURE_NAMES must include '{k}'. Found: {SEQ_FEATURE_NAMES}")

# ----------------------------
# 1) Inspect length distribution -> choose MAX_LEN (CPU-friendly)
# ----------------------------
def describe_lengths(m: pd.DataFrame, name: str):
    L = m["length"].to_numpy(dtype=np.int32, copy=False)
    q = np.percentile(L, [0, 1, 5, 10, 25, 50, 75, 90, 95, 98, 99, 100])
    print(f"\n{name} length stats")
    print(f"- n_objects={len(L):,} | min={int(q[0])} | p50={int(q[5])} | p90={int(q[8])} | p95={int(q[9])} | p99={int(q[10])} | max={int(q[-1])}")
    return q

q_tr = describe_lengths(m_train, "TRAIN")
q_te = describe_lengths(m_test,  "TEST")

# auto pick based on max(p95_train, p95_test) rounded up to multiple of 32, capped for CPU
p95 = int(max(q_tr[9], q_te[9]))
def round_up(x, base=32):
    return int(base * math.ceil(x / base))

# CPU-safe caps:
# - if p95 <= 256 => 256
# - elif <= 384 => 384
# - else <= 512 => 512
if p95 <= 256:
    MAX_LEN = 256
elif p95 <= 384:
    MAX_LEN = 384
else:
    MAX_LEN = 512

# If you want to force smaller on CPU, set here:
FORCE_MAX_LEN = None  # e.g. 256
if FORCE_MAX_LEN is not None:
    MAX_LEN = int(FORCE_MAX_LEN)

print(f"\n[Stage 6] Chosen MAX_LEN = {MAX_LEN} (based on p95={p95})")

# ----------------------------
# 2) Windowing / truncation policy (informative on TDE-like peaks)
# ----------------------------
# Policy:
# - compute token score = w1*|snr_tanh| + w2*|flux_asinh| + w3*detected
# - center = argmax(score)
# - take window [center - MAX_LEN//2, center + MAX_LEN//2]
# - clamp to valid range
W_SNR = 1.0
W_FLX = 0.35
W_DET = 0.25

def select_window(X: np.ndarray, max_len: int) -> tuple[int, int, int]:
    """
    Returns (start, end, center).
    X shape (L, F).
    """
    L = int(X.shape[0])
    if L <= max_len:
        return 0, L, 0

    snr = np.abs(X[:, feat["snr_tanh"]])
    flx = np.abs(X[:, feat["flux_asinh"]])
    det = X[:, feat["detected"]]

    score = (W_SNR * snr) + (W_FLX * flx) + (W_DET * det)
    # if all zeros (rare), fallback to middle
    if not np.isfinite(score).any() or float(score.max()) <= 0.0:
        center = L // 2
    else:
        center = int(np.argmax(score))

    half = max_len // 2
    start = center - half
    start = max(0, start)
    start = min(start, L - max_len)
    end = start + max_len
    return start, end, center

def pad_to_fixed(X: np.ndarray, B: np.ndarray, max_len: int):
    """
    Returns:
      Xp: (max_len, F) float32
      Bp: (max_len,) int8
      Mp: (max_len,) int8  (1=real token)
      orig_len, win_start, win_end
    """
    L = int(X.shape[0])
    if L <= 0:
        # safety: should not happen
        Xp = np.zeros((max_len, X.shape[1]), dtype=np.float32)
        Bp = np.zeros((max_len,), dtype=np.int8)
        Mp = np.zeros((max_len,), dtype=np.int8)
        return Xp, Bp, Mp, 0, 0, 0

    s, e, _ = select_window(X, max_len=max_len)
    Xw = X[s:e]
    Bw = B[s:e]
    lw = int(Xw.shape[0])

    Xp = np.zeros((max_len, X.shape[1]), dtype=np.float32)
    Bp = np.zeros((max_len,), dtype=np.int8)
    Mp = np.zeros((max_len,), dtype=np.int8)

    Xp[:lw] = Xw.astype(np.float32, copy=False)
    Bp[:lw] = Bw.astype(np.int8, copy=False)
    Mp[:lw] = 1

    return Xp, Bp, Mp, L, s, e

# ----------------------------
# 3) Fixed cache builder (efficient: process per shard)
# ----------------------------
FIX_DIR = Path(ART_DIR) / "fixed_seq"
FIX_DIR.mkdir(parents=True, exist_ok=True)

# Decide ordering for train/test arrays (important!)
# - train: df_train_meta.index order (stable)
train_ids = df_train_meta.index.to_list()
y_train = df_train_meta["target"].to_numpy(dtype=np.int8, copy=False)

# - test: if df_sub exists, follow it (ensures submission order); else df_test_meta.index
if "df_sub" in globals() and isinstance(globals()["df_sub"], pd.DataFrame) and "object_id" in df_sub.columns:
    test_ids = df_sub["object_id"].astype(str).str.strip().to_list()
else:
    test_ids = df_test_meta.index.to_list()

# mapping to row index
train_row = {oid: i for i, oid in enumerate(train_ids)}
test_row  = {oid: i for i, oid in enumerate(test_ids)}

NTR = len(train_ids)
NTE = len(test_ids)
F = len(SEQ_FEATURE_NAMES)

# memmap paths
train_X_path = FIX_DIR / "train_X.dat"
train_B_path = FIX_DIR / "train_B.dat"
train_M_path = FIX_DIR / "train_M.dat"
test_X_path  = FIX_DIR / "test_X.dat"
test_B_path  = FIX_DIR / "test_B.dat"
test_M_path  = FIX_DIR / "test_M.dat"

# metadata arrays
train_len_path = FIX_DIR / "train_origlen.npy"
train_win_path = FIX_DIR / "train_winstart.npy"
test_len_path  = FIX_DIR / "test_origlen.npy"
test_win_path  = FIX_DIR / "test_winstart.npy"

# create memmaps
Xtr = np.memmap(train_X_path, dtype=np.float32, mode="w+", shape=(NTR, MAX_LEN, F))
Btr = np.memmap(train_B_path, dtype=np.int8,   mode="w+", shape=(NTR, MAX_LEN))
Mtr = np.memmap(train_M_path, dtype=np.int8,   mode="w+", shape=(NTR, MAX_LEN))

Xte = np.memmap(test_X_path, dtype=np.float32, mode="w+", shape=(NTE, MAX_LEN, F))
Bte = np.memmap(test_B_path, dtype=np.int8,   mode="w+", shape=(NTE, MAX_LEN))
Mte = np.memmap(test_M_path, dtype=np.int8,   mode="w+", shape=(NTE, MAX_LEN))

origlen_tr = np.zeros((NTR,), dtype=np.int32)
winstart_tr = np.zeros((NTR,), dtype=np.int32)
origlen_te = np.zeros((NTE,), dtype=np.int32)
winstart_te = np.zeros((NTE,), dtype=np.int32)

def process_manifest_into_memmap(m: pd.DataFrame, which: str):
    """
    Process manifest (train/test) into the appropriate memmaps by reading each shard once.
    """
    if which == "train":
        row_map = train_row
        Xmm, Bmm, Mmm = Xtr, Btr, Mtr
        origlen, winstart = origlen_tr, winstart_tr
        expected_n = NTR
    else:
        row_map = test_row
        Xmm, Bmm, Mmm = Xte, Bte, Mte
        origlen, winstart = origlen_te, winstart_te
        expected_n = NTE

    filled = 0
    # group by shard to minimize IO
    for shard_path, g in m.groupby("shard", sort=False):
        shard_path = str(shard_path)
        data = np.load(shard_path, allow_pickle=False)
        x_all = data["x"]      # (total_tokens, F)
        b_all = data["band"]   # (total_tokens,)
        # loop rows in this shard group
        for _, r in g.iterrows():
            oid = str(r["object_id"])
            idx = row_map.get(oid, None)
            if idx is None:
                # object_id not requested in ordering (should not happen, but skip safely)
                continue
            start = int(r["start"])
            length = int(r["length"])
            if length <= 0:
                continue

            X = x_all[start:start+length]
            B = b_all[start:start+length]

            Xp, Bp, Mp, L0, ws, _ = pad_to_fixed(X, B, max_len=MAX_LEN)

            Xmm[idx, :, :] = Xp
            Bmm[idx, :] = Bp
            Mmm[idx, :] = Mp
            origlen[idx] = int(L0)
            winstart[idx] = int(ws)
            filled += 1

        del data
        if filled % 2000 == 0:
            gc.collect()

    # sanity: some ids might not exist in manifest (should be 0 if pipeline correct)
    return filled, expected_n

print("\n[Stage 6] Building fixed-length cache (TRAIN)...")
filled_tr, exp_tr = process_manifest_into_memmap(m_train, "train")
print(f"[Stage 6] TRAIN filled: {filled_tr:,} rows (expected ordering size={exp_tr:,})")

print("\n[Stage 6] Building fixed-length cache (TEST)...")
filled_te, exp_te = process_manifest_into_memmap(m_test, "test")
print(f"[Stage 6] TEST filled:  {filled_te:,} rows (expected ordering size={exp_te:,})")

# Flush memmaps
Xtr.flush(); Btr.flush(); Mtr.flush()
Xte.flush(); Bte.flush(); Mte.flush()

# Save ids + y + meta
np.save(FIX_DIR / "train_ids.npy", np.asarray(train_ids, dtype="S"))
np.save(FIX_DIR / "test_ids.npy",  np.asarray(test_ids, dtype="S"))
np.save(FIX_DIR / "train_y.npy",   y_train)

np.save(train_len_path, origlen_tr)
np.save(train_win_path, winstart_tr)
np.save(test_len_path,  origlen_te)
np.save(test_win_path,  winstart_te)

# ----------------------------
# 4) Sanity checks (anti-error)
# ----------------------------
def sanity_check(which: str, n_show: int = 3):
    if which == "train":
        Xmm, Bmm, Mmm = Xtr, Btr, Mtr
        ids = train_ids
        ol = origlen_tr
    else:
        Xmm, Bmm, Mmm = Xte, Bte, Mte
        ids = test_ids
        ol = origlen_te

    # check masks and shapes
    assert Xmm.shape[1] == MAX_LEN and Xmm.shape[2] == F
    assert Bmm.shape[1] == MAX_LEN
    assert Mmm.shape[1] == MAX_LEN

    # show a few random examples
    rng = np.random.default_rng(2025)
    idxs = rng.choice(len(ids), size=min(n_show, len(ids)), replace=False)
    print(f"\n[Stage 6] Sanity samples ({which}):")
    for i in idxs:
        msum = int(Mmm[i].sum())
        print(f"- idx={i} oid={ids[i]} orig_len={int(ol[i])} kept={msum} | bands_unique={sorted(set(Bmm[i,:msum].tolist()))}")

sanity_check("train", n_show=3)
sanity_check("test", n_show=3)

# ----------------------------
# 5) Save config
# ----------------------------
policy_cfg = {
    "max_len": int(MAX_LEN),
    "feature_names": list(SEQ_FEATURE_NAMES),
    "weights": {"snr": float(W_SNR), "flux": float(W_FLX), "detected": float(W_DET)},
    "snr_tanh_scale_used_in_stage5": float(globals().get("SNR_TANH_SCALE", 10.0)) if "SNR_TANH_SCALE" in globals() else None,
    "train_order": "df_train_meta.index",
    "test_order": "df_sub.object_id" if ("df_sub" in globals() and "object_id" in df_sub.columns) else "df_test_meta.index",
    "files": {
        "train_X": str(train_X_path),
        "train_B": str(train_B_path),
        "train_M": str(train_M_path),
        "train_y": str(FIX_DIR / "train_y.npy"),
        "train_ids": str(FIX_DIR / "train_ids.npy"),
        "test_X": str(test_X_path),
        "test_B": str(test_B_path),
        "test_M": str(test_M_path),
        "test_ids": str(FIX_DIR / "test_ids.npy"),
    }
}
cfg_path = FIX_DIR / "length_policy_config.json"
with open(cfg_path, "w", encoding="utf-8") as f:
    json.dump(policy_cfg, f, indent=2)

print("\n[Stage 6] DONE")
print(f"- Saved fixed cache dir: {FIX_DIR}")
print(f"- Saved config: {cfg_path}")

# Export globals for training stage
globals().update({
    "FIX_DIR": FIX_DIR,
    "MAX_LEN": MAX_LEN,
    "FIX_TRAIN_X_PATH": train_X_path,
    "FIX_TRAIN_B_PATH": train_B_path,
    "FIX_TRAIN_M_PATH": train_M_path,
    "FIX_TEST_X_PATH": test_X_path,
    "FIX_TEST_B_PATH": test_B_path,
    "FIX_TEST_M_PATH": test_M_path,
    "FIX_TRAIN_Y_PATH": FIX_DIR / "train_y.npy",
    "FIX_TRAIN_IDS_PATH": FIX_DIR / "train_ids.npy",
    "FIX_TEST_IDS_PATH": FIX_DIR / "test_ids.npy",
    "FIX_POLICY_CFG_PATH": cfg_path,
})

gc.collect()



TRAIN length stats
- n_objects=3,043 | min=17 | p50=150 | p90=194 | p95=386 | p99=908 | max=1164

TEST length stats
- n_objects=7,135 | min=18 | p50=152 | p90=193 | p95=542 | p99=990 | max=1186

[Stage 6] Chosen MAX_LEN = 512 (based on p95=542)

[Stage 6] Building fixed-length cache (TRAIN)...
[Stage 6] TRAIN filled: 3,043 rows (expected ordering size=3,043)

[Stage 6] Building fixed-length cache (TEST)...
[Stage 6] TEST filled:  7,135 rows (expected ordering size=7,135)

[Stage 6] Sanity samples (train):
- idx=1360 oid=gwilwileth_adel_amloth orig_len=157 kept=157 | bands_unique=[0, 1, 2, 3, 4, 5]
- idx=3020 oid=vin_araf_gwador orig_len=151 kept=151 | bands_unique=[0, 1, 2, 3, 4, 5]
- idx=3025 oid=ylf_alph_mindon orig_len=167 kept=167 | bands_unique=[0, 1, 2, 3, 4, 5]

[Stage 6] Sanity samples (test):
- idx=3191 oid=rom_bellas_lebdas orig_len=142 kept=142 | bands_unique=[0, 1, 2, 3, 4, 5]
- idx=7082 oid=nim_nestad_thor orig_len=161 kept=161 | bands_unique=[0, 1, 2, 3, 4, 5]
- idx=7094

77

# CV Split (Object-Level, Stratified)

In [9]:
# ============================================================
# STAGE 7 — CV Split (Object-Level, Stratified) (ONE CELL, Kaggle CPU-SAFE)
#
# Tujuan:
# - Buat Stratified K-Fold di level object_id (bukan per baris lightcurve)
# - Konsisten dengan urutan TRAIN yang dipakai di STAGE 6 (fixed_seq/train_ids.npy)
#
# Output:
# - artifacts/cv_folds.csv                (object_id -> fold)
# - artifacts/cv_folds.npz                (val_idx per fold, untuk training cepat)
# - artifacts/cv_report.txt               (ringkasan distribusi kelas per fold)
# - globals: fold_assign, folds (list of dict)
# ============================================================

import gc, json
from pathlib import Path

import numpy as np
import pandas as pd

# ----------------------------
# 0) Require previous stages
# ----------------------------
for need in ["df_train_meta", "ART_DIR"]:
    if need not in globals():
        raise RuntimeError(f"Missing `{need}`. Jalankan STAGE 2 dulu (Load and Validate Train/Test Logs).")

SEED = int(globals().get("SEED", 2025))

# Prefer using the same ordering as STAGE 6 fixed cache (if available)
train_ids = None
if "FIX_DIR" in globals():
    p = Path(globals()["FIX_DIR"]) / "train_ids.npy"
    if p.exists():
        train_ids = np.load(p, allow_pickle=False).astype("S").astype(str).tolist()

if train_ids is None:
    # fallback: df_train_meta index order
    train_ids = df_train_meta.index.astype(str).tolist()

# y aligned to train_ids
y = df_train_meta.loc[train_ids, "target"].to_numpy(dtype=np.int8, copy=False)

N = len(train_ids)
pos = int((y == 1).sum())
neg = int((y == 0).sum())
if pos == 0 or neg == 0:
    raise RuntimeError(f"Invalid class distribution: pos={pos}, neg={neg}. Tidak bisa Stratified CV.")

# ----------------------------
# 1) Choose n_splits safely (anti-error when positives are very few)
# ----------------------------
DEFAULT_SPLITS = 5
# Ensure each fold can contain at least 1 positive and 1 negative
max_splits_by_pos = pos
max_splits_by_neg = neg
n_splits = min(DEFAULT_SPLITS, max_splits_by_pos, max_splits_by_neg)

if n_splits < 2:
    raise RuntimeError(
        f"Terlalu sedikit sampel untuk CV stratified. pos={pos}, neg={neg}. "
        "Minimal butuh >=2 per kelas untuk KFold."
    )

print(f"[Stage 7] Building StratifiedKFold: n_splits={n_splits} | N={N:,} | pos={pos:,} | neg={neg:,} | seed={SEED}")

# ----------------------------
# 2) Build folds
# ----------------------------
try:
    from sklearn.model_selection import StratifiedKFold
except Exception as e:
    raise RuntimeError("scikit-learn tidak tersedia. Pastikan Kaggle environment punya sklearn.") from e

skf = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=SEED)

fold_assign = np.full(N, -1, dtype=np.int16)
folds = []

for fold, (_, val_idx) in enumerate(skf.split(np.zeros(N), y)):
    fold_assign[val_idx] = fold
    folds.append({
        "fold": int(fold),
        "val_idx": val_idx.astype(np.int32),
    })

if (fold_assign < 0).any():
    raise RuntimeError("Fold assignment masih ada -1 (ada object belum ter-assign). Ini tidak seharusnya terjadi.")

# ----------------------------
# 3) Validate distribution per fold (anti-silent-bug)
# ----------------------------
lines = []
lines.append(f"StratifiedKFold n_splits={n_splits} seed={SEED}")
lines.append(f"Total: N={N} | pos={pos} | neg={neg} | pos%={pos/max(N,1)*100:.4f}%")
lines.append("Per-fold distribution:")

ok = True
for f in range(n_splits):
    idx = np.where(fold_assign == f)[0]
    yf = y[idx]
    pf = int((yf == 1).sum())
    nf = int((yf == 0).sum())
    lines.append(f"- fold {f}: n={len(idx):6d} | pos={pf:5d} | neg={nf:6d} | pos%={(pf/max(len(idx),1))*100:7.4f}%")
    # basic checks
    if pf == 0 or nf == 0:
        ok = False

if not ok:
    raise RuntimeError(
        "Ada fold yang tidak mengandung salah satu kelas (pos=0 atau neg=0). "
        "Coba turunkan n_splits atau pastikan stratify berjalan benar."
    )

# ----------------------------
# 4) Save artifacts
# ----------------------------
ART_DIR = Path(ART_DIR)
cv_dir = ART_DIR / "cv"
cv_dir.mkdir(parents=True, exist_ok=True)

df_folds = pd.DataFrame({"object_id": train_ids, "fold": fold_assign.astype(int)})
folds_csv = cv_dir / "cv_folds.csv"
df_folds.to_csv(folds_csv, index=False)

# Save val_idx per fold in one npz (train_idx can be derived as ~val_idx)
npz_path = cv_dir / "cv_folds.npz"
npz_kwargs = {f"val_idx_{f}": folds[f]["val_idx"] for f in range(n_splits)}
np.savez(npz_path, **npz_kwargs)

report_path = cv_dir / "cv_report.txt"
with open(report_path, "w", encoding="utf-8") as f:
    f.write("\n".join(lines) + "\n")

cfg_path = cv_dir / "cv_config.json"
with open(cfg_path, "w", encoding="utf-8") as f:
    json.dump(
        {
            "seed": SEED,
            "n_splits": int(n_splits),
            "order_source": "fixed_seq/train_ids.npy" if ("FIX_DIR" in globals() and (Path(globals()["FIX_DIR"]) / "train_ids.npy").exists()) else "df_train_meta.index",
            "artifacts": {
                "folds_csv": str(folds_csv),
                "folds_npz": str(npz_path),
                "report_txt": str(report_path),
            },
        },
        f,
        indent=2,
    )

print("\n[Stage 7] CV split OK")
print(f"- Saved: {folds_csv}")
print(f"- Saved: {npz_path}")
print(f"- Saved: {report_path}")
print(f"- Saved: {cfg_path}")
print("\n".join(lines[-(n_splits+2):]))  # show short tail

# Export globals for next stage
globals().update({
    "CV_DIR": cv_dir,
    "n_splits": n_splits,
    "train_ids_ordered": train_ids,  # matches fold_assign
    "y_ordered": y,
    "fold_assign": fold_assign,
    "folds": folds,
    "CV_FOLDS_CSV": folds_csv,
    "CV_FOLDS_NPZ": npz_path,
})

gc.collect()


[Stage 7] Building StratifiedKFold: n_splits=5 | N=3,043 | pos=148 | neg=2,895 | seed=2025

[Stage 7] CV split OK
- Saved: /kaggle/working/mallorn_run/artifacts/cv/cv_folds.csv
- Saved: /kaggle/working/mallorn_run/artifacts/cv/cv_folds.npz
- Saved: /kaggle/working/mallorn_run/artifacts/cv/cv_report.txt
- Saved: /kaggle/working/mallorn_run/artifacts/cv/cv_config.json
Total: N=3043 | pos=148 | neg=2895 | pos%=4.8636%
Per-fold distribution:
- fold 0: n=   609 | pos=   30 | neg=   579 | pos%= 4.9261%
- fold 1: n=   609 | pos=   30 | neg=   579 | pos%= 4.9261%
- fold 2: n=   609 | pos=   30 | neg=   579 | pos%= 4.9261%
- fold 3: n=   608 | pos=   29 | neg=   579 | pos%= 4.7697%
- fold 4: n=   608 | pos=   29 | neg=   579 | pos%= 4.7697%


33

# Train Multiband Event Transformer (CPU-Safe Configuration)

In [None]:
# ============================================================
# STAGE 8 — Train Multiband Event Transformer (CPU-Safe Configuration) (ONE CELL)
# - Kaggle Web (CPU only): small model, small batch, num_workers=0
# - Trains Stratified CV folds from STAGE 7
# - Saves best checkpoint per fold + OOF probabilities (for later threshold tuning)
#
# Requires globals from previous stages:
#   FIX_DIR, MAX_LEN, SEQ_FEATURE_NAMES
#   df_train_meta
#   n_splits, folds, train_ids_ordered, y_ordered
#   CKPT_DIR, OOF_DIR, LOG_DIR
# ============================================================

import os, gc, json, math, time, warnings
from pathlib import Path

import numpy as np
import pandas as pd

warnings.filterwarnings("ignore", category=FutureWarning)

# ----------------------------
# 0) Require previous stages
# ----------------------------
need = [
    "FIX_DIR","MAX_LEN","SEQ_FEATURE_NAMES",
    "df_train_meta",
    "n_splits","folds","train_ids_ordered","y_ordered",
    "CKPT_DIR","OOF_DIR","LOG_DIR",
]
for k in need:
    if k not in globals():
        raise RuntimeError(f"Missing `{k}`. Jalankan STAGE 0..7 dulu dengan urutan benar.")

# ----------------------------
# 1) Imports (torch) + CPU safety
# ----------------------------
try:
    import torch
    import torch.nn as nn
    import torch.nn.functional as F
except Exception as e:
    raise RuntimeError("PyTorch tidak tersedia di environment ini.") from e

SEED = int(globals().get("SEED", 2025))
torch.manual_seed(SEED)
np.random.seed(SEED)

device = torch.device("cpu")

# CPU thread guard (avoid oversubscription)
try:
    torch.set_num_threads(int(os.environ.get("OMP_NUM_THREADS", "2")))
    torch.set_num_interop_threads(1)
except Exception:
    pass

# ----------------------------
# 2) Open fixed-length memmaps (do NOT load into RAM)
# ----------------------------
FIX_DIR = Path(FIX_DIR)
train_ids = list(train_ids_ordered)
y = np.asarray(y_ordered, dtype=np.int8)
N = len(train_ids)
L = int(MAX_LEN)
Fdim = len(SEQ_FEATURE_NAMES)

train_X_path = FIX_DIR / "train_X.dat"
train_B_path = FIX_DIR / "train_B.dat"
train_M_path = FIX_DIR / "train_M.dat"

for p in [train_X_path, train_B_path, train_M_path]:
    if not p.exists():
        raise FileNotFoundError(f"Missing fixed cache file: {p}. Pastikan STAGE 6 sukses.")

X_mm = np.memmap(train_X_path, dtype=np.float32, mode="r", shape=(N, L, Fdim))
B_mm = np.memmap(train_B_path, dtype=np.int8,   mode="r", shape=(N, L))
M_mm = np.memmap(train_M_path, dtype=np.int8,   mode="r", shape=(N, L))

# ----------------------------
# 3) Build global features (object-level) aligned to train_ids
# ----------------------------
# Use only safe, available columns; fill missing with 0
G_COLS = ["Z", "Z_err", "EBV", "Z_missing", "Z_err_missing", "EBV_missing", "is_photoz"]
for c in G_COLS:
    if c not in df_train_meta.columns:
        df_train_meta[c] = 0.0

G = df_train_meta.loc[train_ids, G_COLS].copy()
for c in G_COLS:
    G[c] = pd.to_numeric(G[c], errors="coerce").fillna(0.0).astype(np.float32)

G_np = G.to_numpy(dtype=np.float32, copy=False)

# Simple standardization for numeric stability (avoid leakage: only within train set)
# Standardize all cols (including flags) is okay; flags become small values.
g_mean = G_np.mean(axis=0).astype(np.float32)
g_std  = G_np.std(axis=0).astype(np.float32)
g_std  = np.where(g_std < 1e-6, 1.0, g_std).astype(np.float32)
G_np_z = ((G_np - g_mean) / g_std).astype(np.float32)

# Save scaler for later test inference stage
scaler_path = Path(LOG_DIR) / "global_scaler.json"
with open(scaler_path, "w", encoding="utf-8") as f:
    json.dump({"cols": G_COLS, "mean": g_mean.tolist(), "std": g_std.tolist()}, f, indent=2)

# ----------------------------
# 4) Dataset / DataLoader (num_workers=0)
# ----------------------------
class MemmapSeqDataset(torch.utils.data.Dataset):
    def __init__(self, idx, X_mm, B_mm, M_mm, G_np_z, y=None):
        self.idx = np.asarray(idx, dtype=np.int32)
        self.X_mm = X_mm
        self.B_mm = B_mm
        self.M_mm = M_mm
        self.G = G_np_z
        self.y = None if y is None else np.asarray(y, dtype=np.int8)

    def __len__(self):
        return len(self.idx)

    def __getitem__(self, i):
        j = int(self.idx[i])
        X = self.X_mm[j]  # (L,F) float32
        B = self.B_mm[j]  # (L,) int8
        M = self.M_mm[j]  # (L,) int8
        G = self.G[j]     # (G,) float32
        if self.y is None:
            return (
                torch.from_numpy(X),
                torch.from_numpy(B.astype(np.int64, copy=False)),
                torch.from_numpy(M.astype(np.int64, copy=False)),
                torch.from_numpy(G),
            )
        else:
            y = self.y[j]
            return (
                torch.from_numpy(X),
                torch.from_numpy(B.astype(np.int64, copy=False)),
                torch.from_numpy(M.astype(np.int64, copy=False)),
                torch.from_numpy(G),
                torch.tensor(float(y), dtype=torch.float32),
            )

def make_loader(ds, batch_size, shuffle):
    return torch.utils.data.DataLoader(
        ds,
        batch_size=batch_size,
        shuffle=shuffle,
        num_workers=0,
        pin_memory=False,
        drop_last=False,
    )

# ----------------------------
# 5) Model: Multiband Event Transformer (CPU-safe small)
# ----------------------------
class MultibandEventTransformer(nn.Module):
    def __init__(self, feat_dim, n_bands=6, d_model=128, n_heads=4, n_layers=2, ff_mult=2, dropout=0.10, g_dim=7):
        super().__init__()
        self.feat_dim = feat_dim
        self.n_bands = n_bands
        self.d_model = d_model

        self.x_proj = nn.Linear(feat_dim, d_model)
        self.band_emb = nn.Embedding(n_bands, d_model)

        # learnable positional embedding (fixed MAX_LEN)
        self.pos_emb = nn.Parameter(torch.zeros(1, L, d_model))
        nn.init.normal_(self.pos_emb, mean=0.0, std=0.02)

        enc_layer = nn.TransformerEncoderLayer(
            d_model=d_model,
            nhead=n_heads,
            dim_feedforward=int(d_model * ff_mult),
            dropout=dropout,
            activation="gelu",
            batch_first=True,
            norm_first=True,
        )
        self.encoder = nn.TransformerEncoder(enc_layer, num_layers=n_layers)

        # Attention pooling
        self.attn = nn.Linear(d_model, 1)

        # Global feature fusion
        self.g_proj = nn.Sequential(
            nn.Linear(g_dim, d_model // 2),
            nn.GELU(),
            nn.Dropout(dropout),
        )

        self.head = nn.Sequential(
            nn.Linear(d_model + (d_model // 2), d_model),
            nn.GELU(),
            nn.Dropout(dropout),
            nn.Linear(d_model, 1),
        )

    def forward(self, X, band_id, mask, G):
        """
        X: (B,L,F) float
        band_id: (B,L) long in [0..5]
        mask: (B,L) long/int (1=real token, 0=pad)
        G: (B,g_dim) float
        """
        # ensure correct dtypes
        X = X.to(torch.float32)
        band_id = band_id.clamp(0, self.n_bands - 1).to(torch.long)
        mask = mask.to(torch.long)

        h = self.x_proj(X) + self.band_emb(band_id) + self.pos_emb[:, :X.shape[1], :]

        # Transformer expects src_key_padding_mask with True for pads
        pad_mask = (mask == 0)  # (B,L) bool
        h = self.encoder(h, src_key_padding_mask=pad_mask)

        # Attention pooling with mask
        a = self.attn(h).squeeze(-1)  # (B,L)
        a = a.masked_fill(pad_mask, -1e9)
        w = torch.softmax(a, dim=1)   # (B,L)
        pooled = torch.sum(h * w.unsqueeze(-1), dim=1)  # (B,d_model)

        g = self.g_proj(G.to(torch.float32))
        z = torch.cat([pooled, g], dim=1)
        logit = self.head(z).squeeze(-1)  # (B,)
        return logit

# ----------------------------
# 6) Training hyperparams (CPU-safe)
# ----------------------------
CFG = {
    "d_model": 128,
    "n_heads": 4,
    "n_layers": 2,
    "ff_mult": 2,
    "dropout": 0.10,
    "batch_size": 16,          # keep modest for CPU
    "grad_accum": 2,           # effective batch = 32
    "epochs": 10,
    "lr": 3e-4,
    "weight_decay": 0.01,
    "patience": 3,             # early stopping on val_loss
    "max_grad_norm": 1.0,
}
cfg_path = Path(LOG_DIR) / "train_cfg.json"
with open(cfg_path, "w", encoding="utf-8") as f:
    json.dump(CFG, f, indent=2)

# ----------------------------
# 7) Loss: imbalance handling
# ----------------------------
pos = int((y == 1).sum())
neg = int((y == 0).sum())
if pos == 0:
    raise RuntimeError("No positive samples in training. Tidak bisa training classifier.")
pos_weight = float(neg / max(pos, 1))
pos_weight_t = torch.tensor([pos_weight], dtype=torch.float32, device=device)
criterion = nn.BCEWithLogitsLoss(pos_weight=pos_weight_t)

print("[Stage 8] TRAIN CONFIG (CPU)")
print(f"- N={N:,} | pos={pos:,} | neg={neg:,} | pos_weight={pos_weight:.4f}")
print(f"- Model: d_model={CFG['d_model']} heads={CFG['n_heads']} layers={CFG['n_layers']} dropout={CFG['dropout']}")
print(f"- Batch={CFG['batch_size']} grad_accum={CFG['grad_accum']} epochs={CFG['epochs']} lr={CFG['lr']}")
print(f"- Saved cfg: {cfg_path}")
print(f"- Saved global scaler: {scaler_path}")

# ----------------------------
# 8) Metrics helpers (no sklearn dependency)
# ----------------------------
def sigmoid_np(x):
    x = np.clip(x, -50, 50)
    return 1.0 / (1.0 + np.exp(-x))

def f1_binary(y_true, y_pred01):
    y_true = y_true.astype(np.int32)
    y_pred01 = y_pred01.astype(np.int32)
    tp = int(((y_true == 1) & (y_pred01 == 1)).sum())
    fp = int(((y_true == 0) & (y_pred01 == 1)).sum())
    fn = int(((y_true == 1) & (y_pred01 == 0)).sum())
    if tp == 0:
        return 0.0
    prec = tp / max(tp + fp, 1)
    rec  = tp / max(tp + fn, 1)
    if prec + rec == 0:
        return 0.0
    return float(2 * prec * rec / (prec + rec))

@torch.no_grad()
def eval_model(model, loader):
    model.eval()
    losses = []
    logits_all = []
    y_all = []
    for batch in loader:
        Xb, Bb, Mb, Gb, yb = batch
        Xb = Xb.to(device)
        Bb = Bb.to(device)
        Mb = Mb.to(device)
        Gb = Gb.to(device)
        yb = yb.to(device)

        logit = model(Xb, Bb, Mb, Gb)
        loss = criterion(logit, yb)
        losses.append(float(loss.item()))
        logits_all.append(logit.detach().cpu().numpy())
        y_all.append(yb.detach().cpu().numpy())

    logits_all = np.concatenate(logits_all, axis=0) if logits_all else np.zeros((0,), dtype=np.float32)
    y_all = np.concatenate(y_all, axis=0).astype(np.int8) if y_all else np.zeros((0,), dtype=np.int8)

    probs = sigmoid_np(logits_all)
    pred01 = (probs >= 0.5).astype(np.int8)
    f1 = f1_binary(y_all, pred01)
    return float(np.mean(losses) if losses else np.nan), probs, y_all, f1

# ----------------------------
# 9) Train CV
# ----------------------------
OOF_DIR = Path(OOF_DIR); OOF_DIR.mkdir(parents=True, exist_ok=True)
CKPT_DIR = Path(CKPT_DIR); CKPT_DIR.mkdir(parents=True, exist_ok=True)
LOG_DIR = Path(LOG_DIR); LOG_DIR.mkdir(parents=True, exist_ok=True)

oof_prob = np.zeros((N,), dtype=np.float32)
fold_metrics = []

# Precompute all indices
all_idx = np.arange(N, dtype=np.int32)

start_time = time.time()
for fold_info in folds:
    fold = int(fold_info["fold"])
    val_idx = np.asarray(fold_info["val_idx"], dtype=np.int32)
    val_mask = np.zeros(N, dtype=bool)
    val_mask[val_idx] = True
    tr_idx = all_idx[~val_mask]

    print(f"\n[Stage 8] FOLD {fold}/{n_splits-1} | train={len(tr_idx):,} val={len(val_idx):,}")

    # Datasets/Loaders
    ds_tr = MemmapSeqDataset(tr_idx, X_mm, B_mm, M_mm, G_np_z, y=y)
    ds_va = MemmapSeqDataset(val_idx, X_mm, B_mm, M_mm, G_np_z, y=y)

    dl_tr = make_loader(ds_tr, batch_size=CFG["batch_size"], shuffle=True)
    dl_va = make_loader(ds_va, batch_size=CFG["batch_size"], shuffle=False)

    # Model + optim
    model = MultibandEventTransformer(
        feat_dim=Fdim,
        n_bands=6,
        d_model=CFG["d_model"],
        n_heads=CFG["n_heads"],
        n_layers=CFG["n_layers"],
        ff_mult=CFG["ff_mult"],
        dropout=CFG["dropout"],
        g_dim=G_np_z.shape[1],
    ).to(device)

    opt = torch.optim.AdamW(model.parameters(), lr=CFG["lr"], weight_decay=CFG["weight_decay"])

    best_val_loss = float("inf")
    best_epoch = -1
    best_probs = None
    patience_left = int(CFG["patience"])

    step = 0
    for epoch in range(1, int(CFG["epochs"]) + 1):
        model.train()
        opt.zero_grad(set_to_none=True)

        running = 0.0
        n_batches = 0

        for batch in dl_tr:
            Xb, Bb, Mb, Gb, yb = batch
            Xb = Xb.to(device)
            Bb = Bb.to(device)
            Mb = Mb.to(device)
            Gb = Gb.to(device)
            yb = yb.to(device)

            logit = model(Xb, Bb, Mb, Gb)
            loss = criterion(logit, yb)
            loss = loss / float(CFG["grad_accum"])
            loss.backward()

            if (step + 1) % int(CFG["grad_accum"]) == 0:
                if CFG["max_grad_norm"] is not None:
                    torch.nn.utils.clip_grad_norm_(model.parameters(), float(CFG["max_grad_norm"]))
                opt.step()
                opt.zero_grad(set_to_none=True)

            running += float(loss.item()) * float(CFG["grad_accum"])
            n_batches += 1
            step += 1

        train_loss = running / max(n_batches, 1)

        # Validate
        val_loss, probs, y_val, f1_05 = eval_model(model, dl_va)

        improved = val_loss < (best_val_loss - 1e-6)
        if improved:
            best_val_loss = val_loss
            best_epoch = epoch
            best_probs = probs.copy()

            ckpt_path = CKPT_DIR / f"fold_{fold}.pt"
            torch.save(
                {
                    "fold": fold,
                    "epoch": epoch,
                    "model_state": model.state_dict(),
                    "cfg": CFG,
                    "seq_feature_names": SEQ_FEATURE_NAMES,
                    "max_len": L,
                    "global_cols": G_COLS,
                    "global_scaler": {"mean": g_mean, "std": g_std},
                },
                ckpt_path,
            )
            patience_left = int(CFG["patience"])
        else:
            patience_left -= 1

        print(f"  epoch {epoch:02d} | train_loss={train_loss:.5f} | val_loss={val_loss:.5f} | f1@0.5={f1_05:.4f} | best_epoch={best_epoch} | patience_left={patience_left}")

        if patience_left <= 0:
            break

    if best_probs is None:
        raise RuntimeError(f"Fold {fold}: best_probs is None (unexpected).")

    # Fill OOF
    oof_prob[val_idx] = best_probs.astype(np.float32)

    # Fold summary
    pred01 = (best_probs >= 0.5).astype(np.int8)
    best_f1_05 = f1_binary(y[val_idx], pred01)

    fold_metrics.append({
        "fold": fold,
        "val_size": int(len(val_idx)),
        "best_epoch": int(best_epoch),
        "best_val_loss": float(best_val_loss),
        "f1_at_0p5": float(best_f1_05),
    })

    # Cleanup
    del model, opt, ds_tr, ds_va, dl_tr, dl_va
    gc.collect()

elapsed = time.time() - start_time

# ----------------------------
# 10) Save OOF artifacts
# ----------------------------
oof_path_npy = OOF_DIR / "oof_prob.npy"
np.save(oof_path_npy, oof_prob)

# Also save CSV for convenience
df_oof = pd.DataFrame({
    "object_id": train_ids,
    "target": y.astype(int),
    "oof_prob": oof_prob.astype(np.float32),
})
oof_path_csv = OOF_DIR / "oof_prob.csv"
df_oof.to_csv(oof_path_csv, index=False)

metrics_path = OOF_DIR / "fold_metrics.json"
with open(metrics_path, "w", encoding="utf-8") as f:
    json.dump({"fold_metrics": fold_metrics, "elapsed_sec": float(elapsed)}, f, indent=2)

# quick overall f1@0.5 on OOF (not final; threshold tuning comes later)
oof_pred01 = (oof_prob >= 0.5).astype(np.int8)
oof_f1_05 = f1_binary(y, oof_pred01)

print("\n[Stage 8] CV TRAIN DONE")
print(f"- elapsed: {elapsed/60:.2f} min")
print(f"- OOF saved: {oof_path_npy}")
print(f"- OOF saved: {oof_path_csv}")
print(f"- fold metrics: {metrics_path}")
print(f"- OOF f1@0.5 (rough): {oof_f1_05:.4f}")

# Export globals for next stages (threshold tuning + test inference)
globals().update({
    "oof_prob": oof_prob,
    "OOF_PROB_PATH": oof_path_npy,
    "OOF_CSV_PATH": oof_path_csv,
    "FOLD_METRICS_PATH": metrics_path,
    "GLOBAL_SCALER_PATH": scaler_path,
    "TRAIN_CFG_PATH": cfg_path,
})


[Stage 8] TRAIN CONFIG (CPU)
- N=3,043 | pos=148 | neg=2,895 | pos_weight=19.5608
- Model: d_model=128 heads=4 layers=2 dropout=0.1
- Batch=16 grad_accum=2 epochs=10 lr=0.0003
- Saved cfg: /kaggle/working/mallorn_run/logs/train_cfg.json
- Saved global scaler: /kaggle/working/mallorn_run/logs/global_scaler.json

[Stage 8] FOLD 0/4 | train=2,434 val=609


  torch.from_numpy(X),


  epoch 01 | train_loss=1.35337 | val_loss=1.30779 | f1@0.5=0.0986 | best_epoch=1 | patience_left=3
  epoch 02 | train_loss=1.33828 | val_loss=1.31824 | f1@0.5=0.0000 | best_epoch=1 | patience_left=2
  epoch 03 | train_loss=1.31927 | val_loss=1.31393 | f1@0.5=0.0990 | best_epoch=1 | patience_left=1
  epoch 04 | train_loss=1.32508 | val_loss=1.32877 | f1@0.5=0.0997 | best_epoch=1 | patience_left=0

[Stage 8] FOLD 1/4 | train=2,434 val=609




# OOF Prediction + Threshold Tuning

In [None]:
# ============================================================
# STAGE 9 — OOF Prediction + Threshold Tuning (ONE CELL, Kaggle CPU-SAFE)
#
# Tujuan:
# - Pakai oof_prob (STAGE 8) untuk cari threshold terbaik (maksimalkan F1)
# - Simpan:
#   * best_threshold
#   * curve ringkas (opsional)
#   * report txt/json
#
# Output:
# - /kaggle/working/mallorn_run/oof/threshold_tuning.json
# - /kaggle/working/mallorn_run/oof/threshold_report.txt
# - globals: BEST_THR, thr_table
# ============================================================

import gc, json, warnings
from pathlib import Path
import numpy as np
import pandas as pd

warnings.filterwarnings("ignore", category=FutureWarning)

# ----------------------------
# 0) Require previous stages
# ----------------------------
need = ["OOF_DIR", "df_train_meta"]
for k in need:
    if k not in globals():
        raise RuntimeError(f"Missing `{k}`. Jalankan STAGE 0..8 dulu.")

OOF_DIR = Path(OOF_DIR)
OOF_DIR.mkdir(parents=True, exist_ok=True)

# Load OOF probabilities
oof_prob = None
if "oof_prob" in globals():
    oof_prob = np.asarray(globals()["oof_prob"], dtype=np.float32)
else:
    p = OOF_DIR / "oof_prob.npy"
    if not p.exists():
        raise FileNotFoundError("OOF prob not found. Jalankan STAGE 8 (training) dulu.")
    oof_prob = np.load(p).astype(np.float32)

# Align y ordering: prefer train_ids_ordered if available
if "train_ids_ordered" in globals():
    train_ids = list(globals()["train_ids_ordered"])
    y = df_train_meta.loc[train_ids, "target"].to_numpy(dtype=np.int8, copy=False)
else:
    # fallback: df_train_meta order must match oof_prob length; if not, fail
    if len(oof_prob) != len(df_train_meta):
        raise RuntimeError("Cannot align y: missing train_ids_ordered and lengths mismatch.")
    y = df_train_meta["target"].to_numpy(dtype=np.int8, copy=False)

if len(oof_prob) != len(y):
    raise RuntimeError(f"Length mismatch: oof_prob={len(oof_prob)} vs y={len(y)}")

# ----------------------------
# 1) F1 metric (binary)
# ----------------------------
def f1_binary(y_true, y_pred01):
    y_true = y_true.astype(np.int32)
    y_pred01 = y_pred01.astype(np.int32)
    tp = int(((y_true == 1) & (y_pred01 == 1)).sum())
    fp = int(((y_true == 0) & (y_pred01 == 1)).sum())
    fn = int(((y_true == 1) & (y_pred01 == 0)).sum())
    if tp == 0:
        return 0.0
    prec = tp / max(tp + fp, 1)
    rec  = tp / max(tp + fn, 1)
    if prec + rec == 0:
        return 0.0
    return float(2 * prec * rec / (prec + rec))

def precision_recall(y_true, y_pred01):
    y_true = y_true.astype(np.int32)
    y_pred01 = y_pred01.astype(np.int32)
    tp = int(((y_true == 1) & (y_pred01 == 1)).sum())
    fp = int(((y_true == 0) & (y_pred01 == 1)).sum())
    fn = int(((y_true == 1) & (y_pred01 == 0)).sum())
    prec = tp / max(tp + fp, 1)
    rec  = tp / max(tp + fn, 1)
    return float(prec), float(rec), tp, fp, fn

# ----------------------------
# 2) Threshold sweep (fast + robust)
# ----------------------------
# Option A: sweep fixed grid
grid = np.concatenate([
    np.linspace(0.01, 0.10, 19),
    np.linspace(0.10, 0.90, 81),
    np.linspace(0.90, 0.99, 19),
]).astype(np.float32)

# Option B: also evaluate thresholds at unique prob quantiles (more adaptive)
qs = np.linspace(0.01, 0.99, 99, dtype=np.float32)
quant_thr = np.quantile(oof_prob, qs).astype(np.float32)
thr_candidates = np.unique(np.clip(np.concatenate([grid, quant_thr]), 0.0, 1.0))

best = {"thr": 0.5, "f1": -1.0, "prec": 0.0, "rec": 0.0, "tp": 0, "fp": 0, "fn": 0, "pos_pred": 0}

rows = []
for thr in thr_candidates:
    pred = (oof_prob >= thr).astype(np.int8)
    f1 = f1_binary(y, pred)
    prec, rec, tp, fp, fn = precision_recall(y, pred)
    pos_pred = int(pred.sum())
    rows.append((float(thr), float(f1), float(prec), float(rec), int(tp), int(fp), int(fn), pos_pred))

    # tie-breaker: prefer higher recall if F1 equal, then fewer false positives
    if (f1 > best["f1"] + 1e-12) or (
        abs(f1 - best["f1"]) <= 1e-12 and (rec > best["rec"] + 1e-12)
    ) or (
        abs(f1 - best["f1"]) <= 1e-12 and abs(rec - best["rec"]) <= 1e-12 and (fp < best["fp"])
    ):
        best.update({"thr": float(thr), "f1": float(f1), "prec": float(prec), "rec": float(rec),
                     "tp": int(tp), "fp": int(fp), "fn": int(fn), "pos_pred": int(pos_pred)})

thr_table = pd.DataFrame(rows, columns=["thr","f1","precision","recall","tp","fp","fn","pos_pred"])
thr_table = thr_table.sort_values(["f1","recall","precision"], ascending=[False, False, False]).reset_index(drop=True)

BEST_THR = float(best["thr"])

# ----------------------------
# 3) Additional sanity: compare to default 0.5
# ----------------------------
pred05 = (oof_prob >= 0.5).astype(np.int8)
f1_05 = f1_binary(y, pred05)
prec_05, rec_05, tp_05, fp_05, fn_05 = precision_recall(y, pred05)

# ----------------------------
# 4) Save report
# ----------------------------
out_json = OOF_DIR / "threshold_tuning.json"
out_txt  = OOF_DIR / "threshold_report.txt"
out_csv  = OOF_DIR / "threshold_table_top200.csv"

payload = {
    "best_threshold": BEST_THR,
    "best_f1": best["f1"],
    "best_precision": best["prec"],
    "best_recall": best["rec"],
    "best_counts": {"tp": best["tp"], "fp": best["fp"], "fn": best["fn"], "pos_pred": best["pos_pred"]},
    "baseline_thr_0p5": {"f1": f1_05, "precision": prec_05, "recall": rec_05, "tp": tp_05, "fp": fp_05, "fn": fn_05, "pos_pred": int(pred05.sum())},
    "n_objects": int(len(y)),
    "pos": int((y == 1).sum()),
    "neg": int((y == 0).sum()),
}

with open(out_json, "w", encoding="utf-8") as f:
    json.dump(payload, f, indent=2)

lines = []
lines.append("OOF Threshold Tuning Report")
lines.append(f"- N={payload['n_objects']} | pos={payload['pos']} | neg={payload['neg']} | pos%={payload['pos']/max(payload['n_objects'],1)*100:.4f}%")
lines.append("")
lines.append("Baseline @ thr=0.5")
lines.append(f"- F1={f1_05:.6f} | P={prec_05:.6f} | R={rec_05:.6f} | tp={tp_05} fp={fp_05} fn={fn_05} | pos_pred={int(pred05.sum())}")
lines.append("")
lines.append(f"BEST @ thr={BEST_THR:.6f}")
lines.append(f"- F1={best['f1']:.6f} | P={best['prec']:.6f} | R={best['rec']:.6f} | tp={best['tp']} fp={best['fp']} fn={best['fn']} | pos_pred={best['pos_pred']}")
lines.append("")
lines.append("Top 10 thresholds by (F1, recall, precision):")
for i in range(min(10, len(thr_table))):
    r = thr_table.iloc[i]
    lines.append(
        f"{i+1:02d}. thr={r['thr']:.6f} | f1={r['f1']:.6f} | P={r['precision']:.6f} | R={r['recall']:.6f} | tp={int(r['tp'])} fp={int(r['fp'])} fn={int(r['fn'])} | pos_pred={int(r['pos_pred'])}"
    )

with open(out_txt, "w", encoding="utf-8") as f:
    f.write("\n".join(lines) + "\n")

thr_table.head(200).to_csv(out_csv, index=False)

print("[Stage 9] Threshold tuning DONE")
print(f"- Best threshold: {BEST_THR:.6f}")
print(f"- Best F1:        {best['f1']:.6f}  (P={best['prec']:.6f} R={best['rec']:.6f})")
print(f"- Baseline F1@0.5:{f1_05:.6f}  (P={prec_05:.6f} R={rec_05:.6f})")
print(f"- Saved: {out_json}")
print(f"- Saved: {out_txt}")
print(f"- Saved: {out_csv}")

# Export globals for next stages (test inference + submission)
globals().update({
    "BEST_THR": BEST_THR,
    "thr_table": thr_table,
    "THR_JSON_PATH": out_json,
    "THR_REPORT_PATH": out_txt,
})

gc.collect()


# Test Inference (Fold Ensemble)

In [None]:
# ============================================================
# STAGE 10 — Test Inference (Fold Ensemble) (ONE CELL, Kaggle CPU-SAFE)
#
# Tujuan:
# - Load fixed-length TEST cache (memmap) dari STAGE 6
# - Load checkpoint fold_*.pt dari STAGE 8
# - Predict probability untuk test set per fold
# - Ensemble (mean) antar fold -> test_prob_ens.npy + test_prob_ens.csv
#
# Output:
# - /kaggle/working/mallorn_run/artifacts/test_prob_fold.npy    (N_test, n_folds)
# - /kaggle/working/mallorn_run/artifacts/test_prob_ens.npy     (N_test,)
# - /kaggle/working/mallorn_run/artifacts/test_prob_ens.csv     (object_id, prob)
# - globals: test_prob_ens
# ============================================================

import os, gc, json, warnings
from pathlib import Path

import numpy as np
import pandas as pd

warnings.filterwarnings("ignore", category=FutureWarning)

# ----------------------------
# 0) Require previous stages
# ----------------------------
need = [
    "FIX_DIR","MAX_LEN","SEQ_FEATURE_NAMES",
    "df_test_meta",
    "CKPT_DIR","LOG_DIR",
    "n_splits",
]
for k in need:
    if k not in globals():
        raise RuntimeError(f"Missing `{k}`. Jalankan STAGE 0..9 dulu.")

# Torch
try:
    import torch
    import torch.nn as nn
    import torch.nn.functional as F
except Exception as e:
    raise RuntimeError("PyTorch tidak tersedia di environment ini.") from e

device = torch.device("cpu")
SEED = int(globals().get("SEED", 2025))
torch.manual_seed(SEED)
np.random.seed(SEED)

# Thread guard
try:
    torch.set_num_threads(int(os.environ.get("OMP_NUM_THREADS", "2")))
    torch.set_num_interop_threads(1)
except Exception:
    pass

FIX_DIR = Path(FIX_DIR)
ART_DIR = Path(globals()["ART_DIR"])
ART_DIR.mkdir(parents=True, exist_ok=True)

# ----------------------------
# 1) Load TEST ordering (must match STAGE 6)
# ----------------------------
test_ids_path = FIX_DIR / "test_ids.npy"
if not test_ids_path.exists():
    raise FileNotFoundError(f"Missing: {test_ids_path}. Pastikan STAGE 6 sukses.")
test_ids = np.load(test_ids_path, allow_pickle=False).astype("S").astype(str).tolist()
NTE = len(test_ids)

# ----------------------------
# 2) Open fixed-length TEST memmaps
# ----------------------------
Fdim = len(SEQ_FEATURE_NAMES)
L = int(MAX_LEN)

test_X_path = FIX_DIR / "test_X.dat"
test_B_path = FIX_DIR / "test_B.dat"
test_M_path = FIX_DIR / "test_M.dat"

for p in [test_X_path, test_B_path, test_M_path]:
    if not p.exists():
        raise FileNotFoundError(f"Missing fixed cache file: {p}. Pastikan STAGE 6 sukses.")

Xte = np.memmap(test_X_path, dtype=np.float32, mode="r", shape=(NTE, L, Fdim))
Bte = np.memmap(test_B_path, dtype=np.int8,   mode="r", shape=(NTE, L))
Mte = np.memmap(test_M_path, dtype=np.int8,   mode="r", shape=(NTE, L))

# ----------------------------
# 3) Load global scaler + build standardized global features for TEST
# ----------------------------
# Prefer global scaler from STAGE 8
scaler_json = Path(globals().get("GLOBAL_SCALER_PATH", Path(LOG_DIR) / "global_scaler.json"))
if not scaler_json.exists():
    raise FileNotFoundError(f"Missing global scaler json: {scaler_json}. Jalankan STAGE 8 dulu.")

with open(scaler_json, "r", encoding="utf-8") as f:
    scaler = json.load(f)
G_COLS = scaler["cols"]
g_mean = np.asarray(scaler["mean"], dtype=np.float32)
g_std  = np.asarray(scaler["std"], dtype=np.float32)
g_std  = np.where(g_std < 1e-6, 1.0, g_std).astype(np.float32)

# Ensure columns exist
for c in G_COLS:
    if c not in df_test_meta.columns:
        df_test_meta[c] = 0.0

G = df_test_meta.loc[test_ids, G_COLS].copy()
for c in G_COLS:
    G[c] = pd.to_numeric(G[c], errors="coerce").fillna(0.0).astype(np.float32)

G_np = G.to_numpy(dtype=np.float32, copy=False)
G_np_z = ((G_np - g_mean) / g_std).astype(np.float32)

# ----------------------------
# 4) Dataset/Loader for inference
# ----------------------------
class TestMemmapDataset(torch.utils.data.Dataset):
    def __init__(self, Xmm, Bmm, Mmm, G_np_z):
        self.Xmm = Xmm
        self.Bmm = Bmm
        self.Mmm = Mmm
        self.G = G_np_z

    def __len__(self):
        return self.Xmm.shape[0]

    def __getitem__(self, i):
        X = self.Xmm[i]
        B = self.Bmm[i].astype(np.int64, copy=False)
        M = self.Mmm[i].astype(np.int64, copy=False)
        G = self.G[i]
        return (
            torch.from_numpy(X),
            torch.from_numpy(B),
            torch.from_numpy(M),
            torch.from_numpy(G),
        )

def make_loader(ds, batch_size=64):
    return torch.utils.data.DataLoader(
        ds,
        batch_size=batch_size,
        shuffle=False,
        num_workers=0,
        pin_memory=False,
        drop_last=False,
    )

# ----------------------------
# 5) Model definition (must match STAGE 8)
# ----------------------------
class MultibandEventTransformer(nn.Module):
    def __init__(self, feat_dim, n_bands=6, d_model=128, n_heads=4, n_layers=2, ff_mult=2, dropout=0.10, g_dim=7, max_len=512):
        super().__init__()
        self.feat_dim = feat_dim
        self.n_bands = n_bands
        self.d_model = d_model
        self.max_len = max_len

        self.x_proj = nn.Linear(feat_dim, d_model)
        self.band_emb = nn.Embedding(n_bands, d_model)

        self.pos_emb = nn.Parameter(torch.zeros(1, max_len, d_model))
        nn.init.normal_(self.pos_emb, mean=0.0, std=0.02)

        enc_layer = nn.TransformerEncoderLayer(
            d_model=d_model,
            nhead=n_heads,
            dim_feedforward=int(d_model * ff_mult),
            dropout=dropout,
            activation="gelu",
            batch_first=True,
            norm_first=True,
        )
        self.encoder = nn.TransformerEncoder(enc_layer, num_layers=n_layers)

        self.attn = nn.Linear(d_model, 1)

        self.g_proj = nn.Sequential(
            nn.Linear(g_dim, d_model // 2),
            nn.GELU(),
            nn.Dropout(dropout),
        )

        self.head = nn.Sequential(
            nn.Linear(d_model + (d_model // 2), d_model),
            nn.GELU(),
            nn.Dropout(dropout),
            nn.Linear(d_model, 1),
        )

    def forward(self, X, band_id, mask, G):
        X = X.to(torch.float32)
        band_id = band_id.clamp(0, self.n_bands - 1).to(torch.long)
        mask = mask.to(torch.long)

        h = self.x_proj(X) + self.band_emb(band_id) + self.pos_emb[:, :X.shape[1], :]

        pad_mask = (mask == 0)
        h = self.encoder(h, src_key_padding_mask=pad_mask)

        a = self.attn(h).squeeze(-1)
        a = a.masked_fill(pad_mask, -1e9)
        w = torch.softmax(a, dim=1)
        pooled = torch.sum(h * w.unsqueeze(-1), dim=1)

        g = self.g_proj(G.to(torch.float32))
        z = torch.cat([pooled, g], dim=1)
        logit = self.head(z).squeeze(-1)
        return logit

def sigmoid_np(x):
    x = np.clip(x, -50, 50)
    return 1.0 / (1.0 + np.exp(-x))

@torch.no_grad()
def predict_probs(model, loader):
    model.eval()
    outs = []
    for batch in loader:
        Xb, Bb, Mb, Gb = batch
        Xb = Xb.to(device)
        Bb = Bb.to(device)
        Mb = Mb.to(device)
        Gb = Gb.to(device)
        logit = model(Xb, Bb, Mb, Gb)
        outs.append(logit.detach().cpu().numpy())
    logits = np.concatenate(outs, axis=0) if outs else np.zeros((0,), dtype=np.float32)
    return sigmoid_np(logits).astype(np.float32)

# ----------------------------
# 6) Load fold checkpoints + infer
# ----------------------------
CKPT_DIR = Path(CKPT_DIR)
ckpts = []
for f in range(int(n_splits)):
    p = CKPT_DIR / f"fold_{f}.pt"
    if not p.exists():
        raise FileNotFoundError(f"Missing checkpoint: {p}. Pastikan STAGE 8 menyimpan ckpt per fold.")
    ckpts.append(p)

# Small batch for CPU safety (increase if fast enough)
BATCH_SIZE = 64

ds_test = TestMemmapDataset(Xte, Bte, Mte, G_np_z)
dl_test = make_loader(ds_test, batch_size=BATCH_SIZE)

test_prob_folds = np.zeros((NTE, int(n_splits)), dtype=np.float32)

print(f"[Stage 10] Test inference: N_test={NTE:,} | folds={n_splits} | batch={BATCH_SIZE} (CPU)")
for fold, ckpt_path in enumerate(ckpts):
    ckpt = torch.load(ckpt_path, map_location="cpu")

    cfg = ckpt.get("cfg", {})
    d_model  = int(cfg.get("d_model", 128))
    n_heads  = int(cfg.get("n_heads", 4))
    n_layers = int(cfg.get("n_layers", 2))
    ff_mult  = int(cfg.get("ff_mult", 2))
    dropout  = float(cfg.get("dropout", 0.10))

    model = MultibandEventTransformer(
        feat_dim=Fdim,
        n_bands=6,
        d_model=d_model,
        n_heads=n_heads,
        n_layers=n_layers,
        ff_mult=ff_mult,
        dropout=dropout,
        g_dim=G_np_z.shape[1],
        max_len=L,
    ).to(device)

    model.load_state_dict(ckpt["model_state"], strict=True)

    probs = predict_probs(model, dl_test)
    if len(probs) != NTE:
        raise RuntimeError(f"Fold {fold}: probs length mismatch {len(probs)} vs {NTE}")

    test_prob_folds[:, fold] = probs
    print(f"  fold {fold}: prob_mean={float(probs.mean()):.6f} | prob_std={float(probs.std()):.6f}")

    del model, ckpt, probs
    gc.collect()

# Ensemble mean
test_prob_ens = test_prob_folds.mean(axis=1).astype(np.float32)

# ----------------------------
# 7) Save artifacts
# ----------------------------
fold_path = ART_DIR / "test_prob_fold.npy"
ens_path  = ART_DIR / "test_prob_ens.npy"
csv_path  = ART_DIR / "test_prob_ens.csv"

np.save(fold_path, test_prob_folds)
np.save(ens_path, test_prob_ens)

df_pred = pd.DataFrame({"object_id": test_ids, "prob": test_prob_ens})
df_pred.to_csv(csv_path, index=False)

print("\n[Stage 10] DONE")
print(f"- Saved fold probs: {fold_path}")
print(f"- Saved ens probs : {ens_path}")
print(f"- Saved csv       : {csv_path}")
print(f"- ens mean={float(test_prob_ens.mean()):.6f} | std={float(test_prob_ens.std()):.6f} | min={float(test_prob_ens.min()):.6f} | max={float(test_prob_ens.max()):.6f}")

# Export globals for submission
globals().update({
    "test_ids": test_ids,
    "test_prob_folds": test_prob_folds,
    "test_prob_ens": test_prob_ens,
    "TEST_PROB_FOLD_PATH": fold_path,
    "TEST_PROB_ENS_PATH": ens_path,
    "TEST_PROB_CSV_PATH": csv_path,
})

gc.collect()


# Submission Build

In [None]:
# ============================================================
# STAGE 11 — Submission Build (ONE CELL, Kaggle CPU-SAFE)
#
# Tujuan:
# - Ambil test_prob_ens (STAGE 10) + threshold BEST_THR (STAGE 9, optional)
# - Buat submission.csv sesuai sample_submission.csv
# - Strict checks: order, missing ids, duplicates, NaN/inf
#
# Output:
# - /kaggle/working/submission.csv
# - /kaggle/working/mallorn_run/submissions/submission.csv (copy)
# - /kaggle/working/mallorn_run/submissions/submission_proba.csv (optional)
# ============================================================

import gc, warnings
from pathlib import Path
import numpy as np
import pandas as pd

warnings.filterwarnings("ignore", category=FutureWarning)

# ----------------------------
# 0) Require previous stages
# ----------------------------
for need in ["PATHS", "SUB_DIR"]:
    if need not in globals():
        raise RuntimeError(f"Missing `{need}`. Jalankan STAGE 0 dulu (setup).")

sample_path = Path(PATHS["SAMPLE_SUB"])
if not sample_path.exists():
    raise FileNotFoundError(f"Missing sample_submission.csv: {sample_path}")

df_sub = pd.read_csv(sample_path)
if not {"object_id", "prediction"}.issubset(df_sub.columns):
    raise ValueError(f"sample_submission must have columns object_id,prediction. Found: {list(df_sub.columns)}")

# Get test probabilities
test_prob = None
test_ids = None

if "test_prob_ens" in globals() and "test_ids" in globals():
    test_prob = np.asarray(globals()["test_prob_ens"], dtype=np.float32)
    test_ids = list(globals()["test_ids"])
else:
    # fallback to saved artifact
    if "TEST_PROB_ENS_PATH" in globals():
        p = Path(globals()["TEST_PROB_ENS_PATH"])
        if p.exists():
            test_prob = np.load(p).astype(np.float32)
    if test_prob is None:
        # try default location
        p = Path(globals().get("ART_DIR", "/kaggle/working")) / "test_prob_ens.npy"
        if p.exists():
            test_prob = np.load(p).astype(np.float32)
    # ids
    fix_dir = Path(globals().get("FIX_DIR", "/kaggle/working/mallorn_run/artifacts/fixed_seq"))
    pids = fix_dir / "test_ids.npy"
    if pids.exists():
        test_ids = np.load(pids, allow_pickle=False).astype("S").astype(str).tolist()

if test_prob is None or test_ids is None:
    raise RuntimeError("Missing test predictions. Jalankan STAGE 10 dulu (Test Inference).")

if len(test_prob) != len(test_ids):
    raise RuntimeError(f"Length mismatch: test_prob={len(test_prob)} vs test_ids={len(test_ids)}")

# Determine threshold (optional)
thr = float(globals().get("BEST_THR", 0.5))

# ----------------------------
# 1) Build mapping object_id -> prob
# ----------------------------
df_pred = pd.DataFrame({"object_id": test_ids, "prob": test_prob})
# strict checks
if df_pred["object_id"].duplicated().any():
    dup = df_pred.loc[df_pred["object_id"].duplicated(), "object_id"].iloc[:5].tolist()
    raise ValueError(f"Duplicated object_id in predictions (examples): {dup}")

# sanitize probs
p = df_pred["prob"].to_numpy(dtype=np.float32, copy=False)
if not np.isfinite(p).all():
    bad = int((~np.isfinite(p)).sum())
    raise ValueError(f"Found non-finite probabilities in test predictions: {bad} rows")
p = np.clip(p, 0.0, 1.0)
df_pred["prob"] = p

# ----------------------------
# 2) Align to sample_submission order
# ----------------------------
df_sub["object_id"] = df_sub["object_id"].astype(str).str.strip()
df_pred["object_id"] = df_pred["object_id"].astype(str).str.strip()

# merge in sample order
df_out = df_sub[["object_id"]].merge(df_pred, on="object_id", how="left")

if df_out["prob"].isna().any():
    missing_n = int(df_out["prob"].isna().sum())
    miss_ids = df_out.loc[df_out["prob"].isna(), "object_id"].iloc[:5].tolist()
    raise ValueError(
        f"Some sample_submission object_id have no prediction: {missing_n} missing. "
        f"Examples: {miss_ids}"
    )

# Build final prediction column:
# If competition expects PROBABILITY: set prediction=prob
# If expects BINARY: set prediction=(prob>=thr)
# Default: probability (safer for most Kaggle binary classification submissions).
SUBMISSION_MODE = "prob"  # choose: "prob" or "binary"
if SUBMISSION_MODE == "binary":
    df_out["prediction"] = (df_out["prob"].to_numpy(dtype=np.float32) >= np.float32(thr)).astype(np.int8)
else:
    df_out["prediction"] = df_out["prob"].astype(np.float32)

df_out = df_out[["object_id", "prediction"]]

# final checks
if df_out["object_id"].duplicated().any():
    raise ValueError("submission has duplicate object_id (unexpected).")
if len(df_out) != len(df_sub):
    raise RuntimeError("submission row count mismatch with sample_submission.")

# ----------------------------
# 3) Write submission files
# ----------------------------
SUB_DIR = Path(SUB_DIR)
SUB_DIR.mkdir(parents=True, exist_ok=True)

out_main = Path("/kaggle/working/submission.csv")
out_copy = SUB_DIR / "submission.csv"
out_proba = SUB_DIR / "submission_proba.csv"

df_out.to_csv(out_main, index=False)
df_out.to_csv(out_copy, index=False)

# also save proba version for debugging (always probability)
df_dbg = df_sub[["object_id"]].merge(df_pred, on="object_id", how="left")
df_dbg = df_dbg.rename(columns={"prob": "prediction"}).astype({"prediction": np.float32})
df_dbg.to_csv(out_proba, index=False)

print("[Stage 11] SUBMISSION READY")
print(f"- mode: {SUBMISSION_MODE} | threshold_used={thr:.6f} (only relevant if binary)")
print(f"- wrote: {out_main}")
print(f"- copy : {out_copy}")
print(f"- debug proba: {out_proba}")
print(f"- rows: {len(df_out):,}")

# quick preview
print("\nPreview:")
print(df_out.head(5).to_string(index=False))

# Export globals
globals().update({
    "SUBMISSION_PATH": out_main,
    "SUBMISSION_COPY_PATH": out_copy,
    "SUBMISSION_MODE": SUBMISSION_MODE,
    "SUBMISSION_THRESHOLD": thr,
})

gc.collect()
