In [1]:
import numpy as np
from sklearn.base import BaseEstimator
import matplotlib.pyplot as plt
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.ensemble import VotingClassifier

from sklearn.metrics.pairwise import pairwise_kernels
import joblib
from sklearn import metrics

from sklearn.model_selection import RepeatedStratifiedKFold
from sklearn.model_selection import cross_val_score

In [2]:
#Toy dataset
from sklearn.datasets import load_iris
iris = load_iris()
X = iris.data
y = iris.target

In [15]:
#Serious dataset
from sklearn.datasets import make_classification
X, y = make_classification(n_classes=3, n_samples=1000, n_features=6, n_informative=4, n_redundant=2, random_state=2)

In [9]:
#Hard dataset

X, y = make_classification(n_classes=3, n_samples=1000, n_features=20, n_informative=15, n_redundant=5, random_state=2)

In [5]:
from sklearn.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(X,y, test_size=0.2)

In [44]:
from sklearn.metrics import pairwise_distances
from random import randint
import random

def custom_bootstrap(X, y, similarity_function, n_estimators):
    
    #Initializing list of random training sets
    X_sets = []
    y_sets = []
    
    for i in range(0,n_estimators):
        instance_X = np.empty(np.shape(X))
        instance_y = np.empty(np.shape(y))
        
        #creating seed for specific training set
        seed = randint(0,len(X)-1)
        
        instance_X[0] = X[seed]
        instance_y[0] = y[seed]       
        
        n_entries = 1
        
        #populating training set
        
        
        if 1-(len(X_sets)+1)/n_estimators>0.8:
            p_thresh = 0.8
        else:
            p_thresh = np.round(1-(len(X_sets)+1)/n_estimators,1)
        
        #print("populating set", i, "with probability acceptance",p_thresh)
        
        
        while n_entries < len(X):
            
            rand = randint(0,len(X)-1)
            
            if accept_entry(similarity_function[seed,rand],n_entries/(len(X)-1),p_thresh):
                
                instance_X[n_entries] = X[rand]
                instance_y[n_entries] = y[rand]
                n_entries+=1
                
        X_sets.append(instance_X)
        y_sets.append(instance_y)     
    
    return X_sets,y_sets

def accept_entry(similarity_score, p_ratio, prob_threshold = 0.7):
    #p_ratio gradually shifts the decisional weight from similarity_score to randomness as the test set gets filled up.
    #Specifically, at least least a small number of entries are accepted only through randomness (p_ratio=1)
    #this way it's unlikely for the method to come up with sets with only one class (if not impossible, but I need to check)
    
    #It is also possible to modify the prob_threshold in order to get more or less random values, assuring more variance.
    #I think this tweakability could prove useful to adapt the model to specific instances.
    
    #PLEASE NOTE THAT THIS CODE MAKES SENSE WITH NORMALIZED SIMILARITY SCORES
    #NORMALIZATION FOR FUNCTION THAT DO NOT RETURN [0,1] VALUES STILL NEEDS TO BE IMPLEMENTED.
    #It's not like it doesn't work, but prob calculations take the high road
    
    prob = similarity_score*(1-p_ratio)+ random.random()*p_ratio
    if prob > prob_threshold:
        return True
    else:
        return False
    
def normalize(X):
    #todo, in case the class needs to accept non-normalized similarity functions
    return X


#Test
X_sets, y_sets = custom_bootstrap(X_train, y_train,pairwise_kernels(X_train, metric='rbf'),50)

for i in range(len(X_sets)):
    for j in range (0,len(X_sets[0][:,0])):
        if j==0:
            s = y_sets[i][j]
            g=0
        else:
            if y_sets[i][j] == s:
                g+=1
    #used to measure the effects of p_thresh
    print ("set",i,":",np.round(g/j*100,2),"% of ",y_sets[i][j],"class elements, as per seed")
    


set 0 : 65.33 % of  0.0 class elements, as per seed
set 1 : 61.45 % of  0.0 class elements, as per seed
set 2 : 54.69 % of  1.0 class elements, as per seed
set 3 : 83.48 % of  1.0 class elements, as per seed
set 4 : 77.85 % of  0.0 class elements, as per seed
set 5 : 85.73 % of  1.0 class elements, as per seed
set 6 : 84.48 % of  0.0 class elements, as per seed
set 7 : 84.36 % of  2.0 class elements, as per seed
set 8 : 77.22 % of  2.0 class elements, as per seed
set 9 : 82.98 % of  0.0 class elements, as per seed
set 10 : 62.83 % of  0.0 class elements, as per seed
set 11 : 17.65 % of  1.0 class elements, as per seed
set 12 : 51.56 % of  1.0 class elements, as per seed
set 13 : 58.45 % of  0.0 class elements, as per seed
set 14 : 44.18 % of  2.0 class elements, as per seed
set 15 : 71.59 % of  0.0 class elements, as per seed
set 16 : 73.72 % of  2.0 class elements, as per seed
set 17 : 71.34 % of  2.0 class elements, as per seed
set 18 : 54.44 % of  0.0 class elements, as per seed
set

In [7]:
class CustomEstimator(BaseEstimator):
    
    def __init__(self, n_estimators,
                base_estimator=DecisionTreeClassifier(),
                similarity_function=pairwise_kernels(X, X, metric='rbf'),
                debug = False):
        
        self.n_estimators = n_estimators
        self.base_estimator = base_estimator
        self.similarity_function = similarity_function
        self.debug=debug
        
    
    def fit(self,X,y):
        
        self.n_classes = np.max(y)+1
        
        if self.debug:
            print("Bootstrapping...")
        self.X_sets, self.y_sets = custom_bootstrap(X, y, self.similarity_function, self.n_estimators)
        #dummy_value = self.base_estimator.fit(X,y)
        
        if self.debug:
            print("Fitting...")
        
        for i in range(self.n_estimators):        
            model = self.base_estimator.fit(self.X_sets[i], self.y_sets[i])
            model_name = "model"+str(i)+".pkl"
            joblib.dump(model, model_name)
            
            
        #self.value_ = dummy_value
        
        self.X_ = X
        self.y_ = y

        if self.debug:
            print("Done!")
        #return self #For some reason this produces an error
        
            
    
    def predict(self, X):
        
        
        
        out = np.zeros(len(X))
        probs = self.predict_proba(X)
        
        if self.debug:
            print("Predict...")
        
        for i in range (len(X)):
            maximum = np.max(probs[i])
            for j in range (self.n_classes):
                if probs[i][j] == maximum:
                    out[i] = j
                    
                
        return out
    
    def predict_proba(self, X):
             
        out = np.zeros(shape=(len(X),self.n_classes))
        
        #compute similarity between instance and each training set
        
        if self.debug:
            print("Computing models similarity to instance...")
        
        Z=[]
        for i in range (0,self.n_estimators):
            Z.append(pairwise_kernels(X, self.X_sets[i], metric='rbf'))
        
        if self.debug:
            print("Making predictions with each trained model...")
            
        for i in range(0,len(X)): #for each sample to predict
            #print("evaluating sample",i)
            
            votes = []
            sim_means = np.zeros(self.n_estimators)
            
            voting_power = np.zeros(self.n_estimators)
            
            for j in range (0,self.n_estimators): #for each estimator trained
               
                #load estimator model
                model_name = "model"+str(j)+".pkl"
                model = joblib.load(model_name)
                
                votes.append(model.predict_proba(X[i].reshape(1, -1)))#*np.mean(Z[j][i]) #not sure about using the mean
                sim_means[j] = np.mean(Z[j][i])
            
                if self.debug:
                    #print("votes:",np.round(votes[j],2),"similarity:",np.round(sim_means[j],2))
                    pass
           
                voting_power[j] = (100*sim_means[j]/np.sum(sim_means))/100
                votes[j]=votes[j]*voting_power[j]
        
            
            #print("voting power:",voting_power)
            out[i]=(np.sum(votes,axis=0))
        
        if self.debug:
            print("Done!")

        return out

In [48]:
# Test instance 
from sklearn.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(X,y, test_size=0.2)

custom_clf = CustomEstimator(n_estimators=50,
                             base_estimator=DecisionTreeClassifier(),
                             similarity_function=pairwise_kernels(X, metric='rbf'),debug=True)

custom_clf.fit(X_train, y_train)
y_pred=custom_clf.predict(X_test)
print("Accuracy =",metrics.accuracy_score(y_test, y_pred))

#Commenting this here as I won't do it again: 
#tested 7/18/22 version with make_classification with 4 classes, 500 estimators
#Accuracy score ~0.635
#Somehow each estimator predicts the same thing for each sample

Bootstrapping...
Fitting...
Done!
Computing models similarity to instance...
Making predictions with each trained model...
Done!
Predict...
Accuracy = 0.8


In [36]:
# Comparative tests

from sklearn.ensemble import RandomForestClassifier
from sklearn.ensemble import VotingClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import BaggingClassifier
from sklearn.ensemble import AdaBoostClassifier

X_train, X_test, y_train, y_test = train_test_split(X,y, test_size=0.2)

rnd_clf = RandomForestClassifier(n_estimators=50, max_leaf_nodes=16, n_jobs=-1)

svc_clf = SVC(probability=True)

tree_clf = DecisionTreeClassifier(max_depth=16)

bag_clf = BaggingClassifier(DecisionTreeClassifier(), n_estimators=50, max_samples=100, bootstrap=True, n_jobs=-1, oob_score=True)

custom_clf = CustomEstimator(n_estimators=50,base_estimator=DecisionTreeClassifier())

print("Accuracy scores:")
for clf in (rnd_clf, svc_clf, tree_clf, bag_clf, custom_clf):
    clf.fit(X_train, y_train)
    y_pred = clf.predict(X_test)

    print(clf.__class__.__name__,":",metrics.accuracy_score(y_test, y_pred))
#     results, names = list(), list()
#     scores = evaluate_model(clf, X, y)
#     results.append(scores)
#     names.append(clf.__class__.__name__)
#     print('%s %.3f (%.3f)' % ("Cross validation score:", np.mean(scores), np.std(scores)))



Accuracy scores:
RandomForestClassifier : 0.8
SVC : 0.86
DecisionTreeClassifier : 0.78
BaggingClassifier : 0.795
Bootstrapping...
Fitting...
Done!
Computing models similarity to instance...
Making predictions with each trained model...
Done!
Predict...
CustomEstimator : 0.785
