In [None]:
"""
Implementing and Optimizing Gradient Boosting Machines (GBM) from Scratch

- Custom GradientBoostingRegressor implemented only with NumPy + basic Python
- Base learner: small regression trees (CART-style)
- Supports: n_estimators, learning_rate, max_depth, min_samples_split, subsample
- Hyperparameter tuning with simple grid search
- Benchmark vs sklearn.ensemble.GradientBoostingRegressor
"""

import time
import numpy as np
import pandas as pd

from sklearn.model_selection import train_test_split
from sklearn.datasets import make_regression
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.ensemble import GradientBoostingRegressor as SklearnGBR


# --------------------------------------------------------------------
# 1. Simple regression tree (base learner) implemented from scratch
# --------------------------------------------------------------------

class TreeNode:
    def __init__(self, feature_index=None, threshold=None,
                 left=None, right=None, value=None):
        self.feature_index = feature_index  # int
        self.threshold = threshold          # float
        self.left = left                    # TreeNode
        self.right = right                  # TreeNode
        self.value = value                  # float (for leaf)


class RegressionTree:
    """
    Very small CART-style regression tree.

    - Splitting criterion: squared error (MSE)
    - Greedy, depth-limited tree
    """

    def __init__(self, max_depth=3, min_samples_split=2):
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.root = None

    def fit(self, X, y):
        """
        X: (n_samples, n_features)
        y: (n_samples,)
        """
        X = np.asarray(X)
        y = np.asarray(y)
        self.root = self._build_tree(X, y, depth=0)

    def predict(self, X):
        X = np.asarray(X)
        return np.array([self._predict_row(row, self.root) for row in X])

    # ----- Internal helpers -----

    def _build_tree(self, X, y, depth):
        n_samples, n_features = X.shape

        # Stopping criteria
        if (depth >= self.max_depth or
                n_samples < self.min_samples_split or
                np.unique(y).shape[0] == 1):
            leaf_value = np.mean(y)
            return TreeNode(value=leaf_value)

        # Find best split
        best_feat, best_thresh, best_loss, left_idx, right_idx = \
            self._best_split(X, y)

        # If no useful split was found, make a leaf
        if best_feat is None:
            return TreeNode(value=np.mean(y))

        # Recursively build subtrees
        left = self._build_tree(X[left_idx], y[left_idx], depth + 1)
        right = self._build_tree(X[right_idx], y[right_idx], depth + 1)
        return TreeNode(feature_index=best_feat,
                        threshold=best_thresh,
                        left=left,
                        right=right)

    def _best_split(self, X, y):
        n_samples, n_features = X.shape
        if n_samples <= 1:
            return None, None, None, None, None

        best_feat = None
        best_thresh = None
        best_loss = np.inf
        best_left_idx = None
        best_right_idx = None

        # Precompute some sums for SSE
        for feature_index in range(n_features):
            # Sort by this feature
            sorted_idx = np.argsort(X[:, feature_index])
            x_sorted = X[sorted_idx, feature_index]
            y_sorted = y[sorted_idx]

            sum_total = np.sum(y_sorted)
            sum_sq_total = np.sum(y_sorted ** 2)

            sum_left = 0.0
            sum_sq_left = 0.0
            n_left = 0

            # We split between positions i-1 and i
            for i in range(1, n_samples):
                yi = y_sorted[i - 1]
                sum_left += yi
                sum_sq_left += yi ** 2
                n_left += 1

                n_right = n_samples - n_left
                if n_left < self.min_samples_split or n_right < self.min_samples_split:
                    continue

                # If threshold would be identical, skip
                if x_sorted[i] == x_sorted[i - 1]:
                    continue

                sum_right = sum_total - sum_left
                sum_sq_right = sum_sq_total - sum_sq_left

                # SSE_left = sum(y^2) - sum(y)^2 / n
                sse_left = sum_sq_left - (sum_left ** 2) / n_left
                sse_right = sum_sq_right - (sum_right ** 2) / n_right
                loss = sse_left + sse_right

                if loss < best_loss:
                    best_loss = loss
                    best_feat = feature_index
                    best_thresh = (x_sorted[i] + x_sorted[i - 1]) / 2.0
                    best_left_idx = sorted_idx[:i]
                    best_right_idx = sorted_idx[i:]

        return best_feat, best_thresh, best_loss, best_left_idx, best_right_idx

    def _predict_row(self, row, node):
        if node.value is not None:
            return node.value
        if row[node.feature_index] <= node.threshold:
            return self._predict_row(row, node.left)
        else:
            return self._predict_row(row, node.right)


# --------------------------------------------------------------------
# 2. Gradient Boosting Regressor (from scratch)
# --------------------------------------------------------------------

class GradientBoostingRegressorScratch:
    """
    Gradient Boosting for regression, using custom RegressionTree base learners.

    Loss: Squared error
    """

    def __init__(
        self,
        n_estimators=100,
        learning_rate=0.1,
        max_depth=3,
        min_samples_split=2,
        subsample=1.0,
        random_state=None,
    ):
        self.n_estimators = n_estimators
        self.learning_rate = learning_rate
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.subsample = subsample
        self.random_state = random_state

        self.trees_ = []
        self.gammas_ = []
        self.init_ = None
        self.train_loss_ = []

    def fit(self, X, y):
        X = np.asarray(X)
        y = np.asarray(y)

        rng = np.random.RandomState(self.random_state)

        # Initial model: mean of y
        self.init_ = np.mean(y)
        y_pred = np.full_like(y, fill_value=self.init_, dtype=float)

        for m in range(self.n_estimators):
            # Negative gradient for squared error: residuals
            residuals = y - y_pred

            # Subsampling for stochastic gradient boosting
            if 0 < self.subsample < 1.0:
                n_samples = X.shape[0]
                n_sub = int(self.subsample * n_samples)
                indices = rng.choice(n_samples, size=n_sub, replace=False)
                X_sub = X[indices]
                r_sub = residuals[indices]
            else:
                X_sub = X
                r_sub = residuals

            # Fit base learner to residuals
            tree = RegressionTree(max_depth=self.max_depth,
                                  min_samples_split=self.min_samples_split)
            tree.fit(X_sub, r_sub)

            # Compute optimal step size gamma (line search)
            h_full = tree.predict(X)
            numerator = np.dot(residuals, h_full)
            denom = np.dot(h_full, h_full) + 1e-12
            gamma = numerator / denom

            # Update ensemble prediction
            y_pred += self.learning_rate * gamma * h_full

            # Store stuff
            self.trees_.append(tree)
            self.gammas_.append(gamma)

            # Track training loss
            loss = mean_squared_error(y, y_pred)
            self.train_loss_.append(loss)

        return self

    def predict(self, X):
        X = np.asarray(X)
        y_pred = np.full(shape=(X.shape[0],), fill_value=self.init_, dtype=float)
        for tree, gamma in zip(self.trees_, self.gammas_):
            y_pred += self.learning_rate * gamma * tree.predict(X)
        return y_pred


# --------------------------------------------------------------------
# 3. Data loading: housing.csv OR synthetic regression
# --------------------------------------------------------------------

def load_housing_data_from_csv(path="housing.csv", target_column="MEDV"):
    """
    Load your housing.csv file.

    IMPORTANT:
    - Change `target_column` to match the name of your target variable in the CSV.
      Common examples: "MEDV", "SalePrice", "price", "target", etc.
    """
    df = pd.read_csv(path)
    X = df.drop(columns=[target_column]).values
    y = df[target_column].values
    return X, y


def generate_synthetic_regression(n_samples=2000, n_features=10, noise=10.0, random_state=42):
    X, y = make_regression(
        n_samples=n_samples,
        n_features=n_features,
        noise=noise,
        random_state=random_state,
    )
    return X, y


# --------------------------------------------------------------------
# 4. Hyperparameter tuning
# --------------------------------------------------------------------

def grid_search_gbm(
    X_train,
    y_train,
    X_val,
    y_val,
    learning_rates=(0.05, 0.1),
    n_estimators_list=(50, 100),
    max_depths=(2, 3),
):
    best_config = None
    best_mse = np.inf
    results = []

    for lr in learning_rates:
        for n_estimators in n_estimators_list:
            for depth in max_depths:
                model = GradientBoostingRegressorScratch(
                    n_estimators=n_estimators,
                    learning_rate=lr,
                    max_depth=depth,
                    min_samples_split=5,
                    subsample=0.8,
                    random_state=42,
                )
                model.fit(X_train, y_train)
                y_val_pred = model.predict(X_val)
                mse = mean_squared_error(y_val, y_val_pred)

                config = {
                    "learning_rate": lr,
                    "n_estimators": n_estimators,
                    "max_depth": depth,
                    "val_mse": mse,
                }
                results.append(config)

                if mse < best_mse:
                    best_mse = mse
                    best_config = config

    return best_config, results


# --------------------------------------------------------------------
# 5. Benchmark vs sklearn's GradientBoostingRegressor
# --------------------------------------------------------------------

def benchmark_models(X_train, X_test, y_train, y_test, best_config):
    print("\n=== Training custom GBM (from scratch) with best hyperparameters ===")
    print(best_config)

    custom_gbm = GradientBoostingRegressorScratch(
        n_estimators=best_config["n_estimators"],
        learning_rate=best_config["learning_rate"],
        max_depth=best_config["max_depth"],
        min_samples_split=5,
        subsample=0.8,
        random_state=42,
    )

    t0 = time.time()
    custom_gbm.fit(X_train, y_train)
    custom_train_time = time.time() - t0

    y_pred_custom = custom_gbm.predict(X_test)
    custom_mse = mean_squared_error(y_test, y_pred_custom)
    custom_r2 = r2_score(y_test, y_pred_custom)

    print(f"Custom GBM - Test MSE: {custom_mse:.4f}, R^2: {custom_r2:.4f}, "
          f"Train time: {custom_train_time:.3f}s")

    print("\n=== Training sklearn GradientBoostingRegressor (benchmark) ===")

    sklearn_gbm = SklearnGBR(
        n_estimators=best_config["n_estimators"],
        learning_rate=best_config["learning_rate"],
        max_depth=best_config["max_depth"],
        random_state=42,
    )

    t0 = time.time()
    sklearn_gbm.fit(X_train, y_train)
    sklearn_train_time = time.time() - t0

    y_pred_sklearn = sklearn_gbm.predict(X_test)
    sk_mse = mean_squared_error(y_test, y_pred_sklearn)
    sk_r2 = r2_score(y_test, y_pred_sklearn)

    print(f"sklearn GBM - Test MSE: {sk_mse:.4f}, R^2: {sk_r2:.4f}, "
          f"Train time: {sklearn_train_time:.3f}s")

    return {
        "custom": {"mse": custom_mse, "r2": custom_r2, "time": custom_train_time},
        "sklearn": {"mse": sk_mse, "r2": sk_r2, "time": sklearn_train_time},
        "custom_model": custom_gbm,
        "sklearn_model": sklearn_gbm,
    }


# --------------------------------------------------------------------
# 6. Main script
# --------------------------------------------------------------------

def main(use_housing_csv=True):
    # ========= Choose dataset =========
    if use_housing_csv:
        # --- Your real data ---
        # Change TARGET_COLUMN to the actual target in housing.csv
        CSV_PATH = "housing.csv"
        TARGET_COLUMN = "MEDV"  # <-- CHANGE THIS if needed
        X, y = load_housing_data_from_csv(CSV_PATH, TARGET_COLUMN)
        print(f"Loaded housing data from {CSV_PATH} with shape: {X.shape}")
    else:
        # --- Synthetic regression demo ---
        X, y = generate_synthetic_regression()
        print(f"Generated synthetic regression data with shape: {X.shape}")

    # Train/val/test split
    X_train_full, X_test, y_train_full, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42
    )
    X_train, X_val, y_train, y_val = train_test_split(
        X_train_full, y_train_full, test_size=0.25, random_state=42
    )  # 0.25 * 0.8 = 0.2 -> 60/20/20 split

    print(f"Train size: {X_train.shape[0]}, "
          f"Val size: {X_val.shape[0]}, Test size: {X_test.shape[0]}")

    # ========= Hyperparameter tuning =========
    best_config, all_results = grid_search_gbm(
        X_train,
        y_train,
        X_val,
        y_val,
        learning_rates=(0.05, 0.1, 0.2),
        n_estimators_list=(50, 100, 150),
        max_depths=(2, 3, 4),
    )
    print("\n=== Grid search results (top 5 by val MSE) ===")
    all_results_sorted = sorted(all_results, key=lambda d: d["val_mse"])
    for r in all_results_sorted[:5]:
        print(r)

    # ========= Benchmark vs sklearn GBM =========
    benchmark_results = benchmark_models(X_train_full, X_test, y_train_full, y_test, best_config)

    print("\n=== Final comparison ===")
    print("Custom GBM:", benchmark_results["custom"])
    print("sklearn GBM:", benchmark_results["sklearn"])


if __name__ == "__main__":
    main(use_housing_csv=True)
