In [None]:
from keras.layers import Input,Dense,Lambda,BatchNormalization,Activation,Flatten,Reshape
from keras.layers.convolutional import Conv2D,Conv2DTranspose
from keras.models import Model
from keras import backend as K
from keras import objectives,metrics

import numpy as np
import tensorflow as tf
from tensorflow.keras.preprocessing import image
import matplotlib.pyplot as plt
import matplotlib.colors as colors 
from sklearn import metrics
import cv2
import os

# 画像の切り出し
元の画像から一部を切り取って学習データを作成します。
ここでは、256×256サイズから11×11サイズを切り取り、1万枚を学習データとして用意しました。

In [None]:
def cut_img(x,number,height=11,width=11):
    x_out=[]
    for i in range(number):
        shape_0=np.random.randint(0,x.shape[0])
        shape_1=np.random.randint(0,x.shape[1]-height)
        shape_2=np.random.randint(0,x.shape[2]-width)
        temp=x[shape_0,shape_1:shape_1+height,shape_2:shape_2+width,0]
        x_out.append(temp.reshape(11,11,1))
    x_out=np.array(x_out)
    return x_out

# データの作成
学習用データ：391枚
正常のテスト用データ：40枚
異常(crack,cut,hole,print)のテスト用データ：62枚

In [None]:
path_1=os.listdir(".../anomaly_detection/hazelnut/train/good")
path_2=os.listdir(".../anomaly_detection/hazelnut/test/good")
path_3=os.listdir(".../anomaly_detection/hazelnut/test/bad")

file_1=".../anomaly_detection/hazelnut/train/good/"
file_2=".../anomaly_detection/hazelnut/test/good/"
file_3=".../anomaly_detection/hazelnut/test/bad/"

x_train_normal=[]
x_test_normal=[]
x_test_anomaly=[]

#学習用データの抽出
for i in path_1:
    img_1=image.load_img(file_1+i,grayscale=True,target_size=(256,256))
    x_train=image.img_to_array(img_1)
    x_train=x_train.astype("float32")/255
    x_train_normal.append(x_train)
x_train_normal=np.array(x_train_normal)
x_train_normal=cut_img(x_train_normal,10000)
#print(x_train_normal.shape)

#正常のテスト用データの抽出
for i in path_2:
    img_2=image.load_img(file_2+i,grayscale=True,target_size=(256,256))
    x_test_good=image.img_to_array(img_2)
    x_test_good=x_test_good.astype("float32")/255
    x_test_normal.append(x_test_good)
x_test_normal=np.array(x_test_normal)
test_normal=x_test_normal[np.random.randint(len(x_test_normal))]
test_normal=test_normal.reshape(1,256,256,1)
#print(test_normal.shape)

#異常のテスト用データの抽出
for i in path_3:
    img_3=image.load_img(file_3+i,grayscale=True,target_size=(256,256))
    x_test_bad=image.img_to_array(img_3)
    x_test_bad=x_test_bad.astype("float32")/255
    x_test_anomaly.append(x_test_bad)
x_test_anomaly=np.array(x_test_anomaly)
test_anomaly=x_test_anomaly[np.random.randint(len(x_test_anomaly))]
test_anomaly=test_anomaly.reshape(1,256,256,1)
#print(test_anomaly.shape)

# パラメータの設定

In [None]:
batch_size=100
input_shape=(11,11,1)
latent_dim=2
Nc=16
epochs=30
epsilon_std=1.0

# VAEモデルの構築

In [None]:
#サンプリング関数の作成
def sampling(args):
    z_mean,z_log_var=args
    epsilon=K.random_normal(shape=(K.shape(z_mean)[0],latent_dim),mean=0,stddev=epsilon_std)
    return z_mean+K.exp(0.5*z_log_var)*epsilon

In [None]:
#エンコーダの作成
inputs=Input(shape=input_shape,name="encoder_input")
x=Conv2D(Nc,kernel_size=2,strides=2)(inputs)
x=BatchNormalization()(x)
x=Activation("relu")(x)
x=Conv2D(Nc*2,kernel_size=2,strides=2)(x)
x=BatchNormalization()(x)
x=Activation("relu")(x)
x=Flatten()(x)

z_mean = Dense(latent_dim, name='z_mean')(x)
z_log_var = Dense(latent_dim, name='z_log_var')(x)
z = Lambda(sampling, output_shape=(latent_dim,), name='z')([z_mean, z_log_var])
 
encoder = Model(inputs, [z_mean, z_log_var, z], name='encoder')
encoder.summary()

In [None]:
#デコーダの作成
latent_inputs=Input(shape=(latent_dim,),name="z_sampling")
x=Dense(2*2)(latent_inputs)
x=BatchNormalization()(x)
x=Activation("relu")(x)
x=Reshape((2,2,1))(x)
x=Conv2DTranspose(2*Nc,kernel_size=2,strides=2)(x)
x=BatchNormalization()(x)
x=Activation("relu")(x)
x=Conv2DTranspose(Nc,kernel_size=2,strides=2)(x)
x=BatchNormalization()(x)
x=Activation("relu")(x)

x1=Conv2DTranspose(1,kernel_size=4)(x)
x1=BatchNormalization()(x1)
out1=Activation("sigmoid")(x1)

x2=Conv2DTranspose(1,kernel_size=4)(x)
x2=BatchNormalization()(x2)
out2=Activation("sigmoid")(x2)

decoder=Model(latent_inputs,[out1,out2],name="decoder")
decoder.summary()

In [None]:
#モデルをまとめる
outputs_mu,outputs_sigma2=decoder(encoder(inputs)[2])
vae=Model(inputs,[outputs_mu,outputs_sigma2],name="vae_mlp")
vae.summary()

In [None]:
#損失関数の定義
m_vae_loss=(K.flatten(outputs_mu)-K.flatten(inputs))**2/K.flatten(outputs_sigma2)
m_vae_loss=0.5*K.sum(m_vae_loss)

a_vae_loss=K.log(2*3.14*K.flatten(outputs_sigma2)+1e-7)
a_vae_loss=0.5*K.sum(a_vae_loss)

kl_loss=1+z_log_var-K.square(z_mean)-K.exp(z_log_var)
kl_loss=-0.5*K.sum(kl_loss,axis=-1)

vae_loss=K.mean(kl_loss+a_vae_loss+m_vae_loss)

vae.add_loss(vae_loss)
vae.compile(optimizer="adam")

vae.fit(x_train_normal,epochs=epochs,batch_size=batch_size)

# ヒートマップの作成
256×256サイズの画像に11×11サイズの小窓を上下左右に走らせ、異常スコアを累積させていく。

In [None]:
#ヒートマップの計算
def evaluate_img(model, x_normal, x_anomaly, name, height=11, width=11, move=5):  
    img_normal = np.zeros((x_normal.shape))
    img_anomaly = np.zeros((x_normal.shape))
    
    for i in range(int((x_normal.shape[1]-height)/move)+1):
        for j in range(int((x_normal.shape[2]-width)/move)+1):
            x_sub_normal = x_normal[0, i*move:i*move+height, j*move:j*move+width, 0]
            x_sub_anomaly = x_anomaly[0, i*move:i*move+height, j*move:j*move+width, 0]
            x_sub_normal = x_sub_normal.reshape(1, height, width, 1)
            x_sub_anomaly = x_sub_anomaly.reshape(1, height, width, 1)
            
            #従来手法
            if name == "old_":
                normal_score = model.evaluate(x_sub_normal, batch_size=1, verbose=0)
                img_normal[0, i*move:i*move+height, j*move:j*move+width, 0] +=  normal_score
                
                anomaly_score = model.evaluate(x_sub_anomaly, batch_size=1, verbose=0)
                img_anomaly[0, i*move:i*move+height, j*move:j*move+width, 0] +=  anomaly_score
                
            #提案手法
            else:
                mu,sigma=model.predict(x_sub_normal,batch_size=1,verbose=0)
                loss=0
                for k in range(height):
                    for l in range(width):
                        loss+=0.5*(mu[0,k,l,0]-x_sub_normal[0,k,l,0])**2/sigma[0,k,l,0]
                img_normal[0,i*move:i*move+height,j*move:j*move+width,0]+=loss
                
                mu,sigma=model.predict(x_sub_anomaly,batch_size=1,verbose=0)
                loss=0
                for k in range(height):
                    for l in range(width):
                        loss+=0.5*(mu[0,k,l,0]-x_sub_anomaly[0,k,l,0])**2/sigma[0,k,l,0]
                img_anomaly[0,i*move:i*move+height,j*move:j*move+width,0]+=loss
    
    save_img(x_normal,x_anomaly,img_normal,img_anomaly,name)

In [None]:
#ヒートマップの描画
def save_img(x_normal,x_anomaly,img_normal,img_anomaly,name):
    path=".../anomaly_detection/vae/hazelnut/images/"
    if not os.path.exists(path):
        os.mkdir(path)
    
    #計算したヒートマップを1～10に正規化
    img_max=np.max([img_normal,img_anomaly])
    img_min=np.min([img_normal,img_anomaly])
    img_normal=(img_normal-img_min)/(img_max-img_min)*9+1
    img_anomaly=(img_anomaly-img_min)/(img_max-img_min)*9+1
    
    plt.figure()
    plt.subplot(2,2,1)
    plt.imshow(x_normal[0,:,:,0],cmap="gray")
    plt.axis("off") 
    plt.colorbar()
    
    plt.subplot(2,2,2)
    plt.imshow(img_normal[0,:,:,0],cmap="Blues",norm=colors.LogNorm())
    plt.axis("off")
    plt.colorbar()
    plt.clim(1,10)
    plt.title(name+"normal")
    
    plt.subplot(2,2,3)
    plt.imshow(x_anomaly[0,:,:,0],cmap="gray")
    plt.axis("off")
    plt.colorbar()
    
    plt.subplot(2,2,4)
    plt.imshow(img_anomaly[0,:,:,0],cmap="Blues",norm=colors.LogNorm())
    plt.axis("off")
    plt.colorbar()
    plt.clim(1,10)
    plt.title(name+"anomaly")
    
    plt.savefig(path+name+".png")
    plt.show()
    plt.close()
    
evaluate_img(vae,test_normal,test_anomaly,"old_")
evaluate_img(vae,test_normal,test_anomaly,"new_")

# 最大異常スコアの算出
テストデータ1枚1枚に11×11サイズの小窓を走らせ、算出されたスコアの中からその最大値をその画像の異常スコアとした。
そして閾値を定め、閾値より異常スコアが高いものを「異常画像」、低いものを「正常画像」と判定しました。

In [None]:
def result_score(model,x,name,height=11,width=11,move=5):
    score=[]
    for k in range(len(x)):
        max_score=-100000000000000
        if k%10==0:
            print(k)
            
        for i in range(int((x.shape[1]-height)/move)+1):
            for j in range(int((x.shape[2]-width)/move)+1):
                x_sub=x[k,i*move:i*move+height,j*move:j*move+width,0]
                x_sub=x_sub.reshape(1,height,width,1)
                
                if name=="old_":
                    temp_score=model.evaluate(x_sub,batch_size=1,verbose=0)
                    if temp_score>max_score:
                        max_score=temp_score
                
                else:
                    mu,sigma=model.predict(x_sub,batch_size=1,verbose=0)
                    loss=0
                    for o in range(height):
                        for l in range(width):
                            loss+=0.5*(mu[0,o,l,0]-x_sub[0,o,l,0])**2/sigma[0,o,l,0]
                    if loss>max_score:
                        max_score=loss
        
        score.append(max_score)
    
    return score

In [None]:
test_normal=x_test_normal
test_anomaly=x_test_anomaly

In [None]:
print("normal test data:",len(test_normal))
old_score_normal=result_score(vae,test_normal,"old_")
#print(old_score_normal)

print("anomaly test data:",len(test_anomaly))
old_score_anomaly=result_score(vae,test_anomaly,"old_")
#print(old_score_anomaly)

In [None]:
print("normal test data:",len(test_normal))
new_score_normal=result_score(vae,test_normal,"new_")
#print(new_score_normal)

print("anomaly test data:",len(test_anomaly))
new_score_anomaly=result_score(vae,test_anomaly,"new_")
#print(new_score_anomaly)

# ROC曲線の描画

In [None]:
y_true=np.zeros(len(test_normal)+len(test_anomaly))
y_true[len(test_normal):]=1　#0:正常、1:異常
old_score=np.array(old_score_normal)
old_score=np.hstack((old_score,np.array(old_score_anomaly)))
new_score=np.array(new_score_normal)
new_score=np.hstack((new_score,np.array(new_score_anomaly)))

#FPR,TPR,閾値を算出
fpr_old,tpr_old,thr_old=metrics.roc_curve(y_true,old_score)
fpr_new,tpr_new,thr_new=metrics.roc_curve(y_true,new_score)

#AUC
auc_old=metrics.auc(fpr_old,tpr_old)
auc_new=metrics.auc(fpr_new,tpr_new)

#ROC曲線をプロット
plt.figure()
plt.plot(fpr_old,tpr_old,label="Old method (area=%.2f)"%auc_old)
plt.plot(fpr_new,tpr_new,label="New method (area=%.2f)"%auc_new)
plt.legend()
plt.title("ROC curve")
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.grid(True)
plt.show()