<a href="https://colab.research.google.com/github/hukim1112/MLDL/blob/master/segmentation/semantic_segmentation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Image segmentation

이번 튜토리얼은 영상 분할(image segmentation)을 다루기 위해, <a href="https://lmb.informatik.uni-freiburg.de/people/ronneber/u-net/" class="external">U-Net</a>의 수정된 모델을 다뤄보겠습니다.

## 영상 분할이란?

영상 분류는 각 입력 영상에 레이블(또는 클래스)를 부여합니다. 하지만 만약 영상의 물체의 형상을 알기 위해, 어떤 픽셀들이 물체에 속하는 지 알기 원할 수 있습니다. 이런 경우 영상의 각 픽셀에 레이블(또는 클래스)를 부여합니다. 이런 테스크가 영상 분할이라 알려져있습니다. 분할 모델은 분류 모델보다 영상에 관한 더 상세한 정보를 반환합니다. 영상 분할은 의료용사진분석, 자율주행, 위성사진분석 등의 많은 응용에서 사용됩니다.

이 튜토리얼은 [Oxford-IIIT Pet Dataset](https://www.robots.ox.ac.uk/~vgg/data/pets/)을 사용합니다. 이 데이터는 37 종의 반려동물에 대한 사진이 종별 200장으로 구성됩니다. 그리고 학습과 테스트로 100/100장으로 나눴습니다. 각각의 이미지는 픽셀 수준 마스크(pixel-wise mask)를 상응하는 레이블로 가지고 있습니다. 마스크들은 픽셀 별로 클래스-레이블을 표현합니다. 마스크의 픽셀은 다음 카테고리 중 하나를 표현합니다.

*   Class 1 : Pixel belonging to the pet.
*   Class 2 : Pixel bordering the pet.
*   Class 3 : None of the above/ Surrounding pixel.

In [None]:
import tensorflow as tf
from tensorflow.keras.layers.experimental import preprocessing 

import tensorflow_datasets as tfds

In [None]:
#from tensorflow_examples.models.pix2pix import pix2pix
from IPython.display import clear_output
import matplotlib.pyplot as plt

## Download the Oxford-IIIT Pets dataset

The dataset is [available from TensorFlow Datasets](https://www.tensorflow.org/datasets/catalog/oxford_iiit_pet). The segmentation masks are included in version 3+.

In [None]:
dataset, info = tfds.load('oxford_iiit_pet:3.*.*', with_info=True)

추가적으로, 이미지의 픽셀들은 `[0,1]` 범위로 정규화합니다. 마지막으로 마스크의 픽셀값에 각각 {1, 2, 3} 중의 하나의 클래스를 부여한다. 편의를 위해 1을 차감해서 이번 학습에서는 정답 마스크로 {0, 1, 2} 중 하나의 레이블을 부여합니다.

In [None]:
def normalize(input_image, input_mask):
  input_image = tf.cast(input_image, tf.float32) / 255.0
  input_mask -= 1
  return input_image, input_mask

In [None]:
def load_image(datapoint):
  input_image = tf.image.resize(datapoint['image'], (128, 128))
  input_mask = tf.image.resize(datapoint['segmentation_mask'], (128, 128))

  input_image, input_mask = normalize(input_image, input_mask)

  return input_image, input_mask

데이터셋은 이미 학습, 테스트가 나뉘어져 있습니다.

In [None]:
TRAIN_LENGTH = info.splits['train'].num_examples
BATCH_SIZE = 64
BUFFER_SIZE = 1000
STEPS_PER_EPOCH = TRAIN_LENGTH // BATCH_SIZE

In [None]:
train_images = dataset['train'].map(load_image, num_parallel_calls=tf.data.AUTOTUNE)
test_images = dataset['test'].map(load_image, num_parallel_calls=tf.data.AUTOTUNE)

데이터셋에는 랜덤 플립을 사용하는 간단한 증강을 적용했습니다. Augment 클래스는 입력 영상과 정답 마스크를 함께 뒤집거나 뒤집지 않습니다.
영상 증강은 다음 튜토리얼을 참고하세요. [image augmentation tutorial](https://www.tensorflow.org/tutorials/images/data_augmentation)


In [None]:
class Augment(tf.keras.layers.Layer):
  def __init__(self, seed=42):
    super().__init__()
    # both use the same seed, so they'll make the same randomn changes.
    self.augment_inputs = preprocessing.RandomFlip(mode="horizontal", seed=seed)
    self.augment_labels = preprocessing.RandomFlip(mode="horizontal", seed=seed)
  
  def call(self, inputs, labels):
    inputs = self.augment_inputs(inputs)
    labels = self.augment_labels(labels)
    return inputs, labels

입력 파이프라인을 구축하고, 배치 다음으로 증강을 적용합니다.

In [None]:
train_batches = (
    train_images
    .cache()
    .shuffle(BUFFER_SIZE)
    .batch(BATCH_SIZE)
    .repeat()
    .map(Augment())
    .prefetch(buffer_size=tf.data.AUTOTUNE))

test_batches = test_images.batch(BATCH_SIZE)

몇 개의 샘플을 확인해봅니다.



In [None]:
def display(display_list):
  plt.figure(figsize=(15, 15))

  title = ['Input Image', 'True Mask', 'Predicted Mask']

  for i in range(len(display_list)):
    plt.subplot(1, len(display_list), i+1)
    plt.title(title[i])
    plt.imshow(tf.keras.preprocessing.image.array_to_img(display_list[i]))
    plt.axis('off')
  plt.show()

In [None]:
for images, masks in train_batches.take(2):
  sample_image, sample_mask = images[0], masks[0]
  display([sample_image, sample_mask])

## Define the model
여기서는 수정된 [U-Net](https://arxiv.org/abs/1505.04597)을 사용합니다. U-Net은 인코더(downsampler)와 디코더(upsampler)로 구성됩니다. 더 강건한 특징들을 학습하고 학습 파라미터의 수를 줄이기 위해, 사전학습된 MobileNet-V2를 인코더로 사용합니다. 디코더는 upsample 블록을 사용할 것입니다.

앞서 언급한대로, 인코더로 tf.keras.applications의 MobileNetV2를 사용할 것입니다. 이때 인코더는 학습 과정에서 학습하지 않을 것입니다.

In [None]:
def upsample(filters, size):
  """Upsamples an input.
  Conv2DTranspose => Batchnorm => Dropout => Relu
  """
  initializer = tf.random_normal_initializer(0., 0.02)
  result = tf.keras.Sequential()
  result.add(
      tf.keras.layers.Conv2DTranspose(filters, size, strides=2,
                                      padding='same',
                                      kernel_initializer=initializer,
                                      use_bias=False))

  result.add(tf.keras.layers.BatchNormalization())
  result.add(tf.keras.layers.ReLU())
  return result

In [None]:
base_model = tf.keras.applications.MobileNetV2(input_shape=[128, 128, 3], include_top=False)

# Use the activations of these layers
layer_names = [
    'block_1_expand_relu',   # 64x64
    'block_3_expand_relu',   # 32x32
    'block_6_expand_relu',   # 16x16
    'block_13_expand_relu',  # 8x8
    'block_16_project',      # 4x4
]
base_model_outputs = [base_model.get_layer(name).output for name in layer_names]

# Create the feature extraction model
down_stack = tf.keras.Model(inputs=base_model.input, outputs=base_model_outputs)

down_stack.trainable = False

The decoder/upsampler is simply a series of upsample blocks implemented in TensorFlow examples.

In [None]:
up_stack = [
    upsample(512, 3),  # 4x4 -> 8x8
    upsample(256, 3),  # 8x8 -> 16x16
    upsample(128, 3),  # 16x16 -> 32x32
    upsample(64, 3),   # 32x32 -> 64x64
]

In [None]:
def unet_model(output_channels:int):
  inputs = tf.keras.layers.Input(shape=[128, 128, 3])

  # Downsampling through the model
  skips = down_stack(inputs)
  x = skips[-1]
  skips = reversed(skips[:-1])

  # Upsampling and establishing the skip connections
  for up, skip in zip(up_stack, skips):
    x = up(x)
    concat = tf.keras.layers.Concatenate()
    x = concat([x, skip])

  # This is the last layer of the model
  last = tf.keras.layers.Conv2DTranspose(
      filters=output_channels, kernel_size=3, strides=2,
      padding='same')  #64x64 -> 128x128

  x = last(x)

  return tf.keras.Model(inputs=inputs, outputs=x)

모델 구조를 빠르게 살펴보도록 하겠습니다.

---

In [None]:
OUTPUT_CLASSES = 3

model = unet_model(output_channels=OUTPUT_CLASSES)

tf.keras.utils.plot_model(model, show_shapes=True)

마지막 레이어에서 몇 개의 필터를 사용하 지가 채널 수를 결정합니다. 채널은 클래스 수와 동일하도록 설계합니다.

## Train the model

이제, 남은 것은 모델을 compile, train(fit) 하는 것입니다.

픽셀을 multiclass classification을 해야하기 때문에 `CategoricalCrossentropy` with `from_logits=True`가 표준적인 손실함수입니다.

`losses.SparseCategoricalCrossentropy(from_logits=True)`을 사용하면 레이블로 원핫벡터의 형태대신 스칼라로 사용할 수 있습니다.

In [None]:
model.compile(optimizer='adam',
              loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
              metrics=['accuracy'])

학습전에 무엇을 예측하는 지 확인합니다.

In [None]:
def create_mask(pred_mask):
  pred_mask = tf.argmax(pred_mask, axis=-1)
  pred_mask = pred_mask[..., tf.newaxis]
  return pred_mask[0]

In [None]:
def show_predictions(dataset=None, num=1):
  if dataset:
    for image, mask in dataset.take(num):
      pred_mask = model.predict(image)
      display([image[0], mask[0], create_mask(pred_mask)])
  else:
    display([sample_image, sample_mask,
             create_mask(model.predict(sample_image[tf.newaxis, ...]))])

In [None]:
show_predictions()

학습과정에서 모델이 향상되는 것을 확인하기 위해 callback 함수를 사용합니다.

In [None]:
class DisplayCallback(tf.keras.callbacks.Callback):
  def on_epoch_end(self, epoch, logs=None):
    clear_output(wait=True)
    show_predictions()
    print ('\nSample Prediction after epoch {}\n'.format(epoch+1))

In [None]:
EPOCHS = 20
VAL_SUBSPLITS = 5
VALIDATION_STEPS = info.splits['test'].num_examples//BATCH_SIZE//VAL_SUBSPLITS

model_history = model.fit(train_batches, epochs=EPOCHS,
                          steps_per_epoch=STEPS_PER_EPOCH,
                          validation_steps=VALIDATION_STEPS,
                          validation_data=test_batches,
                          callbacks=[DisplayCallback()])

In [None]:
loss = model_history.history['loss']
val_loss = model_history.history['val_loss']

plt.figure()
plt.plot(model_history.epoch, loss, 'r', label='Training loss')
plt.plot(model_history.epoch, val_loss, 'bo', label='Validation loss')
plt.title('Training and Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss Value')
plt.ylim([0, 1])
plt.legend()
plt.show()

## Make predictions

In [None]:
show_predictions(test_batches, 3)

## Optional: Imbalanced classes and class weights

Semantic segmentation datasets can be highly imbalanced meaning that particular class pixels can be present more inside images than that of other classes. Since segmentation problems can be treated as per-pixel classification problems we can deal with the imbalance problem by weighing the loss function to account for this. It's a simple and elegant way to deal with this problem. See the [imbalanced classes tutorial](https://www.tensorflow.org/tutorials/structured_data/imbalanced_data).

To [avoid ambiguity](https://github.com/keras-team/keras/issues/3653#issuecomment-243939748), `Model.fit` does not support the `class_weight` argument for inputs with 3+ dimensions.

In [None]:
try:
  model_history = model.fit(train_batches, epochs=EPOCHS,
                            steps_per_epoch=STEPS_PER_EPOCH,
                            class_weight = {0:2.0, 1:2.0, 2:1.0})
  assert False
except Exception as e:
  print(f"{type(e).__name__}: {e}")

So in this case you need to implement the weighting yourself. You'll do this using sample weights: In addition to `(data, label)` pairs, `Model.fit` also accepts `(data, label, sample_weight)` triples.

`Model.fit` propagates the `sample_weight` to the losses and metrics, which also accept a `sample_weight` argument. The sample weight is multiplied by the sample's value before the reduction step. For example:

In [None]:
label = [0,0]
prediction = [[-3., 0], [-3, 0]] 
sample_weight = [1, 10] 

loss = tf.losses.SparseCategoricalCrossentropy(from_logits=True,
                                               reduction=tf.losses.Reduction.NONE)
loss(label, prediction, sample_weight).numpy()

So to make sample weights for this tutorial you need a function that takes a `(data, label)` pair and returns a `(data, label, sample_weight)` triple. Where the `sample_weight` is a 1-channel image containing the class weight for each pixel. 

The simplest possible implementation is to use the label as an index into a `class_weight` list:

In [None]:
def add_sample_weights(image, label):
  # The weights for each class, with the constraint that:
  #     sum(class_weights) == 1.0
  class_weights = tf.constant([2.0, 2.0, 1.0])
  class_weights = class_weights/tf.reduce_sum(class_weights)

  # Create an image of `sample_weights` by using the label at each pixel as an 
  # index into the `class weights` .
  sample_weights = tf.gather(class_weights, indices=tf.cast(label, tf.int32))

  return image, label, sample_weights

The resulting dataset elements contain 3 images each:

In [None]:
train_batches.map(add_sample_weights).element_spec

Now you can train a model on this weighted dataset:

In [None]:
weighted_model = unet_model(OUTPUT_CLASSES)
weighted_model.compile(
    optimizer='adam',
    loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    metrics=['accuracy'])

In [None]:
weighted_model.fit(
    train_batches.map(add_sample_weights),
    epochs=1,
    steps_per_epoch=10)