In [1]:
from __future__ import annotations

import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.model_selection import KFold

from sklearn.svm import SVC
from sklearn.tree import DecisionTreeClassifier

from sklearn.metrics import accuracy_score, precision_score, \
                            recall_score, f1_score

### クラスの下に使用例があります

In [2]:
class PipeLine(object):
    """
    __init__ :
    アトリビュートで訓練データ、正解データを管理する
    （引数）
    df: callメソッドでオリジナルのデータが格納される
    df_num: callで指定した数値データが格納される。クラスメソッドで上書きされる
    df_cat: callで指定したカテゴリデータが格納される
    viewer: bool, viewer_row :int 更新後のデータを表示する

    __call__ :
    インスタンスの生成時に引数でオリジナルデータを渡す
    （引数）
    data: 使用するオリジナルデータ  (pd.DataFrame
    numerical: 数値データのカラム名  (list[str])
    categorical: カテゴリデータのカラム名  (list[str])
    target: 

    standard_scaler:
    アトリビュートのdf_numを標準化する
    （引数）
    view: 標準化したdf_numを確認できる

    one_hot: 
    指定したカラムのdf_catをワンホット化してdf_numにconcatする
    （引数）
    columns: ワンホット化したいカラム名_ 
    concat: Trueの場合はアトリビュートのself.df_numに連結し更新する
    view: ワンホットされたデータがdf_numにconcatされているのを確認できる
    return: x_train, x_test, y_train, y_test
    """

    def __init__(self):
        self.df: pd.DataFrame = None
        self.df_num: pd.DataFrame = None
        self.df_cat: pd.DataFrame = None
        self.df_target: pd.DataFrame = None
        self.viewer = True  # 更新したカラムの表示を切り替え
        self.viewer_row = 3  # 表示カラムの行数
        self.random_seed = 42  # 乱数シード値

    def __call__(self,
                 data: pd.DataFrame,
                 numerical=['Age', 'Sex', 'RestingBP', 'Cholesterol', \
                 'FastingBS', 'MaxHR', 'ExerciseAngina', 'Oldpeak'],
                 categorical=['ChestPainType', 'RestingECG', 'ST_Slope'],
                 target=['HeartDisease'],
                 train_flg=True
                 ) -> pd.DataFrame:

        self.df = data
        self.df_num = data[numerical]
        self.df_cat = data[categorical]
        # 正解ラベルが与えられない本番環境では引数からFalseにすること
        if train_flg:
            self.df_target = data[target]
        return self.df_num

    def standard_scaler(self):
        columns = self.df_num.columns
        scaler = StandardScaler()
        scaler.fit(self.df_num)
        self.df_num = scaler.transform(self.df_num)
        self.df_num = pd.DataFrame(self.df_num, columns=columns)
        if self.viewer:
            print('-'*20, '標準化されたdf_num', '-'*20)
            display(self.df_num.head(self.viewer_row))
        return None

    def one_hot(self, columns: list[str], concat=True) -> pd.DataFrame:
        one_hotted = pd.get_dummies(self.df_cat[columns])
        self.df_num = pd.concat((self.df_num, one_hotted), axis=1)
        if self.viewer:
            print('-'*20, f'ワンホットされたカラム{columns}', '-'*20)
            display(self.df_num.head(self.viewer_row))
        return None

    def fold_out_split(self, test_size=0.3, to_array=True) -> np.ndarray:
        pack = train_test_split(self.df_num,  self.df_target,
                                test_size=test_size,
                                random_state=self.random_seed)
        x_tr, x_te, y_tr, y_te = pack
        if to_array:
            x_tr, x_te, y_tr, y_te = [i.values for i in pack]
            y_tr, y_te = y_tr.reshape(-1), y_te.reshape(-1)
        if self.viewer:
            print('-'*20, '分割されたデータShape', '-'*20)
            print(f'x_train: {x_tr.shape} x_test: {x_te.shape}')
            print(f'y_train: {y_tr.shape} y_test: {y_te.shape}')
        return x_tr, x_te, y_tr, y_te

    def k_fold(self, n_splits=5, to_array=True) -> list[list[np.ndarray]]:
        kf = KFold(n_splits=n_splits, shuffle=True, random_state=42)
        packs = []
        for train_index, test_index in kf.split(self.df_num):
            x_tr, x_te = self.df_num.iloc[train_index], self.df_num.iloc[test_index]
            y_tr, y_te = self.df_target.iloc[train_index], self.df_target.iloc[test_index]
            pack = (x_tr, x_te,  y_tr, y_te)
            if to_array:
                pack = [unpack.values for unpack in pack]
            packs.append(pack)
        if self.viewer: 
            print(kf.get_n_splits)
        return packs

    def training(self, valid, Model, valid_args={}, params={}):
        if valid == 'fold_out_split':
            packs = self.fold_out_split(**valid_args)
            model = Model(**params)
            model.fit(packs[0], packs[2])
            evaluations(model, *packs)
            
            return model

        if valid == 'k_fold':
            packs = self.k_fold(**valid_args)
            models = []
            for i, pack in enumerate(packs):
                model = Model(**params)
                model.fit(pack[0], pack[2].reshape(-1))
                print('-'*20, f'model{i} predict', '-'*20)
                evaluations(model, *pack)
                models.append(model)
            return models



# evaluation
def evaluations(model, x_train, x_test, y_train, y_test):
    evaluate = [accuracy_score, precision_score, recall_score, f1_score]
    # 訓練データの評価
    train_pred = model.predict(x_train)
    train_val = {func.__name__: func(y_train, train_pred) for func in evaluate}
    # 検証データの評価
    test_pred = model.predict(x_test)
    test_val = {func.__name__: func(y_test, test_pred) for func in evaluate}
    evals = pd.DataFrame((train_val, test_val), index=['train', 'test'])
    display(evals)
    return evals


def ensemble_prediction(models, x, y):
    try:
        predict = [model.predict_proba(x) for model in models]
        predict_sum = np.sum(predict, axis=0)
        ensemble_prediction = np.array(
            [np.where(pre[0]<pre[1], 1, 0) for pre in predict_sum]
            )
    except AttributeError:
        print('########## 確率で出力するようパラメータもしくはモデルを設定することを推奨 ############')
        predict = [model.predict(x) for model in models]
        predict_sum = np.sum(predict, axis=0)
        ensemble_prediction = np.array(
                [np.where(len(models)//2<=pre, 1, 0) for pre in predict_sum]
                )
    return ensemble_prediction


### ホールドアウト法を使用する場合
PipeLineクラスのtrainingメソッドで valid = 'fold_out_split' を指定する<br>
valid_args = {'test_size': テストサイズ比率} を指定する

In [4]:
df = pd.read_csv('../data/train.csv')  # パスは確認してください

############################# ここを指定する ###################################
valid, model = ['fold_out_split', DecisionTreeClassifier]  # 分割数とモデルを指定
valid_args = {'test_size': 0.2}  # ホールドアウト法なのでテストサイズの割合を指定
params = {'max_depth': 3, }    # 選択したモデルパラメータを入力
##############################################################################

pipe = PipeLine()
pipe.viewer = False
pipe(df)
# kayano_data = func(pipe.df_num)
# pipe.df_num = kayano_data
pipe.standard_scaler()
pipe.one_hot(['ChestPainType'])
models = pipe.training(valid=valid, Model=model, valid_args=valid_args ,params=params)

Unnamed: 0,accuracy_score,precision_score,recall_score,f1_score
train,0.82846,0.888889,0.779783,0.830769
test,0.775194,0.884058,0.743902,0.807947


>
### 交差検証法を使用する場合
PipeLineクラスのtrainingメソッドで valid = 'k_fold' を指定する.<br>
valid_args = {'n_splits': 分割数} を指定する

In [98]:
df = pd.read_csv('../data/train.csv')

############################# ここを指定する ###################################
valid, model = ['fold_out_split', SVC]  # 分割数とモデルを指定
valid_args = {'test_size': 0.2}  # ホールドアウト法なのでテストサイズの割合を指定
params = {'kernel': 'rbf' ,'probability': True}  # 選択したモデルパラメータを入力
##############################################################################

pipe = PipeLine()
pipe.viewer = False
pipe(df)
pipe.standard_scaler()
pipe.one_hot(['ChestPainType'])
models = pipe.training(valid=valid, Model=model, valid_args=valid_args ,params=params)

Unnamed: 0,accuracy_score,precision_score,recall_score,f1_score
train,0.875244,0.895911,0.870036,0.882784
test,0.844961,0.897436,0.853659,0.875
