In [1]:
#!/usr/bin/env python3
from __future__ import annotations
import argparse
from pathlib import Path
from typing import List, Dict
from collections import Counter
import json
import numpy as np
import pandas as pd
from sklearn.preprocessing import LabelEncoder, StandardScaler
from imblearn.over_sampling import SMOTE, RandomOverSampler
from imblearn.under_sampling import RandomUnderSampler

NUMERIC_PREF: List[str] = [
    "Age","Bilirubin","Cholesterol","Albumin","Copper","Alk_Phos",
    "SGOT","Tryglicerides","Platelets","Prothrombin",
]
BINARY_COLS: List[str] = ["Sex","Ascites","Hepatomegaly","Spiders","Edema"]
TARGET_CANDIDATES = ["Status","status","Target","target","Class","class","Outcome","outcome"]
RANDOM_STATE = 42

def find_raw_path(cli_raw: str | None) -> Path:
    if cli_raw:
        p = Path(cli_raw)
        if p.exists() and p.stat().st_size > 0:
            return p
    for c in [
        "liver_cirrhosis.csv",
        "../data/raw/liver_cirrhosis.csv",
        "../Data/raw/liver_cirrhosis.csv",
        "../data/external/raw/liver_cirrhosis.csv",
        "../Data/external/raw/liver_cirrhosis.csv",
        "../../Data/external/raw/liver_cirrhosis.csv",
    ]:
        p = Path(c)
        if p.exists() and p.stat().st_size > 0:
            return p
    raise FileNotFoundError("raw csv not found; pass --raw")

def pick_target(df: pd.DataFrame, cli_target: str | None) -> str:
    if cli_target:
        if cli_target in df.columns:
            return cli_target
        raise KeyError(f"target '{cli_target}' not in columns")
    for t in TARGET_CANDIDATES:
        if t in df.columns:
            return t
    raise KeyError("target column not found")

def iqr_mask(s: pd.Series, k: float = 1.5) -> pd.Series:
    s = pd.to_numeric(s, errors="coerce")
    q1, q3 = s.quantile(0.25), s.quantile(0.75)
    iqr = q3 - q1
    return (s >= q1 - k*iqr) & (s <= q3 + k*iqr)

def safe_numeric(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()
    for c in df.columns:
        if df[c].dtype == bool:
            df[c] = df[c].astype(int)
    df = df.apply(pd.to_numeric, errors="coerce")
    df = df.replace([np.inf, -np.inf], np.nan)
    df = df.fillna(df.median(numeric_only=True))
    df = df.fillna(0)
    return df.astype(np.float32)

def balanced_resize(X: np.ndarray, y: np.ndarray, target_rows: int, random_state: int = 42) -> tuple[np.ndarray, np.ndarray]:
    counts = Counter(y)
    classes = sorted(counts.keys())
    k = len(classes)
    base = target_rows // k
    rem = target_rows % k
    desired: Dict[int, int] = {c: base + (1 if i < rem else 0) for i, c in enumerate(classes)}
    current_total = len(y)
    if current_total == target_rows and all(counts[c] == desired[c] for c in classes):
        return X, y
    if current_total < target_rows:
        ros = RandomOverSampler(sampling_strategy=desired, random_state=random_state)
        return ros.fit_resample(X, y)
    rus = RandomUnderSampler(sampling_strategy=desired, random_state=random_state)
    return rus.fit_resample(X, y)

def main():
    ap = argparse.ArgumentParser()
    ap.add_argument("--raw", default=None)
    ap.add_argument("--out", default="../results/outputs/final_rows_15000.csv")
    ap.add_argument("--mapping_out", default="../results/outputs/final_rows_15000_mapping.json")
    ap.add_argument("--rows", type=int, default=15000)
    ap.add_argument("--target", default=None)
    args, _ = ap.parse_known_args()

    RAW = find_raw_path(args.raw)
    OUT = Path(args.out); OUT.parent.mkdir(parents=True, exist_ok=True)
    MAP_OUT = Path(args.mapping_out) if args.mapping_out else None
    TARGET_ROWS = int(args.rows)

    df = pd.read_csv(RAW)
    target = pick_target(df, args.target)

    present_numeric = [c for c in NUMERIC_PREF if c in df.columns]
    if present_numeric:
        mask = pd.Series(True, index=df.index)
        for c in present_numeric:
            mask &= iqr_mask(df[c])
        df = df.loc[mask].reset_index(drop=True)

    for col in BINARY_COLS:
        if col in df.columns:
            le = LabelEncoder()
            df[col] = le.fit_transform(df[col].astype(str))

    non_target = df.drop(columns=[target])
    non_target = pd.get_dummies(non_target, drop_first=False)
    non_target = non_target.reindex(sorted(non_target.columns), axis=1)
    df = pd.concat([non_target, df[[target]]], axis=1)

    if "Age" in df.columns:
        df["Age"] = pd.to_numeric(df["Age"], errors="coerce")
        if df["Age"].notna().any():
            upper = max(120, int(np.nanmax(df["Age"])) + 1)
            bins = [0, 40, 60, 80, upper]
            labels = ["Young", "Middle", "Senior", "Elderly"]
            df["Age_Group"] = pd.cut(df["Age"], bins=bins, labels=labels, right=False, include_lowest=True)
            age_d = pd.get_dummies(df["Age_Group"], prefix="Age_Group", drop_first=False)
            age_d = age_d.reindex(sorted(age_d.columns), axis=1)
            df = pd.concat([df.drop(columns=["Age_Group"]), age_d], axis=1)

    if {"Bilirubin", "Albumin"}.issubset(df.columns):
        df["Bilirubin_Albumin_Ratio"] = (
            pd.to_numeric(df["Bilirubin"], errors="coerce") /
            (pd.to_numeric(df["Albumin"], errors="coerce") + 1e-6)
        )

    y_str = df[target].astype(str)
    X = df.drop(columns=[target])
    X = safe_numeric(X)
    y_le = LabelEncoder(); y = y_le.fit_transform(y_str)
    counts = Counter(y)
    if len(counts) >= 2 and min(counts.values()) >= 2:
        k_neighbors = max(1, min(5, min(counts.values()) - 1))
        sm = SMOTE(random_state=RANDOM_STATE, k_neighbors=k_neighbors)
        X, y = sm.fit_resample(X, y)

    scaler = StandardScaler(with_mean=True, with_std=True)
    X = scaler.fit_transform(X).astype(np.float32)

    X_res, y_res = balanced_resize(X, y, target_rows=TARGET_ROWS, random_state=RANDOM_STATE)

    cols = [f"f{i:04d}" for i in range(X_res.shape[1])]
    final_df = pd.DataFrame(X_res, columns=cols)
    final_df[target] = y_res.astype(np.int32)
    final_df.to_csv(OUT, index=False)

    if MAP_OUT:
        with MAP_OUT.open("w") as f:
            json.dump({int(i): cls for i, cls in enumerate(y_le.classes_)}, f, indent=2)

if __name__ == "__main__":
    main()
