<a href="https://colab.research.google.com/github/MinsooKwak/segmentation/blob/main/human_segmentation/human_segmentation_with_transfer_learning.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

- kaggle의 Human Segmentation Dataset을 활용
  - 링크 : https://www.kaggle.com/datasets/tapakah68/supervisely-filtered-segmentation-person-dataset/data
  - 사람에 대한 마스크 기반 segmentation

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import cv2

## 1. 데이터셋

In [3]:
data_dir = '/content/drive/MyDrive/DataSet/human_segmentation/'
data_df = pd.read_csv(os.path.join(data_dir, "df.csv"))
data_df.head()

Unnamed: 0.1,Unnamed: 0,images,masks,collages
0,0,images/ds10_pexels-photo-687782.png,masks/ds10_pexels-photo-687782.png,collage/ds10_pexels-photo-687782.jpg
1,1,images/ds10_pexels-photo-835971.png,masks/ds10_pexels-photo-835971.png,collage/ds10_pexels-photo-835971.jpg
2,2,images/ds10_pexels-photo-850708.png,masks/ds10_pexels-photo-850708.png,collage/ds10_pexels-photo-850708.jpg
3,3,images/ds10_pexels-photo-864937.png,masks/ds10_pexels-photo-864937.png,collage/ds10_pexels-photo-864937.jpg
4,4,images/ds10_pexels-photo-865908.png,masks/ds10_pexels-photo-865908.png,collage/ds10_pexels-photo-865908.jpg


In [4]:
df = data_df[['images','masks']]
df.head(3)

Unnamed: 0,images,masks
0,images/ds10_pexels-photo-687782.png,masks/ds10_pexels-photo-687782.png
1,images/ds10_pexels-photo-835971.png,masks/ds10_pexels-photo-835971.png
2,images/ds10_pexels-photo-850708.png,masks/ds10_pexels-photo-850708.png


### 데이터셋 구축 및 텐서 변환 모듈 작성

In [5]:
import torch

In [6]:
data_dir = '/content/drive/MyDrive/DataSet/human_segmentation/'
data_dir2 = "/content/drive/MyDrive/DataSet/human_segmentation_split"

- Transfer learning을 위해 일부 데이터셋만 가져와 구성
  - Image, Mask에 대해 train, val set 250씩 구성

In [7]:
!pip install split-folders[full]

Collecting split-folders[full]
  Downloading split_folders-0.5.1-py3-none-any.whl (8.4 kB)
Installing collected packages: split-folders
Successfully installed split-folders-0.5.1


In [8]:
import splitfolders

In [9]:
# 고정된 개수 데이터로 나눔 (train 250, val 250)
splitfolders.fixed(data_dir, output=data_dir2, seed=2024, fixed=(250,250))

Copying files: 8001 files [20:03,  6.65 files/s]


## Model

In [10]:
import torch
import torch.nn as nn

IMAGE_SIZE = 224

In [11]:
class human_dataset():
  def __init__(self,data_dir, phase, transformer=None):
    self.phase = phase
    self.images_dir = os.path.join(data_dir, phase, "images")
    self.masks_dir = os.path.join(data_dir, phase, "masks")
    self.image_files = [filename for filename in os.listdir(self.images_dir) if filename.endswith('png')]
    self.mask_files = [filename for filename in os.listdir(self.masks_dir) if filename.endswith('png')]
    assert len(self.image_files) == len(self.mask_files)

    self.transformer = transformer

  def __len__(self):
    return len(self.image_files)

  def __getitem__(self, index):
    image = cv2.imread(os.path.join(self.images_dir, self.image_files[index]))
    image = cv2.resize(image, dsize=(IMAGE_SIZE, IMAGE_SIZE), interpolation=cv2.INTER_LINEAR)
    mask = cv2.imread(os.path.join(self.masks_dir, self.mask_files[index]))
    mask = cv2.resize(mask, dsize=(IMAGE_SIZE, IMAGE_SIZE), interpolation=cv2.INTER_NEAREST)

    # 정리 (JPEG 같은 경우)
    mask[mask < 240] = 0
    mask[mask >=240] = 255
    mask = mask/255.

    mask_H, mask_W, mask_C = mask.shape
    background = np.ones(shape = (mask_H, mask_W))
    background[mask[...,0]!= 0] = 0
    background[mask[...,1]!= 0] = 0
    background[mask[...,2]!= 0] = 0

    mask = np.concatenate([np.expand_dims(background, axis=-1), mask], axis=-1)
    mask = np.argmax(mask, axis=-1, keepdims=False)

    if self.transformer:
      image = self.transformer(image)

    target = torch.from_numpy(mask).long()
    return image, target

In [12]:
data_dir2

'/content/drive/MyDrive/DataSet/human_segmentation_split'

In [14]:
dset = human_dataset(data_dir2, "train")

In [15]:
dset[0] # numpy 타입의 image data / tensor형의 target 값 전달

(array([[[ 40,  28,  28],
         [ 43,  30,  32],
         [ 37,  24,  26],
         ...,
         [180, 183, 168],
         [186, 189, 174],
         [186, 189, 174]],
 
        [[ 41,  28,  30],
         [ 39,  27,  28],
         [ 38,  25,  27],
         ...,
         [178, 181, 165],
         [183, 186, 171],
         [188, 191, 176]],
 
        [[ 44,  31,  33],
         [ 40,  27,  29],
         [ 43,  30,  32],
         ...,
         [177, 180, 164],
         [184, 187, 171],
         [189, 192, 177]],
 
        ...,
 
        [[103, 112, 125],
         [103, 114, 128],
         [102, 113, 127],
         ...,
         [132, 156, 154],
         [175, 187, 193],
         [156, 175, 188]],
 
        [[ 99, 108, 121],
         [102, 111, 124],
         [ 90,  99, 113],
         ...,
         [118, 138, 133],
         [162, 174, 184],
         [145, 164, 178]],
 
        [[101, 107, 120],
         [ 97, 107, 120],
         [ 97, 106, 120],
         ...,
         [ 74,  88,  87],
  

- transformer 작성

In [16]:
from torchvision import transforms

In [17]:
def build_transformer():
  transformer = transforms.Compose([
      transforms.ToTensor(),
      transforms.Normalize(mean=[0.485, 0.456, 0.406], std = [0.229, 0.224, 0.225])
  ])
  return transformer

In [18]:
transformer = build_transformer()
dset = human_dataset(data_dir = data_dir2, phase='train', transformer = transformer)

In [19]:
dset[0]

(tensor([[[-1.4329, -1.3815, -1.4843,  ...,  0.9646,  1.0673,  1.0673],
          [-1.4158, -1.4500, -1.4672,  ...,  0.9303,  1.0159,  1.1015],
          [-1.3644, -1.4329, -1.3815,  ...,  0.9132,  1.0331,  1.1187],
          ...,
          [-0.3541, -0.3541, -0.3712,  ...,  0.1426,  0.8789,  0.5536],
          [-0.4226, -0.3712, -0.5767,  ..., -0.0972,  0.6563,  0.3652],
          [-0.3883, -0.4568, -0.4568,  ..., -0.8507, -0.2171, -0.5082]],
 
         [[-1.5455, -1.5105, -1.6155,  ...,  1.1681,  1.2731,  1.2731],
          [-1.5455, -1.5630, -1.5980,  ...,  1.1331,  1.2206,  1.3081],
          [-1.4930, -1.5630, -1.5105,  ...,  1.1155,  1.2381,  1.3256],
          ...,
          [-0.0749, -0.0399, -0.0574,  ...,  0.6954,  1.2381,  1.0280],
          [-0.1450, -0.0924, -0.3025,  ...,  0.3803,  1.0105,  0.8354],
          [-0.1625, -0.1625, -0.1800,  ..., -0.4951,  0.1702, -0.0574]],
 
         [[-1.3164, -1.2467, -1.3513,  ...,  1.1237,  1.2282,  1.2282],
          [-1.2816, -1.3164,

- dataloader에서 데이터를 반복적으로 불러오기 위한 함수

In [20]:
def collate_fn(batch):
  images = []
  targets = []
  for a, b in batch:
    images.append(a)
    targets.append(b)
  images = torch.stack(images, dim=0)
  targets = torch.stack(targets, dim=0)
  return images, targets

In [21]:
from torch.utils.data import DataLoader

In [22]:
dloader = DataLoader(dset, batch_size=4, shuffle=True, collate_fn = collate_fn)

- 데이터로더 동작 확인

In [23]:
for index, batch in enumerate(dloader):
  images = batch[0]
  targets = batch[1]

  print(f'images shape : {images.shape}')
  print(f'targets shape : {targets.shape}')
  if index ==0:
    break

images shape : torch.Size([4, 3, 224, 224])
targets shape : torch.Size([4, 224, 224])


- 데이터로더 빌드

In [24]:
def build_dataloader(data_dir, batch_size=4):
  transformer = build_transformer()

  dataloaders = {}
  train_dataset = human_dataset(data_dir=data_dir2, phase='train', transformer=transformer)
  dataloaders['train'] = DataLoader(train_dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn)

  val_dataset = human_dataset(data_dir=data_dir2, phase='val', transformer=transformer)
  dataloaders['val'] = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn)
  return dataloaders

- 데이터로더 확인

In [25]:
dataloaders = build_dataloader(data_dir=data_dir2, batch_size=4)

for phase in ['train','val']:
  for index, batch in enumerate(dataloaders[phase]):
    images = batch[0]
    targets = batch[1]
    print(f'images shape : {images.shape}')
    print(f'masks shape : {targets.shape}')

    if index ==0:
      break

images shape : torch.Size([4, 3, 224, 224])
masks shape : torch.Size([4, 224, 224])
images shape : torch.Size([4, 3, 224, 224])
masks shape : torch.Size([4, 224, 224])


## VGG16 Backbone UNet architecture

In [26]:
import torch.nn as nn

In [27]:
def ConvLayer(in_channels, out_channels, kernel_size=3, padding=1):
  layers = nn.Sequential(
      nn.Conv2d(in_channels, out_channels, kernel_size= kernel_size, padding=padding),
      nn.BatchNorm2d(out_channels),
      nn.ReLU(inplace=True),

      nn.Conv2d(out_channels, out_channels, kernel_size=kernel_size, padding=padding),
      nn.BatchNorm2d(out_channels),
      nn.ReLU(inplace=True),
  )
  return layers

In [28]:
def UpConvLayer(in_channels, out_channels):
  layers = nn.Sequential(
      nn.ConvTranspose2d(in_channels, out_channels, kernel_size=2, stride=2),
      nn.BatchNorm2d(out_channels),
      nn.ReLU(inplace=True)
  )
  return layers

In [29]:
from torchvision import models

- 모델 구조 보고 어디까지 쓸지 먼저 파악

In [30]:
vgg16 = models.vgg16_bn(pretrained=False)



In [31]:
vgg16

VGG(
  (features): Sequential(
    (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU(inplace=True)
    (3): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (4): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (5): ReLU(inplace=True)
    (6): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (7): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (8): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (9): ReLU(inplace=True)
    (10): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (11): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (12): ReLU(inplace=True)
    (13): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (14): Conv2d(128, 256

- 0~43 까지가 feature extract 부분
- vgg16은 1~512까지 => 1024까지는 없어 구현 필요
  - 512까지 가져와 1024로 변환 필요 (0~34)

In [34]:
class Encoder(nn.Module):
  def __init__(self, pretrained):
    super().__init__()

    backbone = models.vgg16_bn(pretrained=pretrained).features
    self.conv_block1 = nn.Sequential(*backbone[:6])
    self.conv_block2 = nn.Sequential(*backbone[6:13])
    self.conv_block3 = nn.Sequential(*backbone[13:20])
    self.conv_block4 = nn.Sequential(*backbone[20:27])
    self.conv_block5 = nn.Sequential(*backbone[27:34],
                                     ConvLayer(512, 1024, kernel_size=1, padding=0)) # 채널 변환만 (kernel=1)

  def forward(self,x):
    encode_features = []        # skip-connection
    out = self.conv_block1(x)
    encode_features.append(out)

    out = self.conv_block2(out)
    encode_features.append(out)

    out = self.conv_block3(out)
    encode_features.append(out)

    out = self.conv_block4(out)
    encode_features.append(out)

    out = self.conv_block5(out)
    return out, encode_features

- encoder 잘 구현되었는지 확인

In [35]:
encoder = Encoder(pretrained=False)
x = torch.randn(1, 3, 224, 224)
out, ftrs = encoder(x)

In [36]:
for ftr in ftrs:
  print(ftr.shape)

print(out.shape)

torch.Size([1, 64, 224, 224])
torch.Size([1, 128, 112, 112])
torch.Size([1, 256, 56, 56])
torch.Size([1, 512, 28, 28])
torch.Size([1, 1024, 14, 14])


- 과정 설명
  - 224 크기의 input image가 첫번째 conv block 통과하면서 3->64 channel로 변환
  - maxpooling 거쳐서 resolution이 반으로 줄고 (112,112), feature map은 2배로 커짐 (64 -> 128)
  - maxpooling 거쳐서 resolution 또 반으로 줄고, feature map은 2배 증가
  - maxpooling 거쳐서 resolution 또 반으로 줄고, feature map은 2배 증가
- 맨 위 4가지가 skip-connection으로 전달되는 부분
- 마지막 feature는 decoder의 input으로 들어가는 부분  

- Decoder

In [40]:
class Decoder(nn.Module):
    def __init__(self):
        super().__init__()
        self.upconv_layer1 = UpConvLayer(in_channels=1024, out_channels=512)
        self.conv_block1 = ConvLayer(in_channels=512+512, out_channels=512)

        self.upconv_layer2 = UpConvLayer(in_channels=512, out_channels=256)
        self.conv_block2 = ConvLayer(in_channels=256+256, out_channels=256)

        self.upconv_layer3 = UpConvLayer(in_channels=256, out_channels=128)
        self.conv_block3 = ConvLayer(in_channels=128+128, out_channels=128)

        self.upconv_layer4 = UpConvLayer(in_channels=128, out_channels=64)
        self.conv_block4 = ConvLayer(in_channels=64+64, out_channels=64)

    def forward(self, x, encoder_features):
        out = self.upconv_layer1(x)
        out = torch.cat([out, encoder_features[-1]], dim=1)
        out = self.conv_block1(out)

        out = self.upconv_layer2(out)
        out = torch.cat([out, encoder_features[-2]], dim=1)
        out = self.conv_block2(out)

        out = self.upconv_layer3(out)
        out = torch.cat([out, encoder_features[-3]], dim=1)
        out = self.conv_block3(out)

        out = self.upconv_layer4(out)
        out = torch.cat([out, encoder_features[-4]], dim=1)
        out = self.conv_block4(out)
        return out

In [41]:
encoder = Encoder(pretrained=False)
decoder = Decoder()
x = torch.randn(1,3,224,224)
out, ftrs = encoder(x)
out = decoder(out, ftrs)

In [42]:
print(out.shape)

torch.Size([1, 64, 224, 224])


In [43]:
class UNet(nn.Module):
  def __init__(self, num_classes, pretrained):
    super().__init__()
    self.encoder = Encoder(pretrained=pretrained)
    self.decoder = Decoder()
    self.head = nn.Conv2d(64, num_classes, kernel_size=1, padding=0)  # out.shape = 64였음

  def forward(self, x):
    out, encode_features = self.encoder(x)
    out = self.decoder(out, encode_features)
    out = self.head(out)
    return out

In [44]:
model = UNet(num_classes=2, pretrained=False)
x = torch.randn(1,3,224,224)
out = model(x)



In [45]:
print(out.shape)

torch.Size([1, 2, 224, 224])


## Loss

- Dice coefficient는 segmentation에 많이 사용되는 지표

In [46]:
import torch.nn.functional as F

In [58]:
class UNet_metric():
    def __init__(self, num_classes):
        self.num_classes = num_classes
        self.CE_loss = nn.CrossEntropyLoss(reduction="mean")

    def __call__(self, pred, target):
        loss1 = self.CE_loss(pred, target)
        onehot_pred = F.one_hot(torch.argmax(pred, dim=1), num_classes=self.num_classes).permute(0, 3, 1, 2)
        onehot_target = F.one_hot(target, num_classes=self.num_classes).permute(0, 3, 1, 2)
        loss2 = self._get_dice_loss(onehot_pred, onehot_target)
        loss = loss1 + loss2

        dice_coefficient = self._get_batch_dice_coefficient(onehot_pred, onehot_target)
        return loss, dice_coefficient

    def _get_dice_coeffient(self, pred, target):
        set_inter = torch.dot(pred.reshape(-1).float(), target.reshape(-1).float())
        set_sum = pred.sum() + target.sum()
        if set_sum.item() == 0:
            set_sum = 2 * set_inter
        dice_coeff = (2 * set_inter) / (set_sum + 1e-9)
        return dice_coeff

    def _get_multiclass_dice_coefficient(self, pred, target):
        dice = 0
        for class_index in range(1, self.num_classes):
            dice += self._get_dice_coeffient(pred[class_index], target[class_index])
        return dice / (self.num_classes - 1)

    def _get_batch_dice_coefficient(self, pred, target):
        num_batch = pred.shape[0]
        dice = 0
        for batch_index in range(num_batch):
            dice += self._get_multiclass_dice_coefficient(pred[batch_index], target[batch_index])
        return dice / num_batch

    def _get_dice_loss(self, pred, target):
        return 1 - self._get_batch_dice_coefficient(pred, target)

In [59]:
dataloaders = build_dataloader(data_dir=data_dir2, batch_size=4)

for index, batch in enumerate(dataloaders['train']):
    images = batch[0]
    targets = batch[1]
    predictions = model(images)

    if index ==0:
      break

In [60]:
criterion = UNet_metric(num_classes=2)

- 학습 코드

In [61]:
def train_one_epoch(dataloaders, model, criterion, optimizer, device):
    losses = {}
    dice_coefficients = {}

    for phase in ["train", "val"]:
        running_loss = 0.0
        running_dice_coeff = 0.0

        if phase == "train":
            model.train()
        else:
            model.eval()

        for index, batch in enumerate(dataloaders[phase]):
            images = batch[0].to(device)
            targets = batch[1].to(device)

            with torch.set_grad_enabled(phase == "train"):
                predictions = model(images)
                loss, dice_coefficient = criterion(predictions, targets)

            if phase == "train":
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

            running_loss += loss.item()
            running_dice_coeff += dice_coefficient.item()

            if index == 10: # 10 index * mini_batch 데이터수 만큼 데이터를 한정
                break

        losses[phase] = running_loss / index
        dice_coefficients[phase] = running_dice_coeff / index

    return losses, dice_coefficients

## Weight Initialization과 Transfer Learning 비교

1.He initialize

In [62]:
def He_initialization(module):
    if isinstance(module, torch.nn.Conv2d):
        torch.nn.init.kaiming_normal_(module.weight) # He initialization
    elif isinstance(module, torch.nn.BatchNorm2d):
        module.weight.data.fill_(1.0)

In [63]:
NUM_CLASSES = 2
BATCH_SIZE = 12
DEVICE = torch.device('cuda' if torch.cuda.is_available() and is_cuda else 'cpu')

dataloaders = build_dataloader(data_dir, batch_size=BATCH_SIZE)
model = UNet(num_classes=NUM_CLASSES, pretrained=False)
model.apply(He_initialization)
model = model.to(DEVICE)
criterion = UNet_metric(num_classes=NUM_CLASSES)
optimizer = torch.optim.SGD(model.parameters(), lr=0.001, momentum=0.9)



In [None]:
num_epochs = 30

train_loss_def, train_dice_coefficient_def = [], []
val_loss_def, val_dice_coefficient_def = [], []

for epoch in range(num_epochs):
    losses, dice_coefficients = train_one_epoch(dataloaders, model, criterion, optimizer, DEVICE)
    train_loss_def.append(losses["train"])
    val_loss_def.append(losses["val"])
    train_dice_coefficient_def.append(dice_coefficients["train"])
    val_dice_coefficient_def.append(dice_coefficients["val"])

    print(f"{epoch}/{num_epochs} - Train loss: {losses['train']:.4f}, Val loss: {losses['val']:.4f}," + \
          f" Train dice: {dice_coefficients['train']:.4f}, Val dice: {dice_coefficients['val']:.4f}")

2. Weight Transfer pretrained on ImageNet

- 학습이 처음부터 선형적으로 개선

In [None]:
dataloaders = build_dataloader(data_dir, batch_size=BATCH_SIZE)
model = UNet(num_classes=NUM_CLASSES, pretrained=True)
model = model.to(DEVICE)
criterion = UNet_metric(num_classes=NUM_CLASSES)
optimizer = torch.optim.SGD(model.parameters(), lr=0.001, momentum=0.9)

In [None]:
num_epochs = 30

train_loss_prt, train_dice_coefficient_prt = [], []
val_loss_prt, val_dice_coefficient_prt = [], []

for epoch in range(num_epochs):
    losses, dice_coefficients = train_one_epoch(dataloaders, model, criterion, optimizer, DEVICE)
    train_loss_prt.append(losses["train"])
    val_loss_prt.append(losses["val"])
    train_dice_coefficient_prt.append(dice_coefficients["train"])
    val_dice_coefficient_prt.append(dice_coefficients["val"])

    print(f"{epoch}/{num_epochs} - Train loss: {losses['train']:.4f}, Val loss: {losses['val']:.4f}," + \
          f" Train dice: {dice_coefficients['train']:.4f}, Val dice: {dice_coefficients['val']:.4f}")

3. Weight Transfer with freezing encoder layer

In [None]:
dataloaders = build_dataloader(data_dir, batch_size=BATCH_SIZE)
model = UNet(num_classes=NUM_CLASSES, pretrained=True)
model.encoder.reques_grad_ = False # gradient update 꺼버림
model = model.to(DEVICE)
criterion = UNet_metric(num_classes=NUM_CLASSES)
# gradient update 켜진 wieght만 학습
optimizer = torch.optim.SGD(filter(lambda p: p.requires_grad, model.parameters()), lr=0.001, momentum=0.9)

In [None]:
num_epochs = 30

train_loss_frz, train_dice_coefficient_frz = [], []
val_loss_frz, val_dice_coefficient_frz = [], []

for epoch in range(num_epochs):
    losses, dice_coefficients = train_one_epoch(dataloaders, model, criterion, optimizer, DEVICE)
    train_loss_frz.append(losses["train"])
    val_loss_frz.append(losses["val"])
    train_dice_coefficient_frz.append(dice_coefficients["train"])
    val_dice_coefficient_frz.append(dice_coefficients["val"])

    print(f"{epoch}/{num_epochs} - Train loss: {losses['train']:.4f}, Val loss: {losses['val']:.4f}," + \
          f" Train dice: {dice_coefficients['train']:.4f}, Val dice: {dice_coefficients['val']:.4f}")