## 01. Library import

**Purpose**: IO/compute stack, deterministic seed, clean logs & version snapshot.

In [10]:
import os
import sys
import json
import time
import gc
import warnings
from pathlib import Path
from typing import List, Dict, Any

import numpy as np
import pandas as pd
from pandas.api.types import is_bool_dtype, is_integer_dtype, is_float_dtype

import pyarrow as pa
import pyarrow.parquet as pq

# Reproducibility
SEED = 42
np.random.seed(SEED)

# Clean logs
warnings.filterwarnings("ignore", category=FutureWarning, module="pyarrow")

# Pandas display (for debugging)
pd.set_option("display.max_columns", 200)
pd.set_option("display.width", 120)

# Versions snapshot
def lib_versions() -> Dict[str, str]:
    return {
        "pandas": pd.__version__,
        "numpy": np.__version__,
        "pyarrow": pa.__version__,
    }

print(f"pandas: {pd.__version__} | numpy: {np.__version__} | pyarrow: {pa.__version__}")


pandas: 2.2.2 | numpy: 1.26.4 | pyarrow: 21.0.0


## 02. Project paths

**Purpose**: Define input/output folders and constants.

In [11]:
# Project structure
PROJECT_ROOT   = Path.cwd().parent if Path.cwd().name == "notebooks" else Path.cwd()
PROJECT_DIR    = PROJECT_ROOT / "notebooks"

# Data & artifacts
DATA_DIR       = (PROJECT_ROOT / "data" / "train-data").resolve()   # all raw parquet shards live here
ARTIFACTS_DIR  = (PROJECT_ROOT / "artifacts").resolve()
REPORTS_DIR    = (PROJECT_ROOT / "reports").resolve()
SRC_DIR        = (PROJECT_ROOT / "src").resolve()

# Add src/ to sys.path for imports
if str(SRC_DIR) not in sys.path:
    sys.path.insert(0, str(SRC_DIR))

# Ensure output dirs exist
for p in [ARTIFACTS_DIR, REPORTS_DIR]:
    p.mkdir(parents=True, exist_ok=True)

# Main artifacts
FINAL_DS_PATH = ARTIFACTS_DIR / "final_dataset.parquet"   # single-file parquet
TARGET_CSV    = DATA_DIR / "train_target.csv"

# Sanity checks
if not DATA_DIR.exists():
    raise FileNotFoundError(f"DATA_DIR not found: {DATA_DIR}")
if not TARGET_CSV.exists():
    raise FileNotFoundError(f"Target CSV not found: {TARGET_CSV}")

print(f"DATA_DIR={DATA_DIR}")
print(f"ARTIFACTS_DIR={ARTIFACTS_DIR}")
print(f"REPORTS_DIR={REPORTS_DIR}")

DATA_DIR=D:\final_v2\credit-risk-management\data\train-data
ARTIFACTS_DIR=D:\final_v2\credit-risk-management\artifacts
REPORTS_DIR=D:\final_v2\credit-risk-management\reports


## 03.1 Global dtype rules

**Purpose**:  combine explicit casts from features.py with prefix-based heuristics.

In [12]:
DTYPE_RULES: dict[str, list[str]] = {
    # --- integers (counters, durations, amounts) ---
    "Int32": [
        "pre_since_opened", "pre_since_confirmed",
        "pre_pterm", "pre_fterm",
        "pre_till_pclose", "pre_till_fclose",
        "pre_loans_credit_limit", "pre_loans_next_pay_summ",
        "pre_loans_outstanding", "pre_loans_total_overdue",
        "pre_loans_max_overdue_sum", "pre_loans_credit_cost_rate",
        "pre_loans5", "pre_loans530", "pre_loans3060",
        "pre_loans6090", "pre_loans90",
    ],

    # --- ratios (float signals) ---
    "Float32": [
        "pre_util", "pre_over2limit", "pre_maxover2limit",
        "debt_to_limit", "overdue_to_limit", "maxoverdue_to_limit",
        "loan_term_ratio", "since_ratio", "till_close_gap",
        "serious_delay_ratio", "paym_good_count", "paym_bad_count",
        "paym_last_status",  # added in features.py
    ],

    # --- binary flags ---
    "UInt8": [
        "is_zero_loans5", "is_zero_loans530", "is_zero_loans3060",
        "is_zero_loans6090", "is_zero_loans90",
        "is_zero_util", "is_zero_over2limit", "is_zero_maxover2limit",
        "pclose_flag", "fclose_flag",
        # all OHE dummies will also be uint8
    ],

    # --- small encoded categoricals ---
    "Int16": [
        "enc_loans_account_holder_type",
        "enc_loans_credit_status",
        "enc_loans_account_cur",
        "enc_loans_credit_type",
        "paym_last_clean_streak",  # added in features.py
        # enc_paym_* will be appended dynamically
    ],
}

def extend_dtype_rules_with_enc_paym(existing_cols: list[str]) -> None:
    """Append all enc_paym_* columns present in data to Int16 rules."""
    paym_cols = sorted([c for c in existing_cols if c.startswith("enc_paym_")])
    already = set(DTYPE_RULES.get("Int16", []))
    for c in paym_cols:
        if c not in already:
            DTYPE_RULES["Int16"].append(c)
            already.add(c)

## 03.2 Helpers — ID normalizer & dtype harmonizer

**Purpose**: 

> Enforce consistent id column (string[pyarrow]).

> Prevent accidental use of 'rn' unless explicitly allowed.

> Harmonize dtypes to be Arrow-friendly (avoid ArrowTypeError).

In [13]:
def ensure_id_column(
    df: pd.DataFrame,
    allow_rn_as_id: bool = False,
    source: str | None = None
) -> None:
    """Ensure df has 'id' column as string[pyarrow]."""
    if "id" in df.columns:
        df["id"] = df["id"].astype("string")
        return

    for c in ["ID", "Id", "client_id", "customer_id", "loan_id", "account_id", "user_id", "uid"]:
        if c in df.columns:
            df["id"] = df[c].astype("string")
            return

    if allow_rn_as_id and "rn" in df.columns:
        df["id"] = df["rn"].astype("string")
        df["_id_from_rn"] = True
        print(f"Using rn as id in {source or '<df>'}. This may cause collisions/leakage.")
        return

    raise KeyError(
        f"Missing 'id' in {source or '<df>'}. Columns: {list(df.columns)}. "
        "Make sure READ_COLUMNS includes 'id'."
    )


def harmonize_dtypes(
    df: pd.DataFrame,
    dtype_rules: dict[str, list[str]] | None = None
) -> pd.DataFrame:
    """
    Convert dtypes to Arrow-friendly formats:
    - id → string[pyarrow]
    - bool → UInt8
    - int → nullable Int32/Int64
    - object → string[pyarrow]
    """
    if dtype_rules is None:
        dtype_rules = DTYPE_RULES

    managed: set[str] = {"id"}
    for dt, cols in dtype_rules.items():
        managed.update(cols)

    if "id" in df.columns:
        try:
            df["id"] = df["id"].astype("string[pyarrow]")
        except Exception:
            df["id"] = df["id"].astype("string")

    for col in (c for c in df.columns if c not in managed):
        s = df[col]

        if is_bool_dtype(s):
            df[col] = s.astype("UInt8")
            continue

        if is_integer_dtype(s):
            try:
                m, M = s.min(), s.max()
                if (
                    pd.api.types.is_integer(m) and 
                    pd.api.types.is_integer(M) and 
                    -2**31 <= m < 2**31 and -2**31 <= M < 2**31
                ):
                    df[col] = pd.to_numeric(s, errors="coerce").astype("Int32")
                else:
                    df[col] = pd.to_numeric(s, errors="coerce").astype("Int64")
            except Exception:
                df[col] = pd.to_numeric(s, errors="coerce").astype("Int32")
            continue

        if df[col].dtype == "object":
            try:
                df[col] = s.astype("string[pyarrow]")
            except Exception:
                df[col] = s.astype("string")

    return df

## 04. Load target CSV

**Purpose**: Read target once, normalize id dtype, keep column name as 'target', quick sanity checks.

In [14]:
target = pd.read_csv(
    TARGET_CSV,
    dtype={"id": "string", "flag": "int8"}  # enforce stable dtypes on load
)

# Expect ['id','flag'] in file
assert {"id", "flag"} <= set(target.columns), \
    f"train_target.csv must contain ['id','flag'], got {list(target.columns)}"

# Normalize to 'target'
target = target.rename(columns={"flag": "target"})
target = target.set_index("id")

# Sanity checks
pos_rate = float(target["target"].mean())
print(f"Target shape: {target.shape} | positive rate: {pos_rate:.4f}")

Target shape: (3000000, 1) | positive rate: 0.0355


## 05. Import feature pipeline

**Purpose**: Reuse the exact same FeatureGenerator from src/utils/features.py as in notebook 02.

In [15]:
from utils.features import FeatureGenerator, FeatureConfig

fg = FeatureGenerator(FeatureConfig())

## 05.1 FeatureGenerator contract check

**Purpose**: Ensure FG is stateless and does not require .fit().

In [16]:
assert hasattr(fg, "transform") and callable(fg.transform), "FeatureGenerator must implement .transform(df)"

## 06. Locate raw parquet shards

**Purpose**: Collect and sort all source parquet files to stream through.

In [18]:
parquet_files: List[Path] = sorted(DATA_DIR.glob("*.pq"))
assert parquet_files, "No parquet shards found in data/"

print("Found shards:", len(parquet_files))
parquet_files[:5]

Found shards: 12


[WindowsPath('D:/final_v2/credit-risk-management/data/train-data/train_data_0.pq'),
 WindowsPath('D:/final_v2/credit-risk-management/data/train-data/train_data_1.pq'),
 WindowsPath('D:/final_v2/credit-risk-management/data/train-data/train_data_10.pq'),
 WindowsPath('D:/final_v2/credit-risk-management/data/train-data/train_data_11.pq'),
 WindowsPath('D:/final_v2/credit-risk-management/data/train-data/train_data_2.pq')]

## 07. Choose input columns

**Purpose**: Auto-build a minimal column whitelist from shard schema

In [19]:
AUTO_BUILD_WHITELIST = True

if AUTO_BUILD_WHITELIST:
    # peek schema from first shard
    schema_cols = set(pq.ParquetFile(parquet_files[0]).schema.names)
    cfg = FeatureConfig()  # from utils.features

    # explicitly required columns
    needed_exact = {
        "id", "rn",
        "pre_loans_credit_limit", "pre_loans_outstanding", "pre_loans_total_overdue", "pre_loans_max_overdue_sum",
        "pre_pterm", "pre_fterm", "pre_since_opened", "pre_since_confirmed", "pre_till_pclose", "pre_till_fclose",
        "pre_loans5", "pre_loans530", "pre_loans3060", "pre_loans6090", "pre_loans90",
        *cfg.cat_cols,  # categorical inputs for OHE
    }

    # dynamic families
    prefixes = ["enc_paym_"]

    # filter available cols
    whitelist = {c for c in needed_exact if c in schema_cols}
    for p in prefixes:
        whitelist |= {c for c in schema_cols if c.startswith(p)}

    READ_COLUMNS = sorted(whitelist)

    # save whitelist artifact
    WHITELIST_JSON = ARTIFACTS_DIR / "feature_whitelist.json"
    with open(WHITELIST_JSON, "w", encoding="utf-8") as f:
        json.dump(READ_COLUMNS, f, indent=2)

    print(f"Column whitelist built: {len(READ_COLUMNS)} columns → {WHITELIST_JSON}")
else:
    READ_COLUMNS = None
    print("READ_COLUMNS=None → all columns will be read")

Column whitelist built: 42 columns → D:\final_v2\credit-risk-management\artifacts\feature_whitelist.json


## 08. Parquet writer setup + options

**Purpose**: Initialize a single-file parquet writer with schema from

the first processed batch and set IO performance options.

In [20]:
# Writer state
writer = None
schema = None
written_rows = 0
matched_labels = 0
batches = 0
t0 = time.time()

# Writer options
WRITE_COMPRESSION = "snappy"   # fast & efficient
ROW_GROUP_SIZE   = 1_000_000
USE_DICTIONARY   = True      # good for strings/low-cardinality ints

## 09. Sanity: verify whitelist against all shards

**Purpose**: make sure every shard has every column from READ_COLUMNS; list any missing.

In [21]:
problems = {}
for fp in parquet_files:
    cols = set(pq.ParquetFile(fp).schema.names)
    miss = [c for c in READ_COLUMNS if c not in cols]
    if miss:
        problems[fp.name] = miss

if not problems:
    print("Whitelist OK across all shards.")
else:
    print("Missing columns detected:")
    for fn, miss in problems.items():
        print(f" - {fn}: {miss}")

Whitelist OK across all shards.


## 10. Build last-rn map (light pre-pass)

**Purpose**: Scan only ['id','rn'] across all shards to compute the latest rn per id (cheap, 2nd pass uses it to filter)

In [22]:
id_last_rn: dict[str, int] = {}
t0 = time.time()

for i, fp in enumerate(parquet_files, 1):
    pf = pq.ParquetFile(fp)

    # iterate in large row groups to keep IO efficient
    for batch in pf.iter_batches(columns=["id", "rn"], batch_size=1_000_000):
        # convert to pandas; keep ids as strings for stability
        try:
            b = batch.to_pandas(types_mapper=pd.ArrowDtype)  # pandas >=1.5
        except Exception:
            b = batch.to_pandas()

        # normalize dtypes
        b["id"] = b["id"].astype("string")
        b["rn"] = pd.to_numeric(b["rn"], errors="coerce").astype("Int64")

        # per-batch max rn per id
        mx = b.groupby("id", observed=True)["rn"].max()

        # merge into global map (keep the largest rn seen so far)
        for k, v in mx.items():
            if pd.isna(v):
                continue
            cur = id_last_rn.get(k)
            if cur is None or int(v) > cur:
                id_last_rn[k] = int(v)

    if i % 5 == 0:
        print(f"[map {i}/{len(parquet_files)}] ids so far = {len(id_last_rn):,}")

print(f"Built id→last_rn map for {len(id_last_rn):,} ids in {time.time() - t0:.1f}s")

[map 5/12] ids so far = 1,250,000
[map 10/12] ids so far = 2,500,000
Built id→last_rn map for 3,000,000 ids in 23.8s


## 11. Streaming loop (client-level, with dtype harmonization)

**Purpose**: Read whitelisted cols → keep only (id, rn==last_rn[id]) → feature-engineer → join labels → enforce dtypes → write.

In [25]:
from utils import read_parquet_safe

# Ensure whitelist contains critical keys
if READ_COLUMNS is not None:
    if "id" not in READ_COLUMNS: READ_COLUMNS.append("id")
    if "rn" not in READ_COLUMNS: READ_COLUMNS.append("rn")

# Target mapping (nullable to allow missing)
target_map = target["target"]            # index is 'id' (string)
TARGET_DTYPE = "Int8"                    # nullable int (not plain int8)

# If a pre-pass already built id_last_rn, reuse it. Otherwise build it quickly here.
if "id_last_rn" not in globals() or not id_last_rn:
    id_last_rn = {}
    t0 = time.time()
    for i, fp in enumerate(parquet_files, 1):
        df_keys = read_parquet_safe(fp, columns=["id", "rn"])
        ensure_id_column(df_keys, source=getattr(fp, "name", str(fp)))
        if "rn" not in df_keys.columns:
            raise KeyError(f"Missing 'rn' in shard {getattr(fp, 'name', str(fp))}.")
        df_keys["id"] = df_keys["id"].astype("string")
        df_keys["rn"] = pd.to_numeric(df_keys["rn"], errors="coerce").astype("Int32")
        mx = df_keys.groupby("id", observed=True)["rn"].max()
        for k, v in mx.items():
            if pd.isna(v): 
                continue
            cur = id_last_rn.get(k)
            if cur is None or int(v) > cur:
                id_last_rn[k] = int(v)
        if i % 5 == 0:
            print(f"[map {i}/{len(parquet_files)}] ids so far = {len(id_last_rn):,}")
    print(f"Built last-rn map for {len(id_last_rn):,} ids in {time.time()-t0:.1f}s")

# Writer/init state
writer = None
all_cols: list[str] | None = None
written_rows = matched_labels = batches = 0
t0 = time.time()

# Helper: pick pandas dtype string from DTYPE_RULES
def _rule_dtype_of(col: str) -> str:
    for dt, cols in DTYPE_RULES.items():
        if col in cols:
            return {"Int32":"Int32", "Int16":"Int16", "UInt8":"UInt8", "Float32":"float32"}.get(dt, "float32")
    return "float32"

for i, fp in enumerate(parquet_files, 1):
    # Read & normalize
    df = read_parquet_safe(fp, columns=READ_COLUMNS)
    if df.empty:
        continue
    ensure_id_column(df, source=getattr(fp, "name", str(fp)))
    if "rn" not in df.columns:
        raise KeyError(f"Missing 'rn' in shard {getattr(fp, 'name', str(fp))}.")
    df["id"] = df["id"].astype("string")
    df["rn"] = pd.to_numeric(df["rn"], errors="coerce").astype("Int32")

    # Keep only last (id, rn)
    df["_last_rn"] = df["id"].map(id_last_rn)
    keep = df["rn"].eq(df["_last_rn"])
    if not keep.any():
        continue
    df = df.loc[keep].drop(columns=["_last_rn"])

    # Tie-break if duplicates with same (id, rn)
    if df.duplicated(subset=["id", "rn"]).any():
        sec = [c for c in ["pre_since_opened", "pre_since_confirmed", "pre_fterm"] if c in df.columns]
        df = df.sort_values(["id", "rn"] + sec).drop_duplicates(["id", "rn"], keep="last")

    # On the first batch, extend dtype rules with actual enc_paym_* present
    if all_cols is None:
        extend_dtype_rules_with_enc_paym(df.columns)

    # Feature engineering (our FG expects (X, paym); pass df twice)
    feat = fg.transform(df, df)
    feat["id"] = feat["id"].astype("string")

    # Join labels (nullable int to allow NaNs for OOS ids)
    feat["target"] = feat["id"].map(target_map).astype(TARGET_DTYPE)

    # Enforce Arrow-friendly dtypes
    feat = harmonize_dtypes(feat, dtype_rules=DTYPE_RULES)

    # Union schema across batches (id first)
    if all_cols is None:
        all_cols = ["id"] + [c for c in feat.columns if c != "id"]
    else:
        for c in feat.columns:
            if c not in all_cols:
                all_cols.append(c)
    # Ensure every expected column exists in this batch
    for c in all_cols:
        if c not in feat.columns:
            feat[c] = pd.Series(pd.NA, index=feat.index).astype(_rule_dtype_of(c))

    merged = feat[all_cols]

    # Write (single-file Parquet with chunked row groups)
    table = pa.Table.from_pandas(merged, preserve_index=False)
    if writer is None:
        if FINAL_DS_PATH.exists():
            FINAL_DS_PATH.unlink()
        writer = pq.ParquetWriter(
            FINAL_DS_PATH,
            schema=table.schema,
            compression=WRITE_COMPRESSION,
            use_dictionary=USE_DICTIONARY,
        )

    if ROW_GROUP_SIZE and len(merged) > ROW_GROUP_SIZE:
        # slice in row-group chunks
        for start in range(0, len(merged), ROW_GROUP_SIZE):
            writer.write_table(table.slice(start, min(ROW_GROUP_SIZE, len(merged) - start)))
    else:
        writer.write_table(table)

    written_rows   += len(merged)
    matched_labels += int(merged["target"].notna().sum())
    batches += 1

    if batches % 5 == 0:
        gc.collect()
        print(f"[{batches}/{len(parquet_files)}] rows={written_rows:,} matched={matched_labels:,}")

if writer is not None:
    writer.close()

print(f"Done (client-level). Rows: {written_rows:,} | matched: {matched_labels:,} | time: {time.time()-t0:.1f}s")

[5/12] rows=1,250,000 matched=1,250,000
[10/12] rows=2,500,000 matched=2,500,000
Done (client-level). Rows: 3,000,000 | matched: 3,000,000 | time: 907.0s


## 12.1 Post checks

**Purpose**: Read back a small sample to verify schema; compute key stats for the report.

**Note**: `reading the full file may be heavy; sample a few row groups instead.`

In [26]:
sample = pd.read_parquet(FINAL_DS_PATH, engine="pyarrow", columns=["id","target"])
n_total = int(sample.shape[0])
n_labeled = int(sample["target"].notna().sum())
miss_rate = 1.0 - (n_labeled / max(n_total, 1))
print({"rows_total": n_total, "rows_with_target": n_labeled, "missing_target_rate": round(miss_rate, 4)})

{'rows_total': 3000000, 'rows_with_target': 3000000, 'missing_target_rate': 0.0}


## 12.2 Post-checks (extended)

**Purpose**: Validate schema, duplicates, label coverage, and basic NaN profile.

In [27]:
pf = pq.ParquetFile(FINAL_DS_PATH)
meta_cols = pf.schema_arrow.names
print("Final columns:", len(meta_cols))

sample = pd.read_parquet(FINAL_DS_PATH, engine="pyarrow")
n_total    = int(sample.shape[0])
n_labeled  = int(sample["target"].notna().sum()) if "target" in sample.columns else 0
miss_rate  = 1.0 - (n_labeled / max(n_total, 1))

dup_ids = int(sample["id"].duplicated().sum()) if "id" in sample.columns else -1
label_coverage = n_labeled / max(len(target), 1)

nan_share = sample.isna().mean().sort_values(ascending=False).head(10)

print({
    "rows_total": n_total,
    "rows_with_target": n_labeled,
    "missing_target_rate": round(miss_rate,4),
    "duplicate_ids": dup_ids,
    "label_coverage_vs_target_csv": round(float(label_coverage), 4)
})
print("Top-10 NaN share:\n", nan_share)

Final columns: 45
{'rows_total': 3000000, 'rows_with_target': 3000000, 'missing_target_rate': 0.0, 'duplicate_ids': 0, 'label_coverage_vs_target_csv': 1.0}
Top-10 NaN share:
 id                        0.0
enc_paym_7                0.0
enc_paym_9                0.0
pre_fterm                 0.0
pre_loans3060             0.0
pre_loans5                0.0
pre_loans530              0.0
pre_loans6090             0.0
pre_loans90               0.0
pre_loans_credit_limit    0.0
dtype: float64


## 13.1 Report JSON

**Purpose**: Persist assembly metrics for reproducibility (counts, timing, file list).

In [28]:
report: Dict[str, Any] = {
    "n_files": len(parquet_files),
    "rows_written": int(written_rows),
    "labels_matched": int(matched_labels),
    "final_dataset_path": str(FINAL_DS_PATH),
    "created_at": time.strftime("%Y-%m-%d %H:%M:%S"),
    "notes": "Built by streaming all shards, applying FeatureGenerator and left-joining train_target.csv on 'id'.",
}
REPORT_JSON = REPORTS_DIR / "03_final_dataset_report.json"
with open(REPORT_JSON, "w", encoding="utf-8") as f:
    json.dump(report, f, ensure_ascii=False, indent=2)
print("Saved:", REPORT_JSON)

Saved: D:\final_v2\credit-risk-management\reports\03_final_dataset_report.json


## 13.2 Enrich report

**Purpose**: Persist schema, file list, label coverage , and FeatureGenerator config.

In [29]:
pf = pq.ParquetFile(FINAL_DS_PATH)
meta_cols = pf.schema_arrow.names

# label_coverage may be computed in your extended post-check; fall back to None
try:
    _ = label_coverage
except NameError:
    label_coverage = None  # will be written as null in JSON

report.update({
    "parquet_files": [str(p) for p in parquet_files],
    "schema_columns": meta_cols,
    "label_coverage_vs_target_csv": (float(label_coverage) if label_coverage is not None else None),
    "feature_config": (getattr(fg, "cfg", None).__dict__ if hasattr(fg, "cfg") else None),
})

# also persist schema columns to a txt for quick diffing in git
with open(ARTIFACTS_DIR / "final_dataset_schema.txt", "w", encoding="utf-8") as f:
    f.write("\n".join(meta_cols))

# rewrite the report with enriched fields
with open(REPORT_JSON, "w", encoding="utf-8") as f:
    json.dump(report, f, ensure_ascii=False, indent=2)

print("Saved (enriched):", REPORT_JSON, "| schema →", ARTIFACTS_DIR / "final_dataset_schema.txt")

Saved (enriched): D:\final_v2\credit-risk-management\reports\03_final_dataset_report.json | schema → D:\final_v2\credit-risk-management\artifacts\final_dataset_schema.txt


## 14. Audit: cardinalities

**Purpose**: Quick sanity checks — total rows vs unique ids and label coverage.

In [30]:
pf = pq.ParquetFile(FINAL_DS_PATH)
total_rows = pf.metadata.num_rows

# Count unique ids by scanning only the 'id' column.
unique_ids = set()
for rb in pf.iter_batches(columns=["id"], batch_size=1_000_000):
    unique_ids.update(rb.column(0).to_pylist())
n_ids = len(unique_ids)

rows_per_id_avg = total_rows / max(n_ids, 1)
y_ids = set(target.index)
id_coverage = len(unique_ids & y_ids) / max(n_ids, 1)

print({
    "total_rows": total_rows,
    "unique_ids": n_ids,
    "rows_per_id_avg": round(rows_per_id_avg, 3),
    "id_coverage": round(id_coverage, 4)
})

{'total_rows': 3000000, 'unique_ids': 3000000, 'rows_per_id_avg': 1.0, 'id_coverage': 1.0}


## 15. Guidance

**Purpose**: Explain next steps for modeling notebooks (04).

In [31]:
print(
    "Next → 04_modeling_baseline: split train/test (use only rows with target),\n"
    "train baseline models (LogReg/RF/LGBM/XGB), measure ROC-AUC ≥ 0.75. "
    "Keep unlabeled rows for later inference if needed."
)

Next → 04_modeling_baseline: split train/test (use only rows with target),
train baseline models (LogReg/RF/LGBM/XGB), measure ROC-AUC ≥ 0.75. Keep unlabeled rows for later inference if needed.
