Import

In [2]:
import tensorflow as tf
import tensorflow.keras.backend as K
import os
import time
import argparse
from tensorflow.keras import Model, Sequential, metrics, optimizers
import numpy as np
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from sklearn import metrics
from tensorflow.keras.applications import VGG19
from tensorflow.keras import Model, Sequential
from tensorflow.keras.layers import InputLayer, Conv2D, Flatten, Dense, Conv2DTranspose, Reshape
from tensorflow.keras.layers import Input, Dense, BatchNormalization, Conv2D,\
                                    MaxPooling2D, UpSampling2D, GlobalAveragePooling2D,\
                                    Layer, Lambda, Flatten, Reshape, Conv2DTranspose,\
                                    Activation, LeakyReLU, Dropout, InputLayer, ReLU

from tensorflow.keras import initializers
from tqdm import tqdm_notebook, tqdm
import pandas as pd
import matplotlib.pyplot as plt
import math
from glob import glob
from PIL import Image
import csv
import shutil
tf.random.set_seed(1234)

Dataset 설정

In [None]:
## augmentation and normalization for training
def random_aug(img) :
    lower=0.8
    upper=1.2
    img = tf.image.random_contrast(img, lower, upper)
    img /= 255
    return img

## normalization for validation and test
def test_norm(img) :
    img /= 255
    return img

## dataset 
def load_dataset(batch_size=256):
    ## size
    IMG_HEIGHT = 64
    IMG_WIDTH = 64
    
    ## path 
    ### 하부 dir 구조 : normal/in_distribution_class_name/img_name, novel/out_of_distribution_class_name/img_name
    train_dir = "path/to/train"
    val_dir = "path/to/val"
    test_dir = "path/to/test"

    ## ImageDataGenerator 정의
    ### train은 여러 augmentation, test는 기본 normalization만 적용
    train_image_generator = ImageDataGenerator(preprocessing_function=random_aug, rotation_range=2, width_shift_range=1.0,\
                                   height_shift_range=1.0, brightness_range=(0.8,1.2), zoom_range=0.02,\
                                   fill_mode='nearest', horizontal_flip=True,\
                                   vertical_flip=True)
    
    test_image_generator = ImageDataGenerator(preprocessing_function=test_norm)

    train_dataset = train_image_generator.flow_from_directory(batch_size=batch_size,
                                                             directory=train_dir,
                                                             shuffle=True,
                                                             target_size=(IMG_HEIGHT, IMG_WIDTH),
                                                             class_mode = 'categorical',
                                                             interpolation = 'bicubic'
                                                               )
    
    val_dataset = test_image_generator.flow_from_directory(batch_size=batch_size,
                                                             directory=val_dir,
                                                             shuffle=False,
                                                             target_size=(IMG_HEIGHT, IMG_WIDTH),
                                                             class_mode = 'categorical',
                                                             interpolation = 'bicubic'
                                                               )

    test_dataset = test_image_generator.flow_from_directory(batch_size=batch_size,
                                                            directory=test_dir,
                                                            shuffle=False,
                                                            target_size=(IMG_HEIGHT, IMG_WIDTH),
                                                            class_mode = 'categorical',
                                                            interpolation = 'bicubic'
                                                           )

    ## normal, novel로 class 2개
    nb_classes = 2 
    return train_dataset, val_dataset, test_dataset

Display function

In [None]:
def display_image(image_set, recon_set) :
    fig=plt.figure(figsize=(25, 25))
    for i, a in enumerate(image_set) :
        fig.add_subplot(10, 10, 2*i+1)
        plt.imshow(a)
        fig.add_subplot(10, 10, 2*i+2)
        plt.imshow(recon_set[i])
    plt.show()

Loss 정의

In [None]:
## KLD 정의
kl = tf.keras.losses.KLDivergence()

## Contextual Loss 정의
def contextual_loss(x1, y1, loss_type, h=0.1, train=True):
    cx_loss = 0 if train else []
    for x, y in zip(x1, y1):
        if loss_type == 'cl_cosine':
            d = cosine_dist(x, y)
        elif loss_type == 'cl_l2':
            d = l2_dist(x, y)
        else:
            d = l1_dist(x, y)
            
        d_min = K.min(d, axis=2, keepdims=True)        # (N, H*W, 1)
        # Eq (2)
        d_tilde = d / (d_min + 1e-5)
        # Eq(3)
        w = K.exp((1 - d_tilde) / h)
        # Eq(4)
        cx_ij = w / K.sum(w, axis=2, keepdims=True)       # (N, H*W, H*W)
        # Eq (1)
        cx = K.mean(K.max(cx_ij, axis=1), axis=1)  # (N, )
        if train:
            cx_loss += K.mean(-K.log(cx))
        else:
            cx_loss.append(-K.log(cx))    
    return cx_loss

def cosine_dist(x, y):
    N, H, W, C = x.shape
    
    x_vec = tf.transpose(x, perm=[0,3,1,2])
    y_vec = tf.transpose(y, perm=[0,3,1,2])
    
    y_mu = K.mean(y_vec, axis=[0, 2, 3], keepdims=True)
    
    x_centered = x_vec - y_mu
    y_centered = y_vec - y_mu
    
    x_normalized = x_centered / tf.norm(x_centered, ord=2, axis=1, keepdims=True)
    y_normalized = y_centered / tf.norm(y_centered, ord=2, axis=1, keepdims=True)

    ############
    x_normalized = tf.reshape(x_normalized, [N, C, -1])                                # (N, C, H*W)
    y_normalized = tf.reshape(y_normalized, [N, C, -1])                                # (N, C, H*W)

    x_normalized = tf.transpose(x_normalized, perm=[0,2,1])
    
    cosine_sim = tf.matmul(x_normalized, y_normalized)

    d = 1 - cosine_sim 
    
    return d

def l2_dist(x, y):
    N, H, W, C = x.shape
    
    x_vec = tf.transpose(x, perm=[0,3,1,2])
    y_vec = tf.transpose(y, perm=[0,3,1,2])

    x_vec = tf.reshape(x_vec, [N, C, -1])
    y_vec = tf.reshape(y_vec, [N, C, -1])
    
    ### TODO ###
#     x_s = torch.sum(x_vec ** 2, dim=1, keepdim=True)
#     y_s = torch.sum(y_vec ** 2, dim=1, keepdim=True)
    
    x_s = K.sum(x_vec ** 2, axis=1, keepdims=True)
    y_s = K.sum(y_vec ** 2, axis=1, keepdims=True)
    ############
    
    A = tf.transpose(y_vec, perm=[0,2,1]) @ x_vec
    B = tf.transpose(x_s, perm=[0,2,1])
    
    dist = y_s - 2 * A + B
    dist = tf.transpose(dist, perm=[0,2,1])
    dist = K.clip(dist, min_value=0., max_value=100000)
    return dist

def l1_dist(x, y):
    N, H, W, C = x.shape
    
    x_vec = tf.transpose(x, perm=[0,3,1,2])
    y_vec = tf.transpose(y, perm=[0,3,1,2])

    x_vec = tf.reshape(x_vec, [N, C, -1])
    y_vec = tf.reshape(y_vec, [N, C, -1])

    dist = tf.expand_dims(x_vec, axis=2) - tf.expand_dims(y_vec, axis=3)

    dist = K.sum(K.abs(dist), axis=1)

    dist = tf.transpose(dist, perm=[0,2,1])

    dist = K.clip(dist, min_value=0., max_value=100000)
    
    return dist

Test 함수 정의

In [None]:
def diff(test_dataset, model, loss_type, selected_pm_layers):
    diffs_total = [[] for _ in selected_pm_layers]
    labels =[] 
    diffs = []
    file_path = test_dataset.filepaths
    crit = contextual_loss   

    for _ in tqdm(range(len(test_dataset))):
        image_batch, label_batch = test_dataset.next()
        
        mean, log_var, latent_z, recon = model(image_batch, training=False)
        
        targets = model.classifier(image_batch)
        recon_features = model.classifier(recon)
        
        temp = crit(recon_features, targets, train=False, loss_type=loss_type)
        
        for diffs_tmp, tmp in zip(diffs_total, temp):
            diffs_tmp.append(tmp) 
        labels.extend(np.argmax(label_batch, axis=1).tolist())
    
    display_image(image_batch, recon)
    
    diffs = [K.concatenate(i, axis=0) for i in diffs_total]
    
    return diffs, labels, file_path
    
def testing(test_diff, label, file_path):
    from collections import defaultdict
    loss = 0
    for i in test_diff:
        loss += i
    if len(set(label)) > 1 :
        fprs, tprs, threshold = metrics.roc_curve(label, loss, pos_label=1)
        auc, tnr_at_tpr98, th_loss = get_curve(loss,label,normal_label=0)
        print('threshold : ', th_loss)
        FN_path=''
        FP_path=''
        FP_metric = defaultdict(list)
        loss_count = 0 
        for i_path, i_loss in zip(file_path, loss.numpy()):
            if i_loss >= th_loss and i_path.split('/')[-3]=='normal' :
                FN_path += (i_path+'\n\n')
                loss_count+=1
            elif i_loss < th_loss and i_path.split('/')[-3]=='novel' :
                FP_path += (i_path+'\n\n')
                FP_metric[i_path.split('/')[-2]].append(i_path.split('/')[-1])
        print('과검 :',loss_count)
        for ng_name in FP_metric.keys() :
            print('미검 '+ng_name, len(FP_metric[ng_name]) )
        auc_ori = metrics.auc(fprs, tprs)
        auroc_odin = metrics.roc_auc_score(label, loss)
        aupr_in = metrics.average_precision_score(label,loss)
        aupr_out = metrics.average_precision_score(-1 * np.array(label) + 1, 1. - loss)
    else : 
        auc_ori, auc, tnr_at_tpr98, auroc_odin, aupr_in, aupr_out = 0, 0, 0, 0, 0, 0
    val_loss = np.mean(loss)   
    return auc_ori, auc, tnr_at_tpr98, auroc_odin, aupr_in, aupr_out, val_loss

## normal, novel loss 구분
def normal_novel(loss, label, normal_label=1):
    loss = np.array(loss)
    label = np.array(label)
    check = (label == normal_label)
    normal_loss = loss[check]
    novel_loss = loss[~check]
    return normal_loss, novel_loss

## auroc 및 성능 지표 계산 함수
def get_curve(loss, label, normal_label=1):
    normal, novel = normal_novel(loss, label, normal_label)
    tp, fp, tnr_at_tpr98 = [], [], []
    normal, novel = np.sort(normal)[::-1], np.sort(novel)[::-1] # 역순으로 변경
    end = np.max([np.max(normal), np.max(novel)])
    normal = end - normal # 추가 해야함
    novel = end - novel    # 추가 해야함
    start = np.min([np.min(normal), np.min(novel)])
    num_k, num_n = normal.shape[0], novel.shape[0]
    recall_k = 0.98
    pred_k_num = math.ceil(num_k*recall_k)
    threshold_loss = (end-normal[-pred_k_num] + end-normal[-pred_k_num-1])/2
    
    tp = -np.ones([num_k+num_n+1], dtype=int)
    fp = -np.ones([num_k+num_n+1], dtype=int)
    tp[0], fp[0] = num_k, num_n
    k, n = 0, 0
    for l in range(num_k+num_n):
        if k == num_k:
            tp[l+1:] = tp[l]
            fp[l+1:] = np.arange(fp[l]-1, -1, -1)
            break
        elif n == num_n:
            tp[l+1:] = np.arange(tp[l]-1, -1, -1)
            fp[l+1:] = fp[l]
            break
        else:
            if novel[n] < normal[k]:
                n += 1
                tp[l+1] = tp[l]
                fp[l+1] = fp[l] - 1
            else:
                k += 1
                tp[l+1] = tp[l] - 1
                fp[l+1] = fp[l]
    tpr95_pos = np.abs(tp / num_k - recall_k).argmin()
    tnr_at_tpr98 = 1. - fp[tpr95_pos] / num_n
    tpr = tp/num_k
    fpr = fp/num_n
    from sklearn import metrics
    auroc = metrics.auc(fpr, tpr)
    return auroc, tnr_at_tpr98, threshold_loss

모델 정의

In [None]:
class DFCVAE(tf.keras.Model):
    def __init__(self, latent_dim: int, net_type: str='conv', selected_pm_layers=[]):
        super(DFCVAE, self).__init__()
        self.latent_dim = latent_dim
        assert net_type in ['simple', 'conv']
#         inputs = tf.keras.Input(shape=(64, 64, 3))
        self.encoder_conv1 = Conv2D(32, 4, strides=(2, 2), padding="same", kernel_initializer=init1,bias_initializer=initializers.Zeros())
        self.encoder_bn1 = BatchNormalization(momentum=0.9, epsilon=1e-5)

        self.encoder_conv2 = Conv2D(64, 4, strides=(2, 2), padding="same", kernel_initializer=init1,bias_initializer=initializers.Zeros())
        self.encoder_bn2 = BatchNormalization(momentum=0.9, epsilon=1e-5)
        
        self.encoder_conv3 = Conv2D(128, 4, strides=(2, 2), padding="same", kernel_initializer=init1,bias_initializer=initializers.Zeros())
        self.encoder_bn3 = BatchNormalization(momentum=0.9, epsilon=1e-5)

        self.encoder_conv4 = Conv2D(256, 4, strides=(2, 2), padding="same", kernel_initializer=init1,bias_initializer=initializers.Zeros())
        self.encoder_bn4 = BatchNormalization(momentum=0.9, epsilon=1e-5)
    
        self.z_mean = Dense(self.latent_dim, name="z_mean")
        self.z_log_var = Dense(self.latent_dim, name="z_log_var")
        
        self.decoder_dense = Dense(4096)
        self.decoder_conv1 = Conv2D(128, 3, strides=1, padding='same', kernel_initializer=init1,bias_initializer=initializers.Zeros())
        self.decoder_bn1 = BatchNormalization(momentum=0.9, epsilon=1e-3)
        
        self.decoder_conv2 = Conv2D(64, 3, strides=1, padding='same', kernel_initializer=init1,bias_initializer=initializers.Zeros())
        self.decoder_bn2 = BatchNormalization(momentum=0.9, epsilon=1e-3)
        
        self.decoder_conv3 = Conv2D(32, 3, strides=1, padding='same', kernel_initializer=init1,bias_initializer=initializers.Zeros())
        self.decoder_bn3 = BatchNormalization(momentum=0.9, epsilon=1e-3)

        self.decoder_conv4 = Conv2D(3, 3, strides=1, padding='same', activation='sigmoid', kernel_initializer=init1,bias_initializer=initializers.Zeros())
        self.classifier = self.make_classifier(selected_pm_layers)
    
    def call(self, input_tensor, training=False):
        x = self.encoder_conv1(input_tensor)
        x = LeakyReLU(0.2)(x)
        x = self.encoder_bn1(x, training=training)
        
        x = self.encoder_conv2(x)
        x = LeakyReLU(0.2)(x)
        x = self.encoder_bn2(x, training=training)
        
        x = self.encoder_conv3(x)
        x = LeakyReLU(0.2)(x)
        x = self.encoder_bn3(x, training=training)
        
        x = self.encoder_conv4(x)
        x = LeakyReLU(0.2)(x)
        x = self.encoder_bn4(x, training=training)
        
        x = Flatten()(x)
        z_mean = self.z_mean(x)
        z_log_var = self.z_log_var(x)
        z = self.reparameterize(z_mean, z_log_var, training)
        
        x = self.decoder_dense(z)
        x = ReLU()(x)
        
        x = Reshape((4, 4, 256))(x)
        
        x = UpSampling2D((2, 2), interpolation='nearest')(x)
        x = self.decoder_conv1(x)
        x = LeakyReLU(0.2)(x)
        x = self.decoder_bn1(x, training=training)
        
        x = UpSampling2D((2, 2), interpolation='nearest')(x)
        x = self.decoder_conv2(x)
        x = LeakyReLU(0.2)(x)
        x = self.decoder_bn2(x, training=training)

        x = UpSampling2D((2, 2), interpolation='nearest')(x)
        x = self.decoder_conv3(x)
        x = LeakyReLU(0.2)(x)
        x = self.decoder_bn3(x, training=training)

        x = UpSampling2D((2, 2), interpolation='nearest')(x)
        x = self.decoder_conv4(x)

        return z_mean, z_log_var, z, x
    
    
    def make_classifier(self, selected_pm_layers):
        pm = VGG19(include_top=False, weights='imagenet')

        outputs = [pm.get_layer(l).output for l in selected_pm_layers]
        classifier = Model(pm.input, outputs)
    
        return classifier
    
    def reparameterize(self, mean, logvar, training):
        if training :
            eps = tf.random.normal(shape=tf.shape(mean))
        else :
            eps = tf.random.normal(shape=tf.shape(mean), seed=0)
        return eps * tf.exp(logvar * .5) + mean 

Hyper-parameter 설정

In [None]:
loss_type = "cl_cosine" ## loss type in ['cl_cosine', 'cl_l1', 'cl_l2']
latent_dim = 100 ## num of latent dim
num_epochs = 200 ## num of epoch
lr = 0.001 ## learning rate
batch_size = 32 ## batch size
log_dir = 'test_model' ## model file

## parameter initializers
init1 = initializers.he_normal()
init2 = initializers.RandomNormal(stddev=0.015)

## feature extractor 
selected_pm_layers = ['block4_conv1', 'block5_conv1']
model = DFCVAE(latent_dim, net_type='conv', selected_pm_layers=selected_pm_layers)
model.classifier.trainable = False
optimizer = tf.keras.optimizers.Adam(lr)
crit = contextual_loss
os.environ['TF_FORCE_GPU_ALLOW_GROWTH'] = 'true'

Data Load

In [None]:
train_dataset, val_dataset, test_dataset = load_dataset(batch_size=batch_size)

Training

In [None]:
best_tnr_tpr98 = 0
min_val_loss = 100
best_epoch = 0
for epoch in range(1, num_epochs + 1):
    t = time.time()
    kl_loss_list = []
    rc_loss_list = []
    
    for _ in tqdm(range(len(train_dataset))):
        train_x = train_dataset.next()[0]
        with tf.GradientTape() as tape:
            mean, log_var, z, reconstruction = model(train_x, training=True)
            h1_list = model.classifier(train_x)
            h2_list = model.classifier(reconstruction)

            kl_loss = kl(tf.fill([z.shape[0], z.shape[1]],1.0),tf.nn.softmax(z,axis=1)) / z.shape[1]
            rc_loss = crit(h2_list, h1_list, train=True, loss_type=loss_type)
            total_loss = kl_loss + rc_loss
        grad = tape.gradient(total_loss, model.trainable_variables)
        optimizer.apply_gradients(zip(grad, model.trainable_variables))

        kl_loss_list.append(kl_loss.numpy())
        rc_loss_list.append(rc_loss.numpy())
    kl_last_loss = np.mean(kl_loss_list)
    rc_last_loss = np.mean(rc_loss_list)
    display_image(train_x, reconstruction)
    
    print('Epoch {}, Loss1: {}, Loss2: {}, Remaining Time at Epochs: {:.2f}'.format(
            epoch, kl_last_loss, rc_last_loss, time.time() - t
        ))
    
    ## validation 
    test_diff, label, file_path = diff(val_dataset, model, loss_type, selected_pm_layers)
    auc_ori, auc, tnr_at_tpr98, auroc_odin, aupr_in, aupr_out, val_loss = testing(test_diff, label, file_path)
    if val_loss < min_val_loss :
        min_val_loss = val_loss
        best_epoch = epoch
    print(best_epoch)
    model.save("./results/"+log_dir+"_oodd-{:03d}".format(epoch))
    print('-------------validation_result--------------')
    print('auc_ori',auc_ori)
    print('auc',auc)
    print('tnr_at_tpr98',tnr_at_tpr98)
    print('aupr_in',aupr_in)
    print('aupr_out',aupr_out)
    print('val_loss',val_loss)

    ## test
    test_diff, label, file_path = diff(test_dataset, model, loss_type, selected_pm_layers)
    auc_ori, auc, tnr_at_tpr98, auroc_odin, aupr_in, aupr_out, test_loss = testing(test_diff, label, file_path)
    print('-------------test_result--------------')
    print('auc_ori',auc_ori)
    print('auc',auc)
    print('tnr_at_tpr98',tnr_at_tpr98)
    print('aupr_in',aupr_in)
    print('aupr_out',aupr_out)
    print('test_loss',test_loss)
    print(best_epoch, min_val_loss)
    if best_tnr_tpr98 < tnr_at_tpr98 :
        best_tnr_tpr98 = tnr_at_tpr98
    
model.save("./results/"+log_dir+"_oodd_last")

Test

In [None]:
loaded_model = tf.keras.models.load_model('path/to/model')

In [None]:
test_dataset.reset()
tf.random.set_seed(1234)
np.random.seed(0)
test_diff, label, file_path = diff(test_dataset, loaded_model, loss_type, selected_pm_layers)
auc_ori, auc, tnr_at_tpr98, auroc_odin, aupr_in, aupr_out, test_loss = testing(test_diff, label, file_path)
print('auc_ori',auc_ori)
print('auc',auc)
print('tnr_at_tpr98',tnr_at_tpr98)
print('aupr_in',aupr_in)
print('aupr_out',aupr_out)
print('test_loss',test_loss)