In [251]:
import pandas as pd
import random as rnd
import numpy as np
import matplotlib.pyplot as plt
import math
from sklearn import datasets
from sklearn.metrics import r2_score, f1_score, mean_absolute_error, mean_squared_error, roc_auc_score
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import KFold
from sklearn.linear_model import Ridge
from sklearn.linear_model import LogisticRegression, LinearRegression
from sklearn.tree import DecisionTreeRegressor
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split

# Стекинг

In [252]:
from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.model_selection import train_test_split
from sklearn.model_selection import cross_val_predict

class MyStacking(BaseEstimator, ClassifierMixin):  
    """Стэкинг моделей scikit-learn"""

    def __init__(self, models, ens_model):
        """
        Инициализация инстанса
        models - базовые модели для стекинга
        ens_model - мета-модель
        """
        self.models = models
        self.ens_model = ens_model
        self.n = len(models)
        self.X_valid = None
        
    def fit(self, X, y=None, p=0.25, cv=3, err=0.001, random_state=None):
        """
        Обучение стекинга
        p - в каком отношении делить на обучение / тест
            если p = 0 - используем всё обучение!
        cv  (при p=0) - сколько фолдов использовать
        err (при p=0) - величина случайной добавки к метапризнакам
        random_state - инициализация генератора
            
        """
        if p > 0: # делим на обучение и тест
            # разбиение на обучение моделей и метамодели
            X_train, X_valid, y_train, y_valid = train_test_split(X, y, test_size=p, random_state=random_state)
            
            # заполнение матрицы для обучения метамодели
            self.X_valid = np.zeros((X_valid.shape[0], self.n))
            for i, mdl in enumerate(self.models):
                mdl.fit(X_train, y_train)
                self.X_valid[:, i] = mdl.predict(X_valid)
                
            # обучение метамодели
            self.ens_model.fit(self.X_valid, y_valid)
            
        else: # используем всё обучение
            
            # для регуляризации - берём случайные добавки
            self.X_valid = err*np.random.randn(X.shape[0], self.n)
            
            for i, mdl in enumerate(self.models):
                # это oob-ответы алгоритмов
                self.X_valid[:, i] += cross_val_predict(mdl, X, y, cv=cv, n_jobs=-1, method='predict')
                # но сам алгоритм надо настроить
                mdl.fit(X, y)
            
            # обучение метамодели
            self.ens_model.fit(self.X_valid, y)  
            
        return self


    def predict(self, X, y=None):
        """
        Предсказание стекингом
        """
        # заполение матрицы для мета-классификатора
        X_meta = np.zeros((X.shape[0], self.n))
        
        for i, mdl in enumerate(self.models):
            X_meta[:, i] = mdl.predict(X)
        
        return self.ens_model.predict(X_meta)


# Решение задачи стекингом

In [500]:
data = datasets.load_breast_cancer()
X = data["data"]
X = StandardScaler().fit_transform(X)
y = data["target"]


train_X, test_X, train_y, test_y = train_test_split(X, y, train_size=0.8, random_state=4)

In [501]:
from sklearn.naive_bayes import GaussianNB
from sklearn.linear_model import LogisticRegression
from sklearn.neighbors import KNeighborsClassifier
from sklearn.svm import SVC

In [502]:
mdls = [
    GaussianNB(var_smoothing=0.08),
    SVC(kernel="rbf", C=0.5),
    KNeighborsClassifier(n_neighbors=9, metric="minkowski", algorithm="kd_tree")    
]

final_mdl = LogisticRegression(penalty="l2", C=0.01)

In [503]:
stacking = MyStacking(mdls, final_mdl)
stacking.fit(train_X, train_y, p=0.8, random_state=2021)

MyStacking(ens_model=LogisticRegression(C=0.01, class_weight=None, dual=False,
                                        fit_intercept=True, intercept_scaling=1,
                                        l1_ratio=None, max_iter=100,
                                        multi_class='auto', n_jobs=None,
                                        penalty='l2', random_state=None,
                                        solver='lbfgs', tol=0.0001, verbose=0,
                                        warm_start=False),
           models=[GaussianNB(priors=None, var_smoothing=0.08),
                   SVC(C=0.5, break_ties=False, cache_size=200,
                       class_weight=None, coef0=0.0,
                       decision_function_shape='ovr', degree=3, gamma='scale',
                       kernel='rbf', max_iter=-1, probability=False,
                       random_state=None, shrinking=True, tol=0.001,
                       verbose=False),
                   KNeighborsClassifier(algorithm=

In [504]:
pred_y = stacking.predict(test_X)

In [505]:
f1_score(test_y, pred_y)

0.9813664596273292

In [506]:
for mdl in stacking.models:
    print(f1_score(test_y, mdl.predict(test_X)))

0.9685534591194969
0.9620253164556962
0.975


# Беггинг

In [507]:
from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.base import clone

class MyBagging(BaseEstimator, ClassifierMixin):  
    """Беггинг моделей scikit-learn"""

    def __init__(self, 
                 model=LinearRegression(), 
                 n_estimators=100, 
                 type_of_task="regression", 
                 thershold=0.5, 
                 object_selection=1.0, 
                 feature_selection=1.0, 
                 random_state=None):
        """
        Инициализация инстанса
        model - базовая модель
        n_estimators - число моделей в беггинге
        type_of_task - тип решаемой задачи
            regression - регрессия; прогноз - среднее
            classification_binary_class - классификация; прогноз - класс {0, 1} среднее алгоритмов по порогу threshold
            classification_binary_proba - классификация; прогноз - вероятность 0...1 среднее алгоритмов
            
        """
        self.model = model
        self.models = []
        self.n_estimators = n_estimators
        if type_of_task not in ["regression", "classification_binary_class", "classification_binary_proba"]:
            raise ValueError(f"wrong type of task {type_of_task}\n expected: regression, " + 
                             "classification_binary_class, " +  
                             "classification_binary_proba")
        self.column_indexes_of_models = []
        self.type_of_task = type_of_task
        self.object_selection = object_selection
        self.feature_selection = feature_selection
        self.random_state = random_state
        
    def fit(self, X, y=None):
        """
        Обучение стекинга
        object_selection - доля объектов, используемая при обучении каждой модели
        feature_selection - доля признаков, используемая при обучении каждой модели
        random_state - инициализация генератора
        """
        row_index = np.array(range(X.shape[0]))
        col_index = np.array(range(X.shape[1]))
        
        for num_of_mdl in range(self.n_estimators):
            
            if not np.allclose(self.object_selection, 1.0):
                np.random.shuffle(row_index)
                current_row_index = row_index[:int(len(row_index) * self.object_selection)]
                current_row_index = sorted(current_row_index)
            else:
                current_row_index = row_index
            if not np.allclose(self.feature_selection, 1.0):
                np.random.shuffle(col_index)
                current_col_index = col_index[:int(len(col_index) * self.feature_selection)]
                current_col_index = sorted(current_col_index)
            else:
                current_col_index = col_index
            
            self.column_indexes_of_models.append(current_col_index)
            mdl = clone(self.model)
            self.models.append(mdl.fit(X[current_row_index, :][:,current_col_index], y[current_row_index]))
            
        return self


    def predict(self, X, y=None):
        """
        Предсказание стекингом
        """
        res = []
        for i in range(len(self.models)):
            mdl = self.models[i]
            res.append(mdl.predict(X[:, self.column_indexes_of_models[i]]))
            
        if self.type_of_task == "regression":
            return np.mean(res, axis=0)
        elif self.type_of_task == "classification_binary_class":
            return np.mean(res, axis=0) > self.thershold
        elif self.type_of_task == "classification_binary_proba":
            return np.mean(res, axis=0)
        else:
            raise ValueError(f"wrong type of task {type_of_task}\n expected: regression, " +
                             "classification_binary_class, " +
                             "classification_binary_proba")

# Решение задачи беггингом

In [541]:
num_of_mdls = 1000

In [542]:
data = datasets.load_boston()
X = data["data"]
X = StandardScaler().fit_transform(X)
y = data["target"]
train_X, test_X, train_y, test_y = train_test_split(X, y, train_size=0.7, random_state=4)

In [543]:
bagging = MyBagging(
    DecisionTreeRegressor(max_depth=10),                     
    n_estimators=num_of_mdls,
    object_selection=0.7,
    feature_selection=0.9
)
bagging.fit(train_X, train_y)

mean_absolute_error(test_y, bagging.predict(test_X))

2.1095239247853943

In [544]:
from sklearn.ensemble import BaggingRegressor

regr = BaggingRegressor(
    base_estimator=DecisionTreeRegressor(max_depth=10),                    
    n_estimators=num_of_mdls,
    max_features=0.9, 
    max_samples=0.7
).fit(train_X, train_y)

mean_absolute_error(test_y, regr.predict(test_X))

2.123737248206677

# Бустинг

In [545]:
data = datasets.load_boston()
X = data["data"]
X = StandardScaler().fit_transform(X)
y = data["target"]
train_X, test_X, train_y, test_y = train_test_split(X, y, train_size=0.7, random_state=4)

In [546]:
import xgboost as xgb

In [633]:
num_round = 1000
param = {
    'booster': 'dart',
    'max_depth':4,
    'eta':0.15,
    'sampling_method': 'uniform',
    'subsample': 0.8,
    'objective':'reg:squarederror',
    'sample_type': 'weighted',
    'eval_metric': 'mae'
}
dtrain = xgb.DMatrix(data=train_X, label=train_y)
dtest = xgb.DMatrix(data=test_X)
bst = xgb.train(param, dtrain, num_round)

In [634]:
mean_absolute_error(test_y, bst.predict(dtest))

2.2075276117575795