In [1]:
import time
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from sklearn.base import BaseEstimator
from sklearn.datasets import fetch_california_housing, load_breast_cancer
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, accuracy_score
from sklearn.ensemble import GradientBoostingRegressor, GradientBoostingClassifier

from collections import deque
from sklearn.base import BaseEstimator, RegressorMixin, ClassifierMixin
from sklearn.utils.validation import check_X_y, check_array, check_is_fitted

import xgboost
import lightgbm

from xgboost import XGBRegressor, XGBClassifier
from lightgbm import LGBMRegressor, LGBMClassifier

In [2]:
class DecisionTreeRegressor:
    def __init__(self, max_depth: int = 3):
        """
        A simple decision tree regressor that splits to minimize variance.

        Args:
            max_depth (int): Maximum depth of the tree.
        """
        self.max_depth = max_depth
        self.tree = None

    def _find_best_split(self, X: np.ndarray, y: np.ndarray) -> tuple:
        """
        Find the best feature and threshold to split the data by minimizing variance.

        Args:
            X (np.ndarray): Feature matrix of shape (n_samples, n_features).
            y (np.ndarray): Target vector of shape (n_samples,).

        Returns:
            tuple: (best_feature_index, best_threshold)
        """
        best_feature, best_threshold, best_mse = None, None, float('inf')

        for feature in range(X.shape[1]):
            thresholds = np.unique(X[:, feature])
            for threshold in thresholds:
                left_mask = X[:, feature] <= threshold
                left_y, right_y = y[left_mask], y[~left_mask]

                if len(left_y) == 0 or len(right_y) == 0:
                    continue

                # Weighted sum of variances
                mse = np.var(left_y) * len(left_y) + np.var(right_y) * len(right_y)
                if mse < best_mse:
                    best_feature, best_threshold, best_mse = feature, threshold, mse

        return best_feature, best_threshold

    def _build_tree(self, X: np.ndarray, y: np.ndarray, depth: int = 0):
        """
        Recursively build the regression tree.

        Args:
            X (np.ndarray): Feature matrix.
            y (np.ndarray): Target values.
            depth (int): Current tree depth.

        Returns:
            Tree node or prediction value.
        """
        if depth >= self.max_depth or len(y) < 2:
            return np.mean(y)

        feature, threshold = self._find_best_split(X, y)
        if feature is None:
            return np.mean(y)

        left_mask = X[:, feature] <= threshold
        left_tree = self._build_tree(X[left_mask], y[left_mask], depth + 1)
        right_tree = self._build_tree(X[~left_mask], y[~left_mask], depth + 1)

        return (feature, threshold, left_tree, right_tree)

    def fit(self, X: np.ndarray, y: np.ndarray) -> None:
        """
        Train the decision tree regressor.

        Args:
            X (np.ndarray): Feature matrix.
            y (np.ndarray): Target vector.
        """
        self.tree = self._build_tree(X, y)

    def _predict_sample(self, x: np.ndarray, tree) -> float:
        """
        Predict the target value for a single sample.

        Args:
            x (np.ndarray): A single sample.
            tree: The tree structure.

        Returns:
            float: Predicted value.
        """
        if not isinstance(tree, tuple):
            return tree
        feature, threshold, left_tree, right_tree = tree
        subtree = left_tree if x[feature] <= threshold else right_tree
        return self._predict_sample(x, subtree)

    def predict(self, X: np.ndarray) -> np.ndarray:
        """
        Predict target values for input samples.

        Args:
            X (np.ndarray): Feature matrix.

        Returns:
            np.ndarray: Predicted values.
        """
        return np.array([self._predict_sample(x, self.tree) for x in X])


class CustomGBDTBase(BaseEstimator):
    def __init__(self, n_estimators: int = 100, learning_rate: float = 0.1, max_depth: int = 3):
        """
        Base class for Gradient Boosted Decision Trees.

        Args:
            n_estimators (int): Number of boosting rounds.
            learning_rate (float): Step size shrinkage.
            max_depth (int): Maximum depth of each regression tree.
        """
        self.n_estimators = n_estimators
        self.learning_rate = learning_rate
        self.max_depth = max_depth
        self.trees = []

    def _create_tree(self) -> DecisionTreeRegressor:
        """Factory method to create a new regression tree."""
        return DecisionTreeRegressor(max_depth=self.max_depth)

    def _initial_prediction(self, y: np.ndarray) -> float:
        raise NotImplementedError

    def _calculate_gradient(self, y: np.ndarray, predictions: np.ndarray) -> np.ndarray:
        raise NotImplementedError

    def _transform_output(self, raw_predictions: np.ndarray) -> np.ndarray:
        return raw_predictions

    def fit(self, X: np.ndarray, y: np.ndarray) -> None:
        """
        Fit the gradient boosting model.

        Args:
            X (np.ndarray): Training features.
            y (np.ndarray): Training targets.
        """
        self.base_pred = self._initial_prediction(y)
        predictions = np.full(y.shape, self.base_pred)

        for _ in range(self.n_estimators):
            residuals = self._calculate_gradient(y, predictions)
            tree = self._create_tree()
            tree.fit(X, residuals)
            predictions += self.learning_rate * tree.predict(X)
            self.trees.append(tree)

    def predict(self, X: np.ndarray) -> np.ndarray:
        """
        Predict using the gradient boosted ensemble.

        Args:
            X (np.ndarray): Input features.

        Returns:
            np.ndarray: Model predictions.
        """
        raw_preds = np.full(X.shape[0], self.base_pred)
        for tree in self.trees:
            raw_preds += self.learning_rate * tree.predict(X)
        return self._transform_output(raw_preds)


class CustomGBR(CustomGBDTBase):
    """Custom Gradient Boosted Regressor."""

    def _initial_prediction(self, y: np.ndarray) -> float:
        return np.mean(y)

    def _calculate_gradient(self, y: np.ndarray, predictions: np.ndarray) -> np.ndarray:
        return y - predictions  # Negative gradient for MSE

    def _transform_output(self, raw_predictions: np.ndarray) -> np.ndarray:
        return raw_predictions


class CustomGBC(CustomGBDTBase):
    """Custom Gradient Boosted Classifier."""

    def _initial_prediction(self, y: np.ndarray) -> float:
        pos = np.mean(y)
        epsilon = 1e-8
        return np.log((pos + epsilon) / (1 - pos + epsilon))  # Log-odds

    def _calculate_gradient(self, y: np.ndarray, predictions: np.ndarray) -> np.ndarray:
        p = 1 / (1 + np.exp(-predictions))  # Sigmoid
        return y - p  # Negative log-loss gradient

    def _transform_output(self, raw_predictions: np.ndarray) -> np.ndarray:
        probs = 1 / (1 + np.exp(-raw_predictions))
        return (probs >= 0.5).astype(int)

    def predict_proba(self, X: np.ndarray) -> np.ndarray:
        raw_preds = np.full(X.shape[0], self.base_pred)
        for tree in self.trees:
            raw_preds += self.learning_rate * tree.predict(X)
        probs = 1 / (1 + np.exp(-raw_preds))
        return np.vstack([1 - probs, probs]).T


# Optimizations: 

In [4]:
class OptimizedDecisionTree:
    """Histogram-based decision tree with proper node indexing"""
    def __init__(self, max_depth=3, min_samples_split=2, bins=128):
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.bins = bins
        self.nodes = []
        self.root = None
        
    def _find_best_split(self, X, y):
        """Find best split using histogram approximation"""
        n_samples, n_features = X.shape
        best_feature, best_threshold, best_mse = None, None, float('inf')
        
        for f in range(n_features):
            # Create histogram for this feature
            hist, edges = np.histogram(X[:, f], bins=self.bins)
            n_bins = len(hist)
            bin_indices = np.digitize(X[:, f], edges)
            
            # Precompute bin statistics
            bin_sums = np.zeros(n_bins)
            bin_counts = np.zeros(n_bins)
            
            for i in range(n_samples):
                bin_idx = bin_indices[i] - 1
                if bin_idx >= n_bins:
                    bin_idx = n_bins - 1
                bin_sums[bin_idx] += y[i]
                bin_counts[bin_idx] += 1
            
            # Find best split in histogram space
            cumsum_left = 0
            count_left = 0
            
            for i in range(n_bins - 1):
                # Update left statistics
                cumsum_left += bin_sums[i]
                count_left += bin_counts[i]
                
                # Skip if not enough samples
                if count_left < self.min_samples_split or (n_samples - count_left) < self.min_samples_split:
                    continue
                
                # Compute right statistics
                cumsum_right = np.sum(bin_sums) - cumsum_left
                count_right = n_samples - count_left
                
                # Compute MSE
                mse_left = (np.sum(bin_sums[i:]**2 / np.maximum(bin_counts[i:], 1)) - cumsum_left**2 / count_left)
                mse_right = (np.sum(bin_sums[:i]**2 / np.maximum(bin_counts[:i], 1)) - cumsum_right**2 / count_right)
                mse_total = mse_left + mse_right
                
                if mse_total < best_mse:
                    best_mse = mse_total
                    best_feature = f
                    best_threshold = edges[i+1]
        
        return best_feature, best_threshold, best_mse
    
    def _build_tree(self, X, y):
        """Build tree using BFS with proper node indexing"""
        n_samples = X.shape[0]
        root = {
            'id': 0,
            'indices': np.arange(n_samples),
            'depth': 0,
            'is_leaf': False,
            'value': np.mean(y)
        }
        self.nodes = [root]
        queue = deque([root])
        next_id = 1
        
        while queue:
            node = queue.popleft()
            indices = node['indices']
            X_node = X[indices]
            y_node = y[indices]
            
            # Check stopping criteria
            if (node['depth'] >= self.max_depth or 
                len(y_node) < self.min_samples_split or
                np.var(y_node) < 1e-6):
                node['is_leaf'] = True
                continue
                
            # Find best split
            feature, threshold, mse = self._find_best_split(X_node, y_node)
            
            if feature is None:
                node['is_leaf'] = True
                continue
                
            # Split data
            left_mask = X_node[:, feature] <= threshold
            right_mask = ~left_mask
            
            # Create child nodes
            left_child = {
                'id': next_id,
                'indices': indices[left_mask],
                'depth': node['depth'] + 1,
                'is_leaf': False,
                'value': np.mean(y_node[left_mask])
            }
            next_id += 1
            
            right_child = {
                'id': next_id,
                'indices': indices[right_mask],
                'depth': node['depth'] + 1,
                'is_leaf': False,
                'value': np.mean(y_node[right_mask])
            }
            next_id += 1
            
            # Update current node
            node['feature'] = feature
            node['threshold'] = threshold
            node['left'] = left_child
            node['right'] = right_child
            node['is_leaf'] = False
            
            # Add children to queue and node list
            self.nodes.extend([left_child, right_child])
            queue.append(left_child)
            queue.append(right_child)
        
        return root
        
    def fit(self, X, y):
        self.root = self._build_tree(X, y)
        return self
        
    def predict(self, X):
        """Vectorized prediction using tree traversal"""
        n_samples = X.shape[0]
        preds = np.zeros(n_samples)
        current_nodes = [self.root] * n_samples
        
        # Process nodes until all are leaves
        while any(not node.get('is_leaf', True) for node in current_nodes):
            # Process each sample
            for i in range(n_samples):
                node = current_nodes[i]
                
                if node.get('is_leaf', True):
                    # Already at leaf, store prediction
                    preds[i] = node.get('value', 0)
                    continue
                    
                # Traverse to next node
                if X[i, node['feature']] <= node['threshold']:
                    current_nodes[i] = node['left']
                else:
                    current_nodes[i] = node['right']
        
        # Handle any remaining non-leaf nodes
        for i in range(n_samples):
            if not current_nodes[i].get('is_leaf', False):
                preds[i] = current_nodes[i].get('value', 0)
                
        return preds


class OptimizedGradientBoostingRegressor(BaseEstimator, RegressorMixin):
    """Optimized Gradient Boosting for Regression"""
    def __init__(self, n_estimators=100, learning_rate=0.1, max_depth=3, 
                 min_samples_split=2, bins=128, subsample=0.8, verbose=0):
        self.n_estimators = n_estimators
        self.learning_rate = learning_rate
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.bins = bins
        self.subsample = subsample
        self.verbose = verbose
        self.trees = []
        self.base_pred = None
        
    def fit(self, X, y):
        X, y = check_X_y(X, y)
        n_samples = X.shape[0]
        
        # Initial prediction
        self.base_pred = np.mean(y)
        predictions = np.full(n_samples, self.base_pred)
        
        for i in range(self.n_estimators):
            # Compute residuals
            residuals = y - predictions
            
            # Create and fit tree
            tree = OptimizedDecisionTree(
                max_depth=self.max_depth,
                min_samples_split=self.min_samples_split,
                bins=self.bins
            )
            
            # Subsample data
            if self.subsample < 1.0:
                idx = np.random.choice(n_samples, int(n_samples * self.subsample), replace=False)
                tree.fit(X[idx], residuals[idx])
            else:
                tree.fit(X, residuals)
            
            # Update predictions
            tree_pred = tree.predict(X)
            predictions += self.learning_rate * tree_pred
            
            # Store tree
            self.trees.append(tree)
            
            # Progress logging
            if self.verbose and (i+1) % 10 == 0:
                mse = mean_squared_error(y, predictions)
                print(f"Tree {i+1}/{self.n_estimators} - MSE: {mse:.4f}")
        
        return self
        
    def predict(self, X):
        # check_is_fitted(self)
        # X = check_array(X)
        preds = np.full(X.shape[0], self.base_pred)
        
        for tree in self.trees:
            preds += self.learning_rate * tree.predict(X)
            
        return preds


class OptimizedGradientBoostingClassifier(BaseEstimator, ClassifierMixin):
    """Optimized Gradient Boosting for Classification"""
    def __init__(self, n_estimators=100, learning_rate=0.1, max_depth=3, 
                 min_samples_split=2, bins=128, subsample=0.8, verbose=0):
        self.n_estimators = n_estimators
        self.learning_rate = learning_rate
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.bins = bins
        self.subsample = subsample
        self.verbose = verbose
        self.trees = []
        self.base_log_odds = None
        
    def fit(self, X, y):
        X, y = check_X_y(X, y)
        n_samples = X.shape[0]
        
        # Convert to binary classification
        self.classes_, y = np.unique(y, return_inverse=True)
        y = y.astype(float)
        
        # Initial prediction (log-odds)
        pos = np.mean(y)
        epsilon = 1e-8
        self.base_log_odds = np.log((pos + epsilon) / (1 - pos + epsilon))
        log_odds = np.full(n_samples, self.base_log_odds)
        probabilities = 1 / (1 + np.exp(-log_odds))
        
        for i in range(self.n_estimators):
            # Compute gradients
            gradients = y - probabilities
            
            # Create and fit tree
            tree = OptimizedDecisionTree(
                max_depth=self.max_depth,
                min_samples_split=self.min_samples_split,
                bins=self.bins
            )
            
            # Subsample data
            if self.subsample < 1.0:
                idx = np.random.choice(n_samples, int(n_samples * self.subsample), replace=False)
                tree.fit(X[idx], gradients[idx])
            else:
                tree.fit(X, gradients)
            
            # Update predictions
            tree_pred = tree.predict(X)
            log_odds += self.learning_rate * tree_pred
            probabilities = 1 / (1 + np.exp(-log_odds))
            
            # Store tree
            self.trees.append(tree)
            
            # Progress logging
            if self.verbose and (i+1) % 10 == 0:
                acc = np.mean((probabilities > 0.5) == y)
                print(f"Tree {i+1}/{self.n_estimators} - Accuracy: {acc:.4f}")
        
        return self
        
    def predict_proba(self, X):
        # check_is_fitted(self)
        # X = check_array(X)
        log_odds = np.full(X.shape[0], self.base_log_odds)
        
        for tree in self.trees:
            log_odds += self.learning_rate * tree.predict(X)
            
        probabilities = 1 / (1 + np.exp(-log_odds))
        return np.vstack([1 - probabilities, probabilities]).T
        
    def predict(self, X):
        proba = self.predict_proba(X)
        return self.classes_[(proba[:, 1] > 0.5).astype(int)]

In [5]:
# Configure experiment settings
RANDOM_STATE = 42
TEST_SIZE = 0.2
N_ESTIMATORS = 100
LEARNING_RATE = 0.1
MAX_DEPTH = 3

# Initialize logging
results = []

def run_experiment(model, model_name, task, X_train, X_test, y_train, y_test):
    """Train and evaluate a model, returning metrics and time"""
    start_time = time.time()
    model.fit(X_train, y_train)
    train_time = time.time() - start_time
    
    preds = model.predict(X_test)
    
    if task == "regression":
        metric = mean_squared_error(y_test, preds)
        metric_name = "MSE"
    else:
        metric = accuracy_score(y_test, preds)
        metric_name = "Accuracy"
    
    return {
        "Model": model_name,
        "Task": task,
        metric_name: metric,
        "Train Time (s)": train_time,
        "n_estimators": N_ESTIMATORS,
        "learning_rate": LEARNING_RATE,
        "max_depth": MAX_DEPTH
    }

# Run regression experiment
def regression_experiment():
    data = fetch_california_housing()
    X, y = data.data, data.target
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=TEST_SIZE, random_state=RANDOM_STATE
    )
    
    # Initialize models
    models = {
        "Custom GBDT": CustomGBR(n_estimators=N_ESTIMATORS, 
                                learning_rate=LEARNING_RATE, 
                                max_depth=MAX_DEPTH),
        "Custom Optimized GBDT": OptimizedGradientBoostingRegressor(n_estimators = N_ESTIMATORS,
                                                                   learning_rate = LEARNING_RATE,
                                                                   max_depth = MAX_DEPTH),
        "Scikit-learn": GradientBoostingRegressor(n_estimators=N_ESTIMATORS, 
                                                 learning_rate=LEARNING_RATE, 
                                                 max_depth=MAX_DEPTH,
                                                 random_state=RANDOM_STATE),
        "XGBoost": XGBRegressor(n_estimators=N_ESTIMATORS, 
                               learning_rate=LEARNING_RATE, 
                               max_depth=MAX_DEPTH,
                               random_state=RANDOM_STATE,
                               verbosity=0),
        "LightGBM": LGBMRegressor(n_estimators=N_ESTIMATORS, 
                                 learning_rate=LEARNING_RATE, 
                                 max_depth=MAX_DEPTH,
                                 random_state=RANDOM_STATE,
                                 verbose=-1)
    }
    
    for name, model in models.items():
        results.append(run_experiment(
            model, name, "regression", X_train, X_test, y_train, y_test
        ))

# Run classification experiment
def classification_experiment():
    data = load_breast_cancer()
    X, y = data.data, data.target
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=TEST_SIZE, random_state=RANDOM_STATE
    )
    
    # Initialize models
    models = {
        "Custom GBDT": CustomGBC(n_estimators=N_ESTIMATORS, 
                                learning_rate=LEARNING_RATE, 
                                max_depth=MAX_DEPTH),
        "Custom Optimized GBDT": OptimizedGradientBoostingClassifier(n_estimators=N_ESTIMATORS,
                                                              learning_rate = LEARNING_RATE,
                                                              max_depth=MAX_DEPTH),
        "Scikit-learn": GradientBoostingClassifier(n_estimators=N_ESTIMATORS, 
                                                 learning_rate=LEARNING_RATE, 
                                                 max_depth=MAX_DEPTH,
                                                 random_state=RANDOM_STATE),
        "XGBoost": XGBClassifier(n_estimators=N_ESTIMATORS, 
                                learning_rate=LEARNING_RATE, 
                                max_depth=MAX_DEPTH,
                                random_state=RANDOM_STATE,
                                verbosity=0),
        "LightGBM": LGBMClassifier(n_estimators=N_ESTIMATORS, 
                                  learning_rate=LEARNING_RATE, 
                                  max_depth=MAX_DEPTH,
                                  random_state=RANDOM_STATE,
                                  verbose=-1)
    }
    
    for name, model in models.items():
        results.append(run_experiment(
            model, name, "classification", X_train, X_test, y_train, y_test
        ))

# Run experiments
regression_experiment()
classification_experiment()

# Create results table
results_df = pd.DataFrame(results)
print("\n" + "="*120)
print("GBDT Algorithm Comparison Summary")
print("="*120)
print(results_df.to_markdown(index=False))
print("="*120)


GBDT Algorithm Comparison Summary
| Model                 | Task           |        MSE |   Train Time (s) |   n_estimators |   learning_rate |   max_depth |   Accuracy |
|:----------------------|:---------------|-----------:|-----------------:|---------------:|----------------:|------------:|-----------:|
| Custom GBDT           | regression     |   0.293848 |     1848.16      |            100 |             0.1 |           3 | nan        |
| Custom Optimized GBDT | regression     |   1.31063  |       19.8692    |            100 |             0.1 |           3 | nan        |
| Scikit-learn          | regression     |   0.293997 |        2.94729   |            100 |             0.1 |           3 | nan        |
| XGBoost               | regression     |   0.295227 |        0.0657778 |            100 |             0.1 |           3 | nan        |
| LightGBM              | regression     |   0.289398 |        0.10589   |            100 |             0.1 |           3 | nan        |
| Cust