# Demonstration of Ensemble generalization score vs an Individual Classifier

In [None]:
from scipy.special import comb
import math

def ensemble_error(n_classifier : int, error : float) -> float | int :

    ''' 
        Calculates the error of an ensemble classifier. An assumption
        is made that the ensemble's classifiers have equal error rates,
        are independent of one another, and the errors are not correlated.

        This function demonstrates the total error of the model when 
        n_classifer / 2 to n_classifiers miscalculate the class label.
        In otherwords, the worst case scenario when an ensemble would
        misclassify a lable using majority voting.

        @param n_classifier: number of classifiers in the ensemble model
        @param error       : error rate of each classifier in the model
    '''

    def instance_error(k : int, n_classifiers : int, error : float) -> float | int :
        ''' 
            Calculate the probability of an ensemble model producing an error
            given the total classifiers that produced misclassifications, the 
            error rate of each classifier, and the error for each classifer.

            This is calculated as the binomial mass function of a binomial distribution.
            
            @param k: total classfiers that misclassified the data
            @param n_classifers: total classifiers in the ensemble
            @param error       : error rate of each classifier
        '''

        return comb(n_classifier,k) * (error**k) * ((1-error)**(n_classifier - k))
    

    k_start = int(math.ceil(n_classifier / 2.))
    probs = [instance_error(k,n_classifier,error) for k in range(k_start,n_classifier+1)]
    return sum(probs)

ensemble_error(n_classifier=11, error= .25)

In [None]:
import numpy as np
import matplotlib.pyplot as plt

'''
    The error probability of an ensemble is always better than that of an individual classifier as long as the base classifiers
    perform better than random guessing.
'''
error_range = np.arange(0.,1.01,.01)
ensemble_errors = [ensemble_error(n_classifier=11,error=e) for e in error_range]

''' Plot our error for an ensemble classifier'''
plt.plot(
    error_range,
    ensemble_errors,
    linewidth=2,
    label='Ensemble Error'
)

''' Plot our error for a single classifier '''
plt.plot(
    error_range,
    error_range,
    linestyle='--',
    label='Base Error',
    linewidth=2
)

plt.xlabel('Base Error')
plt.ylabel('Base/Ensemble Error')

plt.legend(loc='upper left')

plt.grid(alpha=.5)

plt.show()

# Demonstration of Majority Vote by label and probability

In [None]:
def majority_vote_by_label(labels : list[int], weights : list[int | float] = None) -> int :
    ''' 
        Calculates the predicted label based on the mode, with consideration to weights if defined
        @param labels             : list of class labels as predicted by the ensemble classifer
        @param weights [optional] : weights to assign to predictions by each classifier
    '''
    return np.argmax(np.bincount(labels, weights=weights))

majority_vote_by_label(labels = [0,0,1], weights=[.2,.2,.6])

In [None]:
def majority_vote_by_probability(probabilities : list[list[int]], weights : list[int | float] = None) -> int :
    '''
        Calculates the predicted label of an ensemble based on class membership probabilities, with 
        consideration to weights if defined, by using the max weighted average for each class membership
        along axis 0 and taking the max average.

        Each probability list in probabilities should be [class 0 proba, class 1 proba]
        and should add up to 1.

        @param probabilites       : class membership probabilites, 2D array
        @param weights [optional] : weights to assign to probabilities by each classifier
    '''
    weighted_avg = np.average(probabilities, weights=weights, axis=0)
    return np.argmax(weighted_avg)

majority_vote_by_probability(
    probabilities= [
        [.9,.1],
        [.8,.2],
        [.4,.6]
    ],
    weights=[.2,.2,.6]
)

# Building a Majority Vote Classifier

In [None]:
import operator
from typing import  Any, Self
from sklearn.pipeline import _name_estimators
from sklearn.preprocessing import LabelEncoder
from sklearn.base import BaseEstimator,ClassifierMixin,clone


class MajorityVoteClassifier(BaseEstimator, ClassifierMixin):

    ''' Majority Vote Classification Ensemble Model '''

    def __init__(self, classifiers : list[Any], vote : str = 'classlabel', weights : list[int] = None) -> None :
        
        '''
            @param classifiers: list of individual classifiers to use in the ensemble
            @param vote       : voting strategy to use ['classlabel' or 'probabilities']
            @param weights    : weights to use on predictions from each classifier when voting.
        '''

        self.classifiers = classifiers
        self.named_classifiers = {k:v for k,v in _name_estimators(classifiers)}
        self.vote = vote
        self.weights = weights

    def fit(self, X : list[list[Any]], y : list[list[Any]]) -> Self :

        ''' 

            Fit our ensemble classifiers to the training data 
            @param X : training data to train with
            @param y : training data labels

        '''
        if self.vote not in ('probability','classlabel'):
            raise ValueError(f'vote must be \'probability\' or \'classlabel\', got {self.vote}.')
        
        if self.weights and (len(self.weights) != len(self.classifiers)):
            raise ValueError(f'Number of classifiers and weights must be equal. Got {len(self.weights)} weights and {len(self.classifiers)} classifiers.')
        
        ''' Encode class labels '''
        self.labelenc_ = LabelEncoder()
        self.labelenc_.fit(y)
        self.classes_ = self.labelenc_.classes_

        ''' Fit classifiers '''
        self.classifiers_ = []
        for clf in self.classifiers:
            fitted_clf = clone(clf).fit(X, self.labelenc_.transform(y))
            self.classifiers_.append(fitted_clf)

        return self
    
    def _majority_vote_probas(self, X) -> list[int] :
        ''' Calculate the majority vote class label using class membership probabilities '''
        return np.argmax(self.predict_proba(X),axis=1)
    
    def __instance_vote(self, x) -> int :
        ''' Calcualate a single classifiers predicted class label '''
        return np.argmax(np.bincount(x,weights=self.weights))
    
    def _majority_vote_cls(self, X) -> list[int] :
        ''' Calculate the majority vote class label using predicted class labels '''
        preds = np.asarray([clf.predict(X) for clf in self.classifiers_]).T
        return np.apply_along_axis(self.__instance_vote,axis=1,arr=preds)
    
    def predict(self, X) -> list[int] : 

        ''' 
            Predict class labels for testing data X using our fitted classifiers and majority vote
            @param X : testing data
        '''

        #Don't remove this until you know the added code works
        if self.vote == 'probability':
            maj_vote = np.argmax(self.predict_proba(X),axis=1)
        else:
            predictions = np.asarray([clf.predict(X) for clf in self.classifers_]).T
            maj_vote = np.apply_along_axis(lambda x : np.argmax(np.bincount(x,weights=self.weights)),axis=1,arr=predictions)

        #maj_vote = self._majority_vote_probas(X) if self.vote == 'probability' else self._majority_vote_cls(X)
        return self.labelenc_.inverse_transform(maj_vote)
    
    def predict_proba(self, X):

        ''' Predict the class membership probabilites for our testing data '''
        probas = np.asarray([clf.predict_proba(X) for clf in self.classifiers_])
        avg_proba = np.average(probas,axis=1,weights=self.weights)
        return avg_proba
    
    def get_params(self, deep: bool = True) -> dict:
        
        if not deep:
            return super().get_params(deep)
        
        out = self.named_classifiers.copy()
        for name,step in self.named_classifiers.items():
            for k,v in step.get_params(deep=True).items():
                out[f'{name}_{k}'] = v
        return out

In [None]:
from sklearn import datasets
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, LabelEncoder

iris = datasets.load_iris()

X,y = iris.data[50:,[1,2]], iris.target[50:]

le = LabelEncoder()
y = le.fit_transform(y)

X_train,X_test,y_train,y_test = train_test_split(X, y, test_size=.5, random_state=1, stratify=y)

In [None]:
from sklearn.model_selection import cross_val_score
from sklearn.linear_model import LogisticRegression
from sklearn.tree import DecisionTreeClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.pipeline import Pipeline

clf1 = LogisticRegression(penalty='l2', C=.001, solver='lbfgs', random_state=1)
clf2 = DecisionTreeClassifier(max_depth=1, criterion='entropy', random_state=0)
clf3 = KNeighborsClassifier(n_neighbors=1, p=2, metric='minkowski')

pipe1 = Pipeline([['sc',StandardScaler()],['clf',clf1]])
pipe3 = Pipeline([['sc',StandardScaler()],['clf',clf3]])

clf_labels = ['Logistic Regression', 'Decision Tree', 'KNN']

''' Predictive performance of each individual classifier calculation '''
print('10-fold cross validation:\n')
for clf,label in zip([pipe1,clf2,pipe3],clf_labels):
    scores = cross_val_score(estimator=clf, X=X_train, y=y_train, cv=10, scoring='roc_auc')
    print(f'ROC AUC: {scores.mean():<4.2f} ( +/- {scores.std():<4.2f} ) [{label:^24}]')

In [None]:
mv_clf = MajorityVoteClassifier(classifiers=[pipe1, clf2, pipe3])

clf_labels += ['Majority Voting']
all_clf = [pipe1, clf2, pipe3, mv_clf]

for clf,label in zip(all_clf,clf_labels):
    scores = cross_val_score(estimator=clf, X=X_train, y=y_train, cv=10, scoring='roc_auc')
    print(f'ROC AUC: {scores.mean():<4.2f} ( +/- {scores.std():<4.2f} ) [{label:^24}]')