In [1]:
import numpy as np
import pandas as pd
import random
import warnings
warnings.filterwarnings('ignore')
from decision_tree_tool import DecisionTree

In [26]:
class AdaboostModel():
    def __init__(self, model_type='classification'):   #변수 초기화
        model_types = ['classification', 'regression']
        assert model_type in model_types, f'model_type must be the one of the {model_types}' #조건이 True임을 보증
        self.model_type = model_type       #문제 타입
        self.fitting_models = None        #개별 모형 리스트
        self.coef_list = None             #가중치 알파
        self.weight_lists = None          #업데이트된 가중치
        self.selected_idx_lists = None    #샘플링된 인덱스
        self.error_lists = None           #오분류 관측치
        self.estimator_predicts = None    #개별 모형의 예측결과
        self.uniq_labels = None           #클래스 라벨
        self.num_class = None             #클래수 개수
        self.num_data = None              #데이터 개수
        self.loss = None                  #회귀문제에서의 로스 타임
        
    def get_loss(self, y1, y2, sample_weights, loss):  #Weight Loss Functhion
        D = np.max(np.abs(y1-y2))
        if loss == 'linear':
            L_array = np.abs(y1-y2)/D
        elif loss == 'square':
            L_array = np.square(y1-y2)/(D**2)
        else:
            L_array = 1 - np.exp(-np.abs(y1-y2)/D)
            
        L_bar = np.dot(L_array, sample_weights)
        return L_bar, L_array
    
    def get_weighted_median(self, x, weights):       #가중 중앙값 계산
        sum_val = 0.5 * np.sum(weights)
        cum_sum = weights[0]
        i = 0
        while cum_sum < sum_val:
            i += 1
            cum_sum += weights[i]
        return x[i]
    
    #Adaboost 알고리즘
    def fit(self, X, y, type_of_col, n_estimators=10, random_state=100, loss='linear'):
        self.uniq_labels = np.unique(y)   #클래스 라벨은 y의 고윳값들
        
        #초기화
        fitting_models = []
        coef_list = []            #가중치 알파
        weight_lists = []         #업데이트된 가중치
        selected_idx_lists = []   #샘플링된 인덱스
        error_lists = []          #오분류 관측치
        estimator_predicts = []     #별 모형의 예측결과
        n = X.shape[0]            #관측치 개수
        self.num_data = n         #데이터 개수
        w = np.array([1/n] * len(y))
        col_name = X.columns
        epsilon = 1e-10
        random.seed(random_state)
        
        if self.model_type == 'classification':  #모델 타입이 분류이면
            K = len(self.uniq_labels)
            self.num_class = K           #클래수 개수는 클래스 라벨의 길이
            
            for ne in range(n_estimators):       #초기 가중치 W
                clf = DecisionTree(tree_type = self.model_type)
                clf.fit(X , y, max_depth = 2, type_of_col = type_of_col, auto_determine_type_of_col = False)
                
                #개별 모형의 예측값과 오류 계산
                y_predict = np.array(clf.predict(X))         #개별 모형의 예측값
                incorrect = y != y_predict
                error = np.sum(w[incorrect]) / np.sum(w)     #오류율
                
                if error == 0:
                    break
                    
                estimator_predicts.append(y_predict)
                error_lists.append(error)
                alpha = np.log((1-error) / (error + epsilon)) + np.log(K-1)   #알파값, classifier의 가중치
                fitting_models.append(clf)
                coef_list.append(alpha)
                
                if ne == n_estimators - 1:
                    break
                
                else:
                    #가중치 업데이트 및 새로운 학습데이터 복원 추출
                    new_weight = w.copy()
                    new_weight = new_weight * np.exp(alpha * incorrect)
                    new_weight = new_weight / np.sum(new_weight)
                    weight_lists.append(new_weight)
                    sampling_idx = random.choices(range(n), weights = new_weight, k = n)
                    selected_idx_lists.append(sampling_idx)
                    
                    X = X.iloc[sampling_idx, :].reset_index(drop=True)
                    y = y[sampling_idx]
                    w = np.array([1/n]*len(y))
                    
                    
        self.fitting_models = fitting_models
        self.coef_list = np.array(coef_list)
        self.weight_lists = weight_lists
        self.selected_idx_lists = selected_idx_lists
        self.error_lists = np.array(error_lists)
        self.estimator_predicts = estimator_predicts
        
    def predict(self, X):      #예측수행함수
        n = self.num_data      #관측치 개수
        pred = []
        temp_pred_list = [model.predict(X) for model in self.fitting_models]
        temp_pred_tuple = tuple(zip(*temp_pred_list))
        
        if self.model_type == 'classification':
            alpha_list = self.coef_list
            K = self.num_class
            
            for i in range(n):
                temp_labels = [0]*K
                temp_pred = np.array(temp_pred_tuple[i])
                for k in range(K):
                    temp_labels[k] = np.sum(alpha_list[temp_pred == K])
                pred.append(np.argmax(temp_labels))
        return pred

사이킷런 붓꽃 데이터에 적용

In [30]:
from sklearn.datasets import load_iris

iris = load_iris()
df = pd.DataFrame(iris.data, columns = iris.feature_names)
df['species'] = [iris.target_names[x] for x in iris.target]

species_to_labels = dict(zip(df['species'].unique(), range(len(df['species'].unique()))))
df['label'] = df['species'].map(species_to_labels)

#M = 10으로 모형 적합
X, y = df.iloc[:,:4], df['label'].values
clf = AdaboostModel(model_type = 'classification')
clf.fit(X, y, type_of_col = ['continuous'] * 4, n_estimators = 10)
np.mean(y == clf.predict(X)).round(2)

0.33