In [None]:
import numpy as np
from sklearn.linear_model import LinearRegression
from tqdm.auto import tqdm
from sklearn.metrics import mean_squared_error

In [None]:
class SimplifiedBaggingRegressor:
    def __init__(self, num_bags, oob=False):
        self.num_bags = num_bags
        self.oob = oob

    def _generate_splits(self, data: np.ndarray):
        '''
        Generate indices for every bag and store in self.indices_list list
        '''
        self.indices_list = []
        data_length = len(data)
        for bag in range(self.num_bags):
            indices = np.random.choice(data_length, data_length, replace=True)
            self.indices_list.append(indices)

    def fit(self, model_constructor, data, target):
        '''
        Fit model on every bag.
        Model constructor with no parameters (and with no ()) is passed to this function.

        example:

        bagging_regressor = SimplifiedBaggingRegressor(num_bags=10, oob=True)
        bagging_regressor.fit(LinearRegression, X, y)
        '''
        self.data = None
        self.target = None
        self._generate_splits(data)
        assert len(set(list(map(len, self.indices_list)))) == 1, 'All bags should be of the same length!'
        assert list(map(len, self.indices_list))[0] == len(data), 'All bags should contain `len(data)` number of elements!'
        self.models_list = []
        for bag_indices in self.indices_list:
            model = model_constructor()
            data_bag, target_bag = data[bag_indices], target[bag_indices]
            self.models_list.append(model.fit(data_bag, target_bag)) # store fitted models here
        if self.oob:
            self.data = data
            self.target = target

    def predict(self, data):
        '''
        Get average prediction for every object from passed dataset
        '''
        predictions = np.zeros((len(data), self.num_bags))
        for i, model in enumerate(self.models_list):
            predictions[:, i] = model.predict(data)
        return np.mean(predictions, axis=1)

    def _get_oob_predictions_from_every_model(self):
        '''
        Generates list of lists, where list i contains predictions for self.data[i] object
        from all models, which have not seen this object during the training phase
        '''
        list_of_predictions_lists = [[] for _ in range(len(self.data))]
        for i, model in enumerate(self.models_list):
            oob_indices = list(set(range(len(self.data))) - set(self.indices_list[i]))
            oob_data = self.data[oob_indices]
            predictions = model.predict(oob_data)
            for j, idx in enumerate(oob_indices):
                list_of_predictions_lists[idx].append(predictions[j])
        self.list_of_predictions_lists = np.array(list_of_predictions_lists, dtype=object)

    def _get_averaged_oob_predictions(self):
        '''
        Compute average prediction for every object from the training set.
        If an object has been used in all bags during the training phase, return None instead of prediction
        '''
        self._get_oob_predictions_from_every_model()
        self.oob_predictions = np.array([np.mean(pred) if len(pred) > 0 else None for pred in self.list_of_predictions_lists])

    def OOB_score(self):
        '''
        Compute mean square error for all objects, which have at least one prediction
        '''
        self._get_averaged_oob_predictions()
        valid_indices = [i for i in range(len(self.oob_predictions)) if self.oob_predictions[i] is not None]
        mse = np.mean((self.target[valid_indices] -  self.oob_predictions[valid_indices]) ** 2)
        return mse

In [None]:
X = np.random.randn(2000, 10)
y = np.mean(X, axis=1)
bagging_regressor = SimplifiedBaggingRegressor(num_bags=10, oob=True)
bagging_regressor.fit(LinearRegression, X, y)
predictions = bagging_regressor.predict(X)
print(bagging_regressor._get_averaged_oob_predictions())

None


In [None]:
for _ in tqdm(range(100)):
    X = np.random.randn(2000, 10)
    y = np.mean(X, axis=1)
    bagging_regressor = SimplifiedBaggingRegressor(num_bags=10, oob=True)
    bagging_regressor.fit(LinearRegression, X, y)
    predictions = bagging_regressor.predict(X)
    print(bagging_regressor.OOB_score())
    assert np.mean((predictions - y)**2) < 1e-6, 'Linear dependency should be fitted with almost zero error!'
    assert bagging_regressor.oob, 'OOB feature must be turned on'
    oob_score = bagging_regressor.OOB_score()
    print(oob_score)
    assert oob_score < 1e-6, 'OOB error for linear dependency should be also close to zero!'
    assert abs(
        np.mean(
            list(map(len, bagging_regressor.list_of_predictions_lists))
        ) / bagging_regressor.num_bags - 1/np.exp(1)) < 0.1, 'Probability of missing a bag should be close to theoretical value!'

print('Simple tests done!')

  0%|          | 0/100 [00:00<?, ?it/s]

1.5292776768881393e-31
1.5292776768881393e-31
1.1237429224682053e-31
1.1237429224682053e-31
8.007218949426502e-32
8.007218949426502e-32
4.694994216745221e-32
4.694994216745221e-32
4.087433137642909e-32
4.087433137642909e-32
1.5608614148334044e-31
1.5608614148334044e-31
6.509074653864143e-32
6.509074653864143e-32
1.0281668789890574e-31
1.0281668789890574e-31
3.391384086771199e-32
3.391384086771199e-32
3.490278695763356e-32
3.490278695763356e-32
8.003734784444593e-32
8.003734784444593e-32
4.3173800766194776e-32
4.3173800766194776e-32
1.169206709703596e-31
1.169206709703596e-31
3.199196759907351e-32
3.199196759907351e-32
3.8925199973291225e-32
3.8925199973291225e-32
5.201820125726335e-32
5.201820125726335e-32
1.260171122650207e-31
1.260171122650207e-31
1.3365336175030242e-31
1.3365336175030242e-31
1.2312808861455053e-31
1.2312808861455053e-31
4.863749321003864e-32
4.863749321003864e-32
6.194508951382433e-32
6.194508951382433e-32
1.0004210689807913e-31
1.0004210689807913e-31
1.076670764128

In [None]:
for _ in tqdm(range(10)):
    X = np.random.randn(200, 150)
    y = np.random.randn(len(X))
    bagging_regressor = SimplifiedBaggingRegressor(num_bags=20, oob=True)
    bagging_regressor.fit(LinearRegression, X, y)
    predictions = bagging_regressor.predict(X)
    average_train_error = np.mean((predictions - y)**2)
    assert bagging_regressor.oob, 'OOB feature must be turned on'
    oob_score = bagging_regressor.OOB_score()
    assert oob_score > average_train_error, 'OOB error must be higher than train error due to overfitting!'
    assert abs(
        np.mean(
            list(map(len, bagging_regressor.list_of_predictions_lists))
        ) / bagging_regressor.num_bags - 1/np.exp(1)) < 0.1, 'Probability of missing a bag should be close to theoretical value!'

print('Medium tests done!')

  0%|          | 0/10 [00:00<?, ?it/s]

Medium tests done!


In [None]:
for _ in tqdm(range(10)):
    X = np.random.randn(2000, 15)
    y = np.random.randn(len(X))
    bagging_regressor = SimplifiedBaggingRegressor(num_bags=100, oob=True)
    bagging_regressor.fit(LinearRegression, X, y)
    predictions = bagging_regressor.predict(X)
    oob_score = bagging_regressor.OOB_score()
    assert abs(
        np.mean(
            list(map(len, bagging_regressor.list_of_predictions_lists))
        ) / bagging_regressor.num_bags - 1/np.exp(1)) < 1e-2, 'Probability of missing a bag should be close to theoretical value!'

print('Complex tests done!')

  0%|          | 0/10 [00:00<?, ?it/s]

Complex tests done!
