<a href="https://colab.research.google.com/github/eungbean/pytorch_segmentation_tutorials/blob/main/1_unet.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Semantic Segmentation Demo

The code for this notebook is available here
https://github.com/eungbean/pytorch_segmentation_tutorials

### Imports and utility functions
먼저 라이브러리를 불러옵니다.

In [None]:
# System libs
import os, csv
import numpy as np
import scipy.io
from collections import defaultdict

# Deep learning libs
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
import torchvision.transforms
import torchsummary

# visualize libs
import matplotlib.pyplot as plt
import PIL.Image

### Enabling GPU on Colab
GPU를 활성화 해 줍니다.
먼저 ```런타임```-```런타임 유형변경``` 메뉴에서 'GPU'를 선택하세요.

In [None]:
if not torch.cuda.is_available():
  raise Exception("GPU not availalbe. CPU training will be too slow.")

print("device name", torch.cuda.get_device_name(0))

Github에서 코드를 복사해옵니다.

In [None]:
if not os.path.exists("helper.py"):
  if not os.path.exists("pytorch_segmentation_tutorials"):
    !git clone https://github.com/eungbean/pytorch_segmentation_tutorials.git
  %cd pytorch_segmentation_tutorials
!ls

## Data
학습과 테스트에 사용될 데이터를 준비합니다.  
이 노트북에서는 대용량의 데이터셋을 사용하기가 어렵기 때문에,
Synthesize Data를 사용하겠습니다.  

In [None]:
import helper
import simulation

# Generate some random images
input_images, target_masks = simulation.generate_random_data(192, 192, count=3)

print(f"Input images: shape: {input_images.shape} | min: {input_images.min()}   | max: {input_images.max()}")
print(f"Target masks: shape: {target_masks.shape} | min: {target_masks.min()} | max: {target_masks.max()}")

# Change channel-order and make 3 channels for matplot
input_images_rgb = [x.astype(np.uint8) for x in input_images]

# Map each channel (i.e. class) to each color
target_masks_rgb = [helper.masks_to_colorimg(x) for x in target_masks]

# Left: Input image, Right: Target mask (Ground-truth)
helper.plot_side_by_side([input_images_rgb, target_masks_rgb])

## Define Datamodule

![](https://www.oreilly.com/library/view/deep-learning-with/9781789534092/assets/e03c0f94-a8ed-42fe-96a0-1eb2956445be.png)

Pytorch는 데이터를 받기 위해,   
```데이터 -> Dataset -> Dataloader``` 파이프라인을 가집니다.

* ```Dataloader```: batch, train/val/test등에 따라 필요한 만큼 Dataset에 데이터를 요청.
* ```Dataset```: 요청에 따라 파일로부터 데이터 파일을 읽어오는 역할.
* ```Data```: 저장된 데이터 파일.

In [None]:
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, datasets, models

class SimDataset(Dataset):
  def __init__(self, count, transform=None):
    self.input_images, self.target_masks = simulation.generate_random_data(192, 192, count=count)
    self.transform = transform

  def __len__(self):
    return len(self.input_images)

  def __getitem__(self, idx):
    image = self.input_images[idx]
    mask = self.target_masks[idx]
    if self.transform:
      image = self.transform(image)
    return [image, mask]


Image Augmentation을 위한 파이프라인을 작성합니다.   
* 유용한 라이브러리로 [albumentations](https://github.com/albumentations-team/albumentations)를 많이 사용합니다.   


In [None]:
transform = transforms.Compose([
  transforms.ToTensor(),
  transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) # imagenet
])

학습에 사용될 데이터셋을 생성합니다.   
(시간이 좀 걸립니다.)

In [None]:
train_set = SimDataset(2000, transform = transform)
val_set = SimDataset(200, transform = transform)
test_set = SimDataset(3, transform = transform)

In [None]:
image_datasets = {'train': train_set, 'val': val_set, 'test': test_set}

# Train할 때 한 iteration당 이미지 수
batch_size = 25

# Dataloader 정의
dataloaders = {
  'train': DataLoader(train_set, batch_size=25, shuffle=True, num_workers=0),
  'val': DataLoader(val_set, batch_size=25, shuffle=True, num_workers=0),
  'test': DataLoader(test_set, batch_size=3, shuffle=False, num_workers=0)
}

In [None]:
import torchvision.utils

# 데이터는 Train을 위해 Normalize 된 후 Torch.tensor형태로 준비되므로 
# 시각화하기 위해 원래대로 되돌려야 합니다.
def reverse_transform(inp):
  inp = inp.numpy().transpose((1, 2, 0)) # 이미지의 차원을 C,H,W 에서 H,W,C로 바꾸어 줍니다.
  mean = np.array([0.485, 0.456, 0.406])
  std = np.array([0.229, 0.224, 0.225])
  inp = std * inp + mean
  inp = np.clip(inp, 0, 1)
  inp = (inp * 255).astype(np.uint8)
  return inp

# 데이터로더로부터 직접 데이터를 꺼내고 싶으면 다음과 같이 사용합니다.
inputs, masks = next(iter(dataloaders['train']))

print(f"Input shape : {inputs.shape}")
print(f"Mask shape: {masks.shape}")
plt.imshow(reverse_transform(inputs[3]))

## Define the segmentation model

![](https://theaisummer.com/static/3995761ad87f8909f5dec5925d182e80/4ff83/The-3D-Unet-model.png)

> 그림 가장 아래쪽 channel의 수는 256이 아니라 512입니다.

본격적으로 학습에 사용될 U-Net 모델을 정의합니다.

### 1. ```Double_conv```

```python
def double_conv(in_channels, out_channels):
    return nn.Sequential(
        nn.Conv2d(in_channels, out_channels, 3, padding=1),
        nn.ReLU(inplace=True),
        nn.Conv2d(out_channels, out_channels, 3, padding=1),
        nn.ReLU(inplace=True)
    )   
```

공통적으로 반복하여 사용되는 2번의 conv가 반복되는 블록입니다. ```Conv-ReLU-Conv-ReLU```의 구조를 가지는 블록입니다. 함수로 미리 정의했습니다.

각 블록의 ```input 채널의 수```와 ```output 채널의 수```를 변수로 받습니다.


### 2. U-Net

> #### ```def __init__```  

* 이곳에서 클래스에서 사용될 여러가지 변수를 정의합니다.   
* 기본적인 구조는 미리 정의해드렸습니다.  
* ```--``` 로 비워져있는 Channel number를 채워주세요.   

* ```dconv_down```은 Encoder에 사용됩니다. 
* ```dconv_up```은 Decoder에 사용됩니다.
채널의 수를 정의할 때, concat이 되는 것에 유의해주세요.  

> #### ```def forward``` 

* 이곳에서는 네트워크를 정의합니다.  
위 네트워크 그림을 참고해서, 모델을 만들어주세요.

* Encoder와 Decoder의 첫 블락은 미리 정의해드렸습니다.



In [None]:
def double_conv(in_channels, out_channels):
    return nn.Sequential(
        nn.Conv2d(in_channels, out_channels, 3, padding=1),
        nn.ReLU(inplace=True),
        nn.Conv2d(out_channels, out_channels, 3, padding=1),
        nn.ReLU(inplace=True)
    )   

class UNet(nn.Module):
    def __init__(self, n_class):

        super().__init__()
        self.dconv_down1 = double_conv(--, --)
        self.dconv_down2 = double_conv(--, --)
        self.dconv_down3 = double_conv(--, --)
        self.dconv_down4 = double_conv(--, --)

        self.maxpool = nn.MaxPool2d(2)
        self.upsample = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)        
        
        self.dconv_up3 = double_conv(--, --)
        self.dconv_up2 = double_conv(--, --)
        self.dconv_up1 = double_conv(--, --)
        
        self.conv_last = nn.Conv2d(--, n_class, 1)
        
        
    def forward(self, x):

        ## Encoder
        # Layer 1: input_channel=3, output_channel = 64
        conv1 = self.dconv_down1(x)
        x = self.maxpool(conv1)

        # Layer 2: input_channel=64, output_channel = 128
        conv2 = 
        x = 
        
        # Layer 3: input_channel=128, output_channel = 256
        conv3 = 
        x = 
        
        # Layer 4: input_channel=256, output_channel = 512
        x = 
        
        ## Decoder
        # Layer 3
        x = self.upsample(x)        
        x = torch.cat([x, conv3], dim=1)
        x = self.dconv_up3(x)

        # Layer 2
        ..
        
        # Layer 1
        ..
        
        # Out head
        out = self.conv_last(x)
        
        return out

클래스로 정의된 U-Net을 객체화해줍니다.

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = UNet(n_class=6)
model = model.to(device)

# check keras-like model summary using torchsummary
from torchsummary import summary
summary(model, input_size=(3, 224, 224))

## Define the loss

학습에 사용될 Loss를 정합니다.   
Loss는 BCE, Dice를 함께 사용합니다.

In [None]:
def dice_loss(pred, target, smooth = 1.):
    pred = pred.contiguous()
    target = target.contiguous()    

    intersection = (pred * target).sum(dim=2).sum(dim=2)
    loss = (1 - ((2. * intersection + smooth) / (pred.sum(dim=2).sum(dim=2) + target.sum(dim=2).sum(dim=2) + smooth)))
    
    return loss.mean()

def calc_loss(pred, target, metrics, bce_weight=0.5):
    bce = F.binary_cross_entropy_with_logits(pred, target)

    pred = torch.sigmoid(pred)
    dice = dice_loss(pred, target)

    loss = bce * bce_weight + dice * (1 - bce_weight)

    metrics['bce'] += bce.data.cpu().numpy() * target.size(0)
    metrics['dice'] += dice.data.cpu().numpy() * target.size(0)
    metrics['loss'] += loss.data.cpu().numpy() * target.size(0)

    return loss

## Define the main training loop

Train을 하기 위한 루프를 정의합니다.     
def train_model 함수를 상세히 보면서 하나하나 따라가보세요.

In [None]:
def test_and_vis(model, dataloaders):
    # Get the first batch
    model.eval()
    inputs, labels = next(iter(dataloaders['test']))
    inputs, labels = inputs.to(device), labels.to(device)

    # predict
    preds = model(inputs)
    preds = torch.sigmoid(preds)
    preds = preds.data.cpu().numpy()

    # Change channel-order and make 3 channels for matplot
    input_images_rgb = [reverse_transform(x) for x in inputs.cpu()]
    # Map each channel (i.e. class) to each color
    target_masks_rgb = [helper.masks_to_colorimg(x) for x in labels.cpu().numpy()]
    pred_rgb = [helper.masks_to_colorimg(x) for x in preds]

    helper.plot_side_by_side([input_images_rgb[0], target_masks_rgb[0], pred_rgb[0]])

In [None]:
def print_metrics(metrics, epoch_samples, phase):
    outputs = []
    for k in metrics.keys():
        outputs.append("{}: {:4f}".format(k, metrics[k] / epoch_samples))

    print("{}: {}".format(phase, ", ".join(outputs)))

def train_model(model, optimizer, scheduler, num_epochs=25):
    
    # 학습을 시작하기 전, 초기화를 해줍니다.
    best_model_wts = copy.deepcopy(model.state_dict())
    best_loss = 1e10

    for epoch in range(num_epochs):
        print('Epoch {}/{}'.format(epoch, num_epochs - 1))
        print('-' * 10)

        since = time.time()

        # 각 epoch은 training and validation phase가 존재합니다.
        for phase in ['train', 'val']:
            # Train Phase
            if phase == 'train':
                model.train()  # model을 training mode로 설정합니다.
                scheduler.step()
            else:
                model.eval()   # 을model to evaluate mode로 설정합니다.

            metrics = defaultdict(float)
            epoch_samples = 0

            # 본격적인 학습이 시작됩니다.
            for inputs, labels in dataloaders[phase]:
              # 각 데이터를 불러온 후 GPU로 이동해야 합니다.
              inputs, labels = inputs.to(device), labels.to(device)

              # gradient를 0으로 초기화합니다.
              optimizer.zero_grad()

                # forward
                # training phase 때만 history를 추적합니다.
              with torch.set_grad_enabled(phase == 'train'):
                  outputs = model(inputs)
                  loss = calc_loss(outputs, labels, metrics)

                  # training phase 때만 backward + optimize
                  if phase == 'train':
                      loss.backward()
                      optimizer.step()

                # statistics
              epoch_samples += inputs.size(0)

            print_metrics(metrics, epoch_samples, phase)
            epoch_loss = metrics['loss'] / epoch_samples
            
            # model을 deep copy 합니다.
            if phase == 'val' and epoch_loss < best_loss:
                print("saving best model")
                best_loss = epoch_loss
                best_model_wts = copy.deepcopy(model.state_dict())

        time_elapsed = time.time() - since
        print('{:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))

    print('Best val loss: {:4f}'.format(best_loss))

    # best model weights를 load 합니다.
    model.load_state_dict(best_model_wts)
    return model


## Training


In [None]:
import torch
import torch.optim as optim
from torch.optim import lr_scheduler
import time
import copy

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

num_class = 6

model = UNet(num_class).to(device)

# 모든 parameter들이 Optimize되는걸 관측합니다.
optimizer_ft = optim.Adam(model.parameters(), lr=1e-4)

# Learning Rate를 스케줄링해주는 스케줄러입니다. (학습초반에는 크게, 점점 작게.)
exp_lr_scheduler = lr_scheduler.StepLR(optimizer_ft, step_size=25, gamma=0.1)

# Train을 시작합니다.
model = train_model(model, optimizer_ft, exp_lr_scheduler, num_epochs=40)

## Run the Model
Training 후 학습된 모델을 이용해서 prediction을 진행합니다.

In [None]:
import math

model.eval()   # Set model to the evaluation mode

# Get the first batch
inputs, labels = next(iter(dataloaders['test']))
inputs, labels = inputs.to(device),labels.to(device) 

# Predict
pred = model(inputs)
# The loss functions include the sigmoid function.
pred = torch.sigmoid(pred)
pred = pred.data.cpu().numpy()

# Change channel-order and make 3 channels for matplot
input_images_rgb = [reverse_transform(x) for x in inputs.cpu()]

# Map each channel (i.e. class) to each color
target_masks_rgb = [helper.masks_to_colorimg(x) for x in labels.cpu().numpy()]
pred_rgb = [helper.masks_to_colorimg(x) for x in pred]

In [None]:
helper.plot_side_by_side([input_images_rgb, target_masks_rgb, pred_rgb])

> # 수고하셨습니다!