이번 파트에서는 지금까지 학습한 내용을 바탕으로 머신러닝 자동화 시스템을 구축해보겠습니다.
|시스템명|과제 유형|범위|활용 방법론|
|---|---|---|---|
|MyAutoML1|분류|모델 선택과 하이퍼파라미터 튜닝|그리드 서치|
|MyAutoML2|회귀|모델 선택과 하이퍼파라미터 튜닝|실험 <br> 랜덤 서치|
|MyAutoML3|이진 분류|모델 선택과 하이퍼파라미터 튜닝 <br> 스케일링 <br> 재샘플링 <br> 스태킹 앙상블|메타 모델 <br> 베이지안 최적화|

# 1. 문제 정의 및 클래스 설계

## 탐색 공간
MyAutoml_1은 탐색 공간 내 모든 분류 모델과 하이퍼파라미터를 비교함으로써 최적의 분류 모델과 하이퍼파라미터를 탐색합니다. 여기서 사용자가 탐색하지 않을 모델을 고를 수 있습니다.

<table>
  <tr>
    <td>모델</td>
    <td>하이퍼 파라미터</td>
    <td>탐색 공간</td>
  </tr>
  <tr>
    <td rowspan="2">k-최근접 이웃<br>(KNeighborsClassfier)</td>
    <td>n_neighbors</td>
    <td>{3, 5, 7, 9, 11}</td>
  </tr>
  <tr>
    <td>metric</td>
    <td>{'euclidean', 'manhattan'}</td>
  </tr>
  <tr>
  <tr>
    <td rowspan="2">결정 나무<br>(DecisionTreeClassifier)</td>
    <td>max_depth</td>
    <td>{3, 5, 7, 9}</td>
  </tr>
  <tr>
    <td>min_samples_split</td>
    <td>{2, 5, 10}</td>
  </tr>
  <tr>
  <tr>
    <td rowspan="3">랜덤 포레스트<br>(RandomForestClassfier)</td>
    <td>n_estimators</td>
    <td>{50, 100, 200}</td>
  </tr>
  <tr>
    <td>max_depth</td>
    <td>{2, 3, 4}</td>
  </tr>
  <tr>
    <td>max_features</td>
    <td>{0.2, 0.4, 0.6, 0.8, 1.0}</td>
  </tr>
  <tr>
  <tr>
    <td rowspan="2">신경망<br>(MLPClassfier)</td>
    <td>hidden_layer_sizes</td>
    <td>{(10,), (10, 10), (20, 20), (15, 15, 15), (20, 20, 20), (15, 15, 15, 15), (30, 30, 30, 30)}</td>
  </tr>
  <tr>
    <td>max_iter</td>
    <td>{2000}</td>
  </tr>
  <tr>
</table>

## 클래스 설계: 인자 정의
MyAutoml_1은 다음과 같은 인자를 갖는 파이썬 클래스로 개발합니다.
|인자|설명|기본값|
|---|---|---|
|exclude_modlels|탐색에서 제외할 모델 목록<br>['KNN', 'DT', 'RF', 'MLP']의 부분 집합|[ ]|
|seed|k-최근접 이웃을 제외한 각 모델의 시드|2022|
|cv|폴드 개수|5|
|scoring|분류 평가 척도|'accracy|
|summarize_score|평가 결과 요약 방법|'mean'|
|early_stopping|교차 검증에서의 조기 종료 여부|False|
|early_stopping_criteria|조기 종료 기준으로 다음을 만족하면 조기 종료를 수행<br><br>1 - (현재 폴드에서의 성능 / 현재까지 가장 좋은 모델의 성능) > early_stopping_criteria|0.1|

## 클래스 설계: 메서드 정의
MyAutoml_1은 다음과 같은 메서드를 갖습니다.
|메서드|설명|
|---|---|
|fit|특징 X와 라벨 y를 입력받아 그리드 서치를 사용해 모델과 하이퍼파라미터를 탐색합니다. 한, 최고 점수의 모델 인스턴스를 전체 데이터로 재학습해 model 속성에 저장합니다.|
|show_leaderboard|모델 및 파라미터 탐색 결과인 리더보드를 출력합니다. 이때 리더보드는 모델, 하이퍼파라미터, 성능 지표로 구성된 데이터프레임 입니다.|
|predict|fit 메서드에서 저장한 model로 새로 입력된 특징의 라벨을 예측합니다.|

# 2. 시스템 구현

## 모듈 불러오기
시스템 구현에 필요한 모듈과 함수를 다음고 같이 불러옵니다.

In [2]:
import pandas as pd
from sklearn.model_selection import ParameterGrid, KFold
from sklearn.neighbors import KNeighborsClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.neural_network import MLPClassifier
from sklearn.metrics import *
from functools import partial
import numpy as np

- 라인 8: functools.partial은 인자가 기본값이 아닌 값으로 설정된 함수를 변수에 저장하는 데 사용합니다. 여기서는 average 인자가 'macro'와 'weighted'로 설정해야 하는 다중 쿨래스 분류 평가 지표를 정의하는데 사용합니다.

In [8]:
class MyAutoML1:
    ## 생성자 : 클래스의 생성자의 인자를 정의 합니다.
    def __init__(
        self,
        exclude_models = [],
        seed = None,
        cv = 5,
        scoring = 'accuracy',
        summarize_scoring = 'mean',
        early_stopping = False,
        early_stopping_criteria = 0.1,
    ):
        # self.exclude_models 정의
        model_set = {'KNN', 'DT', 'RF', 'MLP'} # 입력할 수 있는 전체 모델 목록인 model_set를 집합 자료형으로 정의
        if set(exclude_models) == model_set:
            raise ValueError('모든 모델을 제외할 수 없습니다.') # exclude_models와 model_set이 같으면 모든 모델이 탐색 대상에서 제외 / raise는 의도적으로 오류를 발생시키는 데 사용
        improper_models = set(exclude_models) - model_set # exclude_models에는 포햄됐으나, model_set에는 포함되지 않은 집합을 improper_models라고 정의
        if len(improper_models) >= 1:
            raise ValueError(
                '{}는 exclude_models에 포함할 수 없습니다.'.format(improper_models)
            ) # improper_models에 한 개의 요소라도 포함되면 오류를 발생
        self.exclude_models = exclude_models # exclude_models를 self.exclude_models에 저장합니다.

        # self.seed 정의
        if(type(seed) != int) and (seed is not None):
            raise ValueError('seed는 int형 혹은 None 이어야 합니다.') # seed는 int 자료형 혹은 None 이어야 하므로 seed가 int 자료형이나 None이 아니라면 오류를 발생 시킨다.
        self.seed = seed

        # self.cv 정의
        if type(cv) != int:
            raise ValueError('cv는 int 형이어야 합니다.')
        if cv < 2:
            raise ValueError('cv는 2보다는 커야 합니다.') # k-겹 교차 검증에서 폴드의 개수를 나타내는 cv는 2 이상의 자연수여야 한다. 따라서 int 자료형이 아니거나 2보다 작은 값이라면 오류를 발생 시킨다.
        self.cv = cv

        # self.scoring 정의
        scoring_dict = {
            'accuracy': accuracy_score,
            'precision': precision_score,
            'weighted-precision': partial(precision_score, average = 'weighted'),
            'macro-precision': partial(precision_score, average = 'macro'),
            'recall': recall_score,
            'weighted-recall': partial(recall_score, average = 'weighted'),
            'macro-recall': partial(recall_score, average = 'macro'),
            'f1': f1_score,
            'weighted-f1': partial(f1_score, average = 'weighted'),
            'macro-f1': partial(f1_score, average = 'macro')
        } # 사용자가 입력한 문자열을 대응되는 함수로 바꾸기 위한 사전 자료형인 scoring_dict를 정의

        if scoring not in scoring_dict.keys():
            msg = 'scoring은 {} 중 하나여야 합니다.'.format(scoring_dict.keys())
            raise ValueError(msg) # 입력한 scoring이 scoring_dict의 키가 아니면 오류를 발생시킵니다.
        self.scoring = scoring_dict[scoring] # scoring_dict를 scoring으로 인덱싱한 함수를 self.scoring에 저장합니다.

        #self.summarize 정의
        summarize_scoring_dict = {'mean': np.mean, 'max': np.max, 'min': np.min} # 사용자가 입력한 문자열을 대응되는 함수로 바꾸기 위한 사전 자료형 / 최소, 평균, 최대 중 하나로 k-겹 교차 검증 결과 요약

        if summarize_scoring not in ['mean', 'max', 'min']:
            msg = 'summarize_scoring는 {"mean", "max", "min"}중 하나여야 합니다.'
            raise ValueError(msg)
        self.summarize_scoring = summarize_scoring_dict[summarize_scoring]

        #self.early_stopping 정의
        if type(early_stopping) is not bool:
            raise ValueError('early_stopping은 True 혹은 False여야 합니다.') # early_stopping은 부율 자료형 / 다른 자료형 입력 시 오류 발생
        self.early_stopping = early_stopping

        #early_stopping_criteria 정의
        if type(early_stopping_criteria) is not float:
            raise ValueError('early_stopping_criteria 자료형은 float이어야 합니다.')
        if early_stopping_criteria <= 0 or early_stopping_criteria >= 1:
            raise ValueError('early_stopping_criteria는 0과 1 사이의 값이어야 합니다.') # early_stopping_criteria는 0과 1 사이의 float 자료형 / 그 외의 값 오류 발생
        self.early_stopping_criteria = early_stopping_criteria

    ## fit 메서드
    def fit(self, X, y):
        # X, y 포맷 변경
        if isinstance(X, pd.DataFrame):
            X = X.values # X가 데이터프레임이라면 values 속성을 사용해 X를 ndarray로 변환 / 여기서 instance(instance, class)는 instance가 class의 객체이면 True를, 그렇지 않으면 False 반환
        elif isinstance(X, list) or isinstance(X, tuple):
            X = np.array(X) # X가 리스트 혹은 튜플이라면 np.array 함수를 사용해 ndarray로 변환
        if isinstance(y, pd.Series):
            y = y.values
        elif isinstance(y, list) or isinstance(y, tuple):
            y = np.array(y)
        # K최근접 이웃 그리드 정의
        kNN_grid = ParameterGrid(
            {'n_neighbors': [3, 5, 7, 9, 11], 'metric': ['euclidean', 'manhattan']}
        )
        #결정 나무 그리드 정의
        DT_grid = ParameterGrid(
            {'max_depth': [3, 5, 7, 9], 'min_samples_split': [2, 5, 10]}
        )
        # 랜덤 포레스트 그리드 정의
        RFR_grid = ParameterGrid(
            {
                'n_estimators': [50, 100, 200],
                'max_depth': [2, 3, 4],
                'max_features': [0.2, 0.4, 0.6, 0.8, 1.0],
            }
        )
        # 신경망 그리드 정의
        MLP_grid = ParameterGrid(
            {
                'hidden_layer_sizes': [
                    (10,),
                    (10, 10),
                    (20, 20),
                    (15, 15, 15),
                    (20, 20, 20),
                    (15, 15, 15, 15),
                    (30, 30, 30, 30),
                ],
                'max_iter': [2000]
            }
        )

        # 전체 그리드 정의
        grid = {
            KNeighborsClassifier : kNN_grid,
            DecisionTreeClassifier: DT_grid,
            RandomForestClassifier: RFR_grid,
            MLPClassifier: MLP_grid
        }

        # 그리드 서치 시작
        best_score = 0 # 현재 까지 찾은 최고의 점수를 0으로 초기화
        self.leaderboard = [] # 리더보드 속성을 빈 리스트로 초기화 / 그리드 서치를 수행하면서 이 리스트에 탐색 결과 추가 / 마지막에 데이터 프레임으로 변환 예정
        for model_func in grid.keys(): # 분류 모델 클래스를 model_func으로 순회합니다.
            if model_func in self.exclude_models:
                continue # model_func이 exclude_models에 속하면 탐색하지 않습니다.
            for params in grid[model_func]: # model_func의 그리드를 params로 순회합니다.
                if model_func != KNeighborsClassifier:
                    params['random_state'] = self.seed # model_func이 kNeighborsClassifier가 아니라면 사전인 params에 {'random_state': self.seed}를 추가 / k-최근접 이웃은 random_state 인자가 없으므로 추가 x
                kf = KFold(n_splits = self.cv, shuffle = True, random_state = self.seed) # self.cv와 self.seed를 이용해 KFold 인스턴스를 정의
                fold_score_list = [] # 각 모델과 하이퍼 파라미터 교차 검증 결과 담을 fold_score_list를 빈 리스트로 초기화
                # 조기 종료를 하는 경우
                if self.early_stopping:
                    for train_index, test_index in kf.split(X):
                        X_train, X_test = X[train_index], X[test_index]
                        y_train, y_test = y[train_index], y[test_index]
                        model = model_func(**params).fit(X_train, y_train)
                        y_pred = model.predict(X_test) # kf로 구분한 train_index, test_index를 사용해 학습 데이터와 평가 데이터 정의, 학습, 예측 수행
                        fold_score = self.scoring(y_test, y_pred)
                        fold_score_list.append(fold_score) # self.scoring을 사용해 y_test와 y_pred를 비교한 결과를 fold_score에 저장하고 이를 fold_score_list에 추가합니다.
                        if fold_score < best_score * (1 - self.early_stopping_criteria):
                            break # fold_score가 현재까지 최고 점수보다 self.early_stopping_criteria * 100% 이상 낮다면 조기 종료 합니다.
                # 조기 종료를 하지 않는 경우
                else:
                    for train_index, test_index in kf.split(X):
                        X_train, X_test = X[train_index], X[test_index]
                        y_train, y_test = y[train_index], y[test_index]
                        model = model_func(**params).fit(X_train, y_train)
                        y_pred = model.predict(X_test)
                        fold_score = self.scoring(y_test, y_pred)
                        fold_score_list.append(fold_score) # self.early_stoopping이 False라면 조기 종료하지 않고 모델과 하이퍼 파라미터를 평가합니다.
                # 현재까지 찾은 최고의 해 및 리더보드 업데이트
                score = self.summarize_scoring(fold_score_list) # self.summarizing_scoring을 이용해 fold_score_list를 대푯값으로 요약
                if score > best_score:
                    best_score = score
                    best_model_func = model_func
                    best_params = params # 현재까지 찾은 해보다 현재 찾은 해가 더 좋은 해라면 best_score, best_model_func, best_params를 업데이트
                self.leaderboard.append([model_func, params, score]) # 현재 해 정보를 self.leaderboard에 추가합니다.
        self.model = best_model_func(**best_params).fit(X, y) # 최종 모델과 하이퍼 파라미터를 갖는 모델을 전체 데잍에 대해 재학습합니다.
        self.leaderboard = pd.DataFrame(self.leaderboard,
                                            columns = ['모델', '파라미터', '점수']) # self.leaderboard를 모델, 파라미터, 점수라는 칼럼을 갖는 데이터프레임으로 변환

    ## predict 메서드
    def predict(self, X):
        return self.model.predict(X)

    ## show_leaderboard 메서드
    def show_leaderboard(self):
        return self.leaderboard

# 3. 시스템 활용

## 활용 예제 1

데이터 불러오기

In [4]:
# 데이터 불러오기
df = pd.read_csv('../data/classification/winequality-red.csv')
X = df.drop('y', axis = 1)
y = df['y']

머신러닝 자동화

In [5]:
aml = MyAutoML1()
aml.fit(X, y)
result = aml.show_leaderboard()
display(result.sort_values(by = '점수', ascending = False))

Unnamed: 0,모델,파라미터,점수
18,<class 'sklearn.tree._classes.DecisionTreeClas...,"{'max_depth': 7, 'min_samples_split': 10, 'ran...",0.609753
63,<class 'sklearn.ensemble._forest.RandomForestC...,"{'max_depth': 4, 'max_features': 0.8, 'n_estim...",0.604761
55,<class 'sklearn.ensemble._forest.RandomForestC...,"{'max_depth': 4, 'max_features': 0.4, 'n_estim...",0.604753
53,<class 'sklearn.ensemble._forest.RandomForestC...,"{'max_depth': 4, 'max_features': 0.2, 'n_estim...",0.604120
65,<class 'sklearn.ensemble._forest.RandomForestC...,"{'max_depth': 4, 'max_features': 1.0, 'n_estim...",0.603482
...,...,...,...
0,<class 'sklearn.neighbors._classification.KNei...,"{'metric': 'euclidean', 'n_neighbors': 3}",0.500915
3,<class 'sklearn.neighbors._classification.KNei...,"{'metric': 'euclidean', 'n_neighbors': 9}",0.500315
1,<class 'sklearn.neighbors._classification.KNei...,"{'metric': 'euclidean', 'n_neighbors': 5}",0.499052
2,<class 'sklearn.neighbors._classification.KNei...,"{'metric': 'euclidean', 'n_neighbors': 7}",0.496589


## 활용 예제 2

데이터 불러오기

In [6]:
# 데이터 불러오기
df = pd.read_csv('../data/classification/bupa.csv')
X = df.drop('y', axis = 1)
y = df['y']

머신러닝 자동화

In [9]:
aml = MyAutoML1(scoring = 'f1', early_stopping = True, early_stopping_criteria = 0.05)
aml.fit(X, y)
result = aml.show_leaderboard()
display(result.sort_values(by = '점수', ascending = False))
display(aml.predict(X)[:5])

Unnamed: 0,모델,파라미터,점수
54,<class 'sklearn.ensemble._forest.RandomForestC...,"{'max_depth': 4, 'max_features': 0.2, 'n_estim...",0.803900
58,<class 'sklearn.ensemble._forest.RandomForestC...,"{'max_depth': 4, 'max_features': 0.6, 'n_estim...",0.799603
59,<class 'sklearn.ensemble._forest.RandomForestC...,"{'max_depth': 4, 'max_features': 0.6, 'n_estim...",0.794762
37,<class 'sklearn.ensemble._forest.RandomForestC...,"{'max_depth': 3, 'max_features': 0.2, 'n_estim...",0.788789
55,<class 'sklearn.ensemble._forest.RandomForestC...,"{'max_depth': 4, 'max_features': 0.4, 'n_estim...",0.787954
...,...,...,...
15,<class 'sklearn.tree._classes.DecisionTreeClas...,"{'max_depth': 5, 'min_samples_split': 10, 'ran...",0.641026
70,<class 'sklearn.neural_network._multilayer_per...,"{'hidden_layer_sizes': (15, 15, 15), 'max_iter...",0.617647
4,<class 'sklearn.neighbors._classification.KNei...,"{'metric': 'euclidean', 'n_neighbors': 11}",0.615385
72,<class 'sklearn.neural_network._multilayer_per...,"{'hidden_layer_sizes': (15, 15, 15, 15), 'max_...",0.605263




array([1, 1, 1, 1, 1], dtype=int64)