In [None]:
#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""
HHO + XGBoost for Soil Thickness Prediction
-------------------------------------------
- Two feature groups: SSSI & SI (each plus common features)
- For each iteration i, SSSI and SI share the SAME train/test split via random_state=i
- HHO optimizes [max_depth, learning_rate, n_estimators]
- Metrics recorded: R2, RMSE, MAE (train/test)
- Results saved to: HHO_XGB_results.xlsx

Usage
-----
$ python hho_xgb_soil.py

Notes
-----
This refactor keeps the original behavior and randomness identical:
- No new regularization or early stopping added
- Parameter bounds unchanged
- The HHO inner randomness remains as in the original script
"""

from __future__ import annotations

import random
from dataclasses import dataclass
from typing import Dict, List, Tuple

import numpy as np
import pandas as pd
from tqdm import tqdm
from sklearn.model_selection import train_test_split
from sklearn.metrics import r2_score, mean_squared_error, mean_absolute_error
from xgboost import XGBRegressor


# ==============================
# Configuration
# ==============================

# NOTE: Keep your actual file name here; change if you rename the source data file.
DATA_PATH: str = "Train_data.xlsx"
OUTPUT_XLSX: str = "HHO_XGB_results.xlsx"

COMMON_COLS: List[str] = ["DEM", "MRRTF", "NDVI", "MRVBF", "RD"]
GROUPS: Dict[str, List[str]] = {
    "SSSI": ["SSSI", "SSP"] + COMMON_COLS,
    "SI": ["SI", "P"] + COMMON_COLS,
}

# HHO parameter bounds: [max_depth, learning_rate, n_estimators]
LB: np.ndarray = np.array([1, 0.01, 1], dtype=float)
UB: np.ndarray = np.array([30, 0.5, 500], dtype=float)

# Experiment settings
ITERATIONS: int = 100
SEARCH_AGENTS: int = 15
MAX_ITER: int = 5


# ==============================
# Data Structures
# ==============================

@dataclass
class HHOResult:
    """Container for the best solution vector found by HHO."""
    best_params: np.ndarray  # [max_depth, learning_rate, n_estimators]


# ==============================
# Core Functions
# ==============================

def boundary(position: np.ndarray, lb: np.ndarray, ub: np.ndarray) -> np.ndarray:
    """Clip a position vector to parameter bounds."""
    return np.clip(position, lb, ub)


def fitness(solution: np.ndarray,
            X_train: pd.DataFrame, y_train: pd.Series,
            X_val: pd.DataFrame, y_val: pd.Series) -> float:
    """
    Objective for HHO: minimize (1 - test R2).
    Keeps the original modeling choices to preserve results.
    """
    max_depth = int(solution[0])
    learning_rate = float(solution[1])
    n_estimators = int(solution[2])

    model = XGBRegressor(
        max_depth=max_depth,
        learning_rate=learning_rate,
        n_estimators=n_estimators,
        verbosity=0
    )
    model.fit(X_train, y_train)

    test_r2 = r2_score(y_val, model.predict(X_val))
    return (1.0 - test_r2)


def hho(search_agents: int, max_iter: int,
        X_train: pd.DataFrame, y_train: pd.Series,
        X_val: pd.DataFrame, y_val: pd.Series) -> HHOResult:
    """
    Simplified Harris Hawks Optimizer (HHO).
    Returns the best parameter vector found.
    """
    dim = 3
    leader_pos = np.zeros(dim, dtype=float)
    leader_score = float("inf")

    # Initialize population
    positions = np.array([
        [random.uniform(LB[d], UB[d]) for d in range(dim)]
        for _ in range(search_agents)
    ], dtype=float)

    # Optimization loop
    for t in range(max_iter):
        E1 = 2 * (1 - t / max_iter)

        # Evaluate and update leader
        for i in range(search_agents):
            fit = fitness(positions[i], X_train, y_train, X_val, y_val)
            if fit < leader_score:
                leader_score = fit
                leader_pos = positions[i].copy()

        # Update positions
        for i in range(search_agents):
            E0 = 2 * random.random() - 1
            E = E1 * E0
            Q = random.random()
            J = 2 * (1 - random.random())

            if abs(E) >= 1:
                rand_idx = random.randint(0, search_agents - 1)
                X_rand = positions[rand_idx]
                positions[i] = X_rand - random.random() * abs(X_rand - 2 * random.random() * positions[i])
            else:
                if Q < 0.5:
                    positions[i] = leader_pos - E * abs(J * leader_pos - positions[i])
                else:
                    positions[i] = (leader_pos - positions[i]) - E * abs(J * leader_pos - positions[i])

            positions[i] = boundary(positions[i], LB, UB)

            # Re-evaluate and update leader after projection
            fit = fitness(positions[i], X_train, y_train, X_val, y_val)
            if fit < leader_score:
                leader_score = fit
                leader_pos = positions[i].copy()

    return HHOResult(best_params=leader_pos)


def train_and_evaluate(group_name: str, features: List[str], iteration: int,
                       df_clean: pd.DataFrame) -> Dict[str, float]:
    """
    Run one iteration for a feature group:
    - Split with random_state=iteration to align splits across groups
    - HHO -> best params
    - Fit final XGB and compute metrics
    """
    X = df_clean[features]
    y = df_clean["thickness"]

    X_train, X_test, y_train, y_test = train_test_split(
        X, y, train_size=0.7, random_state=iteration
    )

    # Search with HHO
    hho_res = hho(
        search_agents=SEARCH_AGENTS,
        max_iter=MAX_ITER,
        X_train=X_train, y_train=y_train,
        X_val=X_test, y_val=y_test
    )

    # Extract params
    max_depth = int(hho_res.best_params[0])
    learning_rate = float(hho_res.best_params[1])
    n_estimators = int(hho_res.best_params[2])

    # Final model
    model = XGBRegressor(
        max_depth=max_depth,
        learning_rate=learning_rate,
        n_estimators=n_estimators,
        verbosity=0
    )
    model.fit(X_train, y_train)

    # Metrics
    yhat_tr = model.predict(X_train)
    yhat_te = model.predict(X_test)

    result = {
        "Group": group_name,
        "Iteration": iteration + 1,
        "Train_R2": r2_score(y_train, yhat_tr),
        "Test_R2": r2_score(y_test, yhat_te),
        "Train_RMSE": float(np.sqrt(mean_squared_error(y_train, yhat_tr))),
        "Test_RMSE": float(np.sqrt(mean_squared_error(y_test, yhat_te))),
        "Train_MAE": float(mean_absolute_error(y_train, yhat_tr)),
        "Test_MAE": float(mean_absolute_error(y_test, yhat_te)),
        "max_depth": max_depth,
        "learning_rate": round(learning_rate, 4),
        "n_estimators": n_estimators
    }
    return result


def summarize_results(df_results: pd.DataFrame) -> pd.DataFrame:
    """Compute group-wise mean metrics."""
    cols = ["Train_R2", "Test_R2", "Test_RMSE", "Test_MAE"]
    return df_results.groupby("Group")[cols].mean().reset_index()


# ==============================
# Entry Point
# ==============================

def main() -> None:
    # 1) Load & clean
    df = pd.read_excel(DATA_PATH)
    df_clean = df.dropna()

    # 2) Run experiments
    rows: List[Dict[str, float]] = []
    for group_name, feats in GROUPS.items():
        with tqdm(total=ITERATIONS, desc=f"HHO + XGB for {group_name}") as pbar:
            for i in range(ITERATIONS):
                rows.append(
                    train_and_evaluate(group_name, feats, i, df_clean)
                )
                pbar.update(1)

    # 3) Collect & summarize
    results_df = pd.DataFrame(rows)
    grouped_mean = summarize_results(results_df)

    # 4) Output
    print("\n📊 Group-wise mean results:")
    print(grouped_mean)

    results_df.to_excel(OUTPUT_XLSX, index=False)


if __name__ == "__main__":
    main()
