In [1]:
# Import libraries
import os
import sys
import numpy as np
import pandas as pd
from sklearn.model_selection import TimeSeriesSplit
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.metrics import mean_absolute_percentage_error
from deap import base, creator, tools

# Reproducibility
np.random.seed(42)

# Load dataset
df = pd.read_csv("final_processed_data.csv", low_memory=False)

# Select features
FEATURES = [
    "TOTAL_KM", "QTY_LOADS", "QTY_DELIVERIES",
    "COD_DP_MEAN_PRICE_PER_KM", "COD_LP_MEAN_PRICE_PER_KM",
    "START_DELIVERY_TIME_MEAN_PRICE_PER_KM", "ENTRY_WEEKDAY_MEAN_PRICE_PER_KM",
    "HU_KM_PERC", "TEMP_MIN", "TEMP_MAX"
]

# Basic check to catch typos early
missing = [c for c in FEATURES if c not in df.columns]
if missing:
    raise ValueError(f"Missing columns in final_processed_data.csv: {missing}")

X = df[FEATURES].values
y = df["EUR"].values

# Define sMAPE (percent)
def smape(y_true, y_pred):
    denom = (np.abs(y_true) + np.abs(y_pred)) / 2.0
    denom = np.where(denom == 0, 1e-12, denom)
    return np.mean(np.abs(y_true - y_pred) / denom) * 100.0

# DEAP setup (minimize fitness)
if "FitnessMin" not in creator.__dict__:
    creator.create("FitnessMin", base.Fitness, weights=(-1.0,))
if "Individual" not in creator.__dict__:
    creator.create("Individual", list, fitness=creator.FitnessMin)

toolbox = base.Toolbox()

# Hyperparameter bounds
HYPERPARAMETER_BOUNDS = {
    "n_estimators": (50, 1000),
    "learning_rate": (0.01, 0.3),
    "max_depth": (3, 15),
    "subsample": (0.5, 1.0),
    "min_samples_split": (2, 10),
}

# Random individual
def generate_individual():
    return creator.Individual([
        np.random.randint(*HYPERPARAMETER_BOUNDS["n_estimators"]),
        np.random.uniform(*HYPERPARAMETER_BOUNDS["learning_rate"]),
        np.random.randint(*HYPERPARAMETER_BOUNDS["max_depth"]),
        np.random.uniform(*HYPERPARAMETER_BOUNDS["subsample"]),
        np.random.randint(*HYPERPARAMETER_BOUNDS["min_samples_split"]),
    ])

toolbox.register("individual", generate_individual)
toolbox.register("population", tools.initRepeat, list, toolbox.individual)

# Evaluation: returns fitness (MAPE); stores sMAPE for logging
def eval_gb(individual, n_splits=5):
    try:
        params = {
            "n_estimators": max(50, int(individual[0])),
            "learning_rate": float(np.clip(individual[1], 0.01, 0.3)),
            "max_depth": max(1, int(round(individual[2]))),
            "subsample": float(np.clip(individual[3], 0.5, 1.0)),
            "min_samples_split": max(2, int(round(individual[4]))),
            "random_state": 42,
        }
        model = GradientBoostingRegressor(**params)
        tscv = TimeSeriesSplit(n_splits=n_splits)

        mape_scores = []
        smape_scores = []

        for tr_idx, te_idx in tscv.split(X):
            X_tr, X_te = X[tr_idx], X[te_idx]
            y_tr, y_te = y[tr_idx], y[te_idx]
            model.fit(X_tr, y_tr)
            y_pr = model.predict(X_te)
            mape_scores.append(mean_absolute_percentage_error(y_te, y_pr))
            smape_scores.append(smape(y_te, y_pr))

        individual.smape = float(np.mean(smape_scores))
        return (float(np.mean(mape_scores)),)
    except Exception as e:
        individual.smape = np.inf
        print(f"[warn] eval_gb failed for {list(individual)}: {repr(e)}", file=sys.stderr)
        return (1e9,)

# Register EA operators
toolbox.register("evaluate", lambda ind: eval_gb(ind, n_splits=5))
toolbox.register("select", tools.selTournament, tournsize=3)
toolbox.register("mate", tools.cxUniform, indpb=0.5)

def custom_mutation(individual, indpb):
    bounds = list(HYPERPARAMETER_BOUNDS.values())
    for i in range(len(individual)):
        if np.random.rand() < indpb:
            if isinstance(bounds[i][0], int):
                individual[i] = int(np.clip(individual[i] + np.random.randint(-50, 50),
                                            bounds[i][0], bounds[i][1]))
            else:
                individual[i] = float(np.clip(individual[i] + np.random.uniform(-0.05, 0.05),
                                              bounds[i][0], bounds[i][1]))
    return individual,

toolbox.register("mutate", custom_mutation, indpb=0.3)

# EA parameters (full run)
population_size = 20
num_generations = 10
cxpb, mutpb = 0.6, 0.3

# Initialize population and HOF
population = toolbox.population(n=population_size)
hof = tools.HallOfFame(1)

# Prepare flat CSV (append mode per generation)
LOG_PATH = "logbook_smape.csv"
if os.path.exists(LOG_PATH):
    os.remove(LOG_PATH)
pd.DataFrame(columns=["gen", "avg", "min", "avg_smape", "min_smape", "nevals"]).to_csv(LOG_PATH, index=False)

# Evolutionary loop with per-generation checkpointing
for gen in range(num_generations + 1):
    invalid = [ind for ind in population if not ind.fitness.valid]
    fits = list(map(toolbox.evaluate, invalid))
    for ind, fit in zip(invalid, fits):
        ind.fitness.values = fit

    hof.update(population)

    mape_vals = np.array([float(ind.fitness.values[0]) for ind in population], dtype=float)
    smape_vals = np.array([getattr(ind, "smape", np.nan) for ind in population], dtype=float)

    row = pd.DataFrame([{
        "gen": gen,
        "avg": float(np.nanmean(mape_vals)),
        "min": float(np.nanmin(mape_vals)),
        "avg_smape": float(np.nanmean(smape_vals)),
        "min_smape": float(np.nanmin(smape_vals)),
        "nevals": int(len(invalid))
    }])
    row.to_csv(LOG_PATH, mode="a", header=False, index=False)

    if gen == num_generations:
        break

    # Variation
    offspring = toolbox.select(population, len(population))
    offspring = list(map(toolbox.clone, offspring))

    for c1, c2 in zip(offspring[::2], offspring[1::2]):
        if np.random.rand() < cxpb:
            toolbox.mate(c1, c2)
            if hasattr(c1, "smape"): delattr(c1, "smape")
            if hasattr(c2, "smape"): delattr(c2, "smape")
            if hasattr(c1.fitness, "values"): del c1.fitness.values
            if hasattr(c2.fitness, "values"): del c2.fitness.values

    for m in offspring:
        if np.random.rand() < mutpb:
            toolbox.mutate(m)
            if hasattr(m, "smape"): delattr(m, "smape")
            if hasattr(m.fitness, "values"): del m.fitness.values

    population = offspring

# Report best
best = hof[0]
best_params = {
    "n_estimators": int(best[0]),
    "learning_rate": float(best[1]),
    "max_depth": int(round(best[2])),
    "subsample": float(best[3]),
    "min_samples_split": int(round(best[4])),
}
print("Best hyperparameters:", best_params)
print("Best MAPE:", float(best.fitness.values[0]))
print("CSV saved to:", LOG_PATH)

Best hyperparameters: {'n_estimators': 890, 'learning_rate': 0.02886496196573106, 'max_depth': 6, 'subsample': 0.9711008778424264, 'min_samples_split': 7}
Best MAPE: 0.06245690450057456
CSV saved to: logbook_smape.csv
