In [1]:
import keras
from keras.models import load_model
from keras.utils.generic_utils import CustomObjectScope
from keras.utils import to_categorical
from keras import backend as K
import tensorflow as tf
from keras import applications
from keras.layers import Input, GlobalAveragePooling2D, Activation, Dense
from keras.models import Model
from keras import callbacks
from keras.optimizers import Adam, SGD
from sklearn import metrics
from sklearn.neighbors import LocalOutlierFactor
from sklearn.preprocessing import MinMaxScaler
import numpy as np
import os
import glob
import cv2
from PIL import Image
import matplotlib.pyplot as plt

%matplotlib inline

os.environ["CUDA_VISIBLE_DEVICES"]="0"#specify GPU

image_size_width, image_size_height = (299, 299)

#train_normal
folder = ["NORMAL_train"]
x_train_normal = []
y_train_normal  = []
for index, name in enumerate(folder):
    dir = "./" + name
    files = glob.glob(dir + "/*.jpeg")
    for i, file in enumerate(files):
        image = Image.open(file)
        image = image.convert("RGB")#("L")
        image = image.resize((image_size_width, image_size_height))
        data = np.asarray(image)
        x_train_normal.append(data)
        y_train_normal.append(index)

x_train_normal  = np.array(x_train_normal )
y_train_normal  = np.array(y_train_normal )
print('x_train_normal.shape', x_train_normal.shape)
print('y_train_normal.shape', y_train_normal.shape)

Using TensorFlow backend.


kaggle_x_train_normal.shape (40912, 299, 299, 3)
kaggle_y_train_normal.shape (40912,)


In [2]:
#test_normal
folder = ["NORMAL_test"]
x_test_normal = []
y_test_normal  = []
for index, name in enumerate(folder):
    dir = "./" + name
    files = glob.glob(dir + "/*.jpeg")
    for i, file in enumerate(files):
        image = Image.open(file)
        image = image.convert("RGB")#("L")
        image = image.resize((image_size_width, image_size_height))
        data = np.asarray(image)
        x_test_normal .append(data)
        y_test_normal .append(index)

x_test_normal  = np.array(x_test_normal )
y_test_normal  = np.array(y_test_normal )
print('x_test_normal.shape', x_test_normal.shape)
print('y_test_normal.shape', y_test_normal.shape)

#test_anomaly
folder = ["CNV_test", "DME_test", "DRUSEN_test"]
x_test_anomaly = []
y_test_anomaly  = []
for index, name in enumerate(folder):
    dir = "./" + name
    files = glob.glob(dir + "/*.jpeg")
    for i, file in enumerate(files):
        image = Image.open(file)
        image = image.convert("RGB")#("L")
        image = image.resize((image_size_width, image_size_height))
        data = np.asarray(image)
        x_test_anomaly.append(data)
        y_test_anomaly.append(index)

x_test_anomaly  = np.array(x_test_anomaly)
y_test_anomaly  = np.array(y_test_anomaly)
print('x_test_anomaly.shape', x_test_anomaly.shape)
print('y_test_anomaly.shape', y_test_anomaly.shape)

x_test_normal.shape (250, 299, 299, 3)
y_test_normal.shape (250,)
x_test_anomaly.shape (750, 299, 299, 3)
y_test_anomaly.shape (750,)


In [None]:
#Select the best model
for j in range(1, 11):
    model = load_model(f'model.DenseNet201.ep{j:02}.h5', compile=False)
    modelA = Model(inputs=model.inputs, outputs=model.layers[-2].output)
    print('validating model %2d ...' %j)

    train_a = modelA.predict(x_train_normal)
    test_normalA = modelA.predict(x_test_normal)
    test_anomalyA = modelA.predict(x_test_anomaly)
    #print('train_a_predict', train_a)
    #print('test_normalA', test_normalA)
    #print('test_anomalyA', test_anomalyA)

    train_a = train_a.reshape((len(train_a),-1))
    test_normalA = test_normalA.reshape((len(test_normalA),-1))
    test_anomalyA = test_anomalyA.reshape((len(test_anomalyA),-1))
    #print('train_a.len', train_a.shape)
    #print('test_normalA.len', test_normalA.shape)
    #print('test_anomalyA.len', test_anomalyA.shape)

    ms = MinMaxScaler()
    train_a = ms.fit_transform(train_a)
    test_normalA = ms.transform(test_normalA)
    test_anomalyA = ms.transform(test_anomalyA)
    #print('train_a after ms', train_a)
    #print('test_normalA after ms', test_normalA)
    #print('test_anomalyA after ms', test_anomalyA)

    clf_a = LocalOutlierFactor(n_neighbors=20, novelty = False)
    a = clf_a.fit(train_a[:5000])

    Z1_A_a = []
    for i in range(len(x_test_normal)):
        Z1_a = -clf_a._decision_function(test_normalA[[i]]) 
        Z1_A_a.append(Z1_a)
        

    Z2_A_a = []
    for i in range(len(x_test_anomaly)):
        Z2_a = -clf_a._decision_function(test_anomalyA[[i]]) 
        Z2_A_a.append(Z2_a)
        
    Z1_A = np.reshape(Z1_A_a, -1)
    Z2_A = np.reshape(Z2_A_a, -1)
       
    y_true = np.zeros(len(Z1_A)+len(Z2_A))
    y_true[len(Z1_A):] = 1#0:normal、1：abnormal


    # calculate FPR, TPR(, threshold) 
    fpr, tpr, _ = metrics.roc_curve(y_true, np.hstack((Z1_A, Z2_A)), pos_label=1)

    # AUC
    auc = metrics.auc(fpr, tpr)
    
    #FPR＠TPR = 1.0
    alpha = fpr[np.argmax(tpr)]

    max_tnr, max_tpr = 0, 0
    for k in range(len(fpr)):
        if max_tnr + max_tpr < 1-fpr[k] + tpr[k]:
            max_tnr = 1-fpr[k]
            max_tpr = tpr[k]
            
    plt.plot(fpr, tpr, label='(AUC = %.11f)'%(auc))
    plt.legend()
    plt.title(name + '(ROC)')
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.grid(True)
        
    if auc < 1.0:
        print('current AUC is %.11f' %auc)
        print('current alpha is %.11f' %alpha)
            
    else:
        print('finishing the validation since AUC reached 1.0')
        print('alpha is %.11f' %alpha)
        break