In [None]:
import numpy as np
from sklearn.tree import DecisionTreeRegressor
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from typing import Literal, Optional, Union

class GradientBoosting:
    """
    Gradient Boosting implementation supporting both regression and classification.
    For classification, implements binary classification using logistic loss.
    """
    def __init__(
        self,
        n_estimators: int = 100,
        learning_rate: float = 0.1,
        max_depth: int = 3,
        min_samples_split: int = 2,
        task: Literal['regression', 'classification'] = 'regression',
        random_state: Optional[int] = None
    ):
        self.n_estimators = n_estimators
        self.learning_rate = learning_rate
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.task = task
        self.random_state = random_state
        self.trees = []
        self.initial_prediction = None

    def _sigmoid(self, x: np.ndarray) -> np.ndarray:
        """Compute sigmoid function."""
        return 1 / (1 + np.exp(-x))

    def _compute_residuals(self, y: np.ndarray, y_pred: np.ndarray) -> np.ndarray:
        """Compute residuals based on task type."""
        if self.task == 'regression':
            return y - y_pred
        else:
            # For binary classification, compute negative gradient of log loss
            return y - self._sigmoid(y_pred)

    def _initialize_prediction(self, y: np.ndarray) -> float:
        """Initialize prediction with mean for regression or log odds for classification."""
        if self.task == 'regression':
            return np.mean(y)
        else:
            # For binary classification, initialize with log odds
            pos = np.sum(y == 1)
            neg = np.sum(y == 0)
            return np.log(pos / neg)

    def fit(self, X: np.ndarray, y: np.ndarray) -> None:
        """
        Fit the gradient boosting model.
        
        Args:
            X: Training features
            y: Target values
        """
        if self.random_state is not None:
            np.random.seed(self.random_state)

        # Initialize prediction
        self.initial_prediction = self._initialize_prediction(y)
        F = np.full(X.shape[0], self.initial_prediction)

        for _ in range(self.n_estimators):
            # Compute residuals
            residuals = self._compute_residuals(y, F)

            # Fit tree on residuals
            tree = DecisionTreeRegressor(
                max_depth=self.max_depth,
                min_samples_split=self.min_samples_split,
                random_state=self.random_state
            )
            tree.fit(X, residuals)

            # Update predictions
            update = tree.predict(X)
            F += self.learning_rate * update

            # Store the tree
            self.trees.append(tree)

    def predict(self, X: np.ndarray, raw_output: bool = False) -> np.ndarray:
        """
        Make predictions using the gradient boosting model.
        
        Args:
            X: Features to predict
            raw_output: If True, return raw scores for classification
            
        Returns:
            Predictions (probabilities for classification if raw_output=False)
        """
        # Start with initial prediction
        F = np.full(X.shape[0], self.initial_prediction)

        # Add predictions from all trees
        for tree in self.trees:
            F += self.learning_rate * tree.predict(X)

        if self.task == 'classification' and not raw_output:
            return self._sigmoid(F)
        return F

    def staged_predict(self, X: np.ndarray) -> np.ndarray:
        """
        Make predictions at each stage of the boosting process.
        Useful for analyzing the learning curve.
        """
        F = np.full(X.shape[0], self.initial_prediction)
        predictions = []

        for tree in self.trees:
            F += self.learning_rate * tree.predict(X)
            if self.task == 'classification':
                predictions.append(self._sigmoid(F.copy()))
            else:
                predictions.append(F.copy())

        return np.array(predictions)

# Example usage with California Housing dataset
def demonstrate_gradient_boosting():
    # Load and prepare data
    data = fetch_california_housing()
    X, y = data.data, data.target
    
    # Use a subset of data for faster execution
    n_samples = 5000
    indices = np.random.choice(len(X), n_samples, replace=False)
    X, y = X[indices], y[indices]
    
    # Split the data
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42
    )
    
    # Scale the features
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_test_scaled = scaler.transform(X_test)
    
    # Create and train gradient boosting model
    gb = GradientBoosting(
        n_estimators=100,
        learning_rate=0.1,
        max_depth=3,
        task='regression',
        random_state=42
    )
    
    gb.fit(X_train_scaled, y_train)
    
    # Make predictions
    y_pred = gb.predict(X_test_scaled)
    
    # Calculate metrics
    mse = np.mean((y_test - y_pred) ** 2)
    r2 = 1 - np.sum((y_test - y_pred) ** 2) / np.sum((y_test - np.mean(y_test)) ** 2)
    
    print(f"Mean Squared Error: {mse:.4f}")
    print(f"R-squared Score: {r2:.4f}")
    
    # Demonstrate staged predictions
    staged_predictions = gb.staged_predict(X_test_scaled)
    staged_mse = [np.mean((y_test - pred) ** 2) for pred in staged_predictions]
    print(f"\nFinal MSE after each stage:")
    for i in range(0, 100, 10):
        print(f"Stage {i}: {staged_mse[i]:.4f}")

if __name__ == "__main__":
    demonstrate_gradient_boosting()