

># Generate a **YAML** version of the same schema (some teams prefer human-editable YAML in `config/`), plus a helper to **validate** that the current dataset still matches the schema before training.

Awesome — here’s a **YAML-first** setup plus a **validator/helper** so your preprocessing is driven by a human-readable config.

---

# 1) `config/feature_schema.yaml`

```yaml
# Telco Churn — feature schema & strategies
version: 1
target: Churn_flag

binary:
  columns:
    - SeniorCitizen
    - Churn_flag
  encode: passthrough          # keep as 0/1
  dtype: int8                  # optional: cast in preprocessing
  missing: most_frequent

continuous:
  columns:
    - tenure
    - MonthlyCharges
    - TotalCharges
  default_scale: StandardScaler
  missing: median

categorical:
  columns:
    - gender
    - Partner
    - Dependents
    - PhoneService
    - MultipleLines
    - InternetService
    - OnlineSecurity
    - OnlineBackup
    - DeviceProtection
    - TechSupport
    - StreamingTV
    - StreamingMovies
    - Contract
    - PaperlessBilling
    - PaymentMethod
  encode: OneHotEncoder
  missing: most_frequent
  onehot_params:
    handle_unknown: ignore
    sparse_output: false

# Column-level overrides (take precedence over defaults)
overrides:
  TotalCharges:
    scale: QuantileTransformer
    params:
      output_distribution: normal
  MonthlyCharges:
    scale: RobustScaler
```

> Edit the lists to match your dataset if columns differ.

---

# 2) Validator + Preprocessor Builder (`src/pipeline/schema_loader.py`)

```python
from __future__ import annotations
import json
from dataclasses import dataclass
from typing import Dict, Any, List

import numpy as np
import pandas as pd
import yaml
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, StandardScaler, RobustScaler, QuantileTransformer


# ---------- Loader ----------

def load_schema(path: str) -> Dict[str, Any]:
    with open(path, "r") as f:
        return yaml.safe_load(f)


# ---------- Validator ----------

@dataclass
class SchemaReport:
    missing_columns: List[str]
    unexpected_columns: List[str]
    binary_not_two_unique: List[str]
    notes: List[str]

    def ok(self) -> bool:
        return not (self.missing_columns or self.binary_not_two_unique)

def validate_dataset_against_schema(df: pd.DataFrame, schema: Dict[str, Any]) -> SchemaReport:
    tgt = schema["target"]
    groups = ["binary", "continuous", "categorical"]
    expected_cols = set([tgt])
    for g in groups:
        expected_cols.update(schema.get(g, {}).get("columns", []))

    missing = sorted(col for col in expected_cols if col not in df.columns)
    unexpected = sorted(col for col in df.columns if col not in expected_cols)

    binary_cols = schema.get("binary", {}).get("columns", [])
    binary_not_two = []
    for col in binary_cols:
        if col in df.columns:
            nunq = df[col].dropna().nunique()
            if nunq != 2:
                binary_not_two.append(f"{col} (nunique={nunq})")

    notes = []
    if tgt in df.columns and df[tgt].dropna().nunique() != 2:
        notes.append(f"Target {tgt} has nunique={df[tgt].dropna().nunique()} (expected 2)")

    return SchemaReport(missing_columns=missing,
                        unexpected_columns=unexpected,
                        binary_not_two_unique=binary_not_two,
                        notes=notes)


# ---------- Preprocessor Builder ----------

def _scaler_from_name(name: str, params: Dict[str, Any] | None = None):
    params = params or {}
    if name == "StandardScaler":
        return StandardScaler()
    if name == "RobustScaler":
        return RobustScaler()
    if name == "QuantileTransformer":
        return QuantileTransformer(**({"output_distribution": "normal"} | params))
    raise ValueError(f"Unknown scaler: {name}")

def build_preprocessor_from_schema(schema: Dict[str, Any]) -> ColumnTransformer:
    bin_spec = schema.get("binary", {})
    cont_spec = schema.get("continuous", {})
    cat_spec = schema.get("categorical", {})
    overrides = schema.get("overrides", {})

    bin_cols = bin_spec.get("columns", [])
    cont_cols = cont_spec.get("columns", [])
    cat_cols = cat_spec.get("columns", [])

    # Binary: impute then passthrough
    bin_pipe = Pipeline(steps=[
        ("impute", SimpleImputer(strategy=bin_spec.get("missing", "most_frequent"))),
        ("passthrough", "passthrough")
    ])

    # Continuous: per-column scaling (allows overrides)
    cont_transformers = []
    for c in cont_cols:
        ov = overrides.get(c, {})
        scaler_name = ov.get("scale", cont_spec.get("default_scale", "StandardScaler"))
        scaler = _scaler_from_name(scaler_name, ov.get("params"))
        cont_transformers.append((
            f"cont__{c}",
            Pipeline(steps=[
                ("impute", SimpleImputer(strategy=cont_spec.get("missing", "median"))),
                ("scale", scaler)
            ]),
            [c]
        ))

    # Categorical: impute + onehot
    oh_params = cat_spec.get("onehot_params", {"handle_unknown": "ignore", "sparse_output": False})
    cat_pipe = Pipeline(steps=[
        ("impute", SimpleImputer(strategy=cat_spec.get("missing", "most_frequent"))),
        ("onehot", OneHotEncoder(**oh_params))
    ])

    pre = ColumnTransformer(
        transformers=[
            ("binary", bin_pipe, bin_cols),
            ("categorical", cat_pipe, cat_cols),
            *cont_transformers
        ],
        remainder="drop"
    )
    return pre
```

---

# 3) Usage Example in Modeling Notebook

```python
from sklearn.linear_model import LogisticRegression
from sklearn.pipeline import Pipeline

# Load data (ensure Churn_flag exists)
# df = ...

# Load + validate
schema = load_schema("config/feature_schema.yaml")
report = validate_dataset_against_schema(df, schema)

if not report.ok():
    print("❌ Schema validation issues:")
    if report.missing_columns:   print("  - Missing:", report.missing_columns)
    if report.binary_not_two_unique: print("  - Binary not 2 unique:", report.binary_not_two_unique)
    if report.notes:             print("  - Notes:", report.notes)
    # decide: raise or continue
    # raise ValueError("Schema validation failed.")

print("✅ Schema looks good.")

# Build preprocessor
pre = build_preprocessor_from_schema(schema)

X = df.drop(columns=[schema["target"]])
y = df[schema["target"]]

clf = Pipeline(steps=[
    ("pre", pre),
    ("model", LogisticRegression(max_iter=1000, class_weight="balanced"))
])

clf.fit(X, y)
print("✅ Model trained.")
```

---

## Pro tips

* Keep the YAML as your **single source of truth**; update overrides as EDA reveals skew/heavy tails.
* Pin schema versions (`version:`) and save `describe().T`/`skew`/`kurtosis` snapshots to `reports/` for auditability.
* If you later add **imputation rules per column** (e.g., specific medians), add them under `overrides:<col>:impute_value` and swap `SimpleImputer` to `strategy="constant"` with that value.
