In [None]:
%pip install silence_tensorflow

In [None]:
import silence_tensorflow.auto
import os
import numpy as np
import cv2
import matplotlib.pyplot as plt

import tensorflow as tf
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.layers import *
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping,ReduceLROnPlateau

In [None]:
# Getting numpy data

path_save = os.path.join(os.path.abspath('.'), 'npy')
train_x = np.load(os.path.join(path_save, 'train_img.npy'))
test_x = np.load(os.path.join(path_save, 'test_img.npy'))
train_y = np.load(os.path.join(path_save, 'train_lab.npy'))
test_y = np.load(os.path.join(path_save, 'test_lab.npy'))

print('Data shape')
print(f'Train img: {train_x.shape} | Train lab: {train_y.shape}')
print(f'Test img: {test_x.shape} | Test lab: {test_y.shape}')


In [None]:
# Check sample
n_sample = 5
class_dict = {'normal': 0,
                'abnormal': 1}

for n in range(1, len(train_x), len(train_x)//n_sample):

    plt.subplot(1,2,1)
    plt.title(f'{list(class_dict.keys())[train_y[n].argmax()]}')
    plt.imshow(train_x[n],cmap='gray')
    plt.axis('off')
    plt.show()

In [None]:
def Simple_CNNmodel(input_img, base=32, scale=2, n_layers=6):
    # 입력 이미지 텐서를 x에 할당함
    x = input_img

    # n_layers 수 만큼 CNN 층을 추가함
    for n in range(n_layers):
        # 1번째 Convolutional Layer: 필터 수는 base에 scale을 n번 지수로 적용한 값임
        x = Conv2D(((scale)**n) * base, 3, activation=None, padding='same', kernel_initializer='he_normal')(x)
        # Batch Normalization 적용
        x = BatchNormalization()(x)
        # ReLU 활성화 함수 적용
        x = Activation(activation='relu')(x)

        # 2번째 Convolutional Layer: 필터 수 동일하게 적용함
        x = Conv2D(((scale)**n) * base, 3, activation=None, padding='same', kernel_initializer='he_normal')(x)
        x = BatchNormalization()(x)
        x = Activation(activation='relu')(x)

        # 마지막 층 제외, 각 층의 출력을 절반 크기로 다운샘플링함
        if n != n_layers:
            x = MaxPooling2D(pool_size=(2, 2))(x)
        else:
            pass  # 마지막 층에서 MaxPooling2D 미적용

    
    out = GlobalAveragePooling2D(name="avg_pool")(x)    # Global Average Pooling으로 최종 특징 맵을 하나의 벡터로 변환함

    
    out = Dense(128, activation="relu")(out)    # 완전 연결(Dense) 층을 통해 출력 크기를 128로 줄임
    out = BatchNormalization()(out)

    
    out = Dense(64, activation="relu")(out) # 두 번째 완전 연결(Dense) 층 추가해 출력 크기를 64로 줄임
    out = BatchNormalization()(out)
    
    out = Dropout(0.3)(out)   # Dropout을 적용해 과적합 방지

    # 최종 출력 층: n_class 개수 클래스에 대해 softmax 활성화 함수 적용하여 확률 계산
    out = Dense(n_class, activation="softmax")(out)
    
    # 모델 정의 및 입력과 출력 지정함
    model = Model(inputs=input_img, outputs=out)
    
    return model

In [None]:
# Training classification model using Simple CNN model

path_save = os.path.join(os.path.abspath('.'), 'Result', 'model_simple')
os.makedirs(path_save, exist_ok=True)

## parameter setting
n_class = 2         
imageSize = 512     
lr = 0.0001
epochs = 50
batch = 10
loss_function = "categorical_crossentropy"

## model build
input_img = Input(shape=(imageSize,imageSize,1))

model = Simple_CNNmodel(input_img, n_class)
model.summary()         # Check model structure

## model compile
optimizer = tf.keras.optimizers.Adam(learning_rate=lr)
model.compile(
        optimizer=optimizer, loss=loss_function, metrics=["accuracy"]
    )

In [None]:
## Callback
checkpointer = ModelCheckpoint(filepath=os.path.join(path_save,f'best_model.h5'), 
                                verbose=1, 
                                save_weights_only=True, 
                                save_best_only=True, 
                                monitor='val_loss', 
                                save_freq='epoch')

# ReduceLROnPlateau: Keras에서 제공하는 콜백(callback) 중 하나, 
# 학습률(learning rate)을 동적으로 조정하는 기능을 제공. 
# 주로 학습 중에 검증 손실(validation loss)이 더 이상 개선되지 않을 때 학습률을 감소시키는데 사용
"""
monitor: 모니터링할 지표

factor: 학습률을 줄일 비율 => factor=0.1일 경우 현재 학습률의 10%로 줄입니다.

patience: 학습률 감소 판단 전 지켜볼 에포크(epoch) 수

min_lr: 학습률이 줄어들 수 있는 최소값

min_delta: 감소 판단 기준의 최소 변화량 => 이 값보다 작은 손실 개선은 개선 없음으로 간주

verbose: 출력 여부
"""

reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=10, min_lr=0, min_delta=0.001, verbose=1)
callbacks_list = [reduce_lr, checkpointer]


## training model
model.fit(train_x, train_y,
            steps_per_epoch=len(train_x) // batch, epochs=epochs,
            validation_split=0.2,
            callbacks=callbacks_list,
            shuffle=True
                        )

## saving last model weight
model.save_weights(os.path.join(path_save,f'last_model.h5'))

In [None]:
# Evaluation using test data
## Test data => 검증을 위하여 학습에 활용하지 않은 별도의 데이터
_loss, _acc = model.evaluate(train_x, train_y, batch_size=10, verbose=1)
print('Last model accuracy')
print(f'loss: {_loss:0.3f} | accuracy: {_acc:0.3f}')
_loss, _acc = model.evaluate(test_x, test_y, batch_size=10, verbose=1)
print('Last model accuracy')
print(f'loss: {_loss:0.3f} | accuracy: {_acc:0.3f}')

In [None]:
## Using best model
### 모델 구조에서 학습을 통해 생성된 모델의 가중치 적용
model.load_weights(os.path.join(path_save,f'best_model.h5'))
_loss, _acc = model.evaluate(test_x, test_y, batch_size=10, verbose=1)
print('Best model accuracy')
print(f'loss: {_loss:0.3f} | accuracy: {_acc:0.3f}')

In [None]:
# Checking fail data
test_result = model.predict(test_x, batch_size=1)

for a in range(len(test_result)):
    if test_result[a].argmax()!=test_y[a].argmax():
        plt.title(f'GT: {list(class_dict.keys())[test_y[a].argmax()]} | Result: {list(class_dict.keys())[test_result[a].argmax()]}')
        plt.imshow(test_x[a], cmap='gray')
        plt.show()

 <img src="fig_source/confusion_matrix.jpg"></img>

In [None]:
# Confusion matrix 확인
from sklearn.metrics import roc_curve, auc, confusion_matrix, ConfusionMatrixDisplay
test_th = np.where(test_result > 0.5, 1, 0)

cm = confusion_matrix(test_y.argmax(axis=1), test_result.argmax(axis=1))
disp = ConfusionMatrixDisplay(confusion_matrix=cm,
                              display_labels=['normal', 'abnormal'])
disp.plot()

In [None]:
from sklearn.metrics import classification_report
print(classification_report(test_y.argmax(axis=1), test_result.argmax(axis=1)))

In [None]:
def drawing_ROCcurve(test_y, test_result, path_save):

    def compute_roc_auc(y_true, y_score):
        fpr, tpr, _ = roc_curve(y_true, y_score)
        return fpr, tpr, auc(fpr, tpr)

    n_classes = test_y.shape[-1]
    fpr, tpr, roc_auc = {}, {}, {}

    for i in range(n_classes):
        fpr[i], tpr[i], roc_auc[i] = compute_roc_auc(test_y[:, i], test_result[:, i])

    fpr["micro"], tpr["micro"], roc_auc["micro"] = compute_roc_auc(test_y.ravel(), test_result.ravel())

    all_fpr = np.unique(np.concatenate([fpr[i] for i in range(n_classes)]))
    mean_tpr = np.mean([np.interp(all_fpr, fpr[i], tpr[i]) for i in range(n_classes)], axis=0)

    fpr["macro"], tpr["macro"], roc_auc["macro"] = all_fpr, mean_tpr, auc(all_fpr, mean_tpr)

    plt.figure(figsize=(10, 10))
    plt.plot(fpr["micro"], tpr["micro"], label=f"micro-average (auc = {roc_auc['micro']:.2f})", color="maroon", linestyle=":", linewidth=4)
    plt.plot(fpr["macro"], tpr["macro"], label=f"macro-average (auc = {roc_auc['macro']:.2f})", color="navy", linestyle=":", linewidth=4)

    colors = ['blue', 'yellow', 'green', 'red', 'purple', 'orange', 'pink', 'brown', 'gray', 'cyan']
    for i, color in zip(range(n_classes), colors[:n_classes]):
        plt.plot(fpr[i], tpr[i], lw=2, color=color, label=f"ROC curve (auc = {roc_auc[i]:.2f})")

    plt.plot([0, 1], [0, 1], "k--", lw=2)
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xticks(fontsize=16)
    plt.yticks(fontsize=16)
    plt.xlabel("False Positive Rate", fontsize=20)
    plt.ylabel("True Positive Rate", fontsize=20)
    plt.title("Receiver operating characteristic curves", fontsize=20)
    plt.legend(loc="lower right", fontsize=14)
    plt.savefig(f'{path_save}/ROC_curve.png')

In [None]:
drawing_ROCcurve(test_y, test_result, path_save)