In [None]:
import numpy as np
from sklearn.base import BaseEstimator, RegressorMixin, clone
from sklearn.tree import DecisionTreeRegressor
from sklearn.ensemble import RandomForestRegressor
from sklearn.utils.validation import check_X_y, check_array, check_is_fitted
from sklearn.model_selection import train_test_split
from cvxopt import matrix, solvers

class HedgedForecastCombination(BaseEstimator, RegressorMixin):
    """
    Implements hedged forecast combinations by solving a convex optimization problem
    to find optimal weights for combining individual forecasts.
    """

    def __init__(self, base_models, kappa=2.0, shrinkage='ledoit_wolf'):
        """
        Parameters:
        - base_models: List of sklearn-like regressors.
        - kappa: Gross-exposure constraint parameter.
        - shrinkage: Method for covariance estimation ('ledoit_wolf' or 'sample').
        """
        self.base_models = base_models
        self.kappa = kappa
        self.shrinkage = shrinkage

    def fit(self, X, y):
        """
        Fit the base models and compute optimal weights.

        Parameters:
        - X: Training features.
        - y: Training targets.
        """
        X, y = check_X_y(X, y)
        self.n_samples_, self.n_features_ = X.shape
        self.p_ = len(self.base_models)

        # Fit base models and collect predictions
        predictions = np.zeros((self.n_samples_, self.p_))
        for idx, model in enumerate(self.base_models):
            cloned_model = clone(model)
            cloned_model.fit(X, y)
            predictions[:, idx] = cloned_model.predict(X)
        self.predictions_ = predictions

        # Compute residuals
        residuals = y.reshape(-1, 1) - self.predictions_

        # Estimate mean and covariance of residuals
        self.mu_ = residuals.mean(axis=0)
        if self.shrinkage == 'ledoit_wolf':
            self.Sigma_ = self._ledoit_wolf_shrinkage(residuals)
        else:
            self.Sigma_ = np.cov(residuals, rowvar=False)

        # Solve the optimization problem to find weights
        self.weights_ = self._solve_optimization()

        # Store fitted base models
        self.fitted_models_ = []
        for model in self.base_models:
            fitted_model = clone(model).fit(X, y)
            self.fitted_models_.append(fitted_model)

        return self

    def predict(self, X):
        """
        Make predictions using the hedged forecast combination.

        Parameters:
        - X: Test features.

        Returns:
        - Combined predictions.
        """
        check_is_fitted(self, ['weights_', 'fitted_models_'])
        X = check_array(X)

        # Collect predictions from base models
        predictions = np.column_stack([
            model.predict(X) for model in self.fitted_models_
        ])

        # Compute weighted combination
        combined_predictions = predictions @ self.weights_
        return combined_predictions

    def _solve_optimization(self):
        """
        Solve the convex optimization problem to find optimal weights.
        """
        p = self.p_
        mu = self.mu_.reshape(-1, 1)
        Sigma = self.Sigma_

        # Objective: Minimize (w^T mu)^2 + w^T Sigma w
        P = 2 * (Sigma + mu @ mu.T)
        q = np.zeros((p, 1))

        # Constraints
        G_list = []

        # Gross-exposure constraint: ||w||_1 <= kappa
        G_list.append(np.vstack((np.eye(p), -np.eye(p))))
        h_list = [self.kappa * np.ones((p, 1)), np.zeros((p, 1))]

        # Sum of weights equals 1
        A = np.ones((1, p))
        b = np.array([[1.0]])

        G = np.vstack(G_list)
        h = np.vstack(h_list)

        # Convert to cvxopt matrices
        P_cvx = matrix(P)
        q_cvx = matrix(q)
        G_cvx = matrix(G)
        h_cvx = matrix(h)
        A_cvx = matrix(A)
        b_cvx = matrix(b)

        # Solve the quadratic program
        solvers.options['show_progress'] = False
        solution = solvers.qp(P_cvx, q_cvx, G_cvx, h_cvx, A_cvx, b_cvx)

        weights = np.array(solution['x']).flatten()
        return weights

    def _ledoit_wolf_shrinkage(self, residuals):
        """
        Estimate the covariance matrix using Ledoit-Wolf shrinkage.

        Parameters:
        - residuals: Residuals matrix.

        Returns:
        - Shrinkage covariance matrix.
        """
        from sklearn.covariance import ledoit_wolf
        Sigma, _ = ledoit_wolf(residuals)
        return Sigma

class HedgedRandomForestRegressor(BaseEstimator, RegressorMixin):
    """
    Implements a hedged random forest by applying hedged forecast combinations
    to the individual trees of a random forest.
    """

    def __init__(self, n_estimators=100, max_depth=None, kappa=2.0, shrinkage='ledoit_wolf'):
        """
        Parameters:
        - n_estimators: Number of trees in the forest.
        - max_depth: Maximum depth of the trees.
        - kappa: Gross-exposure constraint parameter.
        - shrinkage: Method for covariance estimation.
        """
        self.n_estimators = n_estimators
        self.max_depth = max_depth
        self.kappa = kappa
        self.shrinkage = shrinkage

    def fit(self, X, y):
        """
        Fit the hedged random forest.

        Parameters:
        - X: Training features.
        - y: Training targets.
        """
        # Initialize base models (individual trees)
        base_models = [
            DecisionTreeRegressor(max_depth=self.max_depth, random_state=i)
            for i in range(self.n_estimators)
        ]

        # Create bootstrap samples and fit base models
        n_samples = X.shape[0]
        predictions = np.zeros((n_samples, self.n_estimators))
        for idx, model in enumerate(base_models):
            X_sample, y_sample = self._bootstrap_sample(X, y)
            model.fit(X_sample, y_sample)
            predictions[:, idx] = model.predict(X)
        self.predictions_ = predictions

        # Compute residuals
        residuals = y.reshape(-1, 1) - self.predictions_

        # Estimate mean and covariance of residuals
        self.mu_ = residuals.mean(axis=0)
        if self.shrinkage == 'ledoit_wolf':
            self.Sigma_ = self._ledoit_wolf_shrinkage(residuals)
        else:
            self.Sigma_ = np.cov(residuals, rowvar=False)

        # Solve the optimization problem to find weights
        self.weights_ = self._solve_optimization()

        # Store fitted base models
        self.fitted_models_ = base_models

        return self

    def predict(self, X):
        """
        Make predictions using the hedged random forest.

        Parameters:
        - X: Test features.

        Returns:
        - Combined predictions.
        """
        check_is_fitted(self, ['weights_', 'fitted_models_'])
        X = check_array(X)

        # Collect predictions from base models
        predictions = np.column_stack([
            model.predict(X) for model in self.fitted_models_
        ])

        # Compute weighted combination
        combined_predictions = predictions @ self.weights_
        return combined_predictions

    def _bootstrap_sample(self, X, y):
        """
        Generate a bootstrap sample from the training data.

        Parameters:
        - X: Features.
        - y: Targets.

        Returns:
        - X_sample: Bootstrapped features.
        - y_sample: Bootstrapped targets.
        """
        n_samples = X.shape[0]
        indices = np.random.choice(n_samples, n_samples, replace=True)
        return X[indices], y[indices]

    def _solve_optimization(self):
        """
        Solve the convex optimization problem to find optimal weights.
        """
        p = self.n_estimators
        mu = self.mu_.reshape(-1, 1)
        Sigma = self.Sigma_

        # Objective: Minimize (w^T mu)^2 + w^T Sigma w
        P = 2 * (Sigma + mu @ mu.T)
        q = np.zeros((p, 1))

        # Constraints
        G_list = []

        # Gross-exposure constraint: ||w||_1 <= kappa
        G_list.append(np.vstack((np.eye(p), -np.eye(p))))
        h_list = [self.kappa * np.ones((p, 1)), np.zeros((p, 1))]

        # Sum of weights equals 1
        A = np.ones((1, p))
        b = np.array([[1.0]])

        G = np.vstack(G_list)
        h = np.vstack(h_list)

        # Convert to cvxopt matrices
        P_cvx = matrix(P)
        q_cvx = matrix(q)
        G_cvx = matrix(G)
        h_cvx = matrix(h)
        A_cvx = matrix(A)
        b_cvx = matrix(b)

        # Solve the quadratic program
        solvers.options['show_progress'] = False
        solution = solvers.qp(P_cvx, q_cvx, G_cvx, h_cvx, A_cvx, b_cvx)

        weights = np.array(solution['x']).flatten()
        return weights

    def _ledoit_wolf_shrinkage(self, residuals):
        """
        Estimate the covariance matrix using Ledoit-Wolf shrinkage.

        Parameters:
        - residuals: Residuals matrix.

        Returns:
        - Shrinkage covariance matrix.
        """
        from sklearn.covariance import ledoit_wolf
        Sigma, _ = ledoit_wolf(residuals)
        return Sigma

# Example usage:
if __name__ == '__main__':
    # Generate synthetic data
    from sklearn.datasets import make_regression
    X, y = make_regression(n_samples=1000, n_features=20, noise=0.1, random_state=42)

    # Split data into training and test sets
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    # Initialize and fit hedged random forest
    hrf = HedgedRandomForestRegressor(n_estimators=100, max_depth=None, kappa=2.0, shrinkage='ledoit_wolf')
    hrf.fit(X_train, y_train)

    # Make predictions
    y_pred = hrf.predict(X_test)

    # Evaluate performance
    from sklearn.metrics import mean_squared_error
    mse = mean_squared_error(y_test, y_pred)
    print(f"Hedged Random Forest MSE: {mse:.4f}")

    # Compare with standard Random Forest
    rf = RandomForestRegressor(n_estimators=100, max_depth=None, random_state=42)
    rf.fit(X_train, y_train)
    y_pred_rf = rf.predict(X_test)
    mse_rf = mean_squared_error(y_test, y_pred_rf)
    print(f"Standard Random Forest MSE: {mse_rf:.4f}")