### Convolutional AutoEncoder를 활용한 이미지 이상탐지

이미지의 경우 모두 가로 세로 256px의 산 이미지이며, 산 이미지 = 정상, 구름이 낀 이미지 = 비정상으로 탐지하는 모델을 학습시켜보자.

In [None]:
import os
import numpy as np
import matplotlib as mlp
import matplotlib.pyplot as plt
import plotly.express as px
import tensorflow as tf

from glob import glob
from PIL import Image
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.layers import Input, Conv2D, Conv2DTranspose, MaxPooling2D, BatchNormalization, Activation, GlobalAveragePooling2D
from tensorflow.keras.preprocessing import image_dataset_from_directory
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.callbacks import ReduceLROnPlateau, EarlyStopping

In [None]:
normal_dataset = "../data/normal_data/"  # 정상 데이터셋 (구름 없음) 경로
anomaly_dataset = "../data/anomaly_data/"  # 비정상 데이터셋 (구름 있음) 경로

target_width = 256  # input shape의 width
target_height = 256  # input shape의 height
target_channels = 3  # input shape의 channel
epochs = 300  # 학습 횟수
batch_size = 16  # batch size

### ImageDataGenerator.flow_from_directory

`ImageDataGenerator.flow_from_directory`의 class_mode를 활용하면 Auto Encoder에 활용하는 데이터셋의 형태로 format을 변경할 수 있다.

그간 사용했던 옵션들의 경우 아래와 같다.
- `class_mode="binary"` : 이미지 분류 문제 중, 이진 분류 문제를 다룰 때 사용한다.
- `class_mode="categorical"` : 이미지 분류 문제 중, 다중 분류 문제를 다룰 때 사용한다. multi-labeled이며, one-hot encoding된 상태를 의미한다.

나머지 옵션들은 아래와 같다.
- `class_mode='input'` : Auto Encoder를 사용해야 할 때 사용한다. 데이터셋의 input과 output을 동일하게 만들어준다고 한다.
- `class_mode='sparse'` : 이미지 분류 문제 중, 다중 분류 문제를 다룰 때 사용하는데, multi-label된 형태이며, label encoding된 형태라고 한다.
- `class_mode='None'` : 말 그대로 클래스 모드가 없음을 의미한다.

In [None]:
train_generator = ImageDataGenerator(
    rescale=1./255,  # 정규화
    validation_split=0.3  # validation split factor
)
test_generator = ImageDataGenerator(
    rescale=1./255,  # test셋은 정규화만, 
)

train_dataset = train_generator.flow_from_directory(
    normal_dataset,  # 데이터셋 경로
    classes=None,  # 단일 클래스를 기반으로 학습을 진행하기에 None으로,
    class_mode='input',  # Auto Encoder에 자주 활용되는 input mode로,
    color_mode="rgb",  # 컬러 이미지
    batch_size=batch_size,  # batch size
    target_size=(target_width, target_height),  # target size
    subset="training",
    seed=1337  # validation split을 사용하고 있기에, 반드시 필요하다. 또한, validation set과 동일하게 설정되어야 한다.
)

valid_dataset = train_generator.flow_from_directory(
    normal_dataset,
    classes=None,
    class_mode='input',
    color_mode="rgb",
    batch_size=batch_size,
    target_size=(target_height, target_width),
    subset="validation",
    seed=1337
)

test_dataset = test_generator.flow_from_directory(
    anomaly_dataset,
    classes=None,
    class_mode='input',
    color_mode="rgb",
    batch_size=batch_size,
    target_size=(target_height, target_width),
    shuffle=False
)


## Convolutional Auto Encoder

AutoEncoder의 경우 이상탐지에 자주 사용되는 모델인데, 이미지 처리에 탁월한 Convolutional layer와 결합하면 이미지에 대해서 정상/비정상 검출이 가능하지 않을까? 하는 마음에 시도해봤다.

### Encoder
Encoder의 경우 Convolution연산 후 Pooling을 수행하면서 이미지로부터 Feature를 추출하며 다운 샘플링을 진행한다. 다운 샘플링이 완료된 시점에서는 input되는 이미지들에 대한 저차원 특성 벡터가 생성된다.

### Decoder
Decoder의 경우 Encoder와는 반대로 업 샘플링(역 합성곱 연산)을 진행하는데, 이 때 사용되는 Conv2DTranspose는 up-sampling을 진행함과 동시에 <span style="background-color: yellow;">학습 가능한 필터</span>를 사용하여 보다 원활한 이미지 복구가 가능하다.

In [None]:
# Encoder
input_layer = Input(shape=(target_height, target_width, target_channels))
x = Conv2D(64, (3, 3), padding="same")(input_layer)  # (256, 256, 64)
x = BatchNormalization()(x)  # 상황에 따라 다르지만, BatchNorm은 항상 Activation을 거치기 전에.
x = Activation("relu")(x)
x = MaxPooling2D((2, 2),padding="valid")(x)  # (128, 128, 64)
x = Conv2D(128, (3, 3), padding="same")(x)  # (128, 128, 128)
x = BatchNormalization()(x)
x = Activation("relu")(x)
x = MaxPooling2D((2, 2),padding="valid")(x)  # (64, 64, 128)
x = Conv2D(256, (3, 3), padding="same")(x)  # (64, 64, 256)
x = BatchNormalization()(x)
x = Activation("relu")(x)
x = MaxPooling2D((2, 2),padding="valid")(x)  # (32, 32, 256)
x = Conv2D(512, (3, 3), padding="same")(x)
x = BatchNormalization()(x)
encoded = Activation("relu")(x)

# Decoder
decoded = Conv2DTranspose(512, (3, 3), padding="same")(encoded)  # Conv2DTranspose 사용
x = BatchNormalization()(decoded)
x = Activation("relu")(x)
x = Conv2DTranspose(256, (3, 3), padding="same", strides=(2, 2))(x)
x = BatchNormalization()(x)
x = Activation("relu")(x)
x = Conv2DTranspose(128, (3, 3), padding="same", strides=(2, 2))(x)
x = BatchNormalization()(x)
x = Activation("relu")(x)
x = Conv2DTranspose(64, (3, 3), padding="same", strides=(2, 2))(x)
x = BatchNormalization()(x)
x = Activation("relu")(x)
output_layer = Conv2D(3, (3, 3), activation="sigmoid", padding="same")(x)

model = Model(input_layer, output_layer)
model.compile(optimizer = "adam", loss="mse")

early_stop = EarlyStopping(monitor="val_loss", patience=5, mode="min")
reduce_lr = ReduceLROnPlateau(monitor="val_loss", patience=5, factor=0.1, mode="min")

model.fit(train_dataset, epochs=epochs, validation_data=valid_dataset, callbacks=[early_stop, reduce_lr])
model.save("./models/weights/my_autoencoder.h5")


### Reconstruction Error
현재 학습된 모델의 경우 데이터셋은 오로지 **정상 데이터셋**으로만 학습되었다. 어떻게 특정 기준을 기반으로 정상/비정상을 탐지할까?

그건 바로 <span style="background-color: yellow;">재구성 오류(Reconstruction Error)</span>를 기반으로 임계치를 설정하여, 임계치보다 큰 이미지의 경우 비정상으로 분류한다.

여기서 Reconstruction error란, input되는 image vector와 decoder의 output으로 나온 재구성된 이미지 벡터(reconstructed image vector)간의 차이를 의미한다. 

따라서 임계치에 대한 기준을 잡기 위해서 training에 사용된 모든 이미지의 reconstruction error의 histogram과 비정상 이미지의 reconstruction error histogram을 기반으로 임계치를 정해보자.

### 정상 데이터의 재구성 오류값의 히스토그램

In [None]:
reconstruction_errors = []
normal_image_filenames = glob("../data/normal_data/*/*")

for filename in normal_image_filenames:  # 이미지 경로 하나하나를 순회하면서
    image = Image.open(filename).convert("RGB")  # 이미지를 open하되, rgba가 아니라 rgb모드로 열어서
    nd_image = np.expand_dims((np.array(image)/255), axis=0)  # batch 단위 표현을 위해 차원을 확장하고
    image_array = tf.cast(nd_image, dtype=tf.float32)  # float32형으로 변환하여
    reconstructed_image = model.predict(image_array, verbose=0)  # 예측을 수행한다.
    
    reconstructed_error = np.mean(np.square(image_array - reconstructed_image))  # mse를 활용하여 reconstruction error를 계산하고
    reconstruction_errors.append(reconstructed_error)  # 배열에 담아서

In [None]:
fig = px.histogram(reconstruction_errors)  # 히스토그램으로 시각화한다.
fig.update_layout(width=800, height=600)  # layout 크기 조절
fig.show()

### 비정상 데이터의 재구성 오류값의 히스토그램

In [None]:
anomaly_reconstruction_errors = []
anomaly_image_filenames = glob("../data/anomaly_data/*/*")

for filename in anomaly_image_filenames:
    image = Image.open(filename).convert("RGB")
    nd_image = np.expand_dims((np.array(image)/255), axis=0)
    image_array = tf.cast(nd_image/255, dtype=tf.float32)
    reconstructed_image = model.predict(image_array, verbose=0)
    
    reconstructed_error = np.mean(np.square(image_array - reconstructed_image))
    anomaly_reconstruction_errors.append(reconstructed_error)

In [None]:
fig = px.histogram(anomaly_reconstruction_errors)
fig.update_layout(width=800, height=600)
fig.show()

training image들의 reconstruction error histogram을 통해서 학습에 사용된 이미지들의 대다수 이상이 0.005 이내로 재구성 오류값이 분포된 것을 확인할 수 있다.

따라서 임계치를 0.0045로 잡고, 비정상 이미지 탐지 결과에 대한 정확도를 표시해보자.

In [None]:
true_negative = len([i for i in anomaly_reconstruction_errors if i > 0.005])  # 재구성 오류값이 임계치, 즉 0.005보다 큰 것들(비정상)
all_data = len(anomaly_reconstruction_errors)  # 비정상 데이터의 전체 개수

anomaly_detection_acc = true_negative / all_data  # acc 계산
anomaly_detection_acc  # 와 ! 100퍼센트 !

이제 재구성된 이미지와, 원본 이미지의 차이를 정상/비정상 분류 결과와 함께 나타내보자.

In [None]:
mlp.rcParams['font.family'] = "Malgun Gothic"  # 그래프 내에서 한글 깨짐 현상 방지
anomaly_image_filenames = glob("../data/anomaly_data/*/*")  # 비정상 데이터의 전체 파일 이름들

def predict_results(filepath, threshold):
    image = Image.open(filepath).convert("RGB")
    image_arr = tf.cast((np.array(image)/255), dtype=tf.float32)
    img = np.expand_dims(image_arr, axis=0)

    reconstructed_image = model.predict(img, verbose=0)
    reconstructed_error = np.mean(np.square(img - reconstructed_image))

    if reconstructed_error > threshold:
        return ["비정상", img, reconstructed_image]
    else:
        return ["정상", img, reconstructed_image]

def plot_results(filepath, threshold):
    message, origin, reconstructed_image = predict_results(filepath, threshold)

    plt.subplot(1, 2, 1)
    plt.imshow(origin[0])
    plt.gca().set_title("원본 이미지")

    plt.subplot(1, 2, 2)
    plt.imshow(reconstructed_image[0])
    plt.gca().set_title(f"예측 이미지 \n 예측된 결과 : {message}")

    plt.show()

In [None]:
plot_results(anomaly_image_filenames[8], 0.004)

In [None]:
plot_results(normal_image_filenames[0], 0.004)