In [None]:
# !pip install numpy pandas matplotlib sklearn xgboost lightgbm catboost 
# !pip freeze > requirements.txt
# !pip install xgboost lightgbm catboost 

In [None]:
import os
import numpy as np
import pandas as pd
import sklearn
import matplotlib.pyplot as plt
# import seaborn as sns

from sklearn.model_selection import train_test_split

# Build Pipeline
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import Pipeline
from sklearn.pipeline import FeatureUnion

# Imputation and Scaling"
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import SimpleImputer, KNNImputer, IterativeImputer
from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import StandardScaler, RobustScaler, MinMaxScaler, Normalizer
from sklearn.preprocessing import OneHotEncoder, OrdinalEncoder, LabelEncoder

# Feature Selection 
from sklearn.feature_selection import chi2, f_classif, mutual_info_classif
from sklearn.feature_selection import SelectKBest, SelectFromModel, SelectPercentile, RFE

# Feature Decomposition and Extraction
from sklearn.decomposition import PCA

# Model 
from sklearn.linear_model import LogisticRegression
from sklearn.tree import DecisionTreeClassifier
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import SGDClassifier
from xgboost import XGBClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.naive_bayes import GaussianNB
from sklearn.ensemble import GradientBoostingClassifier
from lightgbm import LGBMClassifier
from catboost import CatBoostClassifier
from sklearn.neural_network import MLPClassifier

# Results
from sklearn.inspection import permutation_importance
from sklearn.metrics import classification_report, confusion_matrix
from sklearn.metrics import roc_auc_score, roc_curve, auc

# GridSearchCV 
from sklearn.model_selection import GridSearchCV, StratifiedKFold
from sklearn.metrics import accuracy_score, f1_score

# Save model weights ---- > pickle package or joblib package
import joblib

# Features

In [None]:
df = pd.read_csv('PF_AI_Internal.csv', encoding='utf-8', low_memory=False)
# df = pd.read_csv('test_01.csv', encoding='utf-8', low_memory=False)# , dtype={'Gender':object, 'hx_cancer':object, 'Pf_Color_bloody':object})
# low_memory옵션은 대용량의 데이터를 불러오는 경우 각 칼럼의 데이터 타입(dtype)을 추측하는 것이 매우 많은 메모리를 사용하기 때문에 대용량의 데이터를 불러올때 메모리 에러가 발생하는 경우 이를 False로 설정할 것을 권장한다.
external_df = pd.read_csv('PF_AI_External.csv', encoding='utf-8', low_memory=False)
# external_df = pd.read_csv('test_external_01.csv', encoding='utf-8', low_memory=False)

clinical_feature = ['Gender', 'Age', 'BMI', 'BT', 'hx_cancer']
blood_serum_feature = []
pleural_fluid_feature = []
for c in df.columns:
    if ('b_' in c) & ('_date' not in c):
        blood_serum_feature.append(c)
    if ('pf_' in c) | ('Pf_' in c):
        pleural_fluid_feature.append(c)
        
print(f'Clinical: {len(clinical_feature)}\nBlood/Serum: {len(blood_serum_feature)}\nPleural Fluid: {len(pleural_fluid_feature)}')

In [None]:
""" SELECT CATEGORICAL FEATURES AND NUMERICAL FEATURES """
test_type = 'post_test' # 'pre_test' 'post_test'
if test_type == 'pre_test':
    # ## Pre-test ## 
    categorical_features = ['Gender', 'hx_cancer']
    numerical_features = ['Age','BMI','BT']

elif test_type == 'post_test':
## Post-test ##
    categorical_features = ['Gender', 'hx_cancer', 'Pf_Color_bloody']
    numerical_features = ['Age','BMI','BT',
                'pf_pH','pf_RBC','pf_WBC','pf_PMN_p','pf_Lympho_p','pf_other_p',
                'pf_protein','pf_glucose','pf_chloride','pf_LD','pf_amylase',
                'pf_albumin','pf_ADA','pf_CEA'] + blood_serum_feature
print(f'Categorical Data: {len(categorical_features)}\nNumerical Data: {len(numerical_features)}')

In [None]:
# categorical_features = ['hx_cancer']
# numerical_features = ['BT', 'pf_pH', 'pf_RBC', 'pf_WBC', 'pf_PMN_p', 'pf_Lympho_p', 'pf_other_p', 'pf_protein', 'pf_LD', 'pf_amylase', 'pf_albumin', 'pf_ADA', 'pf_CEA', 'b_Lympho', 'b_plt', 'b_CRP', 'b_Protein', 'b_TB', 'b_BUN', 'b_Calcium', 'b_Cholesterol', 'b_Creatinine', 'b_Uric_Acid', 'b_albumin']
# print(f'Categorical Data: {len(categorical_features)}\nNumerical Data: {len(numerical_features)}')

In [None]:
# categorical_features = ['hx_cancer']
# numerical_features = ['BT', 'pf_pH', 'pf_RBC', 'pf_WBC', 'pf_PMN_p', 'pf_Lympho_p', 'pf_other_p', 'pf_protein', 'pf_LD', 'pf_amylase', 'pf_albumin', 'pf_ADA', 'pf_CEA', 'b_WBC', 'b_Lympho', 'b_plt', 'b_CRP', 'b_TB', 'b_Calcium', 'b_Cholesterol', 'b_total_CO2', 'b_GGT', 'b_K', 'b_Uric_Acid']
# print(f'Categorical Data: {len(categorical_features)}\nNumerical Data: {len(numerical_features)}')

In [None]:
""" Setting Variables """
random_state = 42

fontsize = 15
title_font = 16
small_font = 12
mean_color = '#1a50b6'
data_dir = 'external' 

label_dict = {0:'Transudative',1:'Malignant',2:'Parapneumonic',3:'Tuberculous',4:'Others'}

X_features = categorical_features + numerical_features # clinical_feature + blood_serum_feature + pleural_fluid_feature
Y_target = 'final_dx_1'
print(f'Number of Features Uses: {len(X_features)}')

# Dataset split

In [None]:
""" Set X(Data) and Y(Target) """
X = df.loc[:, X_features]
feature_col = X.columns
Y = df.loc[:, Y_target]
print(f"Feature: {X.shape} | Target: {Y.shape}")

In [None]:
""" TRAIN AND TEST DATASET SPLIT """
test_size = 0.2
x_train, x_test, y_train, y_test = train_test_split(X, Y, 
                                                    test_size=test_size, shuffle=True, 
                                                    stratify=Y, random_state=random_state)

print(f'Train: {len(x_train)} | Test: {len(x_test)}\n')

for idx, v in label_dict.items():
    print(f'{v} | Train: {y_train.value_counts()[idx]} | Test: {y_test.value_counts()[idx]}')

""" Set X(Data) and Y(Target) for external """
x_external = external_df.loc[:, X_features]
y_external = external_df.loc[:, Y_target]
print(f"\nExternal Feature: {x_external.shape} | Target: {y_external.shape}")

for idx, v in label_dict.items():
    print(f'{v} | External: {y_external.value_counts()[idx]}')

In [None]:
y_train.value_counts()/y_test.value_counts()

# Pipeline

- sklearn.base의 TransformerMixin을 상속하면 fit, transform메서드만 만들어도 자동으로 fit_transform()메서드를 자동으로 생성해준다.
- sklearn.base의 BaseEstimator를 상속하면 하이퍼파라미터 튜닝에 필요한 두 메서드 get_params()와 set_params()를 추가로 얻게 된다. (생성자에 *args나 **kargs 사용하면 안됨)

In [None]:
class FeatureSpliter( BaseEstimator, TransformerMixin ):
    """ 변수 나누어주는 Class 
    수치형(Numerical) 데이터와 범주형(Categorical) 데이터를 선택하기 위한 클래스
    """
    def __init__(self, feature_names):
        self.feature_names = feature_names
    
    def fit(self, X, y = None):
        return self
    
    def transform(self, X, y = None):
        return X[self.feature_names]

In [None]:
""" 결측치 대체 Class """
class MissingTransformer( BaseEstimator, TransformerMixin ):
    def __init__(self, imputer='single', strategy='mean'):
        self.imputer = imputer
        self.strategy = strategy

    def fit( self, X, y = None ):
        if (self.imputer == 'single') or (self.imputer == SimpleImputer()):
            self.imputer = SimpleImputer(missing_values=np.nan, strategy=self.strategy)
            if self.strategy not in ['mean', 'median', 'most_frequent', 'constant']:
                raise ValueError(f" >>> Can only use these strategies: ['mean', 'median', 'most_frequent', 'constant'], got strategy = {self.strategy}")
        elif self.imputer == 'multiple':
            if (self.strategy == 'knn') or (self.imputer == KNNImputer()):
                self.imputer = KNNImputer(missing_values=np.nan, n_neighbors=2)
            elif (self.strategy == 'iterative') or (self.imputer == IterativeImputer()):
                self.imputer = IterativeImputer(estimator=LinearRegression(), missing_values=np.nan, max_iter=10, verbose=2, 
                                                imputation_order='roman',random_state=random_state, n_nearest_features=5)
            else: 
                raise ValueError(f" >>> Can only use these strategies: ['knn', 'iterative'], got strategy = {self.strategy}")
                
        self.imputer = self.imputer.fit(X)      
        return self
    
    def transform(self, X , y = None ):
        # cols = X.columns.tolist()
        result = self.imputer.transform(X)
        # result = pd.DataFrame(result , columns=cols)
        return result

In [None]:
class CategoricalTransformer( BaseEstimator, TransformerMixin ):
    """ 범주형 변수 처리 Class """
    def __init__(self, encoder ='ordinal'):
        self.encoder = encoder
        
    def fit( self, X, y = None ):
        if (self.encoder == 'ordinal') or (self.encoder == OrdinalEncoder()):
            self.encoder = OrdinalEncoder()
        # elif self.encoder == 'onehot':
        #     self.encoder = OneHotEncoder()
        # elif (self.encoder == 'label') or (self.encoder == LabelEncoder()):
        #     self.encoder = LabelEncoder()
        else:
            raise ValueError(f" >>> Can only use these encoders : ['ordinal'], got encoder = {self.encoder}") # 'onehot', 'label'
            
        self.encoder = self.encoder.fit(X)
        return self
    
    def transform(self, X , y = None ):
        # cols = X.columns.to_list()
        result = self.encoder.transform(X)
        # result = pd.DataFrame(result, columns=cols)
        return result

In [None]:
class NumericalTransformer( BaseEstimator, TransformerMixin ):
    """ 연속형 변수 처리 Class """
    def __init__(self, scaler='minmax'):
        self.scaler = scaler
            
    def fit( self, X, y = None ):
        if (self.scaler == 'minmax') or (self.scaler == MinMaxScaler()):
            self.scaler = MinMaxScaler()
        elif (self.scaler == 'standard') or (self.scaler == StandardScaler()):
            self.scaler = StandardScaler()
        elif (self.scaler == 'normalize') or (self.scaler == Normalizer()):
            self.scaler = Normalizer()
        elif (self.scaler == 'robust') or (self.scaler == RobustScaler()):
            self.scaler = RobustScaler()
        else:
            raise ValueError(f" >>> Can only use these scalers : ['minmax', 'standard', 'normalize', 'robust'], got scaler = {self.scaler}")
            
        self.scaler = self.scaler.fit(X)
        return self
    
    def transform(self, X , y = None):
        # cols = X.columns.to_list()
        result = self.scaler.transform(X)
        # result = pd.DataFrame(result, columns=cols)
        return result

** Feature Selection **
- 단일 변수 선택법은 각각의 독립변수를 하나만 사용한 예측모형의 성능을 이용하여 가장 분류성능 혹은 상관관계가 높은 변수만 선택하는 방법
    - chi2: 카이제곱 검정 통계값
    - f_classif: 분산분석(ANOVA) F검정 통계값
    - mutual_info_classif: 상호정보량(mutual information)
- feature_selection 서브패키지는 성능이 좋은 변수만 사용하는 전처리기인 SelectKBest 클래스도 제공
- ** For regression: f_regression, mutual_info_regression **
- ** For classification: chi2, f_classif, mutual_info_classif **

In [None]:
class FeatureSelector( BaseEstimator, TransformerMixin ):
    """ Feature Selection Class """
    def __init__(self, selector='chi2', k=10):
        self.selector = selector
        self.k = k
            
    def fit( self, X, y=None):
        if (self.selector == 'chi2'): # filter method
            score_func = chi2
            self.selector = SelectKBest(score_func=score_func, k=self.k)
        elif (self.selector == 'anova'): # filter method
            score_func = f_classif
            self.selector = SelectKBest(score_func=score_func, k=self.k)
        elif (self.selector == 'mutualinfo'): # filter method
            score_func = mutual_info_classif
            self.selector = SelectKBest(score_func=score_func, k=self.k)
        elif (self.selector == 'model-based'): # model-based, embedded method
            model_sel = RandomForestClassifier(n_estimators=100, random_state=random_state).fit(X, y)
            self.selector = SelectFromModel(model_sel, prefit=True, max_features=self.k)
        else:
            raise ValueError(f" >>> Can only use these selectors : ['chi2', 'anova', 'mutualinfo','model-based'], got selector = {self.selector}")
        
        self.selector = self.selector.fit(X, y)
        return self
    
    def transform(self, X , y=None):
        result = self.selector.transform(X)
        return result
    
    def get_feature_name(self): # Get Selected Feature Name
        return self.selector.get_feature_names_out(input_features=feature_col)
    
    def get_support(self):
        return self.selector.get_support()

# Results Visualization

In [None]:
# # from sklearn.metrics import confusion_matrix, plot_confusion_matrix

# def show_confusion_matrix(y_test, y_pred, save_path=None, dataset=None):
#     conf_matrix = confusion_matrix(y_test, y_pred)
    
#     cmap = plt.cm.Blues
#     plt.figure(figsize=(6,6))
#     plt.imshow(conf_matrix, cmap=cmap, interpolation='nearest')

#     thresh = conf_matrix.max() / 1.5 
#     for i in range(conf_matrix.shape[0]):
#         for j in range(conf_matrix.shape[1]):
#             plt.text(x=j, y=i,s=conf_matrix[i, j], va='center', ha='center', color="white" if conf_matrix[i, j] > thresh else "black",fontsize=fontsize) # , size='large'

#     plt.xticks(np.arange(len(label_dict)), label_dict.values(), fontsize=fontsize, rotation=45)
#     plt.yticks(np.arange(len(label_dict)), label_dict.values(), fontsize=fontsize, rotation=45)
#     plt.xlabel('Predictions', fontsize=fontsize)
#     plt.ylabel('Actuals', fontsize=fontsize)
#     plt.title(f'Confusion matrix: {dataset}', fontsize=title_font)
#     plt.colorbar(fraction=0.05, pad=0.05)
#     if save_path:
#         plt.savefig(f'{save_path}/Confusion_Matrix_{dataset}.png', bbox_inches='tight')
#     plt.show()
#     plt.close()

In [None]:
def show_confusion_matrix(y_test, y_pred, save_path=None, dataset=None):
    conf_matrix = confusion_matrix(y_test, y_pred)
    conf_matrix = conf_matrix/conf_matrix.sum(1, keepdims=True) # np.divide(conf_matrix, conf_matrix.sum(1, keepdims=True))

    cmap = plt.cm.Blues
    plt.figure(figsize=(6,6))
    plt.imshow(conf_matrix, cmap=cmap, interpolation='nearest')

    thresh = conf_matrix.max() / 1.5 
    for i in range(conf_matrix.shape[0]):
        for j in range(conf_matrix.shape[1]):
            plt.text(x=j, y=i,s=round(conf_matrix[i, j], 2), va='center', ha='center', color="white" if conf_matrix[i, j] > thresh else "black",fontsize=fontsize) # , size='large'

    plt.xticks(np.arange(len(label_dict)), label_dict.values(), fontsize=fontsize, rotation=45)
    plt.yticks(np.arange(len(label_dict)), label_dict.values(), fontsize=fontsize, rotation=45)
    plt.xlabel('Predictions', fontsize=fontsize)
    plt.ylabel('Actuals', fontsize=fontsize)
    plt.title(f'Confusion matrix: {dataset}', fontsize=title_font)
    plt.colorbar(fraction=0.05, pad=0.05)
    if save_path:
        plt.savefig(f'{save_path}/Confusion_Matrix_{dataset}.png', bbox_inches='tight')
    plt.show()
    plt.close()

In [None]:
# from sklearn.inspection import permutation_importance

def show_permutation_importance(model, x, y, save_path=None, dataset=None):
    """ 
    >>> PERMUTATION IMPORTANCE <<<  
    > fix_model : 훈련된 모델
    > X_train : 훈련데이터 Feature
    > y_train : 훈련데이터 Target
    > n_repeats : 특정 Feature 를 몇번 Shuffle 할 것인지
    > scoring : Feature 를 Shuffler 한 뒤, 예측값과 실제값을 어떤 Metric 을 사용해 비교할지
    > random_state : 난수 고정
    """
    result = permutation_importance(model, x, y, n_repeats=10,
                                    random_state=random_state, n_jobs=2)
    
    sorted_result = result.importances_mean.argsort()
    # 결과를 DataFrame 화
    importances = pd.DataFrame(result.importances_mean[sorted_result], index=feature_col[sorted_result]).sort_values(0, ascending=False)   
    
    # 결과를 시각화
    plt.figure(figsize=(9,6))
    top_index = sorted_result[-20:]
    plt.boxplot(result.importances[top_index].T, vert=False, labels=feature_col[top_index])
    plt.title(f"Permutation Importances: TOP 20 ({dataset})", fontsize=title_font)
    plt.yticks(fontsize=fontsize)
    plt.xticks(fontsize=fontsize)
    plt.tight_layout()
    if save_path:
        plt.savefig(f'{save_path}/Permutation_Importance_{dataset}.png', bbox_inches='tight')
    plt.show()
    plt.close()

In [None]:
def show_roc_curve(model, x_test, y_test, save_path=None, dataset=None):
    """ ROC & AUC """
    y_true = pd.get_dummies(y_test).to_numpy()
    y_proba = model.predict_proba(x_test)

    fprs = []
    tprs = []
    aucs = []

    fig, ax = plt.subplots(figsize=(10, 7))#, dpi=dpi)
    mean_fpr = np.linspace(0,1,100)
    for k, v in label_dict.items():
        fpr, tpr, _ = roc_curve(y_true[:, k], y_proba[:, k])
        auc_score = auc(fpr, tpr)
        interp_tpr = np.interp(mean_fpr, fpr, tpr)
        interp_tpr[0] = 0.0
        tprs.append(interp_tpr)
        aucs.append(auc_score)

        line = ax.plot(fpr, tpr, label=f'{label_dict[k]} (AUC=%0.2f)'%auc_score, alpha=0.3)

    ax.plot([0, 1], [0, 1], 'r--', label='Chance', alpha=0.8)
    mean_tpr = np.mean(tprs, axis=0)
    mean_tpr[-1] = 1.0
    mean_auc = auc(mean_fpr, mean_tpr)
    std_auc = np.std(aucs)
    # print("Mean roc_auc_score: ", roc_auc_score(y_true, y_proba, multi_class='macro'))
    ax.plot(mean_fpr, mean_tpr, color=mean_color, label='Mean ROC (AUC=%0.2f $\pm$ %0.2f)'%(mean_auc, std_auc), alpha=0.8)


    ax.legend(fontsize=small_font, ncol=1, loc='lower right')
    plt.xlabel('False Positive Rate', fontsize=fontsize)
    plt.ylabel('True Positive Rate', fontsize=fontsize)
    plt.xticks(fontsize=fontsize)
    plt.yticks(fontsize=fontsize)
    plt.title(f'Receiver Operating Characteristic (ROC) Curve: {dataset}', fontsize=title_font)
    plt.tight_layout()
    if save_path:
        plt.savefig(f'{save_path}/ROC_Curve_{dataset}.png', bbox_inches='tight')
    plt.show()
    plt.close()

In [None]:
def set_preprocessing(cat_feature=None, cat_encoder = 'ordinal', cat_imputer = 'single', cat_strategy = 'most_frequent', num_feature=None, num_scaler = 'minmax', num_imputer = 'single', num_strategy = 'mean'):
    """ PipeLine: 
    >>> Feature Selector: Categorical or Numerical
    >>> Imputer: Frequent or Mean
    >>> Scaler: Normalization
    """

    """ CREATE PREPROCESSING STEPS FOR CATEGORICAL PIPELINE """
    categorical_steps = [
        ('cat_selector', FeatureSpliter(cat_feature)),
        ('cat_encoder', CategoricalTransformer(encoder=cat_encoder)), 
        ('cat_imputer', MissingTransformer(imputer=cat_imputer, strategy=cat_strategy)),
    ]

    """ CREATE PREPROCESSING STEPS FOR NUMERICAL PIPELINE """
    numerical_steps = [
        ('num_selector', FeatureSpliter(num_feature)),
        ('num_imputer', MissingTransformer(imputer=num_imputer, strategy=num_strategy)),
        ('num_scaler', NumericalTransformer(scaler=num_scaler)),
    ]

    # create the 2 pipelines with the respective steps
    categorical_pipeline = Pipeline(categorical_steps)
    numerical_pipeline = Pipeline(numerical_steps)

    pipeline_list = [
        ('categorical_pipeline', categorical_pipeline),
        ('numerical_pipeline', numerical_pipeline)
    ]
    # Combining the 2 pieplines horizontally into one full pipeline
    preprocessing_pipeline = FeatureUnion(transformer_list=pipeline_list)
    
    return preprocessing_pipeline

In [None]:
def set_model(network):
    if network == 'Multinomial':
        model = LogisticRegression(multi_class='multinomial', solver='lbfgs', random_state = random_state)
    elif network == 'Decision':
        model = DecisionTreeClassifier(criterion='entropy', random_state=random_state)
    elif network == 'SVC':
        model = SVC(random_state = random_state)
    elif  network == 'RF':
        model = RandomForestClassifier(n_estimators = 1000, random_state = random_state)
    elif network == 'SGD':
        model = SGDClassifier(random_state = random_state)
    elif network == 'XGB': # Good
        model = XGBClassifier(random_state = random_state)
    elif network == 'LGB': # Best
        model = LGBMClassifier(random_state = random_state)
    elif network == 'KNN': 
        model = KNeighborsClassifier(n_neighbors = 7)
    elif network == 'Naive':
        model = GaussianNB()
    elif network == 'Gradient': # Good 
        model = GradientBoostingClassifier(random_state = random_state)
    elif network == 'Cat': # Good 
        model = CatBoostClassifier(random_state = random_state)
    elif network == 'MLP':
        model = MLPClassifier(hidden_layer_sizes=(100,), activation='relu',solver='adam',batch_size=100,random_state=random_state)
    # elif network == 'Logistic':
    #     model = LogisticRegression(random_state = random_state)
    else: 
        raise ValueError(f" >>> Can only use these estimators : ['Multinomial', 'Decision', 'SVC', 'RF', 'SGD', 'XGB', 'LGB', 'KNN', 'Naive', 'Gradient', 'Cat', 'MLP'], got estimator = {network}")
    
    return model 

In [None]:
from sklearn.utils.validation import check_X_y, check_array, check_is_fitted
from sklearn.utils.multiclass import unique_labels
from sklearn.metrics import euclidean_distances

class EstimatorSelector( BaseEstimator, TransformerMixin ):
    def __init__(self, estimator='Multinomial'):
        self.estimator = estimator
            
    def fit(self, X, y):
        if (self.estimator == 'Multinomial') or (self.estimator == LogisticRegression(multi_class='multinomial', solver='lbfgs', random_state = random_state)):
            self.estimator = LogisticRegression(multi_class='multinomial', solver='lbfgs', random_state = random_state)
        elif (self.estimator == 'Decision') or (self.estimator == DecisionTreeClassifier(criterion='entropy', random_state=random_state)):
            self.estimator = DecisionTreeClassifier(criterion='entropy', random_state=random_state)
        elif (self.estimator == 'SVC') or (self.estimator == SVC(random_state = random_state)):
            self.estimator = SVC(random_state = random_state)
        elif (self.estimator == 'RF') or (self.estimator == RandomForestClassifier(n_estimators = 1000, random_state = random_state)):
            self.estimator = RandomForestClassifier(n_estimators = 1000, random_state = random_state)
        elif (self.estimator == 'SGD') or (self.estimator == SGDClassifier(random_state = random_state)):
            self.estimator = SGDClassifier(random_state = random_state)
        elif (self.estimator == 'XGB') or (self.estimator == XGBClassifier(random_state = random_state)): # Good
            self.estimator = XGBClassifier(random_state = random_state)
        elif (self.estimator == 'LGB') or (self.estimator == LGBMClassifier(random_state = random_state)): # Best
            self.estimator = LGBMClassifier(random_state = random_state)
        elif (self.estimator == 'KNN') or (self.estimator == KNeighborsClassifier(n_neighbors = 7)): 
            self.estimator = KNeighborsClassifier(n_neighbors = 7)
        elif (self.estimator == 'Naive') or (self.estimator == GaussianNB()):
            self.estimator = GaussianNB()
        elif (self.estimator == 'Gradient') or (self.estimator == GradientBoostingClassifier(random_state = random_state)): # Good 
            self.estimator = GradientBoostingClassifier(random_state = random_state)
        elif (self.estimator == 'Cat') or (self.estimator == CatBoostClassifier(random_state = random_state)): # Good 
            self.estimator = CatBoostClassifier(random_state = random_state)
        elif (self.estimator == 'MLP') or (self.estimator == MLPClassifier(hidden_layer_sizes=(100,), activation='relu',solver='adam',batch_size=100,random_state=random_state)):
            self.estimator = MLPClassifier(hidden_layer_sizes=(100,), activation='relu',solver='adam',batch_size=100,random_state=random_state)
        # elif (self.estimator == 'Logistic') or (self.estimator == LogisticRegression(random_state = random_state)):
        #     self.estimator = LogisticRegression(random_state = random_state)
        else: 
            raise ValueError(f" >>> Can only use these estimators : ['Multinomial', 'Decision', 'SVC', 'RF', 'SGD', 'XGB', 'LGB', 'KNN', 'Naive', 'Gradient', 'Cat', 'MLP'], got estimator = {self.estimator}")
            
        self.estimator.fit(X,y)
        return self

    def predict(self, X):
        y_pred = self.estimator.predict(X)
        return y_pred
    
    def score(self, X, y):
        score = self.estimator.score(X, y)
        return score

    def predict_proba(self, X):
        proba = self.estimator.predict_proba(X)
        return proba

# Model 

In [None]:
""" SELECT VARIABLES AND CREATE PIPELINE """
cat_encoder = 'ordinal'
cat_imputer = 'single' # 'single' 'multiple'
cat_strategy = 'most_frequent' # 'most_frequent' 'knn'
num_scaler = 'robust' # 'minmax' 'standard' 'normalize' 'robust'
num_imputer = 'single' # 'single' 'multiple'
num_strategy = 'mean' # 'mean' 'iterative'
preprocessing_pipe = set_preprocessing(categorical_features, cat_encoder, cat_imputer, cat_strategy, numerical_features, num_scaler, num_imputer, num_strategy)

select_func = 'mutualinfo' # 'chi2', 'anova', 'mutualinfo'
select_k = 35
selector_pipe = FeatureSelector(selector=select_func, k=select_k)

# Feature Extraction : 피쳐들을 선택하는 것이 아니라, 더 작은 차원으로 피쳐들을 맵핑하는 것이다. 
# PCA, LDA, SVD, NMF
# n_components = 40
# pca_pipe = PCA(n_components=n_components, random_state
# =random_state)

network = 'LGB'
# model_pipe = set_model(network)
model_pipe  = EstimatorSelector(estimator=network)

In [None]:
# Set model pipeline
pipeline = Pipeline([('Processing', preprocessing_pipe),
                  # ('Selector', selector_pipe),
                #   ('PCA', pca_pipe),
                  ('Model', model_pipe),
                 ])
pipeline

In [None]:
""" Model Train """
pipeline.fit(x_train, y_train)
y_pred = pipeline.predict(x_test)
y_pred_external = pipeline.predict(x_external)

In [None]:
print(">>> Test Accuracy / Weighted F1-Score: {:.4f} / {:.4f}".format(accuracy_score(y_test, y_pred), f1_score(y_test, y_pred, average='weighted')))
print(">>> External Accuracy / Weighted F1-Score: {:.4f} / {:.4f}".format(accuracy_score(y_external, y_pred_external), f1_score(y_external, y_pred_external, average='weighted')))

In [None]:
""" 실험한 모델들을 저장 """
# SAVE PATH 지정
# SAVE_RESULT_PATH = f'{data_dir}/{len(X_features)}/{network}/{num_imputer}_{num_scaler}'
SAVE_RESULT_PATH = f'{data_dir}/{test_type}/{network}/{num_imputer}_{num_scaler}'
os.makedirs(SAVE_RESULT_PATH, exist_ok=True)
joblib.dump(pipeline, os.path.join(SAVE_RESULT_PATH, f"best_model.pkl"))
# m_load = joblib.load(os.path.join(SAVE_RESULT_PATH, f"model_{network}.pkl"))
# print(">>> Test Accuracy / Weighted F1-Score: {:.2f} / {:.2f}".format(m_load.score(x_test, y_test), f1_score(y_test, y_pred, average='weighted')))

In [None]:
print(f'>>> Current Model: {network} <<<')
print(f'>>> Current Scaler: {num_scaler} <<<')
print(f'>>> Current Imputer: {num_imputer} <<<')
print(f'Number of Train Dataset: {len(x_train)}')
print(f'Number of Test Dataset: {len(x_test)}')
print(">>> Train Accuracy / Weighted F1-Score: {:.4f} / {:.4f}".format(pipeline.score(x_train, y_train), f1_score(y_train, pipeline.predict(x_train), average='weighted')))
print(">>> Test Accuracy / Weighted F1-Score: {:.4f} / {:.4f}".format(pipeline.score(x_test, y_test), f1_score(y_test, y_pred, average='weighted')))
print(classification_report(y_test, y_pred, target_names=list(label_dict.values())))

print(">>> External Accuracy / Weighted F1-Score: {:.4f} / {:.4f}".format(pipeline.score(x_external, y_external), f1_score(y_external, y_pred_external, average='weighted')))
print(classification_report(y_external, y_pred_external, target_names=list(label_dict.values())))

# # View Selected Features
# print('-'*30)
# print(pipeline.named_steps['Selector'].get_feature_name())
# print("Number of Selected Feature: {}".format(len(pipeline.named_steps['Selector'].get_feature_name())))

In [None]:
""" Model Results Save """
results_dict = dict()
results_dict['Model'] = network
results_dict['Scaler'] = num_scaler
results_dict['Imputer'] = num_imputer
results_dict['<<< TRAIN >>>'] = ''
results_dict['Train Set'] = len(x_train)
results_dict['Train Accuracy'] = round(pipeline.score(x_train, y_train), 4)
results_dict['Train Weighted F1-Score'] = round(f1_score(y_train, pipeline.predict(x_train), average='weighted'), 4)
results_dict['<<< TEST >>>'] = ''
results_dict['Test Set'] = len(x_test)
results_dict['Test Accuracy'] = round(pipeline.score(x_test, y_test), 4)
results_dict['Test Weighted F1-Score'] = round(f1_score(y_test, y_pred, average='weighted'), 4)
results_dict['Results Table: Test'] = f'\n{classification_report(y_test, y_pred, target_names=list(label_dict.values()))}'
results_dict['<<< EXTERNAL >>>'] = ''
results_dict['External Set'] = len(x_external)
results_dict['External Accuracy'] = round(pipeline.score(x_external, y_external), 4)
results_dict['External Weighted F1-Score'] = round(f1_score(y_external, y_pred_external, average='weighted'), 4)
results_dict['Results Table: External'] = f'\n{classification_report(y_external, y_pred_external, target_names=list(label_dict.values()))}'
# results_dict['Selected Features'] = pipeline.named_steps['Selector'].get_feature_name()
# results_dict['Number of Selected Features'] = len(pipeline.named_steps['Selector'].get_feature_name())

f = open(os.path.join(SAVE_RESULT_PATH, " results_info.txt"), "w")
f.write(f' --- Results --- \n')
for k, v in results_dict.items():
    f.write(f'[{k}]: {v}\n')
f.close() 

In [None]:
show_confusion_matrix(y_test, y_pred, save_path=SAVE_RESULT_PATH, dataset='Test')
show_confusion_matrix(y_external, y_pred_external, save_path=SAVE_RESULT_PATH, dataset='External')

In [None]:
show_roc_curve(pipeline, x_test, y_test, save_path=SAVE_RESULT_PATH, dataset='Test')
show_roc_curve(pipeline, x_external, y_external, save_path=SAVE_RESULT_PATH, dataset='External')

In [None]:
show_permutation_importance(pipeline, x_test, y_test, save_path=SAVE_RESULT_PATH, dataset='Test')
show_permutation_importance(pipeline, x_external, y_external, save_path=SAVE_RESULT_PATH, dataset='External')

# Wrapper Method

In [None]:
""" SELECT VARIABLES AND CREATE PIPELINE """
cat_encoder = 'ordinal'
cat_imputer = 'single' # 'single' 'multiple'
cat_strategy = 'most_frequent' # 'most_frequent' 'knn'
num_scaler = 'robust' # 'minmax' 'standard' 'normalize' 'robust'
num_imputer = 'single' # 'single' 'multiple'
num_strategy = 'mean' # 'mean' 'iterative'
preprocessing_pipe = set_preprocessing(categorical_features, cat_encoder, cat_imputer, cat_strategy, numerical_features, num_scaler, num_imputer, num_strategy)

network = 'LGB'
model_pipe = set_model(network)
# model_pipe  = EstimatorSelector(estimator=network)

In [None]:
preprocessing_pipe.fit(x_train)
x_train_trans = preprocessing_pipe.transform(x_train)
x_test_trans = preprocessing_pipe.transform(x_test)
x_external_trans = preprocessing_pipe.transform(x_external)

In [None]:
# """ Model Train """
# step = 1
# min_features_to_select = 5
# rfe_selector = RFE(estimator=model_pipe, n_features_to_select=min_features_to_select, step=step, verbose=5)
# rfe_selector.fit(x_train_trans, y_train)
# y_pred = rfe_selector.predict(x_test_trans)
# y_pred_external = rfe_selector.predict(x_external_trans)

In [None]:
from sklearn.feature_selection import RFECV
from sklearn.model_selection import RepeatedStratifiedKFold

cv = 5 #RepeatedStratifiedKFold(n_splits=5, n_repeats=3, random_state=random_state)
step = 1
min_features_to_select = 5
rfe_selector = RFECV(estimator=model_pipe, step=step, scoring='f1_weighted', min_features_to_select=min_features_to_select, cv=cv)
rfe_selector.fit(x_train_trans, y_train)
y_pred = rfe_selector.predict(x_test_trans)

y_pred_external = rfe_selector.predict(x_external_trans)

In [None]:
rfe_selector.support_.sum()

In [None]:
plt.figure()
plt.title("Performance per number of features")
plt.xlabel("Number of features selected")
plt.ylabel("Cross validation score (f1_weighted)")
plt.plot(
    range(min_features_to_select, len(rfe_selector.grid_scores_)*step + min_features_to_select, step),
    rfe_selector.grid_scores_, '--',
    alpha=0.5,
)
plt.plot(
    range(min_features_to_select, len(rfe_selector.grid_scores_)*step + min_features_to_select, step),
    np.mean(rfe_selector.grid_scores_, axis=1), color='red', 
    label='Mean',
)
plt.scatter(np.argmax(np.mean(rfe_selector.grid_scores_, axis=1))+min_features_to_select, np.max(np.mean(rfe_selector.grid_scores_, axis=1)),marker='*', color='red',s=100)
plt.legend()
# if save_path:
#     plt.savefig(f'{save_path}/ROC_Curve_{dataset}.png', bbox_inches='tight')
plt.show()
plt.close()

In [None]:
# rfe_selector.estimator_
# rfe_selector.n_features_
# rfe_selector.support_
# rfe_selector.ranking_
# rfe_selector.cv_results_

In [None]:
rfe_selector.ranking_

In [None]:
print(f'>>> Current Model: {network} <<<')
print(f'>>> Current Scaler: {num_scaler} <<<')
print(f'>>> Current Imputer: {num_imputer} <<<')
print(f'Number of Train Dataset: {len(x_train_trans)}')
print(f'Number of Test Dataset: {len(x_test_trans)}')
print(">>> Train Accuracy / Weighted F1-Score: {:.4f} / {:.4f}".format(rfe_selector.score(x_train_trans, y_train), f1_score(y_train, rfe_selector.predict(x_train_trans), average='weighted')))
print(">>> Test Accuracy / Weighted F1-Score: {:.4f} / {:.4f}".format(rfe_selector.score(x_test_trans, y_test), f1_score(y_test, y_pred, average='weighted')))
print(classification_report(y_test, y_pred, target_names=list(label_dict.values())))
print(">>> External Accuracy / Weighted F1-Score: {:.4f} / {:.4f}".format(rfe_selector.score(x_external_trans, y_external), f1_score(y_external, y_pred_external, average='weighted')))
print(classification_report(y_external, y_pred_external, target_names=list(label_dict.values())))

rfe_support = rfe_selector.get_support()
rfe_feature = x_train.loc[:,rfe_support].columns.tolist()
print(rfe_feature)
print(f'Number of Features: {len(rfe_feature)}')

In [None]:
""" 실험한 모델들을 저장 """
# SAVE PATH 지정 
SAVE_RESULT_PATH = f'{data_dir}/{len(X_features)}/RFECV/{network}/{num_imputer}_{num_scaler}'
# SAVE_RESULT_PATH = f'results/Test'
os.makedirs(SAVE_RESULT_PATH, exist_ok=True)
joblib.dump(rfe_selector, os.path.join(SAVE_RESULT_PATH, f"best_model.pkl"))
# m_load = joblib.load(os.path.join(SAVE_RESULT_PATH, f"model_{network}.pkl"))
# print(">>> Test Accuracy / Weighted F1-Score: {:.2f} / {:.2f}".format(m_load.score(x_test, y_test), f1_score(y_test, y_pred, average='weighted')))

In [None]:
""" Model Results Save """
results_dict = dict()
results_dict['Model'] = network
results_dict['Scaler'] = num_scaler
results_dict['Imputer'] = num_imputer
results_dict['<<< TRAIN >>>'] = ''
results_dict['Train Set'] = len(x_train_trans)
results_dict['Train Accuracy'] = round(rfe_selector.score(x_train_trans, y_train), 4)
results_dict['Train Weighted F1-Score'] = round(f1_score(y_train, rfe_selector.predict(x_train_trans), average='weighted'), 4)
results_dict['<<< TEST >>>'] = ''
results_dict['Test Set'] = len(x_test_trans)
results_dict['Test Accuracy'] = round(rfe_selector.score(x_test_trans, y_test), 4)
results_dict['Test Weighted F1-Score'] = round(f1_score(y_test, y_pred, average='weighted'), 4)
results_dict['Results Table: Test'] = f'\n{classification_report(y_test, y_pred, target_names=list(label_dict.values()))}'
results_dict['<<< EXTERNAL >>>'] = ''
results_dict['External Set'] = len(x_external_trans)
results_dict['External Accuracy'] = round(rfe_selector.score(x_external_trans, y_external), 4)
results_dict['External Weighted F1-Score'] = round(f1_score(y_external, y_pred_external, average='weighted'), 4)
results_dict['Results Table: External'] = f'\n{classification_report(y_external, y_pred_external, target_names=list(label_dict.values()))}'
results_dict['<<< SELECTED FEATURE >>>'] = ''
results_dict['Selected Features'] = rfe_feature
results_dict['Number of Selected Features'] = len(rfe_feature)

f = open(os.path.join(SAVE_RESULT_PATH, " results_info.txt"), "w")
f.write(f' --- Results --- \n')
for k, v in results_dict.items():
    f.write(f'[{k}]: {v}\n')
f.close() 

In [None]:
show_confusion_matrix(y_test, y_pred, save_path=SAVE_RESULT_PATH, dataset='Test')
show_confusion_matrix(y_external, y_pred_external, save_path=SAVE_RESULT_PATH, dataset='External')

In [None]:
# show_permutation_importance(rfe_selector, x_test_trans, y_test, save_path=SAVE_RESULT_PATH, dataset='Test')
# show_permutation_importance(rfe_selector, x_external, y_external, save_path=SAVE_RESULT_PATH, dataset='External')

In [None]:
show_roc_curve(rfe_selector, x_test_trans, y_test, save_path=SAVE_RESULT_PATH, dataset='Test')
show_roc_curve(rfe_selector, x_external_trans, y_external, save_path=SAVE_RESULT_PATH, dataset='External')

# GridSearchCV

In [None]:
""" SELECT VARIABLES AND CREATE PIPELINE """
cat_encoder = 'ordinal'
cat_imputer = 'single' # 'single' 'multiple'
cat_strategy = 'most_frequent' # 'most_frequent' 'knn'
num_scaler = 'standard' # 'minmax' 'standard' 'normalize' 'robust'
num_imputer = 'single' # 'single' 'multiple'
num_strategy = 'mean' # 'mean' 'iterative'
preprocessing_pipe = set_preprocessing(categorical_features, cat_encoder, cat_imputer, cat_strategy, numerical_features, num_scaler, num_imputer, num_strategy)

select_func = 'mutualinfo' # 'chi2', 'anova', 'mutualinfo'
select_k = 49
selector_pipe = FeatureSelector(selector=select_func, k=select_k)

# Feature Extraction : 피쳐들을 선택하는 것이 아니라, 더 작은 차원으로 피쳐들을 맵핑하는 것이다. 
# PCA, LDA, SVD, NMF
# n_components = 40
# pca_pipe = PCA(n_components=n_components, random_state
# =random_state)

network = 'LGB'
# model_pipe = set_model(network)
model_pipe  = EstimatorSelector(estimator=network)

In [None]:
# Set model pipeline
# steps=[('Processing', preprocessing_pipe), ('Selector', selector_pipe), ('PCA', pca_pipe), ('Model', model_pipe)]

steps=[('Processing', preprocessing_pipe), ('Selector', selector_pipe), ('Model', model_pipe)]

pipeline_grid = Pipeline(steps=steps)
pipeline_grid

In [None]:
pipeline_grid.get_params().keys()

In [None]:
# pipeline_grid.get_params().keys()

params = {
    # 'Processing__numerical_pipeline__num_scaler__scaler': ['minmax','standard','robust'],
    # 'Processing__numerical_pipeline__num_imputer__':[],
    'Selector__k':[3, 5, 10, 12, 15, 18, 20, 21,22,23,24,25], # [3, 4, 5,6,7,8,9, 10,11, 12,13,14, 15], # 
    # 'Selector__selector':['chi2','anova','mutualinfo'], # 'chi2',-> input X must be non-negative
    # 'PCA__n_components': [min(5, ), 12],
    # 'Model__estimator':['Multinomial', 'Decision', 'SVC', 'RF', 'SGD', 'XGB', 'LGB', 'KNN', 'Naive', 'Gradient', 'Cat'],
    # 'Model__C':[0.1,1, 10, 100],
    # 'Model__gamma':['scale','auto'], #[1,0.1,0.01,0.001],
    # 'Model__kernel':['linear','rbf', 'poly', 'sigmoid'],
    # ### MLP
    # # 'Model__activation':['identity','logistic','tanh','relu'],
    # 'Model__alpha':[0.0001,0.001,0.01,0.1],
    # 'Model__hidden_layer_sizes':[(2,),(5,),(10,)], # ,(100,)
    # # 'Model__solver':['lbfgs','sgd','adam'],
    # # 'Model__learning_rate':['constant','invscaling','adaptive'],
    # 'Model__learning_rate_init':[0.0001,0.001, 0.01, 0.1],
    # 'Model__max_iter':[25, 50, 75, 100],
    # 'Model__momentum':[0.1,0.3,0.5,0.9],
    # ### LGB
    # 'Model__learning_rate':[0.01, 0.1, 0.2, 0.3, 0.4, 0.5],
    # 'Model__max_depth':[25, 50, 75],
    # 'Model__num_leaves':[100,300,500,900,1200],
    # 'Model__n_estimators':[100,200,300,500,800,1000],
    # ### RF
    # 'Model__n_estimators':[100,200,300,500],
    # 'Model__max_depth':[25, 50, 75],
    # 'Model__max_leaf_nodes':[25, 50, 100],
}

In [None]:
# sklearn.metrics.get_scorer_names()

# StratifiedKFold 라벨의 비율을 유지하며 교차검증
kf = 5 #StratifiedKFold(n_splits=5)

grid_search = GridSearchCV(pipeline_grid, param_grid=params,scoring='f1_weighted',n_jobs=-1, cv=kf, refit=True, verbose=2, error_score='raise')
grid_search

In [None]:
grid_search.fit(x_train, y_train)

In [None]:
print('best parameters: ', grid_search.best_params_)
# print('best estimator: ', grid_search.best_estimator_)
print('best score: ', grid_search.best_score_)

In [None]:
em = grid_search.best_estimator_
y_pred = em.predict(x_test)
y_pred_external = em.predict(x_external)

print(">>> Train Accuracy / Weighted F1-Score: {:.4f} / {:.4f}".format(em.score(x_train, y_train), f1_score(y_train, em.predict(x_train), average='weighted')))
print(">>> Test Accuracy / Weighted F1-Score: {:.4f} / {:.4f}".format(em.score(x_test, y_test), f1_score(y_test, em.predict(x_test), average='weighted')))
print(">>> External Accuracy / Weighted F1-Score: {:.4f} / {:.4f}".format(em.score(x_external, y_external), f1_score(y_external, y_pred_external, average='weighted')))

In [None]:
em.named_steps['Selector'].get_feature_name()

In [None]:
print(classification_report(y_test, y_pred, target_names=list(label_dict.values())))

In [None]:
# SAVE PATH 지정 
SAVE_RESULT_PATH = f"{data_dir}/BestGridCV/{network}/{select_func}/{num_imputer}_{num_scaler}_{grid_search.best_params_['Selector__k']}"
# SAVE_RESULT_PATH = f'results/Test'
os.makedirs(SAVE_RESULT_PATH, exist_ok=True)
joblib.dump(em, os.path.join(SAVE_RESULT_PATH, f"best_model.pkl"))

In [None]:
""" Model Results Save """
results_dict = dict()
results_dict['Model'] = network
results_dict['Scaler'] = num_scaler
results_dict['Imputer'] = num_imputer
results_dict['Train Set'] = len(x_train)
results_dict['Train Accuracy'] = round(em.score(x_train, y_train), 4)
results_dict['Train Weighted F1-Score'] = round(f1_score(y_train, em.predict(x_train), average='weighted'), 4)
results_dict['Test Set'] = len(x_test)
results_dict['Test Accuracy'] = round(em.score(x_test, y_test), 4)
results_dict['Test Weighted F1-Score'] = round(f1_score(y_test, y_pred, average='weighted'), 4)
results_dict['Results Table: Test'] = f'\n{classification_report(y_test, y_pred, target_names=list(label_dict.values()))}'
results_dict['External Set'] = len(x_external)
results_dict['External Accuracy'] = round(em.score(x_external, y_external), 4)
results_dict['External Weighted F1-Score'] = round(f1_score(y_external, y_pred_external, average='weighted'), 4)
results_dict['Results Table: External'] = f'\n{classification_report(y_external, y_pred_external, target_names=list(label_dict.values()))}'
results_dict['Best Parameters'] = grid_search.best_params_
results_dict['Selected Features'] = em.named_steps['Selector'].get_feature_name()
results_dict['Number of Selected Features'] = len(em.named_steps['Selector'].get_feature_name())

f = open(os.path.join(SAVE_RESULT_PATH, " results_info.txt"), "w")
f.write(f' --- Results --- \n')
for k, v in results_dict.items():
    f.write(f'[{k}]: {v}\n')
f.close() 

In [None]:
show_confusion_matrix(y_test, y_pred, save_path=SAVE_RESULT_PATH, dataset='Test')
show_confusion_matrix(y_external, y_pred_external, save_path=SAVE_RESULT_PATH, dataset='External')

In [None]:
# show_permutation_importance(em, x_test, y_test, save_path=SAVE_RESULT_PATH, dataset='Test') # 
# show_permutation_importance(em, x_external, y_external, save_path=SAVE_RESULT_PATH, dataset='External') # 

In [None]:
show_roc_curve(em, x_test, y_test, save_path=SAVE_RESULT_PATH, dataset='Test')
show_roc_curve(em, x_external, y_external, save_path=SAVE_RESULT_PATH, dataset='External')