In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import datetime as pydatetime
import os

from sklearn.tree import DecisionTreeClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, accuracy_score, precision_score, recall_score, roc_auc_score, confusion_matrix,f1_score,roc_curve
from sklearn import tree
import joblib



def classification_decisiontree(trainX, testX, trainY, testY, max_depth=None, min_samples_leaf=1, decision_boundary=0.5):
    """

    Parameters
    ----------
    trainX : array
        Train X.
    testX : array
        Test X.
    trainY : array
        Train Y.
    testY : array
        Test Y.
    criterion : {'gini', 'entropy', 'log_loss'}, default = "gini"
        의사 결정 나무의 분기 기준.
    max_depth : int, optional, default = None
        의사 결정 나무의 최대 깊이. 의사 결정 나무가 최대 깊이에 도달하면 더 이상 분기하지 않음.
    min_samples_leaf : int or float, optional, default = 2
        노드 별 최소 샘플 갯수, 터미널 노드안에 샘플 수가 min_samples_split과 같아지면 더 이상 분기하지 않음.

    Returns
    -------
    model :
        분류 의사결정나무 모델.
    confusionMatrix : array
        분류 성능 평가를 위한 Confusion Matrix
    fpr : array
        False Positive Rate
    tpr : array
        True Positive Rate

    """

    if isinstance(trainX, pd.DataFrame):
        trainX = trainX.values
    if isinstance(testX, pd.DataFrame):
        testX = testX.values
    if isinstance(trainY, pd.DataFrame):
        trainY = trainY.values
    if isinstance(testY, pd.DataFrame):
        testY = testY.values

    # Decision Tree Model
    model = DecisionTreeClassifier(criterion='gini', max_depth=max_depth, min_samples_leaf=min_samples_leaf)
    model.fit(trainX, trainY)
    
    anomalyScore = model.predict_proba(testX)
    
    pred = np.array([], dtype=np.int32)
    for i in range(len(anomalyScore)):
        if (anomalyScore[i][1] > decision_boundary) == True:
            pred = np.append(pred, 1)
        else:
            pred = np.append(pred, 0)
            
    # Decision Tree 모델 Pickle 파일로 저장
    saveModel = joblib.dump(model, 'DT.pkl')

    fpr_array = np.array([], dtype=np.int32)
    tpr_array = np.array([], dtype=np.int32)

    for i in range(1, 100):
        predd = np.array([], dtype=np.int32)
        for j in range(len(anomalyScore)):
            decision_boundary = i / 100
            if (anomalyScore[j][1] > decision_boundary) == True:
                predd = np.append(predd, 1)
            else:
                predd = np.append(predd, 0)

        cofMatt = confusion_matrix(testY, predd, labels=[1, 0])  # 1: 불량, 0: 정상

        ##confusion matrix의 각 요소
        auc_tpr = cofMatt[1, 1] / (cofMatt[1, 1] + cofMatt[1, 0])
        auc_fpr = cofMatt[0, 1] / (cofMatt[0, 0] + cofMatt[0, 1])
        fpr_array = np.append(fpr_array, auc_fpr)
        tpr_array = np.append(tpr_array, auc_tpr)

    auc = roc_auc_score(testY, pred)
    cofMat = confusion_matrix(testY, pred, labels=[1, 0])  # 1: 불량, 0: 정상
    tpr = cofMat[1, 1] / (cofMat[1, 1] + cofMat[1, 0])
    fpr = cofMat[0, 1] / (cofMat[0, 0] + cofMat[0, 1])

    anomalyScore = np.round(anomalyScore[:, 1], 3)

    return {"AnomalyScore":anomalyScore, "Prediction":pred, "Aucscore": str(auc), "Fpr": fpr, "Tpr": tpr,"Fprarray": fpr_array, "Tprarray": tpr_array}
         
def classification_model_load_DT(model, testX, decision_boundary=0.5):
    
    model_wd = 'DT.pkl'
    model = joblib.load(model_wd)
    
    anomalyScore = model.predict_proba(testX)
    
    pred = np.array([], dtype=np.int32)
    for i in range(len(anomalyScore)):
        if (anomalyScore[i][1] > decision_boundary) == True:
            pred = np.append(pred, 1)
        else:
            pred = np.append(pred, 0)
            
    return {"AnomalyScore":np.round(anomalyScore[:,1],3), "Prediction": pred}


In [None]:
# 클래스는 무조건 1,0으로 설정 (이진분류만 가능)
df = pd.read_csv('/workspace/xnsolution/data/1_Iris_virginica.csv', encoding='euc-kr')

df['label'][np.where(df['label'] == 'outlier')[0]] = 1
df['label'][np.where(df['label'] == 'target')[0]] = 0

X = df.iloc[:,0:4]
y = df.iloc[:,4]
y= y.astype('category')

trainX, testX, trainY, testY = train_test_split(X,y, test_size=0.3)

model = classification_decisiontree(trainX, testX, trainY, testY,max_depth=None, decision_boundary=0.5)
load_model = classification_model_load_DT('RF.pkl', testX, decision_boundary=0.5)