In [7]:
import gc, random, psutil, pynvml, polars as pl, numpy as np, optuna, mlflow, mlflow.catboost
from pathlib import Path
from catboost import CatBoostRegressor
from sklearn.model_selection import TimeSeriesSplit
from sklearn.metrics import mean_squared_error
from optuna.integration.mlflow import MLflowCallback

# ────────────────────────────────────  CONFIG  ─────────────────────────────────── #
SEED          = 42
DATA_DIR      = Path("data")          # <- full 12-month parquet set
TARGET        = "tip_amount"
N_TRIALS      = 50
N_SPLITS      = 5
MAX_ITERS     = 10_000
EARLY_STOP    = 100
EXPERIMENT    = "CatBoost_TimeSeries_Optuna"

random.seed(SEED)
np.random.seed(SEED)

# ───────────────────────────────  HELPER: SYS METRICS  ─────────────────────────── #
def log_sys(prefix: str = "") -> None:
    mlflow.log_metric(f"{prefix}cpu_pct",   psutil.cpu_percent())
    mlflow.log_metric(f"{prefix}mem_pct",   psutil.virtual_memory().percent)
    try:
        pynvml.nvmlInit()
        h = pynvml.nvmlDeviceGetHandleByIndex(0)
        u = pynvml.nvmlDeviceGetUtilizationRates(h)
        m = pynvml.nvmlDeviceGetMemoryInfo(h)
        mlflow.log_metric(f"{prefix}gpu_util_pct",      u.gpu)
        mlflow.log_metric(f"{prefix}gpu_mem_used_mb",   m.used / 2**20)
        pynvml.nvmlShutdown()
    except Exception:
        pass

# ───────────────────────────────  DATA  PIPELINE  ─────────────────────────────── #
def load_one(parquet_file: Path) -> pl.DataFrame:
    df = pl.read_parquet(parquet_file, low_memory=True)

    # unify datetime precision ────────────────────────────────────────────
    for col in ("tpep_pickup_datetime", "pickup_datetime",
                "tpep_dropoff_datetime", "dropoff_datetime"):
        if col in df.columns:
            df = df.with_columns(pl.col(col).cast(pl.Datetime("ns")))

    # remove negative targets
    df = df.filter(pl.col(TARGET) >= 0)

    # trip duration + calendar features ───────────────────────────────────
    ns = 60_000_000_000
    pick = next((c for c in ("tpep_pickup_datetime", "pickup_datetime") if c in df.columns), None)
    drop = next((c for c in ("tpep_dropoff_datetime", "dropoff_datetime") if c in df.columns), None)

    if pick and drop:
        df = (
            df
            .with_columns((
                (pl.col(drop).cast(pl.Int64) - pl.col(pick).cast(pl.Int64)) / ns
            ).cast(pl.Float32).alias("trip_duration_min"))
            .with_columns([
                pl.col(pick).dt.month().cast(pl.Int8).alias("pickup_month"),
                pl.col(pick).dt.day().cast(pl.Int8).alias("pickup_day"),
                pl.col(pick).dt.hour().cast(pl.Int8).alias("pickup_hour"),
                pl.col(pick).dt.weekday().cast(pl.Int8).alias("pickup_dow"),
            ])
            .drop([pick, drop])
        )
    else:   # fallback for rare corrupt rows
        df = df.with_columns([
            pl.lit(0).cast(pl.Float32).alias("trip_duration_min"),
            pl.lit(0).cast(pl.Int8).alias("pickup_month"),
            pl.lit(0).cast(pl.Int8).alias("pickup_day"),
            pl.lit(0).cast(pl.Int8).alias("pickup_hour"),
            pl.lit(0).cast(pl.Int8).alias("pickup_dow"),
        ])

    # guarantee existence of every fee column (missing for early months)
    for c, t in {
        "cbd_congestion_fee": pl.Float32,
        "airport_fee":        pl.Float32,
        "congestion_surcharge": pl.Float32,
    }.items():
        if c not in df.columns:
            df = df.with_columns(pl.lit(0).cast(t).alias(c))

    # categorical handling ────────────────────────────────────────────────
    int_cats   = ["VendorID","RatecodeID","PULocationID","DOLocationID",
                  "payment_type","pickup_month","pickup_day","pickup_hour","pickup_dow"]
    str_cats   = ["store_and_fwd_flag"]

    for c in int_cats:
        if c not in df.columns:
            df = df.with_columns(pl.lit(-1).cast(pl.Int32).alias(c))
        else:
            df = df.with_columns(pl.col(c).cast(pl.Int32))

    for c in str_cats:                 # only *string* categoricals to Categorical
        if c not in df.columns:
            df = df.with_columns(pl.lit("missing").cast(pl.Utf8).alias(c))
        df = df.with_columns(pl.col(c).cast(pl.Categorical))

    return df

frames = [load_one(f) for f in sorted(DATA_DIR.glob("*.parquet"))]
df = pl.concat(frames)
del frames; gc.collect()

pdf   = df.to_pandas(use_pyarrow_extension_array=True)
y     = pdf[TARGET]
X     = pdf.drop(columns=[TARGET])
cat_cols = [c for c in ["store_and_fwd_flag"]  # only the real string category
            if c in X.columns]

# ──────────────────  OPTUNA + MLFLOW  ────────────────── #
tscv  = TimeSeriesSplit(n_splits=N_SPLITS)
mlflow.set_experiment(EXPERIMENT)

mlcb  = MLflowCallback(metric_name="val_rmse",
                       create_experiment=False,
                       mlflow_kwargs={"nested": True})

@mlcb.track_in_mlflow()
def objective(trial):
    params = {
        "depth":              trial.suggest_int("depth", 4, 10),
        "learning_rate":      trial.suggest_float("learning_rate", 1e-3, 0.3, log=True),
        "l2_leaf_reg":        trial.suggest_float("l2_leaf_reg", 1e-3, 10,  log=True),
        "subsample":          trial.suggest_float("subsample", 0.5, 1.0),
        "colsample_bylevel":  trial.suggest_float("colsample_bylevel", 0.5, 1.0),
        "min_data_in_leaf":   trial.suggest_int("min_data_in_leaf", 1, 100),
        "iterations":         MAX_ITERS,
        "early_stopping_rounds": EARLY_STOP,
        "eval_metric":        "RMSE",
        "random_seed":        SEED,
        "task_type":          "GPU",
        "verbose":            0,
        "cat_features":       cat_cols,
    }

    cv_rmse, best_iters = [], []
    for tr, vl in tscv.split(X):
        model = CatBoostRegressor(**params)
        model.fit(X.iloc[tr], y.iloc[tr],
                  eval_set=(X.iloc[vl], y.iloc[vl]),
                  verbose=False)
        cv_rmse.append(mean_squared_error(y.iloc[vl],
                                          model.predict(X.iloc[vl]),
                                          squared=False))
        best_iters.append(model.get_best_iteration())
        del model; gc.collect()

    mean_rmse = float(np.mean(cv_rmse))
    trial.set_user_attr("best_iterations", int(np.mean(best_iters)))
    mlflow.log_metric("rmse_cv", mean_rmse)
    mlflow.log_metric("best_iterations", trial.user_attrs["best_iterations"])
    log_sys()
    return mean_rmse

with mlflow.start_run(run_name="optuna_catboost"):
    # static HW snapshot
    mlflow.log_params({
        "cpu_cores": psutil.cpu_count(logical=True),
        "mem_total_gb": round(psutil.virtual_memory().total / 2**30, 2),
        "gpu_name": (pynvml.nvmlDeviceGetName(
                        pynvml.nvmlDeviceGetHandleByIndex(0)).decode()
                     if pynvml.nvmlInit() is None else "NA")
    })
    study = optuna.create_study(direction="minimize",
                                sampler=optuna.samplers.TPESampler(seed=SEED))
    study.optimize(objective, n_trials=N_TRIALS, callbacks=[mlcb])

    best_params = study.best_trial.params
    final_iter  = study.best_trial.user_attrs["best_iterations"]

    final = {
        **best_params,
        "iterations": final_iter,
        "random_seed": SEED,
        "task_type": "GPU",
        "verbose": 0,
        "cat_features": cat_cols,
    }

    model = CatBoostRegressor(**final)
    model.fit(X, y, verbose=False)
    mlflow.log_params(final)
    mlflow.catboost.log_model(model, artifact_path="model")
    log_sys("final_")

print(f"Best CV RMSE: {study.best_value:.5f}")


2025/05/26 15:23:27 INFO mlflow.tracking.fluent: Experiment with name 'CatBoost_TimeSeries_Optuna' does not exist. Creating a new experiment.
  mlcb  = MLflowCallback(metric_name="val_rmse",
  @mlcb.track_in_mlflow()


AttributeError: 'str' object has no attribute 'decode'

In [None]:
# plt.figure(figsize=(6, 6))
# sns.scatterplot(x=y_test, y=preds, alpha=0.3)
# plt.xlabel("Actual")
# plt.ylabel("Predicted")
# plt.tight_layout()
# plt.savefig("scatter_actual_vs_pred.png", dpi=300)

# residuals = y_test - preds
# plt.figure(figsize=(6, 4))
# sns.histplot(residuals, bins=100, kde=True)
# plt.xlabel("Residuals")
# plt.tight_layout()
# plt.savefig("residual_hist.png", dpi=300)

# importances = model.get_feature_importance(type="PredictionValuesChange")
# imp_df = pd.DataFrame({"feature": features, "importance": importances}).sort_values("importance", ascending=False)
# plt.figure(figsize=(8, 6))
# sns.barplot(x="importance", y="feature", data=imp_df.head(20))
# plt.tight_layout()
# plt.savefig("feature_importance.png", dpi=300)

# model.save_model("tip_model_catboost.cbm")
# imp_df.to_csv("feature_importance.csv", index=False)