# Random Patches Definition

In [1]:
from sklearn.base import BaseEstimator, ClassifierMixin
import numpy as np
class RandomPatches(BaseEstimator, ClassifierMixin):
    def __init__(self, n_estimators=10, max_features=2, custom_voting="majority"):
        self.n_estimators = n_estimators 
        self.max_features = max_features 
        self.custom_voting = custom_voting
        self.learners = [] # stores the trained base classifiers
        self.subspaces = [] # stores the feature subsets used by each learner
        self.oob_scores = [] # out-of-bag (accuracy) scores for each learner
        self.oob_sets_size = [] # number of elements in out-of-bag samples
    
    def fit(self, X, y): # trains the ensemble by creating subspaces and fitting a decision tree classifier to each subset
        X = X.values if isinstance(X, pd.DataFrame) else X
        y = y.values if isinstance(y, pd.Series) else y
 
        # total number of instances and total number of features
        n_samples, n_features = X.shape

        # for every member in the ensemble....
        # Should select the patches (subsets of instances and features) and train a DecisionTreeClassifier()
        # Also calculate the accuracy score for the oob and append it to oob_scores
        for _ in range(self.n_estimators):
            # Bootstrap sampling (sampling with replacement) of instances 
            bootstrap_sample_indices = np.random.choice(n_samples, size=n_samples, replace=True)
            # mark instances that were not selected for bootstrap sample as OOD
            oob_indices = np.setdiff1d(np.arange(n_samples), bootstrap_sample_indices)

            # Get random sample of features without replacement
            feature_indices = np.random.choice(n_features, self.max_features, replace=False)

            # Train the decision tree using the bootstrap sample and selected features
            dt_classifier = DecisionTreeClassifier()
            dt_classifier.fit(X[bootstrap_sample_indices][:, feature_indices], y[bootstrap_sample_indices])

            # Store the trained decision tree and its corresponding subspace of features
            self.learners.append(dt_classifier)
            self.subspaces.append(feature_indices)

            # Have the learner make predictions on the OOB instances
            oob_predictions = dt_classifier.predict(X[oob_indices][:, feature_indices])
            # Calculate and store the resulting OOB accuracy
            # np.mean() treats '1' as True and '0' as False, so it gives the proportion of correct predictions
            oob_accuracy = np.mean(oob_predictions == y[oob_indices]) 
            self.oob_sets_size.append(len(oob_indices))
            self.oob_scores.append(oob_accuracy)

        return self

    def predict(self, X):
        # Converting X to a NumPy array if necessary
        X = X.values if isinstance(X, pd.DataFrame) else X
        
        # Collecting predictions from each learner using their respective subspaces
        predictions = np.array([learner.predict(X[:, subspace]) for learner, subspace in zip(self.learners, self.subspaces)])
        
        # Transposing to get a shape of (n_samples, n_estimators)
        # where each row is a sample and each column is a prediction from a different learner 
        predictions = predictions.T
    
        # Implement a voting mechanism based on the value of 'self.custom_voting'
        if self.custom_voting == "weighted":
            # Weighted voting based on OOB accuracy
            weights = np.array(self.oob_scores)  # Converting OOB scores to a NumPy array
            # Initialising the weighted votes 
            weighted_votes = np.zeros((X.shape[0], len(np.unique(predictions.flatten())))) 

            for s in range(X.shape[0]):  # Iterate over each sample s
                for l in range(self.n_estimators):  # Iterate over each learner l
                    # predictions[s, l] is the class prediction for sample s by learner l
                    # += weights[l] adds the OOB weight of learner l to the corresponding class's vote count for sample s
                    # the better the learner's OOB accuracy, the more its prediction will impact the final vote
                    weighted_votes[s, int(predictions[s, l])] += weights[l]  
    
            vote = np.argmax(weighted_votes, axis=1)  # Select class with highest total weight
        else: # default voting scheme is majority vote
            vote = np.array([np.argmax(np.bincount(predictions[s, :].astype(int))) for s in range(X.shape[0])])
            
        return vote  # Return the final predicted class for each sample

# Data Reading and Evaluation Functions

In [2]:
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
def load_dataset(dataset_path='electricity2.csv'):
    _data = pd.read_csv(dataset_path)
    # class label must be the last column
    X = _data.iloc[:, :-1]
    y = _data.iloc[:, -1]
    return (X, y)

# train-test split and return accuracy
def evaluate_classifier(classifier, X, y, test_size=0.3):
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=test_size, random_state=42)
    classifier.fit(X_train, y_train)
    y_pred = classifier.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)
    return accuracy, X_train, X_test, y_train, y_test


# perform experiments in one dataset for several classifiers
def run_experiments(classifiers, show_oob=True):
    (X_electricity, y_electricity) = load_dataset(dataset_path='electricity2.csv')
    
    results = []
    datasets = {
        'Electricity': (X_electricity, y_electricity)
    }
    
    for dataset_name, (X, y) in datasets.items():
        for clf_name, clf in classifiers.items():
            print(f"running {clf_name}")
            accuracy, _, _, _, _ = evaluate_classifier(clf, X, y)
            results.append({
                'Dataset': dataset_name,
                'Classifier': clf_name,
                'Accuracy': accuracy
            })
            if isinstance(clf, RandomPatches) and show_oob:
                for i, (subspace, oob_set_size, oob_accuracy) in enumerate(zip(clf.subspaces, clf.oob_sets_size, clf.oob_scores)):
                    print(f"Base Learner {i+1} | Subspace (features): {subspace} | OOB Instances: {oob_set_size} | OOB Accuracy: {oob_accuracy:.4f}")

    return pd.DataFrame(results)

# Evaluation with Printed Results

In [3]:
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import BaggingClassifier, RandomForestClassifier, AdaBoostClassifier
#!pip install xgboost #NOTE: if output is 'xgboost' is unknown, then uncomment this line and rerun cell
from xgboost import XGBClassifier

# Run the experiments and display results
classifiers = {
        'DecisionTree': DecisionTreeClassifier(),
        'Bagging': BaggingClassifier(n_estimators=20),
        'RandomForest': RandomForestClassifier(),
        'AdaBoost': AdaBoostClassifier(algorithm="SAMME"),
        'XGBoost': XGBClassifier(eval_metric='logloss'),
        'RandomPatches(11, 50) (Weighted Vote)': RandomPatches(max_features=11, n_estimators=50, custom_voting="weighted")
    }

results_df = run_experiments(classifiers, show_oob=False)
display(results_df)

running DecisionTree
running Bagging
running RandomForest
running AdaBoost
running XGBoost
running RandomPatches(11, 50) (Weighted Vote)


Unnamed: 0,Dataset,Classifier,Accuracy
0,Electricity,DecisionTree,0.753053
1,Electricity,Bagging,0.800942
2,Electricity,RandomForest,0.808004
3,Electricity,AdaBoost,0.762174
4,Electricity,XGBoost,0.804031
5,Electricity,"RandomPatches(11, 50) (Weighted Vote)",0.808077
