
# CATBOOST — Pré-processamento, Treino, CV (no treino), Optuna e Avaliação no Teste (holdout do treino)


In [None]:

import os, re, json, math, warnings
from pathlib import Path
import numpy as np
import pandas as pd

from sklearn.model_selection import train_test_split, StratifiedGroupKFold
from sklearn.metrics import mean_squared_log_error, mean_absolute_percentage_error

from catboost import CatBoostRegressor, Pool
import optuna

pd.set_option("display.max_columns", None)
warnings.filterwarnings("ignore")

SEED = 42
BASE = Path(".").resolve()

SAVE_DIR = BASE / "models_catboost_corrigido"
SAVE_DIR.mkdir(parents=True, exist_ok=True)

print("Base:", BASE)
print("Save dir:", SAVE_DIR)


In [None]:

# Carregamento de dados
CSV_CANDIDATES = [
    BASE / "train_houses.csv",
    BASE / "df_train.csv",
    BASE / "dados_treino.csv",
]

if 'df' in globals() and isinstance(df, pd.DataFrame):
    train_raw = df.copy()
else:
    csv_path = next((p for p in CSV_CANDIDATES if p.exists()), None)
    if csv_path is None:
        raise FileNotFoundError("Não encontrei um CSV de treino. Defina um DataFrame 'df' ou coloque 'train_houses.csv' na pasta.")
    train_raw = pd.read_csv(csv_path)

train_raw.columns = [c.lower() for c in train_raw.columns]

print(train_raw.shape, "linhas x colunas")
train_raw.head(3)


In [None]:

# Definições de features
X_COLS = [
    "type","region","municipalitycode","municipality","districtname",
    "mintimetoneareststation","maxtimetoneareststation","floorplan","landshape",
    "buildingyear","structure","use","purpose","direction","classification",
    "cityplanning","coverageratio","floorarearatio","year","quarter","renovation",
    "time_to_station_mean","log_totalfloorarea","log_area","log_frontage","log_breadth"
]
CAT_COLS = [
    "type","region","municipalitycode","municipality","districtname",
    "floorplan","landshape","structure","use","purpose",
    "direction","classification","cityplanning","renovation"
]


In [None]:

# Funções de features (CatBoost-friendly)
def build_features_cb(df: pd.DataFrame, X_COLS: list[str], cat_cols: list[str]) -> pd.DataFrame:
    df = df.copy()
    def _to_num(s): return pd.to_numeric(s, errors="coerce")
    base_t = _to_num(df.get("timetoneareststation", np.nan))
    min_t  = _to_num(df.get("mintimetoneareststation", np.nan))
    max_t  = _to_num(df.get("maxtimetoneareststation", np.nan))
    time_mean = base_t.copy() if isinstance(base_t, pd.Series) else pd.Series(np.nan, index=df.index)
    mnan = time_mean.isna()
    time_mean[mnan] = (min_t[mnan] + max_t[mnan]) / 2.0
    df["time_to_station_mean"] = time_mean

    for src, dst in [
        ("totalfloorarea", "log_totalfloorarea"),
        ("area",           "log_area"),
        ("frontage",       "log_frontage"),
        ("breadth",        "log_breadth"),
    ]:
        if src in df.columns:
            df[dst] = np.log1p(pd.to_numeric(df[src], errors="coerce").fillna(0))

    for c in ["buildingyear","coverageratio","floorarearatio","year","quarter"]:
        if c in df.columns:
            df[c] = pd.to_numeric(df[c], errors="coerce")

    if "renovation" in df.columns:
        df["renovation"] = (
            df["renovation"].astype("string").str.strip()
              .replace({"Yes":"Done","No":"Not yet","nan":"Not yet","NaN":"Not yet"})
        )

    for c in X_COLS:
        if c not in df.columns:
            df[c] = "Missing" if c in cat_cols else np.nan

    for c in cat_cols:
        df[c] = df[c].astype("string").fillna("Missing").astype(str)
    for c in [col for col in X_COLS if col not in cat_cols]:
        df[c] = pd.to_numeric(df[c], errors="coerce").astype(float)

    return df

def prepare_for_catboost(X: pd.DataFrame, cat_cols: list[str]) -> pd.DataFrame:
    X = X.copy()
    for c in cat_cols:
        X[c] = X[c].astype("string").fillna("Missing").astype(str)
    num_cols = [c for c in X.columns if c not in cat_cols]
    for c in num_cols:
        X[c] = pd.to_numeric(X[c], errors="coerce").astype(float)
    return X


In [None]:

# Construção de features
train_f = build_features_cb(train_raw, X_COLS, CAT_COLS)
print(train_f.shape)
train_f.head(3)


In [None]:

# Alvo seguro + split
tp_raw = train_f["tradeprice"] if "tradeprice" in train_f.columns else None
if tp_raw is None:
    raise KeyError("Coluna 'tradeprice' não encontrada no dataset.")

y_raw = pd.to_numeric(tp_raw, errors="coerce")
if y_raw.isna().mean() > 0.5:
    y_raw = pd.to_numeric(tp_raw.astype(str).str.replace(r"[^\d\.\-]", "", regex=True), errors="coerce")

mask = y_raw.notna() & (y_raw > 0)

print("Total linhas:", len(y_raw))
print("Com preço válido (>0):", int(mask.sum()))
print("Descartadas:", int((~mask).sum()))

X_all = train_f.loc[mask, X_COLS].copy()
y_raw = y_raw.loc[mask]
y_log = np.log1p(y_raw)
assert np.isfinite(y_log).all(), "Ainda há NaN/inf em y_log após filtragem."

if "year" in train_f.columns:
    strata = pd.to_numeric(train_f.loc[mask, "year"], errors="coerce").fillna(-1).astype(int)
else:
    strata = None

stratify_arg = strata if strata is not None and strata.nunique() > 1 else None
X_tr, X_te, y_tr, y_te = train_test_split(X_all, y_log, test_size=0.2, random_state=42, stratify=stratify_arg)
print("Shapes -> X_tr:", X_tr.shape, "| X_te:", X_te.shape)


In [None]:

# Optuna + CV no TREINO
if "year" in train_f.columns:
    cv_strata = pd.to_numeric(train_f.loc[mask, "year"], errors="coerce").fillna(-1).astype(int)
    cv_strata = cv_strata.loc[X_tr.index]
else:
    cv_strata = pd.Series(-1, index=X_tr.index)

cv_groups = train_f.loc[X_tr.index, "districtname"].astype(str).fillna("Missing") if "districtname" in train_f.columns else pd.Series("All", index=X_tr.index)

CAT_IDX = [X_tr.columns.get_loc(c) for c in CAT_COLS if c in X_tr.columns]

def objective_cv(trial):
    params = {
        "depth": trial.suggest_int("depth", 6, 10),
        "learning_rate": trial.suggest_float("learning_rate", 0.01, 0.15, log=True),
        "l2_leaf_reg": trial.suggest_float("l2_leaf_reg", 1.0, 10.0),
        "subsample": trial.suggest_float("subsample", 0.6, 1.0),
        "colsample_bylevel": trial.suggest_float("colsample_bylevel", 0.6, 1.0),
        "iterations": trial.suggest_int("iterations", 800, 4000),
        "random_seed": 42,
        "loss_function": "RMSE",
        "od_type": "Iter",
        "od_wait": 200,
        "verbose": False
    }

    cv = StratifiedGroupKFold(n_splits=5, shuffle=True, random_state=42)
    rmsle_scores = []

    for tr_idx, va_idx in cv.split(X_tr, cv_strata, cv_groups):
        Xtr, Xva = X_tr.iloc[tr_idx], X_tr.iloc[va_idx]
        ytr, yva = y_tr.iloc[tr_idx], y_tr.iloc[va_idx]

        Xtr = prepare_for_catboost(Xtr, CAT_COLS)
        Xva = prepare_for_catboost(Xva, CAT_COLS)

        train_pool = Pool(Xtr, ytr, cat_features=[Xtr.columns.get_loc(c) for c in CAT_COLS if c in Xtr.columns])
        valid_pool = Pool(Xva, yva, cat_features=[Xva.columns.get_loc(c) for c in CAT_COLS if c in Xva.columns])

        cb = CatBoostRegressor(**params)
        cb.fit(train_pool, eval_set=valid_pool, verbose=False)

        pred = np.maximum(np.expm1(cb.predict(Xva)), 0)
        true = np.expm1(yva)
        rmsle = math.sqrt(mean_squared_log_error(true, pred))
        rmsle_scores.append(rmsle)

    return float(np.mean(rmsle_scores))

study = optuna.create_study(direction="minimize", study_name="catboost_rmsle_cv")
study.optimize(objective_cv, n_trials=40, show_progress_bar=True)

print("Best RMSLE (CV mean):", study.best_value)
print("Best params:", study.best_params)

with open(SAVE_DIR / "catboost_best_params.json", "w", encoding="utf-8") as f:
    json.dump(study.best_params, f, ensure_ascii=False, indent=2)


In [None]:

# Treino final no TREINO + avaliação no TESTE
best_params = dict(study.best_params)
best_params.update({"random_seed": 42, "loss_function": "RMSE", "verbose": False})

X_tr_cb = prepare_for_catboost(X_tr, CAT_COLS)
X_te_cb = prepare_for_catboost(X_te, CAT_COLS)
cat_idx_final = [X_tr_cb.columns.get_loc(c) for c in CAT_COLS if c in X_tr_cb.columns]

train_pool = Pool(X_tr_cb, y_tr, cat_features=cat_idx_final)
test_pool  = Pool(X_te_cb, y_te, cat_features=cat_idx_final)

final_cb = CatBoostRegressor(**best_params)
final_cb.fit(train_pool)

model_path = (Path("models_catboost_corrigido") / "catboost_model_final.cbm")
final_cb.save_model(str(model_path))

meta = {"X_cols": X_COLS, "cat_cols": CAT_COLS}
with open(Path("models_catboost_corrigido") / "inference_meta.json", "w", encoding="utf-8") as f:
    json.dump(meta, f, ensure_ascii=False, indent=2)

y_pred_log = final_cb.predict(X_te_cb)
y_pred = np.maximum(np.expm1(y_pred_log), 0)
y_true = np.expm1(y_te)

rmsle = math.sqrt(mean_squared_log_error(y_true, y_pred))
mape  = mean_absolute_percentage_error(y_true, y_pred) * 100.0

print(f"RMSLE (teste holdout): {rmsle:.4f}")
print(f"MAPE  (teste holdout): {mape:.2f}%")

pred_df = pd.DataFrame({"y_true": y_true.values, "y_pred": y_pred})
pred_df.to_csv(Path("models_catboost_corrigido") / "holdout_predictions.csv", index=False)
print("Previsões do holdout salvas.")

# Importâncias
try:
    importances = final_cb.get_feature_importance(train_pool, type="PredictionValuesChange")
    fi = pd.DataFrame({"feature": X_tr_cb.columns, "importance": importances}).sort_values("importance", ascending=False)
    fi.to_csv(Path("models_catboost_corrigido") / "feature_importances.csv", index=False)
    print("Importâncias salvas.")
except Exception as e:
    print("Aviso (importâncias):", e)
