<a href="https://colab.research.google.com/github/dowrave/RoadToImageSeg_GAN/blob/main/220429_ImageSegmentation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install -q git+https://github.com/tensorflow/examples.git
!pip install -q -U tfds-nightly

In [None]:
import tensorflow as tf
from tensorflow_examples.models.pix2pix import pix2pix

import tensorflow_datasets as tfds
tfds.disable_progress_bar()

from IPython.display import clear_output
import matplotlib.pyplot as plt

In [None]:
# Oxford-IIIT Pets Datasets 다운

In [None]:
dataset, info = tfds.load('oxford_iiit_pet:3.*.*', with_info = True)

In [None]:
def normalize(input_image, input_mask):
  input_image = tf.cast(input_image, tf.float32) / 255.0
  input_mask -= 1 # 분할마스크 - 1 ==> 레이블 {0, 1, 2}
  return input_image, input_mask 

In [None]:
@tf.function
def load_image_train(datapoint):
  input_image = tf.image.resize(datapoint['image'], (128, 128))
  input_mask = tf.image.resize(datapoint['segmentation_mask'], (128, 128))

  if tf.random.uniform(()) > 0.5:
    input_image = tf.image.flip_left_right(input_image)
    input_mask = tf.image.flip_left_right(input_mask)

  input_image, input_mask = normalize(input_image, input_mask)
  return input_image, input_mask

In [None]:
def load_image_test(datapoint):
    input_image = tf.image.resize(datapoint['image'], (128, 128))
    input_mask = tf.image.resize(datapoint['segmentation_mask'], (128, 128))

    input_image, input_mask = normalize(input_image, input_mask)
    return input_image, input_mask

In [None]:
TRAIN_LENGTH = info.splits['train'].num_examples
BATCH_SIZE = 64
BUFFER_SIZE = 1000
STEPS_PER_EPOCH = TRAIN_LENGTH // BATCH_SIZE

In [None]:
train = dataset['train'].map(load_image_train, num_parallel_calls = tf.data.experimental.AUTOTUNE)
test = dataset['test'].map(load_image_test)

In [None]:
train_dataset = train.cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE).repeat()
train_dataset = train_dataset.prefetch(buffer_size = tf.data.experimental.AUTOTUNE)
test_dataset = test.batch(BATCH_SIZE)

In [None]:
def display(display_list):
  plt.figure(figsize = (15, 15))

  title = ['Input Image', 'True Mask', "Predicted Mask"]

  for i in range(len(display_list)):
    plt.subplot(1, len(display_list), i + 1)
    plt.title(title[i])
    plt.imshow(tf.keras.preprocessing.image.array_to_img(display_list[i]))
    plt.axis('off')
  plt.show()

In [None]:
for image, mask in train.take(1):
  sample_image, sample_mask = image, mask

In [None]:
display([sample_image, sample_mask])

### 모델 정의하기
- 사용하는 모델 : U-Net
- U-Net의 구성 : 인코더(다운샘플러) & 디코더(업샘플러)
- 인코더 : 미리 훈련된 MobileNETV2
- 디코더 : pix2pix의 업샘플 블록(GAN할 때 다룰 예정)

In [None]:
# 출력 채널 : 3개 (픽셀 당 라벨은 3개임)
OUTPUT_CHANNELS = 3

In [None]:
base_model = tf.keras.applications.MobileNetV2(input_shape = [128, 128, 3], include_top = False)

layer_names = [
               'block_1_expand_relu', # 64 * 64
               'block_3_expand_relu', # 32
               'block_6_expand_relu', # 16
               'block_13_expand_relu', # 8
               'block_16_project' # 4
]
layers = [base_model.get_layer(name).output for name in layer_names]

# 특징 추출 모델
down_stack = tf.keras.Model(inputs = base_model.input, outputs = layers)

down_stack.trainable = False

In [None]:
# 디코더
up_stack = [
            pix2pix.upsample(512, 3), # 4,4 -> 8,8
            pix2pix.upsample(256, 3), # 8,8 -> 16, 16
            pix2pix.upsample(128, 3), # 16, 16 -> 32, 32
            pix2pix.upsample(64, 3), # 32, 32 -> 64, 64
]

In [None]:
def unet_model(output_channels):
  inputs = tf.keras.layers.Input(shape=[128, 128, 3])
  x = inputs

  # 다운샘플링
  skips = down_stack(x)
  x = skips[-1]
  skips = reversed(skips[:-1])

  # 건너뛰기 연결을 업샘플링하여 설정(?)
  for up, skip in zip(up_stack, skips):
    x = up(x)
    concat = tf.keras.layers.Concatenate()
    x = concat([x, skip])

  # 마지막 층
  last = tf.keras.layers.Conv2DTranspose(
      output_channels, 3, strides = 2,
      padding = 'same'
  )

  x = last(x)

  return tf.keras.Model(inputs = inputs, outputs = x)

In [None]:
# 모델 훈련
model = unet_model(OUTPUT_CHANNELS)
model.compile(optimizer = 'adam',
              loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits = True), # 픽셀 별로 레이블 할당(다중 분류) -> 클래스 예측 시 권장(가장 높은 값이 레이블)
              metrics = ['accuracy'])

In [None]:
tf.keras.utils.plot_model(model, show_shapes = True)

In [None]:
# 너무 설명이 없긴 하다..
def create_mask(pred_mask):
  pred_mask = tf.argmax(pred_mask, axis = -1)
  pred_mask = pred_mask[..., tf.newaxis]
  return pred_mask[0]


In [None]:
def show_predictions(dataset = None, num = 1):
  if dataset:
    for image, mask in dataset.take(num):
      pred_mask = model.predict(image)
      display([image[0], mask[0], create_mask(pred_mask)])
  else:
    display([sample_image, sample_mask, create_mask(model.predict(sample_image[tf.newaxis, ...]))])

In [None]:
show_predictions()

In [None]:
# 훈련 동안 모델의 향상 관찰
class DisplayCallback(tf.keras.callbacks.Callback):
  def on_epoch_end(self, epoch, logs= None):
    clear_output(wait = True)
    show_predictions()
    print('\n 에포크 이후 예측 예시 {}\n'.format(epoch + 1))

In [None]:
EPOCHS = 20
VAL_SUBSPLITS = 5
VALIDATION_STEPS = info.splits['test'].num_examples//BATCH_SIZE//VAL_SUBSPLITS # 이게 뭐지?

model_history = model.fit(train_dataset, epochs = EPOCHS,
                          steps_per_epoch = STEPS_PER_EPOCH,
                          validation_steps = VALIDATION_STEPS,
                          validation_data = test_dataset,
                          callbacks = [DisplayCallback()])

In [None]:
loss = model_history.history['loss']
val_loss = model_history.history['val_loss']

epochs = range(EPOCHS)

plt.figure()
plt.plot(epochs, loss, 'r', label = 'Training Loss')
plt.plot(epochs, val_loss, 'bo', label = 'Validation Loss')
plt.title('Training and Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss Value')
plt.ylim([0, 1])
plt.legend()
plt.show()

In [None]:
# 예측하기
show_predictions(test_dataset, 3)