In [33]:
# Heterogeneous pooling
import pandas as pd
import numpy as np
from sklearn.model_selection import cross_val_score
from sklearn.model_selection import RepeatedStratifiedKFold
from sklearn.model_selection import GridSearchCV
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

from scipy import stats


from sklearn.discriminant_analysis import StandardScaler
from sklearn.pipeline import Pipeline
from tqdm import tqdm





In [34]:
# all the data will be stored in this dataframe, with the method name, mean accuracy, standard deviation, lower and upper bound
df = pd.DataFrame(columns=['method', 'mean', 'std', 'lower', 'upper'])




In [35]:
X = pd.read_csv('X.csv')
y = pd.read_csv('y.csv')




In [36]:
# Zero Rule Baseline
from sklearn.dummy import DummyClassifier

clf = DummyClassifier(strategy='most_frequent')

cv = RepeatedStratifiedKFold(n_splits=10, n_repeats=3, random_state=36851234)

scoresZeroR = cross_val_score(clf, X, y, scoring='accuracy', cv=cv, n_jobs=-1)

inf, sup = stats.norm.interval(0.95, loc=scoresZeroR.mean(), scale=scoresZeroR.std()/np.sqrt(len(scoresZeroR)))


df = df.append({'method': 'ZR', 'mean': scoresZeroR.mean(), 'std': scoresZeroR.std(), 'lower':inf, 'upper': sup}, ignore_index=True)


  df = df.append({'method': 'ZR', 'mean': scoresZeroR.mean(), 'std': scoresZeroR.std(), 'lower':inf, 'upper': sup}, ignore_index=True)


In [37]:


def train_model(model,params_grid,name, df): 
    scalar = StandardScaler()
    pipe = Pipeline(steps=[('s',scalar), ('m', model)])

    outer = RepeatedStratifiedKFold(n_splits=10, n_repeats=3, random_state=36851234)
    inner = RepeatedStratifiedKFold(n_splits=4, n_repeats=3, random_state=36851234)

    gs = GridSearchCV(pipe, param_grid=params_grid, scoring='accuracy', cv=inner, n_jobs=-1)

    scores = cross_val_score(gs, X, y.values.ravel(), scoring='accuracy', cv=outer, n_jobs=-1)
    

    inf, sup = stats.norm.interval(0.95, loc=scores.mean(), scale=scores.std()/np.sqrt(len(scores)))

    df_awnser = pd.concat([df, pd.DataFrame({'method': [name], 'mean': [np.mean(scores)], 'std': inf, 'upper': sup})], ignore_index=True)
    
    return df_awnser, scores

In [38]:
from sklearn.ensemble import BaggingClassifier


bg = BaggingClassifier(random_state=11)

name = 'BA'

params_grid = {
    'm__n_estimators': [3,9,15,21]
    } 

df, scoresBagging = train_model(bg,params_grid,name,df)


In [39]:
from sklearn.ensemble import AdaBoostClassifier

ada = AdaBoostClassifier(random_state=11)

name = 'AB'

params_grid = {
    'm__n_estimators': [3,9,15,21]
    }

df,scoresAda = train_model(ada,params_grid,name,df)

In [40]:
from sklearn.ensemble import RandomForestClassifier


rf = RandomForestClassifier(random_state=11)

name = 'RF'

params_grid = {
    'm__n_estimators': [3,9,15,21]
} 


df,scoresRandomForest = train_model(rf,params_grid,name,df)

### HP

In [41]:
# save df 
df.to_csv('df.csv', index=False)

In [42]:
# import base estimators
from sklearn.base import BaseEstimator
# import classifiers
from sklearn.tree import DecisionTreeClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.naive_bayes import GaussianNB


class HeterogeneousEnsemble(BaseEstimator):
    # define o construtor para o classificador
    def __init__(self,n_samples=3):
        
        self.classifiers  =  [DecisionTreeClassifier(random_state=11), KNeighborsClassifier(), GaussianNB()]
        self.n_samples = n_samples
        self.trained_classifiers = []


    def train_classifiers(self, X, y):

        # converter para numpy array
        X = X.to_numpy()
        y = y.to_numpy()

        # faz o loop sobre os classificadores individuais
        for clf in self.classifiers:
            # treina o classificador no conjunto de treinamento atual
            clf.fit(X, y.ravel())
            # adiciona o classificador treinado à lista
            self.trained_classifiers.append(clf)
        # retorna a lista de classificadores treinados
        return self.trained_classifiers

    def sample_data(self,X, y, random_state):
        # amostra as características com reposição e obtém os rótulos correspondentes
        X_sampled = X.sample(frac=1, replace=True, random_state=random_state)
        y_sampled = y.loc[X_sampled.index]
        # retorna o conjunto de dados amostrado
        return X_sampled, y_sampled


    def predict_hp(self, X_test, class_order):
        # cria um dicionário para armazenar as predições de cada classificador
        votes = {}
        X_test = X_test.to_numpy().reshape(1, -1)
        # faz o loop sobre os classificadores individuais
        for clf in self.trained_classifiers:
            # prediz a classe do exemplo de teste usando o classificador atual
            pred = clf.predict(X_test)
            # armazena a predição no dicionário
            if pred[0] in votes:
                votes[pred[0]] += 1
            else:
                votes[pred[0]] = 1
                

        # obtém a(s) classe(s) mais votada(s) e as armazena em uma lista
        max_votes = max(votes.values())
        most_voted_classes = [k for k,v in votes.items() if v == max_votes]
        
        hp_pred = None

        # se houver mais de uma classe mais votada, quebra o empate usando a ordem das classes do conjunto de treinamento
        if len(most_voted_classes) > 1:
            for c in class_order:
                if c in most_voted_classes:
                    hp_pred = c
                    break
            if hp_pred is None:
                hp_pred = most_voted_classes[0]
        # caso contrário, retorna a classe mais votada como a predição do conjunto HP
        else:
            hp_pred = most_voted_classes[0]

        # retorna a predição
        return hp_pred



    def fit(self,X,y): 

        # # if data is numpy array, convert to pandas dataframe
        if isinstance(X, np.ndarray):
            X = pd.DataFrame(X)
        if isinstance(y, np.ndarray):
            y = pd.Series(y)

        # reset index
        X.reset_index(drop=True, inplace=True)
        y.reset_index(drop=True, inplace=True)

        classifiers = [] # ciclo para treinar os classificadores individuais
        for i in range(self.n_samples):
            # se for a primeira iteração, use os dados de treinamento originais
            if i == 0:
                X_current = X.copy()
                y_current = y.copy()
            # caso contrário, crie um novo conjunto de treinamento amostrando com reposição os dados originais usando a função sample_data
            else:
                X_current, y_current = self.sample_data(X, y, i)
            
            # treina os classificadores individuais nos dados de treinamento atuais usando a função train_classifiers e os estende à lista
            classifiers.extend(self.train_classifiers(X_current, y_current))

    def predict(self,X_test): 
        if(isinstance(X_test, np.ndarray)):
            X_test = pd.DataFrame(X_test)
        
        class_order = y.value_counts().index.tolist()
        # cria uma lista vazia para armazenar as predições do conjunto HP
        hp_predictions = []
        # faz o loop sobre os exemplos de teste
        for index, row in X_test.iterrows():
            # predict the class of the test example using the predict_hp function and append it to the list # prediz a classe do exemplo de teste usando a função predict_hp e a adiciona à lista
            hp_pred = self.predict_hp(row, class_order)
            hp_predictions.append(hp_pred)
        return hp_predictions

    

In [43]:
hp2 = HeterogeneousEnsemble(n_samples=3)

hp2.fit(X,y)

y_pred = hp2.predict(X_test)

from sklearn.metrics import accuracy_score

print(accuracy_score(y_test, y_pred))


0.8166666666666667


In [46]:
df

Unnamed: 0,method,mean,std,lower,upper
0,ZR,0.165057,0.010883,0.161163,0.168952
1,BA,0.511073,0.472997,,0.549148
2,AB,0.271648,0.259689,,0.283606
3,RF,0.515517,0.48276,,0.548274


hp_Test = HeterogeneousEnsemble(n_samples=9)

hp_Test.fit(X,y)


hp_predictions = hp_Test.predict(X_test)

# avalia a acurácia do conjunto HP no conjunto de teste
hp_accuracy = accuracy_score(y_test, hp_predictions)
print(f'The accuracy of HP ensemble is {hp_accuracy:.4f}')

In [44]:

def train_model(model,params_grid,name, df): 
    scalar = StandardScaler()
    pipe = Pipeline(steps=[('s',scalar), ('m', model)])

    outer = RepeatedStratifiedKFold(n_splits=10, n_repeats=3, random_state=36851234)
    inner = RepeatedStratifiedKFold(n_splits=4, n_repeats=3, random_state=36851234)

    gs = GridSearchCV(pipe, param_grid=params_grid, scoring='accuracy', cv=inner, n_jobs=-1)

    scores = cross_val_score(gs, X, y.values.ravel(), scoring='accuracy', cv=outer, n_jobs=-1)


    print(np.mean(scores))

    inf, sup = stats.norm.interval(0.95, loc=scores.mean(), scale=scores.std()/np.sqrt(len(scores)))

    df_awnser = pd.concat([df, pd.DataFrame({'method': [name], 'mean': [np.mean(scores)], 'std': [inf], 'upper': [sup]})], ignore_index=True)
    # df_awnser = []
    return df_awnser, scores

# KNN gera um warning em relacao ao parametro keepdims, nao consigo dar surpress nele
hp = HeterogeneousEnsemble()

name = 'HP'

params_grid = {
    'm__n_samples': [1,3,5,7]
    }

df_2, scoresHeteros = train_model(hp,params_grid,name,df)



0.42352490421455946


In [45]:
    np.mean(scores)

NameError: name 'scores' is not defined

In [None]:
df_2


[]

In [None]:
df.to_csv('df.csv', index=False)

AttributeError: 'list' object has no attribute 'to_csv'