# 이미지 세그멘테이션 (Image segmentation)

In [None]:
!pip install git+https://github.com/tensorflow/examples.git
!pip install -U tfds-nightly

### 텐서플로와 다른 라이브러리 임포트

In [None]:
import tensorflow as tf

In [None]:
from tensorflow_examples.models.pix2pix import pix2pix

import tensorflow_datasets as tfds
tfds.disable_progress_bar()

from IPython.display import clear_output
import matplotlib.pyplot as plt

#### GPU 메모리 설정 (프로그램에서 필요한만큼만 할당하는 방식)

In [None]:
# 런타임에서 할당하는데 필요한 양만큼의 GPU 메모리를 할당
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
  try:
    tf.config.experimental.set_memory_growth(gpus[0], True)
  except RuntimeError as e:
    # 프로그램 시작시에 메모리 증가가 설정되어야만 합니다
    print(e)

## Oxford-IIIT Pets 데이터셋 다운로드

In [None]:
dataset, info = tfds.load('oxford_iiit_pet:3.*.*', with_info=True)

#### 정규화 (Normalization)
* 이미지 : [0,1] 범위로 정규화
* 세그멘테이션 마스크 : {1, 2, 3}에서 {0, 1, 2}로 변경

In [None]:
def normalize(input_image, input_mask):
  input_image = tf.cast(input_image, tf.float32) / 255.0
  input_mask -= 1
  return input_image, input_mask

#### 이미지 전처리
* 이미지 크기 (128, 128)로 resize
* 이미지 확장 (Data Augmentation) : 좌우 뒤집기 (50% 확률)
* 이미지/마스크 정규화

In [None]:
@tf.function
def load_image_train(datapoint):
  input_image = tf.image.resize(datapoint['image'], (128, 128))
  input_mask = tf.image.resize(datapoint['segmentation_mask'], (128, 128))

  if tf.random.uniform(()) > 0.5:
    input_image = tf.image.flip_left_right(input_image)
    input_mask = tf.image.flip_left_right(input_mask)

  input_image, input_mask = normalize(input_image, input_mask)

  return input_image, input_mask

In [None]:
def load_image_test(datapoint):
  input_image = tf.image.resize(datapoint['image'], (128, 128))
  input_mask = tf.image.resize(datapoint['segmentation_mask'], (128, 128))

  input_image, input_mask = normalize(input_image, input_mask)

  return input_image, input_mask

#### 데이터셋에 훈련 데이터셋과 테스트 데이터셋이 분리되어 있음

In [None]:
TRAIN_LENGTH = info.splits['train'].num_examples
BATCH_SIZE = 64
BUFFER_SIZE = 1000
STEPS_PER_EPOCH = TRAIN_LENGTH // BATCH_SIZE

#### 훈련/테스트 이미지 파일 전처리

In [None]:
train = dataset['train'].map(load_image_train, num_parallel_calls=tf.data.experimental.AUTOTUNE)
test = dataset['test'].map(load_image_test)

#### 훈련/테스트 데이터셋 생성

In [None]:
train_dataset = train.cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE).repeat()
train_dataset = train_dataset.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)
test_dataset = test.batch(BATCH_SIZE)

#### 이미지, 마스크, 예측 결과를 순서대로 화면에 보여줌

In [None]:
def display(display_list):
  plt.figure(figsize=(15, 15))

  title = ['Input Image', 'True Mask', 'Predicted Mask']

  for i in range(len(display_list)):
    plt.subplot(1, len(display_list), i+1)
    plt.title(title[i])
    plt.imshow(tf.keras.preprocessing.image.array_to_img(display_list[i]))
    plt.axis('off')
  plt.show()

In [None]:
for image, mask in train.take(1):
  sample_image, sample_mask = image, mask
display([sample_image, sample_mask])

## 모델 정의 (U-Net 모델을 변형해서 정의)
* Encoder는 pre-trained MobileNetV2로 구성
* Decoder는 Pix2Pix Upsample Block을 이용해서 구성
(https://github.com/tensorflow/examples/blob/master/tensorflow_examples/models/pix2pix/pix2pix.py 참고) 

In [None]:
# 픽셀의 각 클래스에 대한 확률이 채널 별로 정의됨
OUTPUT_CHANNELS = 3

#### Encoder 정의 (MobileNetV2)
* Encoder는 Pre-train 되어 있어서 학습을 하지 않음

In [None]:
base_model = tf.keras.applications.MobileNetV2(input_shape=[128, 128, 3], include_top=False)

# Skip Connection을 만들 계층 선정
layer_names = [
    'block_1_expand_relu',   # 64x64
    'block_3_expand_relu',   # 32x32
    'block_6_expand_relu',   # 16x16
    'block_13_expand_relu',  # 8x8
    'block_16_project',      # 4x4
]
layers = [base_model.get_layer(name).output for name in layer_names]

# 특징을 추출할 수 있도록 Encoder를 모델로 생성
down_stack = tf.keras.Model(inputs=base_model.input, outputs=layers)

down_stack.trainable = False

#### Decoder 정의 (Pix2Pix의 upsample block 사용)

In [None]:
up_stack = [
    pix2pix.upsample(512, 3),  # 4x4 -> 8x8
    pix2pix.upsample(256, 3),  # 8x8 -> 16x16
    pix2pix.upsample(128, 3),  # 16x16 -> 32x32
    pix2pix.upsample(64, 3),   # 32x32 -> 64x64
]

#### UNet 모델 정의 (Encoder와 Decoder에 Skip Connection 연결)

In [None]:
def unet_model(output_channels):
  inputs = tf.keras.layers.Input(shape=[128, 128, 3])
  x = inputs

  # Encoder에서 특징 추출
  skips = down_stack(x)
  x = skips[-1]
  skips = reversed(skips[:-1])

  # Encoder와 Decoder 간에 skip connections 연결
  for up, skip in zip(up_stack, skips):
    x = up(x)
    concat = tf.keras.layers.Concatenate()
    x = concat([x, skip])

  # 모델의 마지막 계층 추가
  last = tf.keras.layers.Conv2DTranspose(
      output_channels, 3, strides=2,
      padding='same')  #64x64 -> 128x128

  x = last(x)

  return tf.keras.Model(inputs=inputs, outputs=x)

## 모델 훈련

In [None]:
model = unet_model(OUTPUT_CHANNELS)
model.compile(optimizer='adam',
              loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
              metrics=['accuracy'])

#### 모델 구조 확인

In [None]:
tf.keras.utils.plot_model(model, show_shapes=True)

#### 훈련 하기 전에 예측을 어떻게 하는지 확인

In [None]:
def create_mask(pred_mask):
  pred_mask = tf.argmax(pred_mask, axis=-1)
  pred_mask = pred_mask[..., tf.newaxis]
  return pred_mask[0]

In [None]:
def show_predictions(dataset=None, num=1):
  if dataset:
    for image, mask in dataset.take(num):
      pred_mask = model.predict(image)
      display([image[0], mask[0], create_mask(pred_mask)])
  else:
    display([sample_image, sample_mask,
             create_mask(model.predict(sample_image[tf.newaxis, ...]))])

In [None]:
show_predictions()

#### 훈련을 하는 동안 진행 상황을 화면에 보여줌

In [None]:
class DisplayCallback(tf.keras.callbacks.Callback):
  def on_epoch_end(self, epoch, logs=None):
    clear_output(wait=True)
    show_predictions()
    print ('\nSample Prediction after epoch {}\n'.format(epoch+1))

In [None]:
EPOCHS = 20
VAL_SUBSPLITS = 5
VALIDATION_STEPS = info.splits['test'].num_examples//BATCH_SIZE//VAL_SUBSPLITS

model_history = model.fit(train_dataset, epochs=EPOCHS,
                          steps_per_epoch=STEPS_PER_EPOCH,
                          validation_steps=VALIDATION_STEPS,
                          validation_data=test_dataset,
                          callbacks=[DisplayCallback()])

In [None]:
loss = model_history.history['loss']
val_loss = model_history.history['val_loss']

epochs = range(EPOCHS)

plt.figure()
plt.plot(epochs, loss, 'r', label='Training loss')
plt.plot(epochs, val_loss, 'bo', label='Validation loss')
plt.title('Training and Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss Value')
plt.ylim([0, 1])
plt.legend()
plt.show()

## 예측

#### 적은 epoch으로 훈련을 해도 정확히 예측을 하는 모습을 확인해보자.

In [None]:
show_predictions(test_dataset, 3)