In [7]:


import os
import numpy as np
from dataclasses import dataclass
from typing import Optional, List, Tuple

from sklearn.datasets import make_regression
from sklearn.model_selection import KFold
from sklearn.metrics import mean_absolute_error, mean_squared_error
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import GradientBoostingRegressor

# -----------------------------
# Utilities
# -----------------------------
def set_global_seed(seed: int = 42):
    np.random.seed(seed)

def ensure_dir(path: str):
    os.makedirs(path, exist_ok=True)

# -----------------------------
# Simple regression tree (NumPy-only)
# -----------------------------
@dataclass
class _TreeNode:
    feature_index: Optional[int] = None
    threshold: Optional[float] = None
    left: Optional['_TreeNode'] = None
    right: Optional['_TreeNode'] = None
    value: Optional[float] = None

class NumpyDecisionTreeRegressor:
    def __init__(self, max_depth: int = 2, min_samples_split: int = 2,
                 n_thresholds_per_feature: int = 8, random_state: int = 42):
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.n_thresholds_per_feature = n_thresholds_per_feature
        self.root: Optional[_TreeNode] = None
        self.random_state = random_state

    def fit(self, X: np.ndarray, y: np.ndarray):
        np.random.seed(self.random_state)
        self.root = self._build_tree(X, y, depth=0)

    def predict(self, X: np.ndarray) -> np.ndarray:
        return np.array([self._predict_row(self.root, row) for row in X])

    def _predict_row(self, node: _TreeNode, x_row: np.ndarray) -> float:
        while node.value is None:
            node = node.left if x_row[node.feature_index] <= node.threshold else node.right
        return node.value

    def _build_tree(self, X: np.ndarray, y: np.ndarray, depth: int) -> _TreeNode:
        if depth >= self.max_depth or X.shape[0] < self.min_samples_split:
            return _TreeNode(value=float(y.mean()))
        if np.allclose(y, y[0]):
            return _TreeNode(value=float(y[0]))
        feat, thr, gain, left_mask, right_mask = self._best_split(X, y)
        if feat is None:
            return _TreeNode(value=float(y.mean()))
        left_node = self._build_tree(X[left_mask], y[left_mask], depth + 1)
        right_node = self._build_tree(X[right_mask], y[right_mask], depth + 1)
        return _TreeNode(feature_index=feat, threshold=thr, left=left_node, right=right_node)

    def _best_split(self, X: np.ndarray, y: np.ndarray):
        current_sse = np.sum((y - y.mean()) ** 2)
        best_feature, best_threshold, best_gain = None, None, 0.0
        best_left_mask, best_right_mask = None, None
        for j in range(X.shape[1]):
            xj = X[:, j]
            if np.unique(xj).size < 2:
                continue
            thresholds = np.quantile(xj, np.linspace(0, 1, self.n_thresholds_per_feature + 2)[1:-1])
            for thr in thresholds:
                left_mask = xj <= thr
                right_mask = ~left_mask
                if left_mask.sum() < self.min_samples_split or right_mask.sum() < self.min_samples_split:
                    continue
                yl, yr = y[left_mask], y[right_mask]
                sse_left = np.sum((yl - yl.mean()) ** 2)
                sse_right = np.sum((yr - yr.mean()) ** 2)
                gain = current_sse - (sse_left + sse_right)
                if gain > best_gain + 1e-12:
                    best_gain, best_feature, best_threshold = gain, j, float(thr)
                    best_left_mask, best_right_mask = left_mask, right_mask
        if best_feature is None:
            return None, None, 0.0, None, None
        return best_feature, best_threshold, best_gain, best_left_mask, best_right_mask

# -----------------------------
# Gradient Boosting Regressor (NumPy-only)
# -----------------------------
class NumpyGradientBoostingRegressor:
    def __init__(self, n_estimators: int = 50, learning_rate: float = 0.1,
                 max_depth: int = 2, random_state: int = 42):
        self.n_estimators = n_estimators
        self.learning_rate = learning_rate
        self.max_depth = max_depth
        self.random_state = random_state
        self.init_ = 0.0
        self.trees: List[NumpyDecisionTreeRegressor] = []

    def fit(self, X: np.ndarray, y: np.ndarray):
        np.random.seed(self.random_state)
        self.init_ = float(np.mean(y))
        y_pred = np.full(X.shape[0], self.init_, dtype=float)
        self.trees = []
        for m in range(self.n_estimators):
            residuals = y - y_pred
            tree = NumpyDecisionTreeRegressor(
                max_depth=self.max_depth, min_samples_split=2, n_thresholds_per_feature=8,
                random_state=self.random_state + m
            )
            tree.fit(X, residuals)
            y_pred = y_pred + self.learning_rate * tree.predict(X)
            self.trees.append(tree)
        return self

    def predict(self, X: np.ndarray) -> np.ndarray:
        y_pred = np.full(X.shape[0], self.init_, dtype=float)
        for tree in self.trees:
            y_pred += self.learning_rate * tree.predict(X)
        return y_pred

# -----------------------------
# Data: generate -> save -> load -> preprocess
# -----------------------------
def generate_and_save_dataset(csv_dir="data", csv_name="regression_dataset.csv",
                              n_samples=500, n_features=10, n_informative=8, noise=10.0, random_state=42):
    ensure_dir(csv_dir)
    X, y = make_regression(
        n_samples=n_samples,
        n_features=n_features,
        n_informative=n_informative,
        noise=noise,
        random_state=random_state
    )
    data = np.concatenate([X, y.reshape(-1, 1)], axis=1)
    header = [f"x{i}" for i in range(X.shape[1])] + ["target"]
    path = os.path.join(csv_dir, csv_name)
    np.savetxt(path, data, delimiter=",", header=",".join(header), comments="", fmt="%.6f")
    return path

def load_and_preprocess_dataset(path: str):
    raw = np.genfromtxt(path, delimiter=",", names=True)
    feature_names = [n for n in raw.dtype.names if n != "target"]
    X = np.vstack([raw[n] for n in feature_names]).T
    y = raw["target"]
    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X)
    return X_scaled, y

# -----------------------------
# CV + hyperparameter sweep
# -----------------------------
def evaluate_models_cv(X: np.ndarray, y: np.ndarray,
                       learning_rates=(0.1, 0.05),
                       n_estimators_list=(50, 100),
                       max_depth_list=(2,),
                       n_splits=3, random_state=42):
    kf = KFold(n_splits=n_splits, shuffle=True, random_state=random_state)
    results_custom, results_sklearn = [], []
    for lr in learning_rates:
        for n_est in n_estimators_list:
            for md in max_depth_list:
                mae_c, rmse_c, mae_s, rmse_s = [], [], [], []
                for tr_idx, va_idx in kf.split(X):
                    X_tr, X_va = X[tr_idx], X[va_idx]
                    y_tr, y_va = y[tr_idx], y[va_idx]
                    gbm = NumpyGradientBoostingRegressor(n_estimators=n_est, learning_rate=lr, max_depth=md)
                    gbm.fit(X_tr, y_tr)
                    y_pred_c = gbm.predict(X_va)
                    mae_c.append(mean_absolute_error(y_va, y_pred_c))
                    rmse_c.append(np.sqrt(mean_squared_error(y_va, y_pred_c)))
                    sk = GradientBoostingRegressor(loss='squared_error', learning_rate=lr,
                                                   n_estimators=n_est, max_depth=md, random_state=random_state)
                    sk.fit(X_tr, y_tr)
                    y_pred_s = sk.predict(X_va)
                    mae_s.append(mean_absolute_error(y_va, y_pred_s))
                    rmse_s.append(np.sqrt(mean_squared_error(y_va, y_pred_s)))
                results_custom.append({
                    'lr': lr, 'n_est': n_est, 'depth': md,
                    'mae': np.mean(mae_c), 'rmse': np.mean(rmse_c)
                })
                results_sklearn.append({
                    'lr': lr, 'n_est': n_est, 'depth': md,
                    'mae': np.mean(mae_s), 'rmse': np.mean(rmse_s)
                })
    return results_custom, results_sklearn

# -----------------------------
# Main execution block
# -----------------------------
if __name__ == "__main__":
    set_global_seed(42)

    # 1. Generate and save dataset
    print("Generating and saving dataset...")
    csv_path = generate_and_save_dataset(random_state=42)
    print(f"Dataset saved to {csv_path}")

    # 2. Load and preprocess dataset
    print("Loading and preprocessing dataset...")
    X, y = load_and_preprocess_dataset(csv_path)
    print(f"Dataset loaded with X shape: {X.shape}, y shape: {y.shape}")

    # 3. Define hyperparameters for sweep
    learning_rates = (0.1, 0.05, 0.01)
    n_estimators_list = (50, 100, 200)
    max_depth_list = (2, 4)

    # 4. Run cross-validation and hyperparameter sweep
    print("Running hyperparameter sweep with 5-fold CV...")
    custom_results, sklearn_results = evaluate_models_cv(
        X, y,
        learning_rates=learning_rates,
        n_estimators_list=n_estimators_list,
        max_depth_list=max_depth_list,
        n_splits=5, random_state=42
    )

    print("\n--- Custom GBM Results ---")
    for r in custom_results:
        print(f"LR: {r['lr']}, Estimators: {r['n_est']}, Depth: {r['depth']} => MAE: {r['mae']:.2f}, RMSE: {r['rmse']:.2f}")

    print("\n--- Scikit-learn GBM Results ---")
    for r in sklearn_results:
        print(f"LR: {r['lr']}, Estimators: {r['n_est']}, Depth: {r['depth']} => MAE: {r['mae']:.2f}, RMSE: {r['rmse']:.2f}")

    # 5. Find best models
    best_custom = min(custom_results, key=lambda x: x['mae'])
    best_sklearn = min(sklearn_results, key=lambda x: x['mae'])

    print("\n--- Best Models ---")
    print(f"Custom GBM Best: LR: {best_custom['lr']}, Estimators: {best_custom['n_est']}, Depth: {best_custom['depth']} | MAE: {best_custom['mae']:.2f}, RMSE: {best_custom['rmse']:.2f}")
    print(f"Scikit-learn GBM Best: LR: {best_sklearn['lr']}, Estimators: {best_sklearn['n_est']}, Depth: {best_sklearn['depth']} | MAE: {best_sklearn['mae']:.2f}, RMSE: {best_sklearn['rmse']:.2f}")

    # 6. Generate Markdown report
    report_content = "# Gradient Boosting Regressor Comparison Report\n\n"
    report_content += "This report compares a custom NumPy-only Gradient Boosting Regressor against scikit-learn's implementation.\n\n"

    report_content += "## Hyperparameters Explored\n"
    report_content += f"- Learning Rates: {', '.join(map(str, learning_rates))}\n"
    report_content += f"- Number of Estimators: {', '.join(map(str, n_estimators_list))}\n"
    report_content += f"- Max Depth: {', '.join(map(str, max_depth_list))}\n"
    report_content += "- Cross-validation: 5-fold\n\n"

    report_content += "## Custom GBM Results\n"
    report_content += "| LR | Estimators | Depth | MAE | RMSE |\n"
    report_content += "|----|------------|-------|-----|------|\n"
    for r in custom_results:
        report_content += f"| {r['lr']} | {r['n_est']} | {r['depth']} | {r['mae']:.2f} | {r['rmse']:.2f} |\n"
    report_content += "\n"

    report_content += "## Scikit-learn GBM Results\n"
    report_content += "| LR | Estimators | Depth | MAE | RMSE |\n"
    report_content += "|----|------------|-------|-----|------|\n"
    for r in sklearn_results:
        report_content += f"| {r['lr']} | {r['n_est']} | {r['depth']} | {r['mae']:.2f} | {r['rmse']:.2f} |\n"
    report_content += "\n"

    report_content += "## Best Performing Models\n"
    report_content += "Based on Mean Absolute Error (MAE):\n\n"
    report_content += f"- **Custom GBM Best:** LR: {best_custom['lr']}, Estimators: {best_custom['n_est']}, Depth: {best_custom['depth']} | MAE: {best_custom['mae']:.2f}, RMSE: {best_custom['rmse']:.2f}\n"
    report_content += f"- **Scikit-learn GBM Best:** LR: {best_sklearn['lr']}, Estimators: {best_sklearn['n_est']}, Depth: {best_sklearn['depth']} | MAE: {best_sklearn['mae']:.2f}, RMSE: {best_sklearn['rmse']:.2f}\n"

    report_filename = "gbm_comparison_report.md"
    with open(report_filename, "w") as f:
        f.write(report_content)
    print(f"\nReport saved to {report_filename}")




Generating and saving dataset...
Dataset saved to data/regression_dataset.csv
Loading and preprocessing dataset...
Dataset loaded with X shape: (500, 10), y shape: (500,)
Running hyperparameter sweep with 5-fold CV...

--- Custom GBM Results ---
LR: 0.1, Estimators: 50, Depth: 2 => MAE: 50.57, RMSE: 64.64
LR: 0.1, Estimators: 50, Depth: 4 => MAE: 41.34, RMSE: 53.13
LR: 0.1, Estimators: 100, Depth: 2 => MAE: 37.42, RMSE: 48.47
LR: 0.1, Estimators: 100, Depth: 4 => MAE: 36.16, RMSE: 46.70
LR: 0.1, Estimators: 200, Depth: 2 => MAE: 30.13, RMSE: 39.12
LR: 0.1, Estimators: 200, Depth: 4 => MAE: 34.99, RMSE: 45.05
LR: 0.05, Estimators: 50, Depth: 2 => MAE: 66.28, RMSE: 83.75
LR: 0.05, Estimators: 50, Depth: 4 => MAE: 53.18, RMSE: 67.39
LR: 0.05, Estimators: 100, Depth: 2 => MAE: 50.45, RMSE: 64.71
LR: 0.05, Estimators: 100, Depth: 4 => MAE: 41.38, RMSE: 53.18
LR: 0.05, Estimators: 200, Depth: 2 => MAE: 36.92, RMSE: 47.98
LR: 0.05, Estimators: 200, Depth: 4 => MAE: 35.87, RMSE: 46.32
LR: 0.01