In [1]:
!pip install imodels

Collecting imodels
  Downloading imodels-1.4.1-py3-none-any.whl (231 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/231.2 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m231.2/231.2 kB[0m [31m7.0 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: imodels
Successfully installed imodels-1.4.1


In [2]:
!!pip install pmlb

['Collecting pmlb',
 '  Downloading pmlb-1.0.1.post3-py3-none-any.whl (19 kB)',
 'Installing collected packages: pmlb',
 'Successfully installed pmlb-1.0.1.post3']

In [3]:
import numpy as np
import pandas as pd
from math import sqrt
from sklearn.ensemble import RandomForestRegressor
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import mean_squared_error
from sklearn.utils import check_random_state
from imodels.util.data_util import get_clean_dataset
from sklearn.metrics import roc_auc_score, mean_squared_error
from sklearn.model_selection import train_test_split

#######################
#     Functions       #
#######################

# Function to generate indices for random samples from a dataset
def generate_sample_indices(random_state, n_samples):
    random_instance = check_random_state(random_state)
    sample_indices = random_instance.randint(0, n_samples, n_samples)
    return sample_indices

# Function to generate indices for samples that are not selected (out-of-bag samples)
def generate_unsampled_indices(random_state, n_samples):
    sample_indices = generate_sample_indices(random_state, n_samples)
    sample_counts = np.bincount(sample_indices, minlength=n_samples)
    unsampled_mask = sample_counts == 0
    indices_range = np.arange(n_samples)
    unsampled_indices = indices_range[unsampled_mask]
    return unsampled_indices

# Function to generate indices for random samples from a dataset
def generate_sample_indices(random_state, n_samples):
    random_instance = check_random_state(random_state)
    sample_indices = random_instance.randint(0, n_samples, n_samples)
    return sample_indices

# Function to generate indices for samples that are not selected (out-of-bag samples)
def generate_unsampled_indices(random_state, n_samples):
    sample_indices = generate_sample_indices(random_state, n_samples)
    sample_counts = np.bincount(sample_indices, minlength=n_samples)
    unsampled_mask = sample_counts == 0
    indices_range = np.arange(n_samples)
    unsampled_indices = indices_range[unsampled_mask]
    return unsampled_indices

########################
#       Classifier     #
########################

# Custom RandomForestClassifier class
class CustomRandomForestClassifier(RandomForestClassifier):
    def fit(self, X, y):
        super().fit(X, y)
        self.in_bag_indices_ = []
        self.oob_indices_ = []
        self.tree_weights_ = []

        for estimator in self.estimators_:
            random_state = estimator.random_state
            in_bag_indices = generate_sample_indices(random_state, len(X))
            oob_indices = generate_unsampled_indices(random_state, len(X))

            self.in_bag_indices_.append(in_bag_indices)
            self.oob_indices_.append(oob_indices)

            if len(oob_indices) > 0:
                oob_predictions = estimator.predict(X[oob_indices])
                oob_loss = mean_squared_error(y[oob_indices], oob_predictions)
                self.tree_weights_.append(np.exp(-oob_loss))
            else:
                self.tree_weights_.append(0)

        # Normalize tree weights
        total_weight = np.sum(self.tree_weights_)
        if total_weight > 0:
            self.tree_weights_ = [weight / total_weight for weight in self.tree_weights_]

        return self

    def predict(self, X, weights=None):
        """
        Make predictions using the fitted custom random forest model.

        Parameters:
        - X: Input features for making predictions (numpy array or pandas DataFrame).
        - weights (optional): The weighting scheme to use for aggregating predictions. Supported values:
          "expOOB" (weights based on the exponential of the negative out-of-bag error) and "uniform"
          (equal weighting). Defaults to "uniform" if not specified or if an unknown value is passed.

        Returns:
        - final_preds: An array of predicted class labels.
        """
        if not hasattr(self, "estimators_"):
            raise ValueError("The forest is not fitted yet!")

        weighted_preds = np.zeros((X.shape[0], len(self.classes_)))

        if weights is None or weights not in ["expOOB", "uniform"]:
            weights = "uniform"

        if weights == "expOOB":
            for tree, weight in zip(self.estimators_, self.tree_weights_):
                preds = tree.predict_proba(X)
                weighted_preds += weight * preds
        elif weights == "uniform":
            for tree in self.estimators_:
                preds = tree.predict_proba(X)
                weighted_preds += preds / len(self.estimators_)

        final_preds = np.argmax(weighted_preds, axis=1)
        return self.classes_[final_preds]


    def predict_proba(self, X, weights=None):
        if not hasattr(self, "estimators_"):
            raise ValueError("The forest is not fitted yet!")

        weighted_preds = np.zeros((X.shape[0], len(self.classes_)))

        if weights is None or weights not in ["expOOB", "uniform"]:
            weights = "uniform"

        if weights == "expOOB":
            for tree, weight in zip(self.estimators_, self.tree_weights_):
                preds = tree.predict_proba(X)
                weighted_preds += weight * preds
        elif weights == "uniform":
            for tree in self.estimators_:
                preds = tree.predict_proba(X)
                weighted_preds += preds / len(self.estimators_)

        return weighted_preds

########################
#       Regressor      #
########################

class CustomRandomForestRegressor(RandomForestRegressor):
    """
    A custom RandomForestRegressor that allows for weighting trees based on their out-of-bag error.

    Inherits from sklearn.ensemble.RandomForestRegressor.
    """
    def fit(self, X, y):
        super().fit(X, y)
        self.in_bag_indices_ = []
        self.oob_indices_ = []
        self.tree_weights_ = []

        for estimator in self.estimators_:
            random_state = estimator.random_state
            in_bag_indices = generate_sample_indices(random_state, len(X))
            oob_indices = generate_unsampled_indices(random_state, len(X))

            self.in_bag_indices_.append(in_bag_indices)
            self.oob_indices_.append(oob_indices)

            if len(oob_indices) > 0:
                oob_predictions = estimator.predict(X[oob_indices])
                oob_loss = mean_squared_error(y[oob_indices], oob_predictions)
                self.tree_weights_.append(np.exp(-oob_loss))
            else:
                self.tree_weights_.append(0)

        # Normalize tree weights
        total_weight = np.sum(self.tree_weights_)
        if total_weight > 0:
            self.tree_weights_ = [weight / total_weight for weight in self.tree_weights_]

        return self

    def predict(self, X, weights=None):
        """
        Predict using the forest of trees.

        Parameters:
        - X: array-like or sparse matrix of shape = [n_samples, n_features]
        The input samples.

        - weights: {'uniform', 'expOOB'} weighting scheme to use for aggregating predictions.
          'uniform' will treat all trees equally, 'expOOB' will weight trees based on the exponential of the negative out-of-bag error.

        Returns:
        - y: array of shape = [n_samples]
        The predicted values.
        """
        if not hasattr(self, "estimators_"):
            raise ValueError("The forest is not fitted yet!")

        if weights is None or weights not in ["expOOB", "uniform"]:
            weights = "uniform"

        # Collect predictions from each tree
        all_preds = np.array([tree.predict(X) for tree in self.estimators_])
        if weights == "expOOB":
            # Use the exponential of the negative out-of-bag error as weights
            weighted_preds = np.average(all_preds, axis=0, weights=self.tree_weights_)
        elif weights == "uniform":
            # All trees have equal weight
            weighted_preds = np.mean(all_preds, axis=0)

        return weighted_preds

######################
#     Evaluation     #
######################

def evaluate_datasets(datasets, task_type='classification', random_state=42):
    """
    Evaluate datasets using the custom random forest model (classifier or regressor).

    Parameters:
    - datasets: list of dataset names to evaluate.
    - task_type: 'classification' or 'regression', specifies the type of task.
    - random_state: int, random state for reproducibility.

    Returns:
    - df_scores: DataFrame containing the scores (ROC AUC for classification, RMSE for regression) for each dataset.
    """
    scores_default = []
    scores_expOOB = []

    for dataset_name in datasets:
        # Fetch the dataset
        X, y, feature_names = get_clean_dataset(dataset_name)
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.20, random_state=random_state)

        if task_type == 'classification':
            model = CustomRandomForestClassifier(oob_score=True, random_state=random_state)
        elif task_type == 'regression':
            model = CustomRandomForestRegressor(oob_score=True, random_state=random_state)
        else:
            raise ValueError("task_type must be 'classification' or 'regression'")

        # Initialize and train the model
        model.fit(X_train, y_train)

        # Make predictions and evaluate
        if task_type == 'classification':
            # Using ROC AUC for classification
            score_default = roc_auc_score(y_test, model.predict_proba(X_test, weights="uniform")[:, 1])
            score_expOOB = roc_auc_score(y_test, model.predict_proba(X_test, weights="expOOB")[:, 1])
        elif task_type == 'regression':
            # Using RMSE for regression
            score_default = sqrt(mean_squared_error(y_test, model.predict(X_test, weights="uniform")))
            score_expOOB = sqrt(mean_squared_error(y_test, model.predict(X_test, weights="expOOB")))

        scores_default.append(score_default)
        scores_expOOB.append(score_expOOB)

    # Create a DataFrame
    df_scores = pd.DataFrame({
        'Dataset': datasets,
        'Default': scores_default,
        'expOOB': scores_expOOB
    })

    return df_scores

# Define your datasets
classification_datasets = ["diabetes", "breast_cancer", "heart", "haberman"]
regression_datasets = ["fico", "enhancer", "credit_g", "juvenile_clean"]

# Evaluate classification datasets
df_classification_scores = evaluate_datasets(classification_datasets, task_type='classification')

# Evaluate regression datasets
df_regression_scores = evaluate_datasets(regression_datasets, task_type='regression')

# Print scores for each dataset
print("Classification Scores:")
print(df_classification_scores)
print("="*100)
print("\nRegression Scores:")
print(df_regression_scores)

fetching diabetes from pmlb
fetching heart from imodels
fetching fico from imodels
fetching credit_g from imodels
Classification Scores:
         Dataset   Default    expOOB
0       diabetes  0.830926  0.830370
1  breast_cancer  0.828231  0.828231
2          heart  0.915584  0.916306
3       haberman  0.624369  0.623737

Regression Scores:
          Dataset   Default    expOOB
0            fico  0.448790  0.448789
1        enhancer  0.245904  0.245917
2        credit_g  0.398569  0.398691
3  juvenile_clean  0.262467  0.262477
