Slides: https://cs229.stanford.edu/notes2021fall/lecture11-boosting.pdf 

In [20]:
from sklearn.ensemble import AdaBoostClassifier
from sklearn.datasets import make_classification
import numpy as np
import pandas as pd

In [21]:
from typing import Tuple


y_column = 'target'
sample_weights_col = 'sample_weight'
def shuffle_data(x,y):
    data  = list(zip(x,y))
    np.random.shuffle(data)
    return list(zip(*data))

# prepare pandas from sklearn classification data
def sklearn_dataset_to_pandas(X, y)-> Tuple[pd.DataFrame,list[str]]:
    x_columns = [f'feature{i}' for i in range(len(X[0]))]
    df = pd.DataFrame(X, columns=x_columns)
    df[y_column] = y
    return df, x_columns
    

In [22]:
split = 0.8
sample = 125
n_estimators = 10
train_size = int(sample * split)
X, y = make_classification(n_samples=sample, n_features=4,
                           n_informative=4, n_redundant=0,
                           random_state=0, shuffle=False)
X,y = shuffle_data(X,y)
Xtrain, Xtest = X[:train_size], X[train_size:]
ytrain, ytest = y[:train_size], y[train_size:]

clf = AdaBoostClassifier(n_estimators=n_estimators, algorithm="SAMME", random_state=0)
clf.fit(Xtrain, ytrain)
clf.score(Xtest, ytest) # 93.5% mean accuracy on the whole datset

0.72

In [23]:
len(Xtrain), len(Xtrain[0]), len(ytrain), ytrain[0]

(100, 4, 100, 1)

In [24]:
train_df,x_columns = sklearn_dataset_to_pandas(Xtrain,ytrain)
train_df[sample_weights_col] = [1. for _ in range(len(train_df))]
test_df, _ = sklearn_dataset_to_pandas(Xtest,ytest)

In [25]:
import pandas as pd
from typing import Tuple

from typing import  Tuple


def node_entropy(node_df: pd.DataFrame) -> float:

    # Initialize to uniform sample weights if it is not defined
    if sample_weights_col not in node_df.columns:
        node_df[sample_weights_col] = [1/len(node_df)] * len(node_df)
    count = sum(node_df[sample_weights_col])

    class_labels = node_df[y_column].unique()

    impurity = 0

    for c in class_labels:
        pc = sum(node_df[node_df[y_column]==c][sample_weights_col])/count
        if pc > 0 :
            impurity -= pc * np.log2(pc)
    
    return impurity

def get_class(df:pd.DataFrame) -> str:
    counts = df[y_column].value_counts()
    return counts.idxmax()

def binary_split(node_df:pd.DataFrame,col_name:str) -> Tuple[float,float]:
    all_values = list(set(node_df[col_name]))
    all_values.sort()
    min_impurity = float('inf')
    best_split_val = None
    n = len(node_df)
    class_labels = []
    for val in all_values[:-1]: # no need to check last element
        left = node_df[node_df[col_name]<=val]
        left_class = get_class(left)
        right = node_df[node_df[col_name]>val]
        right_class = get_class(right)
        impurity = (node_entropy(left) * len(left) + node_entropy(right) * len(right)) / n
        if impurity < min_impurity:
            min_impurity = impurity
            best_split_val = val
            class_labels = [left_class, right_class]
    return min_impurity, best_split_val, class_labels

def find_best_split(node_df:pd.DataFrame) -> Tuple[float,str, float,list[int]]:
    best_col = ''
    best_impurity = float('inf')
    best_split_val = None
    best_class_labels = None
    for col in x_columns:
        impurity, split_val,class_labels = binary_split(node_df,col)
        # print("Best impurity for feature {} is {} at split value {}".format(col, impurity, split_val))
        if impurity < best_impurity:
            best_impurity = impurity
            best_col = col
            best_split_val = split_val
            best_class_labels = class_labels

    return best_impurity, best_col, best_split_val, best_class_labels
    

In [26]:
from typing import Literal


def weight_coefficient(weighted_error:float)->float:
    weighted_error = min(weighted_error,0.9999)
    return max(0.0001,np.log((1- weighted_error)/weighted_error) / 2)

def update_sample_weights(train_df, weight_coefficient, predictions)-> pd.DataFrame:
    sample_weights = list(train_df[sample_weights_col].copy())
    for index, row in train_df.iterrows():
        pred = predictions[index]
        if pred != row[y_column]:
            sample_weights[index] *= np.exp(weight_coefficient)
        else:
            sample_weights[index] /= np.exp(weight_coefficient)
    total = sum(sample_weights)
    normalized_sample_weights = [w/total for w in sample_weights]
    return normalized_sample_weights

class DecisionNode:
    def __init__(self,data: pd.DataFrame, 
                      node_type: Literal['numerical']= 'numerical'):
        self.node_type = node_type # only support numerical for now
        self.impurity, self.col, self.decision_boundary, self.class_labels = find_best_split(data)

    def __str__(self):
        return f"Decision node with impurity {self.impurity} at column {self.col} with decision boundary {self.decision_boundary}"
    
    
    def predict_single(self,sample): 
        return self.class_labels[(sample[self.col] >= self.decision_boundary).astype(int)]
    
    def predict(self,data: pd.DataFrame)-> pd.Series:
        return data.apply(self.predict_single,axis=1)
    
    def error(self,data: pd.DataFrame)-> float:
        predictions = self.predict(data)
        errors = (predictions != data["target"]).astype(int)
        return np.average(errors, weights=data[sample_weights_col])
    
class EnsembleModel:
    def __init__(self,estimators: list[DecisionNode], estimator_weights:list[float]):
        self.estimators = estimators
        self.estimator_weights = estimator_weights

    def predict(self,X):
        predictions = np.array([est.predict(X).apply(lambda x: x if x==1. else -1.) for est in self.estimators])
        return (np.average(predictions, weights=self.estimator_weights, axis=0) > 0 ).astype(int)
    
    

In [27]:
# AdaBoost algorithm

"""
1- Initialize uniform sample importance weights
2- for t in range(T): 
    - Train classifier using sample weights. 
    - Compute weight coefficient for the classifier. 
    - Update sample weights based on classifier errors.
    - Normalize the sample weights.
3- Return final classifier as a weighted sum of the base classifiers.
"""
def adaboost(train_df,number_of_estimators:int):
    train_df[sample_weights_col] = [1. for _ in range(len(train_df))]
    estimators = []
    estimator_weights = []
    for t in range(number_of_estimators):
        
        estimator_t = DecisionNode(train_df)
        error = estimator_t.error(train_df)
        coeff = weight_coefficient(error)
        print(f"Iteration {t}, Estimator: {estimator_t}, weighted_error: {error}, coefficient: {coeff}")

        new_sample_weights = update_sample_weights(train_df,coeff,estimator_t.predict(train_df))
        print("New sample weights:", new_sample_weights)
        train_df[sample_weights_col] = new_sample_weights
        estimators.append(estimator_t)
        estimator_weights.append(coeff)
    return EnsembleModel(estimators=estimators,estimator_weights=estimator_weights)
    
    


In [28]:
ensemble_model = adaboost(train_df=train_df,number_of_estimators=n_estimators)

Iteration 0, Estimator: Decision node with impurity 0.8278055584802597 at column feature1 with decision boundary 0.08807427900766229, weighted_error: 0.29, coefficient: 0.44769202352742077
New sample weights: [0.007042253521126748, 0.0172413793103448, 0.0172413793103448, 0.007042253521126748, 0.007042253521126748, 0.007042253521126748, 0.007042253521126748, 0.007042253521126748, 0.007042253521126748, 0.007042253521126748, 0.0172413793103448, 0.007042253521126748, 0.007042253521126748, 0.007042253521126748, 0.007042253521126748, 0.007042253521126748, 0.007042253521126748, 0.007042253521126748, 0.007042253521126748, 0.007042253521126748, 0.0172413793103448, 0.0172413793103448, 0.0172413793103448, 0.007042253521126748, 0.0172413793103448, 0.007042253521126748, 0.0172413793103448, 0.0172413793103448, 0.0172413793103448, 0.007042253521126748, 0.007042253521126748, 0.007042253521126748, 0.007042253521126748, 0.007042253521126748, 0.0172413793103448, 0.0172413793103448, 0.007042253521126748, 

In [29]:
np.mean(ensemble_model.predict(test_df) == np.array(ytest))

0.64

In [32]:
np.mean(ensemble_model.estimators[2].predict(test_df) == np.array(ytest))

0.6