In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

# Data Loading & Sanity Checks

In [None]:
# ============================================================
# STEP 1 — Data Loading & Sanity Checks (ONE CELL) — CatBoost Track (FINAL CONTEXT SAFE)
# Fixes:
# - Series.lower() bug -> use .str.lower()
# - Robust parsing + dedup retained
# - LABEL SANITIZATION (critical):
#     * normalize kategori (strip+upper)
#     * remove invalid kategori that are actually pollutants (e.g., "O3", "PM10", ...)
#     * handle ultra-rare classes (BERBAHAYA) -> merge into "SANGAT TIDAK SEHAT" (safe default)
#
# Outputs (globals):
#   sub, ID_COL, SUB_TARGET_COL
#   df_ispu_all, df_train, df_ispu_unlabeled
#   df_ndvi, df_holiday, df_weather, df_pop, df_river
#   test_candidates
# ============================================================

import re
from pathlib import Path
import numpy as np
import pandas as pd

DATA_ROOT = Path("/kaggle/input/penyisihan-datavidia-10")
assert DATA_ROOT.exists(), f"DATA_ROOT not found: {DATA_ROOT}"

pd.set_option("display.max_columns", 200)
pd.set_option("display.width", 200)

# ----------------------------
# Helpers
# ----------------------------
def _read_csv_smart(path: Path) -> pd.DataFrame:
    seps = [",", ";", "\t", "|"]
    encs = ["utf-8", "utf-8-sig", "latin1"]
    last_err = None
    for sep in seps:
        for enc in encs:
            try:
                df = pd.read_csv(path, sep=sep, encoding=enc, low_memory=False)
                if df.shape[1] >= 2:
                    return df
            except Exception as e:
                last_err = e
    raise RuntimeError(f"Failed to read: {path}\nLast error: {last_err}")

def _norm_col(c: str) -> str:
    c = str(c).strip().lower()
    c = re.sub(r"[^\w]+", "_", c)
    c = re.sub(r"_+", "_", c).strip("_")
    return c

def _standardize_columns(df: pd.DataFrame) -> pd.DataFrame:
    cols = {c: _norm_col(c) for c in df.columns}
    df = df.rename(columns=cols)

    rename = {}
    for c in df.columns:
        if c in ["tanggal", "date", "time", "waktu"]:
            rename[c] = "tanggal"
        elif c in ["stasiun", "station", "stasiun_id", "id_stasiun"]:
            rename[c] = "stasiun"
        elif c in ["periode_data", "periode"]:
            rename[c] = "periode_data"
        elif c in ["pm_sepuluh", "pm10", "pm_10"]:
            rename[c] = "pm10"
        elif c in ["pm_duakomalima", "pm2_5", "pm25", "pm_2_5", "pm2_5_"]:
            rename[c] = "pm25"
        elif c in ["sulfur_dioksida", "so2"]:
            rename[c] = "so2"
        elif c in ["karbon_monoksida", "co"]:
            rename[c] = "co"
        elif c in ["ozon", "o3"]:
            rename[c] = "o3"
        elif c in ["nitrogen_dioksida", "no2"]:
            rename[c] = "no2"
        elif c in ["parameter_pencemar_kritis", "parameter_pencemar", "pencemar_kritis"]:
            rename[c] = "parameter_pencemar_kritis"
        elif c in ["max", "maks", "nilai_maks", "indeks_maks"]:
            rename[c] = "max"
        elif c in ["ndvi", "vegetation_index"]:
            rename[c] = "ndvi"
        elif c in ["is_holiday_nasional", "holiday_nasional", "is_holiday"]:
            rename[c] = "is_holiday_nasional"
        elif c in ["is_weekend", "weekend"]:
            rename[c] = "is_weekend"
        elif c in ["day_name", "nama_hari"]:
            rename[c] = "day_name"
        elif c in ["nama_libur", "holiday_name"]:
            rename[c] = "nama_libur"

        # keep common typos as *_alt
        elif c == "categori":
            rename[c] = "kategori_alt"
        elif c == "critical":
            rename[c] = "parameter_pencemar_kritis_alt"

    return df.rename(columns=rename)

def parse_date_twopass(s: pd.Series) -> pd.Series:
    s = s.astype(str).str.strip()
    s = s.replace({"": np.nan, "nan": np.nan, "NaN": np.nan, "None": np.nan})
    d1 = pd.to_datetime(s, errors="coerce", dayfirst=True)
    m = d1.isna()
    if m.any():
        d2 = pd.to_datetime(s[m], errors="coerce", dayfirst=False)
        d1.loc[m] = d2
    return d1

def _coerce_numeric(df: pd.DataFrame, cols):
    for c in cols:
        if c in df.columns:
            df[c] = pd.to_numeric(df[c], errors="coerce")
    return df

def _dedup_keep_most_complete(df: pd.DataFrame, key_cols):
    if not all(k in df.columns for k in key_cols):
        return df
    df = df.copy()
    df["_nn"] = df.notna().sum(axis=1)
    idx = df.groupby(key_cols)["_nn"].idxmax()
    df = df.loc[idx].drop(columns=["_nn"]).reset_index(drop=True)
    return df

def _basic_sanity(name: str, df: pd.DataFrame, key_cols=None, date_col="tanggal"):
    print(f"\n--- {name} ---")
    print("shape:", df.shape)
    if key_cols is not None and all(k in df.columns for k in key_cols):
        print(f"duplicates on {key_cols}:", int(df.duplicated(key_cols).sum()))
    if date_col in df.columns:
        print(f"{date_col}: NaT={int(df[date_col].isna().sum())} | range=[{df[date_col].min()} .. {df[date_col].max()}]")
    miss = (df.isna().mean().sort_values(ascending=False).head(8) * 100).round(2)
    print("top missing% cols:")
    print(miss.to_string())

# ============================================================
# 0) sample_submission
# ============================================================
sub = _standardize_columns(_read_csv_smart(DATA_ROOT / "sample_submission.csv"))
ID_COL = "id" if "id" in sub.columns else sub.columns[0]
SUB_TARGET_COL = "category" if "category" in sub.columns else sub.columns[-1]
n_test_expected = len(sub)

print("Loaded sample_submission:", sub.shape, "cols:", list(sub.columns))
print("ID_COL:", ID_COL, "| SUB_TARGET_COL:", SUB_TARGET_COL)
print("submission ID unique:", bool(sub[ID_COL].is_unique))

# ============================================================
# 1) ISPU (concat all years) + CLEAN
# ============================================================
ispu_files = sorted((DATA_ROOT / "ISPU").glob("*.csv"))
assert len(ispu_files) > 0, "No ISPU CSV files found."

frames = []
for p in ispu_files:
    df0 = _standardize_columns(_read_csv_smart(p))
    df0["source_file"] = p.name
    frames.append(df0)

df_ispu_all = pd.concat(frames, ignore_index=True, sort=False)

# unify label + critical columns into canonical names
if "kategori_alt" in df_ispu_all.columns:
    if "kategori" not in df_ispu_all.columns:
        df_ispu_all["kategori"] = df_ispu_all["kategori_alt"]
    else:
        df_ispu_all["kategori"] = df_ispu_all["kategori"].fillna(df_ispu_all["kategori_alt"])

if "parameter_pencemar_kritis_alt" in df_ispu_all.columns:
    if "parameter_pencemar_kritis" not in df_ispu_all.columns:
        df_ispu_all["parameter_pencemar_kritis"] = df_ispu_all["parameter_pencemar_kritis_alt"]
    else:
        df_ispu_all["parameter_pencemar_kritis"] = df_ispu_all["parameter_pencemar_kritis"].fillna(
            df_ispu_all["parameter_pencemar_kritis_alt"]
        )

# robust date parse
if "tanggal" in df_ispu_all.columns:
    df_ispu_all["tanggal"] = parse_date_twopass(df_ispu_all["tanggal"])

# stasiun cleanup + code
if "stasiun" in df_ispu_all.columns:
    df_ispu_all["stasiun"] = df_ispu_all["stasiun"].astype(str).str.strip()
    df_ispu_all["stasiun_code"] = (
        df_ispu_all["stasiun"]
        .str.upper()
        .str.extract(r"(DKI\s*\d+)", expand=False)
        .str.replace(" ", "", regex=False)
    )
else:
    df_ispu_all["stasiun_code"] = np.nan

# numeric casts
df_ispu_all = _coerce_numeric(df_ispu_all, ["pm10", "pm25", "so2", "co", "o3", "no2", "max"])

# drop rows missing key fields
df_ispu_all = df_ispu_all.dropna(subset=["tanggal", "stasiun"]).copy()

# dedup by key keep most complete
df_ispu_all = _dedup_keep_most_complete(df_ispu_all, ["tanggal", "stasiun"])
df_ispu_all = df_ispu_all.sort_values(["tanggal", "stasiun"]).reset_index(drop=True)

_basic_sanity("ISPU (ALL) CLEAN (pre-label-sanitize)", df_ispu_all, key_cols=["tanggal", "stasiun"])

# ============================================================
# [CRITICAL] LABEL SANITIZATION & TRAIN/UNLABELED SPLIT
# ============================================================
# 1) normalize kategori (upper, strip)
if "kategori" in df_ispu_all.columns:
    df_ispu_all["kategori"] = (
        df_ispu_all["kategori"]
        .astype(str)
        .str.strip()
        .str.upper()
        .replace({"": np.nan, "NAN": np.nan, "NONE": np.nan})
    )

# 2) remove invalid kategori that are actually pollutants
INVALID_KATEGORI = {"PM10", "PM25", "SO2", "CO", "O3", "NO2", "MAX"}
if "kategori" in df_ispu_all.columns:
    bad = df_ispu_all["kategori"].isin(INVALID_KATEGORI)
    n_bad = int(bad.sum())
    if n_bad > 0:
        df_ispu_all.loc[bad, "kategori"] = np.nan
    print(f"[LABEL] invalid kategori removed (pollutant-as-label): {n_bad}")

# 3) handle ultra-rare class "BERBAHAYA" (default merge; keeps training stable)
if "kategori" in df_ispu_all.columns:
    vc = df_ispu_all["kategori"].value_counts(dropna=True)
    if vc.get("BERBAHAYA", 0) < 10:
        df_ispu_all.loc[df_ispu_all["kategori"] == "BERBAHAYA", "kategori"] = "SANGAT TIDAK SEHAT"
        print("[LABEL] merged BERBAHAYA -> SANGAT TIDAK SEHAT (rare class)")

# 4) build train vs unlabeled (BUGFIX: use .str.lower())
if "kategori" in df_ispu_all.columns:
    lab = df_ispu_all["kategori"].astype(str).str.strip()
    lab_low = lab.str.lower()
    m_train = df_ispu_all["kategori"].notna() & (lab != "") & (lab_low != "nan")
    df_train = df_ispu_all.loc[m_train].copy()
    df_ispu_unlabeled = df_ispu_all.loc[~m_train].copy()
else:
    df_train = df_ispu_all.copy()
    df_ispu_unlabeled = df_ispu_all.iloc[0:0].copy()

print("\nTrain/unlabeled split:")
print("df_train:", df_train.shape, "| df_ispu_unlabeled:", df_ispu_unlabeled.shape)
if "kategori" in df_train.columns:
    print("\nTarget distribution (df_train):")
    print(df_train["kategori"].astype(str).str.strip().value_counts(dropna=False).to_string())

_basic_sanity("ISPU (ALL) CLEAN (post-label-sanitize)", df_ispu_all, key_cols=["tanggal", "stasiun"])

# ============================================================
# 2) NDVI + stasiun_code
# ============================================================
df_ndvi = _standardize_columns(_read_csv_smart(DATA_ROOT / "NDVI (vegetation index)" / "indeks-ndvi-jakarta.csv"))
if "tanggal" in df_ndvi.columns:
    df_ndvi["tanggal"] = parse_date_twopass(df_ndvi["tanggal"])
if "stasiun" in df_ndvi.columns:
    df_ndvi["stasiun"] = df_ndvi["stasiun"].astype(str).str.strip().str.upper().str.replace(" ", "", regex=False)
    df_ndvi["stasiun_code"] = df_ndvi["stasiun"].str.extract(r"(DKI\d+)", expand=False)
df_ndvi = _coerce_numeric(df_ndvi, ["ndvi"])
df_ndvi = df_ndvi.dropna(subset=["tanggal", "stasiun"]).copy()
df_ndvi = _dedup_keep_most_complete(df_ndvi, ["tanggal", "stasiun"])
_basic_sanity("NDVI CLEAN", df_ndvi, key_cols=["tanggal", "stasiun"])

# ============================================================
# 3) Holidays (clean to one row per date)
# ============================================================
df_holiday = _standardize_columns(_read_csv_smart(DATA_ROOT / "libur-nasional" / "dataset-libur-nasional-dan-weekend.csv"))
if "tanggal" in df_holiday.columns:
    df_holiday["tanggal"] = parse_date_twopass(df_holiday["tanggal"])
df_holiday = df_holiday.dropna(subset=["tanggal"]).sort_values("tanggal").copy()

for c in ["is_holiday_nasional", "is_weekend"]:
    if c in df_holiday.columns:
        df_holiday[c] = pd.to_numeric(df_holiday[c], errors="coerce").fillna(0).astype(int)

agg = {}
if "is_holiday_nasional" in df_holiday.columns: agg["is_holiday_nasional"] = "max"
if "is_weekend" in df_holiday.columns: agg["is_weekend"] = "max"
if "nama_libur" in df_holiday.columns: agg["nama_libur"] = "first"
df_holiday = df_holiday.groupby("tanggal", as_index=False).agg(agg)
df_holiday["day_name"] = df_holiday["tanggal"].dt.day_name()

_basic_sanity("HOLIDAYS CLEAN", df_holiday, key_cols=["tanggal"])

# ============================================================
# 4) Weather (multiple stations) clean
# ============================================================
weather_files = sorted((DATA_ROOT / "cuaca-harian").glob("*.csv"))
assert len(weather_files) > 0, "No weather CSV files found."

w_frames = []
for p in weather_files:
    w = _standardize_columns(_read_csv_smart(p))
    tag = p.stem.lower().replace("cuaca_harian_", "").replace("cuaca-harian-", "")
    w["weather_station"] = tag
    w["weather_code"] = (pd.Series([tag] * len(w)).str.extract(r"(dki\d)", expand=False).str.upper())
    if "tanggal" in w.columns:
        w["tanggal"] = parse_date_twopass(w["tanggal"])
    w_frames.append(w)

df_weather = pd.concat(w_frames, ignore_index=True, sort=False)
df_weather = df_weather.dropna(subset=["tanggal"]).copy()

for c in df_weather.columns:
    if c not in ["tanggal", "weather_station", "weather_code"]:
        if df_weather[c].dtype == object:
            df_weather[c] = pd.to_numeric(df_weather[c], errors="ignore")

df_weather = _dedup_keep_most_complete(df_weather, ["tanggal", "weather_station"])
df_weather = df_weather.sort_values(["weather_station", "tanggal"]).reset_index(drop=True)

_basic_sanity("WEATHER (ALL) CLEAN", df_weather, key_cols=["tanggal", "weather_station"])

# ============================================================
# 5) Population
# ============================================================
df_pop = _standardize_columns(_read_csv_smart(DATA_ROOT / "jumlah-penduduk" / "data-jumlah-penduduk-provinsi-dki-jakarta-berdasarkan-kelompok-usia-dan-jenis-kelamin-tahun-2013-2021-komponen-data.csv"))
if "tahun" in df_pop.columns:
    df_pop["tahun"] = pd.to_numeric(df_pop["tahun"], errors="coerce")
if "jumlah_penduduk" in df_pop.columns:
    df_pop["jumlah_penduduk"] = pd.to_numeric(df_pop["jumlah_penduduk"], errors="coerce")
_basic_sanity("POPULATION", df_pop)

# ============================================================
# 6) River Quality
# ============================================================
df_river = _standardize_columns(_read_csv_smart(DATA_ROOT / "kualitas-air-sungai" / "data-kualitas-air-sungai-komponen-data.csv"))
for c in ["latitude", "longitude", "baku_mutu", "hasil_pengukuran", "bulan_sampling"]:
    if c in df_river.columns:
        df_river[c] = pd.to_numeric(df_river[c], errors="coerce")
_basic_sanity("RIVER QUALITY", df_river)

# ============================================================
# 7) Find test mapping file candidates (rows == sample_submission) with an 'id' column
# ============================================================
test_candidates = []
for p in DATA_ROOT.rglob("*.csv"):
    if p.name == "sample_submission.csv":
        continue
    name = p.name.lower()
    if ("test" in name) or ("submission" in name):
        try:
            tmp = _standardize_columns(_read_csv_smart(p).head(2))
            if "id" in tmp.columns:
                df_full = _standardize_columns(_read_csv_smart(p))
                if len(df_full) == n_test_expected:
                    test_candidates.append((str(p), list(df_full.columns)))
        except Exception:
            pass

print("\n--- Test mapping file candidates (rows == sample_submission) ---")
if len(test_candidates) == 0:
    print("None found by heuristic. Likely there is a separate test file not matching this heuristic name.")
else:
    for fp, cols in test_candidates:
        print(fp, "| cols:", cols)

# ============================================================
# Preview
# ============================================================
print("\n--- Preview heads ---")
display(df_train.head(3))
display(df_ndvi.head(3))
display(df_holiday.head(3))
display(df_weather.head(3))

# Master Table Building (Correct Joins)

In [None]:
# ============================================================
# STEP 2 — Master Table Building (Correct Joins) (ONE CELL) — FINAL CONTEXT SAFE
# Builds:
#   df_train_master  (for training)
#   df_test_master   (for inference; requires test mapping file with id)
# Also provides:
#   df_test_like     (alias for Step 5 requirement)
#
# Notes:
# - Joins are leakage-safe (no rolling/lag here; that is Step 3).
# - Safe joins:
#     Holiday (by tanggal)
#     NDVI (tanggal+stasiun_code)
#     Weather local (tanggal+stasiun_code) + global fallback (tanggal)
#     Population (year aggregate; global)
#     River (year-month aggregate; global)
# ============================================================

import re
from pathlib import Path
import numpy as np
import pandas as pd

# ---- guards (assumes Step 1 already ran) ----
need = ["sub","ID_COL","SUB_TARGET_COL","df_train","df_ndvi","df_holiday","df_weather","df_pop","df_river"]
miss = [k for k in need if k not in globals()]
if miss:
    raise RuntimeError(f"Missing globals from Step 1: {miss}. Jalankan Step 1 dulu.")

DATA_ROOT = Path("/kaggle/input/penyisihan-datavidia-10")
assert DATA_ROOT.exists(), f"DATA_ROOT not found: {DATA_ROOT}"

# ----------------------------
# Helpers (keep consistent with Step 1)
# ----------------------------
def _norm_col(c: str) -> str:
    c = str(c).strip().lower()
    c = re.sub(r"[^\w]+", "_", c)
    c = re.sub(r"_+", "_", c).strip("_")
    return c

def _standardize_columns(df: pd.DataFrame) -> pd.DataFrame:
    return df.rename(columns={c: _norm_col(c) for c in df.columns})

def parse_date_twopass(s: pd.Series) -> pd.Series:
    s = s.astype(str).str.strip()
    s = s.replace({"": np.nan, "nan": np.nan, "NaN": np.nan, "None": np.nan})
    d1 = pd.to_datetime(s, errors="coerce", dayfirst=True)
    m = d1.isna()
    if m.any():
        d2 = pd.to_datetime(s[m], errors="coerce", dayfirst=False)
        d1.loc[m] = d2
    return d1

def _mk_stasiun_code(stasiun_series: pd.Series) -> pd.Series:
    st = stasiun_series.astype(str).str.strip().str.upper()
    code = st.str.extract(r"(DKI\s*\d+)", expand=False).str.replace(" ", "", regex=False)
    return code

def _prefix_cols(df: pd.DataFrame, prefix: str, keep: set) -> pd.DataFrame:
    ren = {c: f"{prefix}{c}" for c in df.columns if c not in keep}
    return df.rename(columns=ren)

def _dedup_keep_most_complete(df: pd.DataFrame, key_cols):
    if not all(k in df.columns for k in key_cols):
        return df
    df = df.copy()
    df["_nn"] = df.notna().sum(axis=1)
    idx = df.groupby(key_cols)["_nn"].idxmax()
    df = df.loc[idx].drop(columns=["_nn"]).reset_index(drop=True)
    return df

def _find_test_mapping_file(data_root: Path, n_rows: int) -> Path | None:
    """
    Cari CSV (selain sample_submission) yang punya kolom 'id' dan jumlah baris == n_rows.
    Lebih robust: baca head dulu, lalu baca full hanya kolom 'id' untuk cek panjang.
    """
    for p in data_root.rglob("*.csv"):
        if p.name == "sample_submission.csv":
            continue
        try:
            head = _standardize_columns(pd.read_csv(p, nrows=5))
            if "id" not in head.columns:
                continue
            # cek rowcount cepat: baca hanya kolom id kalau bisa
            try:
                df_id = _standardize_columns(pd.read_csv(p, usecols=["id"]))
                if len(df_id) == n_rows:
                    return p
            except Exception:
                df_full = _standardize_columns(pd.read_csv(p))
                if "id" in df_full.columns and len(df_full) == n_rows:
                    return p
        except Exception:
            continue
    return None

def _ensure_datetime(df: pd.DataFrame, col="tanggal") -> pd.DataFrame:
    df = df.copy()
    if col in df.columns and not np.issubdtype(df[col].dtype, np.datetime64):
        df[col] = parse_date_twopass(df[col])
    return df

def _select_numeric_cols(df: pd.DataFrame, keep: set):
    cols = []
    for c in df.columns:
        if c in keep or pd.api.types.is_numeric_dtype(df[c]):
            cols.append(c)
    return cols

# ----------------------------
# Core builder
# ----------------------------
def build_master(df_base: pd.DataFrame, *, has_target: bool) -> pd.DataFrame:
    df = df_base.copy()

    # ensure tanggal exists + datetime
    if "tanggal" not in df.columns:
        raise RuntimeError("Base df missing 'tanggal'.")
    df = _ensure_datetime(df, "tanggal")

    # ensure stasiun_code
    if "stasiun_code" not in df.columns:
        if "stasiun" in df.columns:
            df["stasiun_code"] = _mk_stasiun_code(df["stasiun"])
        else:
            df["stasiun_code"] = np.nan

    # basic calendar (safe)
    df["year"] = df["tanggal"].dt.year.astype("Int64")
    df["month"] = df["tanggal"].dt.month.astype("Int64")
    df["day"] = df["tanggal"].dt.day.astype("Int64")
    df["dow"] = df["tanggal"].dt.dayofweek.astype("Int64")
    df["dayofyear"] = df["tanggal"].dt.dayofyear.astype("Int64")

    # ---- Holiday join: by tanggal ----
    hol = df_holiday.copy()
    hol = _ensure_datetime(hol, "tanggal")
    hol = hol.dropna(subset=["tanggal"]).drop_duplicates(["tanggal"])
    df = df.merge(hol, on="tanggal", how="left")

    # ---- NDVI join: by tanggal + stasiun_code ----
    nd = df_ndvi.copy()
    nd = _ensure_datetime(nd, "tanggal")
    if "stasiun_code" not in nd.columns:
        if "stasiun" in nd.columns:
            nd["stasiun_code"] = _mk_stasiun_code(nd["stasiun"])
    nd = nd.dropna(subset=["tanggal","stasiun_code"]).drop_duplicates(["tanggal","stasiun_code"])
    # drop duplicated raw station name to avoid messy columns
    drop_raw = [c for c in ["stasiun"] if c in nd.columns]
    if drop_raw:
        nd = nd.drop(columns=drop_raw)
    df = df.merge(nd, on=["tanggal","stasiun_code"], how="left")

    # ---- Weather join: local (tanggal+stasiun_code) + global fallback (tanggal) ----
    wx = df_weather.copy()
    wx = _ensure_datetime(wx, "tanggal")
    if "weather_code" in wx.columns:
        wx["weather_code"] = wx["weather_code"].astype(str).str.strip().str.upper()
    else:
        wx["weather_code"] = np.nan

    # local
    wx_loc = wx.dropna(subset=["tanggal","weather_code"]).copy()
    wx_loc = wx_loc.rename(columns={"weather_code":"stasiun_code"})
    keep_keys = {"tanggal","stasiun_code","weather_station"}
    wx_loc_cols = _select_numeric_cols(wx_loc, keep=keep_keys)
    wx_loc = wx_loc[wx_loc_cols].drop_duplicates(["tanggal","stasiun_code"])
    wx_loc = _prefix_cols(wx_loc, "wx_", keep={"tanggal","stasiun_code"})
    df = df.merge(wx_loc, on=["tanggal","stasiun_code"], how="left")

    # global mean by tanggal
    wx_g = wx.dropna(subset=["tanggal"]).copy()
    wx_g_num = [c for c in wx_g.columns if pd.api.types.is_numeric_dtype(wx_g[c])]
    if len(wx_g_num) > 0:
        wx_g = wx_g.groupby("tanggal", as_index=False)[wx_g_num].mean(numeric_only=True)
        wx_g = _prefix_cols(wx_g, "wxg_", keep={"tanggal"})
        df = df.merge(wx_g, on="tanggal", how="left")

        # fill local wx_ with global wxg_ (same base name)
        for c in list(df.columns):
            if c.startswith("wx_"):
                base = c.replace("wx_", "")
                cg = "wxg_" + base
                if cg in df.columns:
                    df[c] = df[c].fillna(df[cg])

    # ---- Population join: total per year (global) ----
    pop = df_pop.copy()
    # try normalize if column names differ
    if "tahun" in pop.columns:
        pop["tahun"] = pd.to_numeric(pop["tahun"], errors="coerce")
    if "jumlah_penduduk" in pop.columns:
        pop["jumlah_penduduk"] = pd.to_numeric(pop["jumlah_penduduk"], errors="coerce")
    if {"tahun","jumlah_penduduk"}.issubset(pop.columns):
        pop_y = pop.dropna(subset=["tahun","jumlah_penduduk"]).groupby("tahun", as_index=False)["jumlah_penduduk"].sum()
        pop_y = pop_y.rename(columns={"tahun":"year","jumlah_penduduk":"pop_total_year"})
        df = df.merge(pop_y, on="year", how="left")

    # ---- River join: global year-month aggregates ----
    riv = df_river.copy()
    # convert to numeric if exist
    for cc in ["periode_data","bulan_sampling","baku_mutu","hasil_pengukuran"]:
        if cc in riv.columns:
            riv[cc] = pd.to_numeric(riv[cc], errors="coerce")

    if {"periode_data","bulan_sampling","baku_mutu","hasil_pengukuran"}.issubset(riv.columns):
        r = riv.dropna(subset=["periode_data","bulan_sampling","baku_mutu","hasil_pengukuran"]).copy()
        r["ratio_to_std"] = r["hasil_pengukuran"] / (r["baku_mutu"].replace(0, np.nan))
        r["exceed"] = (r["hasil_pengukuran"] > r["baku_mutu"]).astype(int)
        r_agg = r.groupby(["periode_data","bulan_sampling"], as_index=False).agg(
            river_exceed_rate=("exceed","mean"),
            river_ratio_mean=("ratio_to_std","mean"),
            river_n=("exceed","size"),
        )
        r_agg = r_agg.rename(columns={"periode_data":"year", "bulan_sampling":"month"})
        df = df.merge(r_agg, on=["year","month"], how="left")

    # ---- keep CatBoost-friendly categoricals ----
    for c in ["stasiun","stasiun_code","parameter_pencemar_kritis","day_name","nama_libur","weather_station"]:
        if c in df.columns:
            # keep object (not forced to string "nan")
            df[c] = df[c].where(df[c].notna(), np.nan).astype("object")

    # ---- final dedup on key ----
    key_cols = ["tanggal","stasiun_code"] if "stasiun_code" in df.columns else ["tanggal"]
    df = _dedup_keep_most_complete(df, key_cols)

    # target cleanup (safe)
    if has_target and "kategori" in df.columns:
        df["kategori"] = df["kategori"].astype(str).str.strip()

    return df

# ============================================================
# 1) Build TRAIN master
# ============================================================
df_train_master = build_master(df_train, has_target=True)
print("df_train_master:", df_train_master.shape)
if "stasiun_code" in df_train_master.columns:
    print("train key duplicates:", int(df_train_master.duplicated(["tanggal","stasiun_code"]).sum()))

# ============================================================
# 2) Build TEST master (needs id mapping file)
# ============================================================
n_test_expected = len(sub)

test_path = None
# Prefer Step 1 test_candidates if available
if "test_candidates" in globals() and isinstance(test_candidates, list) and len(test_candidates) > 0:
    # pick the first candidate
    try:
        test_path = Path(test_candidates[0][0])
    except Exception:
        test_path = None

if test_path is None:
    test_path = _find_test_mapping_file(DATA_ROOT, n_test_expected)

if test_path is None:
    print("\n[WARN] Test mapping file (id -> tanggal/stasiun) not found yet.")
    print("       df_test_master=None, but you can still run Step 3–4 on train.")
    df_test_master = None
    df_test_like = None
else:
    print("\nTest mapping file:", str(test_path))
    df_test = _standardize_columns(pd.read_csv(test_path))

    # basic checks
    if "id" not in df_test.columns:
        raise RuntimeError(f"Test file has no 'id': {test_path}")
    if len(df_test) != n_test_expected:
        raise RuntimeError(f"Test file rows != submission rows: {len(df_test)} vs {n_test_expected}")

    # ensure tanggal
    if "tanggal" not in df_test.columns:
        raise RuntimeError("Test mapping file missing 'tanggal'.")
    df_test["tanggal"] = parse_date_twopass(df_test["tanggal"])

    # ensure stasiun_code
    if "stasiun_code" not in df_test.columns:
        if "stasiun" in df_test.columns:
            df_test["stasiun_code"] = _mk_stasiun_code(df_test["stasiun"])
        elif "stasiun_id" in df_test.columns:
            df_test["stasiun_code"] = _mk_stasiun_code(df_test["stasiun_id"])
        else:
            # fallback: find any col containing DKI
            cand = None
            for c in df_test.columns:
                if df_test[c].astype(str).str.contains("dki", case=False, na=False).any():
                    cand = c
                    break
            if cand is None:
                raise RuntimeError("Cannot infer station from test mapping file.")
            df_test["stasiun_code"] = _mk_stasiun_code(df_test[cand])

    # build master
    df_test_master = build_master(df_test, has_target=False)
    # keep id column for submission
    df_test_master["id"] = df_test["id"].values

    # IMPORTANT: Provide alias required by your Step 5 code
    df_test_like = df_test_master.copy()

    print("df_test_master:", df_test_master.shape)
    print("[OK] df_test_like created for Step 5:", df_test_like.shape)

# ============================================================
# Preview
# ============================================================
display(df_train_master.head(3))
if df_test_master is not None:
    display(df_test_master.head(3))

# Feature Engineering (Time-Series + Calendar + Robustness)

In [None]:
# ============================================================
# STEP 3 — Feature Engineering (Time-Series + Calendar + Robustness) — FINAL
# Requires:
#   df_train_master  (from Step 2)
#   df_test_master   (optional; from Step 2)
# Produces:
#   df_train_fe, df_test_fe
# Notes:
# - STRICT leakage-safe (all rolling on shifted values only)
# - Station-aware (grouped by stasiun_code)
# - CatBoost-friendly (categorical kept as object)
# ============================================================

import numpy as np
import pandas as pd

# ----------------------------
# Guards
# ----------------------------
if "df_train_master" not in globals():
    raise RuntimeError("Missing df_train_master. Run Step 2 first.")

for c in ["tanggal", "stasiun_code"]:
    if c not in df_train_master.columns:
        raise RuntimeError(f"df_train_master missing required column: {c}")

# ----------------------------
# Calendar / time features
# ----------------------------
def add_time_features(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()

    df["year"] = df["tanggal"].dt.year.astype("Int64")
    df["month"] = df["tanggal"].dt.month.astype("Int64")
    df["day"] = df["tanggal"].dt.day.astype("Int64")
    df["dow"] = df["tanggal"].dt.dayofweek.astype("Int64")
    df["dayofyear"] = df["tanggal"].dt.dayofyear.astype("Int64")

    # cyclic encoding (safe)
    doy = df["dayofyear"].astype(float)
    df["doy_sin"] = np.sin(2 * np.pi * doy / 365.25)
    df["doy_cos"] = np.cos(2 * np.pi * doy / 365.25)

    mon = df["month"].astype(float)
    df["mon_sin"] = np.sin(2 * np.pi * mon / 12.0)
    df["mon_cos"] = np.cos(2 * np.pi * mon / 12.0)

    # weekend fallback
    if "is_weekend" not in df.columns:
        df["is_weekend"] = (df["dow"].isin([5, 6])).astype(int)

    return df

# ----------------------------
# Lag + rolling (STRICT leakage-safe)
# ----------------------------
def add_station_lag_rolling(
    df: pd.DataFrame,
    group_col: str = "stasiun_code",
    base_cols = ("pm10","pm25","so2","co","o3","no2","max"),
    lags = (1, 2, 3, 7, 14),
    windows = (3, 7, 14, 30),
) -> pd.DataFrame:

    df = df.copy()
    df = df.sort_values([group_col, "tanggal"]).reset_index(drop=True)

    base_cols = [c for c in base_cols if c in df.columns]
    if len(base_cols) == 0:
        raise RuntimeError("No pollutant columns found for lag/rolling features.")

    g = df.groupby(group_col, sort=False)

    # --- lags ---
    for c in base_cols:
        for L in lags:
            df[f"{c}_lag{L}"] = g[c].shift(L)

    # --- rolling (on shifted series only) ---
    for c in base_cols:
        s = g[c].shift(1)  # critical: past only
        for w in windows:
            df[f"{c}_rmean{w}"] = (
                s.rolling(w, min_periods=max(2, w // 3))
                 .mean()
                 .reset_index(level=0, drop=True)
            )
            df[f"{c}_rstd{w}"] = (
                s.rolling(w, min_periods=max(2, w // 3))
                 .std()
                 .reset_index(level=0, drop=True)
            )

    # --- deltas (safe, lag-based) ---
    for c in base_cols:
        if f"{c}_lag1" in df.columns and f"{c}_lag2" in df.columns:
            df[f"{c}_d12"] = df[f"{c}_lag1"] - df[f"{c}_lag2"]
        if f"{c}_lag1" in df.columns and f"{c}_rmean7" in df.columns:
            df[f"{c}_d1_rm7"] = df[f"{c}_lag1"] - df[f"{c}_rmean7"]

    return df

# ----------------------------
# Weather interactions (robust)
# ----------------------------
def add_weather_interactions(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()

    # helper: pick first existing column
    def pick(cols):
        for c in cols:
            if c in df.columns:
                return c
        return None

    wind = pick([
        "wx_wind_speed_10m_mean_km_h",
        "wxg_wind_speed_10m_mean_km_h",
        "wind_speed_10m_mean_km_h",
    ])
    prec = pick([
        "wx_precipitation_sum_mm",
        "wxg_precipitation_sum_mm",
        "precipitation_sum_mm",
    ])
    rad = pick([
        "wx_shortwave_radiation_sum_mj_m²",
        "wxg_shortwave_radiation_sum_mj_m²",
        "shortwave_radiation_sum_mj_m²",
    ])
    rh = pick([
        "wx_relative_humidity_2m_mean",
        "wxg_relative_humidity_2m_mean",
        "relative_humidity_2m_mean",
    ])
    tmp = pick([
        "wx_temperature_2m_mean_c",
        "wxg_temperature_2m_mean_c",
        "temperature_2m_mean_c",
    ])

    # interactions based on lag1 (already leakage-safe)
    if "pm25_lag1" in df.columns and wind is not None:
        df["pm25_lag1_x_wind"] = df["pm25_lag1"] * df[wind]

    if "pm10_lag1" in df.columns and wind is not None:
        df["pm10_lag1_x_wind"] = df["pm10_lag1"] * df[wind]

    if "o3_lag1" in df.columns and rad is not None:
        df["o3_lag1_x_rad"] = df["o3_lag1"] * df[rad]

    if "pm25_lag1" in df.columns and prec is not None:
        df["pm25_lag1_div_prec"] = df["pm25_lag1"] / (df[prec].fillna(0) + 1.0)

    if "co_lag1" in df.columns and rh is not None:
        df["co_lag1_x_rh"] = df["co_lag1"] * df[rh]

    if "pm25_lag1" in df.columns and tmp is not None:
        df["pm25_lag1_x_temp"] = df["pm25_lag1"] * df[tmp]

    return df

# ----------------------------
# Final type fixing for CatBoost
# ----------------------------
def finalize_types_for_catboost(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()

    # categorical columns (keep object)
    for c in [
        "stasiun",
        "stasiun_code",
        "parameter_pencemar_kritis",
        "day_name",
        "nama_libur",
        "weather_station",
    ]:
        if c in df.columns:
            df[c] = df[c].astype("object")

    # binary flags
    for c in ["is_weekend", "is_holiday_nasional"]:
        if c in df.columns:
            df[c] = pd.to_numeric(df[c], errors="coerce").fillna(0).astype(int)

    return df

# ----------------------------
# Build FE
# ----------------------------
def build_fe(df: pd.DataFrame) -> pd.DataFrame:
    df = add_time_features(df)
    df = add_station_lag_rolling(df)
    df = add_weather_interactions(df)
    df = finalize_types_for_catboost(df)
    return df

# ----------------------------
# Apply to train / test
# ----------------------------
df_train_fe = build_fe(df_train_master)

if "df_test_master" in globals() and df_test_master is not None:
    df_test_fe = build_fe(df_test_master)
else:
    df_test_fe = None

# ----------------------------
# Sanity checks
# ----------------------------
print("df_train_fe:", df_train_fe.shape)
if df_test_fe is not None:
    print("df_test_fe :", df_test_fe.shape)

top_miss = (df_train_fe.isna().mean()
            .sort_values(ascending=False)
            .head(15) * 100).round(2)

print("\nTop missing% (train_fe):")
print(top_miss.to_string())

display(df_train_fe.head(3))

# Model Training (Time-Based CV + CatBoost Optimization)

In [None]:
# ============================================================
# STEP 4 — Model Training (Time-Based CV + CatBoost) — REVISED (OOF FIX)
# Fixes vs your last:
# - OOF computed ONLY on rows that actually appear in validation (no zero-proba rows)
# - Time folds do NOT require train to contain ALL classes (rare-class safe)
# - Still CPU only, auto categorical detection + sanitization
# Outputs:
# - feature_cols, cat_cols, classes, class_to_id, id_to_class
# - folds
# - models
# - oof_proba (only valid rows filled), oof_pred (for valid rows), oof_macro_f1
# - oof_valid_mask, oof_valid_idx
# ============================================================

import numpy as np
import pandas as pd
from catboost import CatBoostClassifier, Pool
from sklearn.metrics import f1_score, classification_report

if "df_train_fe" not in globals():
    raise RuntimeError("Missing df_train_fe. Run Step 3 first.")

# ----------------------------
# Config
# ----------------------------
N_SPLITS = 4
GAP_DAYS = 0
SEEDS = [42]

ITERATIONS = 8000
LR = 0.05
DEPTH = 8
L2 = 6.0

MISSING_CAT = "__MISSING__"

# ----------------------------
# Prepare data
# ----------------------------
df = df_train_fe.copy()
if "tanggal" not in df.columns or "kategori" not in df.columns:
    raise RuntimeError("df_train_fe must contain 'tanggal' and 'kategori'.")

df = df.dropna(subset=["tanggal"]).sort_values(
    ["tanggal", "stasiun_code"] if "stasiun_code" in df.columns else ["tanggal"]
).reset_index(drop=True)

drop_cols = {"kategori", "tanggal", "source_file", "periode_data"}
if "id" in df.columns:
    drop_cols.add("id")

# align features with test (if exists)
if "df_test_fe" in globals() and df_test_fe is not None:
    common = [c for c in df.columns if c in df_test_fe.columns]
    feature_cols = [c for c in common if c not in drop_cols]
else:
    feature_cols = [c for c in df.columns if c not in drop_cols]

X = df[feature_cols].copy()

# clean target
y_str = df["kategori"].astype(str).str.strip()
y_str = y_str[y_str.str.lower() != "nan"]
df = df.loc[y_str.index].reset_index(drop=True)
X  = X.loc[y_str.index].reset_index(drop=True)
y_str = y_str.reset_index(drop=True)

# ----------------------------
# AUTO categorical detection + sanitization
# ----------------------------
is_num = X.apply(pd.api.types.is_numeric_dtype)
cat_cols = X.columns[~is_num].tolist()

for c in cat_cols:
    X[c] = X[c].where(X[c].notna(), MISSING_CAT).astype(str)
    X[c] = X[c].replace({"nan": MISSING_CAT, "None": MISSING_CAT, "": MISSING_CAT})

num_cols = X.columns[is_num].tolist()
for c in num_cols:
    X[c] = pd.to_numeric(X[c], errors="coerce")

bad_num = [c for c in num_cols if X[c].dtype == object]
if bad_num:
    raise RuntimeError(f"Numeric columns still object after coercion: {bad_num[:10]}")

# ----------------------------
# classes + weights (handles rare classes)
# ----------------------------
classes = sorted(y_str.unique().tolist())
counts = y_str.value_counts()

# avoid division by zero (shouldn't happen because classes from y_str)
class_weights = [float(len(y_str) / (len(classes) * counts[c])) for c in classes]

class_to_id = {c: i for i, c in enumerate(classes)}
id_to_class = {i: c for c, i in class_to_id.items()}
y = y_str.map(class_to_id).astype(int)

print("Train rows:", len(df), "| n_features:", len(feature_cols), "| n_cat(auto):", len(cat_cols))
print("Classes:", classes)
print("Class counts:\n", counts.to_string())

# ----------------------------
# Time folds (year-based preferred; no "all-classes-in-train" constraint)
# ----------------------------
def make_time_folds(df_in: pd.DataFrame, n_splits=4, gap_days=0):
    d = df_in.copy()
    d["year"] = d["tanggal"].dt.year
    years = sorted(d["year"].dropna().unique().tolist())

    folds = []

    # year-based: take newest years as validation (classic time split)
    if len(years) >= 2:
        for vy in years[::-1]:
            tr_mask = d["year"] < vy
            va_mask = d["year"] == vy
            if not va_mask.any():
                continue
            if gap_days > 0:
                va_start = d.loc[va_mask, "tanggal"].min()
                tr_mask = tr_mask & (d["tanggal"] <= (va_start - pd.Timedelta(days=gap_days)))

            tr_idx = d.index[tr_mask].to_numpy()
            va_idx = d.index[va_mask].to_numpy()
            if len(tr_idx) == 0 or len(va_idx) == 0:
                continue

            folds.append((tr_idx, va_idx))
            if len(folds) >= n_splits:
                break

        if folds:
            return folds

    # fallback: date blocks
    uniq_dates = np.array(sorted(d["tanggal"].unique()))
    blocks = np.array_split(uniq_dates, n_splits + 1)
    for b in blocks[1:][::-1]:
        va_start, va_end = b.min(), b.max()
        tr_mask = d["tanggal"] < va_start
        va_mask = (d["tanggal"] >= va_start) & (d["tanggal"] <= va_end)
        if gap_days > 0:
            tr_mask = tr_mask & (d["tanggal"] <= (va_start - pd.Timedelta(days=gap_days)))

        tr_idx = d.index[tr_mask].to_numpy()
        va_idx = d.index[va_mask].to_numpy()
        if len(tr_idx) == 0 or len(va_idx) == 0:
            continue

        folds.append((tr_idx, va_idx))
        if len(folds) >= n_splits:
            break

    # final fallback: last 20%
    if not folds:
        cut = int(len(d) * 0.8)
        folds = [(d.index[:cut].to_numpy(), d.index[cut:].to_numpy())]
    return folds

folds = make_time_folds(df, n_splits=N_SPLITS, gap_days=GAP_DAYS)

print("\nFolds:")
for i, (tr, va) in enumerate(folds):
    dtr = (df.loc[tr, "tanggal"].min(), df.loc[tr, "tanggal"].max())
    dva = (df.loc[va, "tanggal"].min(), df.loc[va, "tanggal"].max())
    print(f"fold{i}: train={len(tr)} [{dtr[0]}..{dtr[1]}] | valid={len(va)} [{dva[0]}..{dva[1]}]")

# ----------------------------
# Train CV (CPU only) + OOF (VALID-ONLY)
# ----------------------------
K = len(classes)
oof_proba = np.zeros((len(df), K), dtype=np.float32)
oof_valid_mask = np.zeros(len(df), dtype=bool)

models = []
fold_scores = []

for seed in SEEDS:
    print(f"\n=== SEED {seed} | task_type=CPU ===")
    for fi, (tr_idx, va_idx) in enumerate(folds):
        X_tr, y_tr = X.iloc[tr_idx], y.iloc[tr_idx]
        X_va, y_va = X.iloc[va_idx], y.iloc[va_idx]

        train_pool = Pool(X_tr, y_tr, cat_features=cat_cols)
        valid_pool = Pool(X_va, y_va, cat_features=cat_cols)

        model = CatBoostClassifier(
            loss_function="MultiClass",
            eval_metric="TotalF1",
            classes_count=K,
            class_weights=class_weights,
            iterations=ITERATIONS,
            learning_rate=LR,
            depth=DEPTH,
            l2_leaf_reg=L2,
            random_strength=1.0,
            bagging_temperature=0.5,
            border_count=128,
            random_seed=seed,
            od_type="Iter",
            od_wait=400,
            task_type="CPU",
            thread_count=-1,
            verbose=250
        )

        model.fit(train_pool, eval_set=valid_pool, use_best_model=True)

        proba = model.predict_proba(X_va)
        pred_int = np.argmax(proba, axis=1)
        score = f1_score(y_va, pred_int, average="macro")

        # fill OOF only for valid rows
        oof_proba[va_idx] += (proba / len(SEEDS))
        oof_valid_mask[va_idx] = True

        fold_scores.append(float(score))
        models.append(model)

        print(f"[seed {seed} fold {fi}] macroF1={score:.5f} | best_iter={model.get_best_iteration()}")

# ----------------------------
# OOF summary (VALID ONLY, IMPORTANT)
# ----------------------------
oof_valid_idx = np.where(oof_valid_mask)[0]
if len(oof_valid_idx) == 0:
    raise RuntimeError("No OOF validation rows were filled. Check folds logic.")

oof_pred_int = np.argmax(oof_proba[oof_valid_idx], axis=1)
oof_pred = np.array([id_to_class[i] for i in oof_pred_int])

oof_macro_f1 = f1_score(y_str.iloc[oof_valid_idx], oof_pred, average="macro")

print("\n=== OOF RESULTS (VALID ONLY) ===")
print("Fold macroF1:", [round(s, 5) for s in fold_scores])
print("OOF macroF1 :", round(oof_macro_f1, 6))
print(f"OOF covered rows: {len(oof_valid_idx)} / {len(df)}")

print("\nOOF classification report (valid-only):")
print(classification_report(y_str.iloc[oof_valid_idx], oof_pred, digits=4))

# Inference, Ensembling, Submission & QA