# Image Segmentation with U-Net

이번 시간에는 U-Net 모델을 이용하여 image segmentation 작업을 진행해보자.

Semantic image segmenataion은 pixel수준으로 이미지의 레이블을 예측하는 문제를 지칭합니다. 다시 말해, 단순히 물체가 이미지에 존재하는지를 예측하는 것이 아니라, 각 픽셀이 어떤 클래스에 속하는지를 파악합니다.

Segmentation은 Object detection과 유사하게 "주어진 이미지에 어떤 물체가 존재하고 어디에 위치하는가?"라는 질문에 답합니다. 그러나 object detection은 물체를 bounding box로 감싸기 때문에, 박스 내에 물체가 아닌 픽셀도 포함될 수 있습니다. 

반면에, semantic image segmentation은 픽셀 단위로 정확한 물체의 마스크를 얻을 수 있어 더 세밀한 정보를 제공합니다.

<img src="resources/carseg.png" style="width:500px;height:250;">

In [None]:
import numpy as np

import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader
from torchvision import datasets, models
import torchvision.transforms.v2 as transforms

import wandb
from tqdm import tqdm
import matplotlib.pyplot as plt
from PIL import Image

from training_utilities import create_dataloaders, train_loop, calculate_pixel_accuracy, AverageMeter, save_checkpoint, load_checkpoint

In [None]:
data_root_dir = '/datasets'

train_transform = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225])
])

voc_train_dataset = datasets.VOCSegmentation(root=data_root_dir, year='2012', image_set='train', download=False,
                                          transform = train_transform)

먼저 Pascal VOC 2012 Segmentation 데이터셋을 불러오자.

이 dataset은 RGB 이미지와 이에 대응되는 mask를 리턴합니다.
mask에는 픽셀 수준으로 class를 지칭하는 정수값이 들어있습니다.

- 0 : background
- 255 : 'void' or unlabelled.
- 1~20 : 20 classes


한편, 이미지의 크기를 보면 transform이 image에만 적용되고 target에는 적용되지 않은 것을 확인할 수 있습니다.

target에도 transform을 적용하려면 새로운 데이터셋과 custom transform함수 정의가 필요합니다.

In [None]:
idx = 0
image, target = voc_train_dataset[idx]
mask_palette = target.getpalette()

print(f"Image.shape = {image.shape}, target.shape = {np.array(target).shape}")
print(f"Target values unique : {np.unique(target)}")

In [None]:
import random

def mask_tensor_to_pil(mask):
    mask_np = mask.numpy().astype(np.uint8)
    mask_pil = Image.fromarray(mask_np, mode='P')
    mask_pil.putpalette(mask_palette)
    return(mask_pil)

def visualize_samples(dataset, cols=4, rows=3, select_random = True):
    """
    Visualize a few samples from the VOCSegmentation dataset, showing both the input image and its corresponding label (segmentation mask).

    Parameters:
        dataset: A dataset object, e.g., VOCSegmentation, where each item is a tuple (image, label).
        cols (int): Number of columns in the visualization grid.
        rows (int): Number of rows in the visualization grid.
    """
    figure, ax = plt.subplots(nrows=rows, ncols=cols * 2, figsize=(12, 6))
    ax = ax.flatten()

    if select_random:
        indices = random.sample(range(len(dataset)), cols * rows)
    else:
        indices = range(cols * rows)
    
    
    for i, idx in enumerate(indices):
        # Get the image and label (segmentation mask)
        img, mask = dataset[idx]

        # unnormalize image
        mean = torch.tensor([0.485, 0.456, 0.406]).to(img.device).view(-1, 1, 1)
        std = torch.tensor([0.229, 0.224, 0.225]).to(img.device).view(-1, 1, 1)
        img = img * std + mean
        

        # Display the image
        ax[2 * i].imshow(img.numpy().transpose((1, 2, 0)))
        ax[2 * i].set_title(f"Image {i+1}")
        ax[2 * i].axis("off")

        # Display the segmentation mask (assuming it's a single-channel mask)
        if isinstance(mask, torch.Tensor):
            mask = mask_tensor_to_pil(mask)

        ax[2 * i + 1].imshow(mask, cmap="gray")
        ax[2 * i + 1].set_title(f"Label {i+1}")
        ax[2 * i + 1].axis("off")

    plt.tight_layout()
    plt.show()

In [None]:
visualize_samples(voc_train_dataset, cols=2, rows=3, select_random = True)

이미지 분류(classification) 문제와는 달리, segmentation 문제에서는 이미지와 타겟 마스크에 동일한 변환(transform)을 적용해야 합니다. 

예를 들어, 이미지에 수평 반전을 적용했다면, 타겟 마스크에도 동일한 변환을 수행하여 이미지와 레이블이 일치하도록 해야 합니다.

또한, 데이터 증강(data augmentation)이 랜덤하게 수행되는 경우에도 이미지와 마스크에 동일한 랜덤 변환이 적용되도록 설정해야 합니다. 이렇게 해야만 데이터의 일관성을 유지할 수 있습니다.

새로운 데이터셋 `UNetDataset`은 `VOCSegmentation`데이터셋에서 `image`와 `target`을 읽어와 동시에 `transforms`함수에 전달한다.

In [None]:
class UNetDataset(Dataset):
    def __init__(self, voc_dataset, transforms=None):
        self.dataset = voc_dataset
        self.transforms = transforms

        self.classes = ["background", "aeroplane", "bicycle", "bird", "boat", "bottle",
                        "bus", "car", "cat", "chair", "cow", "diningtable",
                        "dog", "horse", "motorbike", "person", "pottedplant",
                        "sheep", "sofa", "train", "tvmonitor"]


    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, idx):
        image, target = self.dataset[idx]

        if self.transforms:
            image, target = self.transforms(image, target)

        return image, target


Custom transform함수를 정의하기 위해서는 `__init__` 과 `__call__`을 메서드를 포함한 호출 가능한(callable) 클래스를 만들면 됩니다.

아래 코드는 transform함수 호출시(`__call__`) 전달받은 `img`와 `mask`에 동일한 변환을 수행하는 transform함수의 예시입니다.

1. JointResize: 이미지와 타겟 마스크의 크기를 모두 변경합니다.
   - 마스크는 레이블 손실이 없도록 최근접 이웃 보간 (NEAREST interpolation)을 사용합니다.
2. JointToTensor: 이미지를 텐서로 변환하고, 마스크를 정수형 텐서로 변환합니다.
   - 이미지를 0과 1 사이의 값을 가지는 텐서로 변환하고, 마스크는 레이블 정보를 가지는 정수형 텐서로 변환합니다.
3. JointNormalize: 이미지를 주어진 평균과 표준편차로 정규화합니다. 마스크는 변형하지 않습니다.

In [None]:
import torchvision.transforms.functional as F

class JointResize(object):
    """Resize both image and target mask to the given size."""
    def __init__(self, size):
        self.size = size

    def __call__(self, img, mask):
        img = F.resize(img, self.size)
        mask = F.resize(mask, self.size, interpolation=transforms.functional.InterpolationMode.NEAREST)
        return img, mask

class JointToTensor(object):
    """Convert PIL image to tensor and mask to integer tensor"""
    def __call__(self, img, mask):
        img = F.to_tensor(img)  # Image is converted to a floating-point tensor
        mask = torch.as_tensor(np.array(mask), dtype = torch.long)  # Mask is converted to integer tensor
        return img, mask
    
class JointNormalize(object):
    """Normalize only the image, not the target mask."""
    def __init__(self, mean, std):
        self.normalize = transforms.Normalize(mean=mean, std=std)

    def __call__(self, img, mask):
        img = self.normalize(img)
        return img, mask

`JointRandomRotation` 같이 변환이 랜덤하게 수행되는 경우에도 이미지와 마스크에 동일한 변환이 적용되도록 설정하여 데이터의 일관성을 보장할 수 있습니다.

### <mark>실습</mark> JointRandomHorizontalFlip
`F.hflip`함수([docs](https://pytorch.org/vision/main/generated/torchvision.transforms.functional.hflip.html))와 `torch.rand(1).item()` 랜덤 값을 이용하여 랜덤 값이 `self.p`보다 <u>작으면</u> horizontal flip을 수행하는 Transform함수 `JointRandomHorizontalFlip`를 완성하세요.

In [None]:
class JointRandomRotation(object):
    """Randomly rotate both image and target mask by an angle within a given range."""
    def __init__(self, degrees=(-10, 10)):
        self.degrees = degrees

    def __call__(self, img, mask):
        angle = (torch.rand(1).item() * (self.degrees[1] - self.degrees[0])) + self.degrees[0]
        img = F.rotate(img, angle)
        mask = F.rotate(mask, angle, interpolation=F.InterpolationMode.NEAREST, fill = 255)
        return img, mask
    
class JointRandomHorizontalFlip(object):
    """Randomly flip both image and target mask horizontally with a given probability."""
    def __init__(self, p=0.5):
        self.p = p

    def __call__(self, img, mask):
        ##### YOUR CODE START #####

        ##### YOUR CODE END #####
        return img, mask
    
class JointRandomCrop(object):
    """Randomly crop both image and target mask to the specified size."""
    def __init__(self, size, pad_fill_value=255):
        self.size = size
        self.fill_value = pad_fill_value #  Fill padding value (255 for void class)
        self.padding = True

    def __call__(self, img, mask):
        img_w, img_h = img.size
        crop_w, crop_h = self.size

        if self.padding and (img_w < crop_w or img_h < crop_h):
            padding = [0, 0, max(0, crop_w - img_w), max(0, crop_h - img_h)]
            img = F.pad(img, padding)
            mask = F.pad(mask, padding, fill=self.fill_value)

        #i, j, h, w = transforms.RandomCrop.get_params(img, output_size=self.size)
        i = torch.randint(0, max(0, img_h - crop_h) + 1, (1,)).item()
        j = torch.randint(0, max(0, img_w - crop_w) + 1, (1,)).item()
        img = F.crop(img, i, j, crop_h, crop_w)
        mask = F.crop(mask, i, j, crop_h, crop_w)
        return img, mask

class JointRandomRescale(object):
    """Randomly rescale both image and target mask by a factor."""
    def __init__(self, scale_range=(0.5, 2.0)):
        self.scale_range = scale_range

    def __call__(self, img, mask):
        scale_factor = torch.rand(1).item() * (self.scale_range[1] - self.scale_range[0]) + self.scale_range[0]
        img_w, img_h = img.size
        new_w, new_h = int(img_w * scale_factor), int(img_h * scale_factor)

        img = F.resize(img, (new_h, new_w))
        mask = F.resize(mask, (new_h, new_w), interpolation=F.InterpolationMode.NEAREST) 
        return img, mask

In [None]:
def load_VOC_Segmentation_datasets(data_root_dir):
    normalize = JointNormalize(mean=[0.485, 0.456, 0.406],
                               std=[0.229, 0.224, 0.225])
    
    train_transforms = transforms.Compose([
        JointRandomHorizontalFlip(p=0.5),
        JointRandomRotation(degrees = (-10, 10)),
        JointResize((256, 256)),
        JointRandomRescale(scale_range = (0.8, 1.2)),
        JointRandomCrop(size=(224, 224)),
        JointResize((224, 224)),
        JointToTensor(),
        normalize
    ])

    test_transforms = transforms.Compose([
        JointResize((224, 224)),
        JointToTensor(),
        normalize
    ])
    
    voc_train_dataset = datasets.VOCSegmentation(root=data_root_dir, year='2012', image_set='train', 
                                                 download=False, transform = None)
    voc_test_dataset = datasets.VOCSegmentation(root=data_root_dir, year='2012', image_set='val', 
                                                download=False, transform = None)
    
    train_dataset = UNetDataset(voc_train_dataset, transforms=train_transforms)
    test_dataset = UNetDataset(voc_test_dataset, transforms=test_transforms)


    return train_dataset, test_dataset


In [None]:
train_dataset, test_dataset = load_VOC_Segmentation_datasets(data_root_dir)

X, y = train_dataset[0]
print(f"Image shape : {X.shape}")
print(f"Mask shape: {y.shape}")
print(f"Mask values unique {y.unique()}\n")

print(f"Dataset size: Train {len(train_dataset)}, Test {len(test_dataset)}")

In [None]:
visualize_samples(train_dataset, cols = 2, select_random= False)

## U-Net Architecture

U-Net 아키텍처는 그 구조가 U자형이라서 붙여진 이름으로, 2015년 종양 검출을 위해 처음 제안된 이 모델은 현재까지도 다양한 semantic segmentation 작업에 널리 사용되고 있습니다.

U-Net은 기존의 Convolutional Network에서 마지막 fully connected 레이어를 transposed convolution 레이어로 대체하여, feature map의 업샘플링(upsampling을)을 수행합니다. 이 과정을 통해 feature map을 원본 이미지의 크기로 다시 확대할 수 있습니다. 

하지만 Convolutional Network의 마지막 feature map은 많은 공간적 정보를 이미 많이 잃어버린 상태입니다. 단순히 업샘플링만 한다면 세부적인 segmentation 결과를 얻기 어렵습니다. 

이를 보완하기 위해, U-Net은 입력 이미지에 대해 진행된 각 conv 연산 수와 동일한 수의 transposed convolution을 수행하고, skip connection을 사용하여 다운샘플링 과정에서의 feature map 정보를 업샘플링 레이어에 전달합니다. 이 방식은 이미지의 세부 정보를 보존하고, 더 정확한 분할 결과를 제공합니다.

<img src="resources/unet.png" style="width:700px;height:400;">


### <mark>실습</mark> Encoder (Downsampling Block) 

Encoder에서 이미지는 convolutional layer를 거치면서 높이와 너비가 감소하고 채널 수는 증가하게 됩니다.

Encoder는 두개의 [Conv2d, BatchNorm2d, ReLU]로 이루어진 `DoubleConv`와 `MaxPool2d`를 쌓아서 만듭니다.

1. `DoubleConv`는 아래와 같이 구성되어 있다.
    - Conv2d: `out_channels`개의 3x3 필터와 bias = False. padding = 1로 하여 이미지의 크기를 유지한다
    - BatchNorm2d
    - ReLU
    - Conv2d: 위와 동일
    - BatchNorm2d
    - ReLU

2. `Down` 블럭을 완성하세요
    - MaxPool2d: 2x2 kernel with stride 2
    - DoubleConv
    - <u>if `dropout_prob` > 0</u>, add [nn.Dropout2d](https://pytorch.org/docs/stable/generated/torch.nn.Dropout2d.html) layer with p = `dropout_prob`
    - `nn.Sequential`은 여러 레이어들을 순차적으로 실행하도록 묶는 방법입니다. `*layers`는 리스트로 정의된 레이어들을 nn.Sequential에 개별적으로 전달합니다. 즉, `nn.Sequential(*layers)`는 layers 리스트에 있는 레이어들이 순서대로 실행되도록 설정합니다.

In [None]:
class DoubleConv(nn.Module):
    """(convolution => BN => ReLU) * 2"""

    def __init__(self, in_channels, out_channels):
        super().__init__()

        self.double_conv = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1, bias=False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1, bias=False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True)
        )

    def forward(self, x):
        out = self.double_conv(x)
        return out


class Down(nn.Module):
    """Downscaling with 2x2 maxpool then DoubleConv"""

    def __init__(self, in_channels, out_channels, dropout_prob = .0):
        super().__init__()

        ##### YOUR CODE START #####

        ##### YOUR CODE END #####
      
        self.maxpool_conv = nn.Sequential(*layers)

    def forward(self, x):
        out = self.maxpool_conv(x)
        return out

In [None]:
down_block = Down(in_channels=64, out_channels=128, dropout_prob=0.3)

input = torch.randn(16, 64, 32, 32) 
output = down_block(input)

print("Input shape:", input.shape)
print("Output shape:", output.shape)

assert output.shape == (16, 128, 16, 16)

### <mark>실습</mark> Decoder

팽창 단계에서는 수축단계와 반대로 이미지의 크기를 다시 원본 이미지의 크기로 키우며 채널 수를 점차 줄인다.

먼저 transposed convolution을 이용하여 upsampling을 수행한 뒤 encoder block에서의 출력과 합쳐(concatenate), `DoubleConv`를 수행합니다.

Arguments:
- `upsampling_input`: Decoder block의 이전 레이어에서의 입력
- `skip_connection` Encoder block으로 부터 오는 입력

Steps:
- [nn.ConvTranspose2d](https://pytorch.org/docs/stable/generated/torch.nn.ConvTranspose2d.html): kernel size 2x2 and stride 2. 채널 수는 절반으로 줄어든다
- skip connections: `skip_connection`와 ConvTranspose2를 거친 `upsampling_input`를 concatenation한다. 일반적으로 concat 순서는 상관없지만 코드 테스트을 위해 <u>[`skip_connection`, `upsampling_input`]의 순서로</u> concat할 것.
- DoubleConv with output channels `out_channels`

(참고) 만약 `skip_connection`과 `upsampling_input`의 공간 차원이 맞지 않으면 둘중 하나를 잘라내거나(crop) padding을 붙여넣어 차원을 맞춰준다.

In [None]:
class Up(nn.Module):
    """Upscaling then double conv"""

    def __init__(self, in_channels, out_channels):
        super().__init__()
        ##### YOUR CODE START #####

        ##### YOUR CODE END #####

    def forward(self, upsampling_input, skip_connection):
        ##### YOUR CODE START #####

        ##### YOUR CODE END #####
        return out

In [None]:
up_block = Up(in_channels=128, out_channels=64)

upsampling_input = torch.randn(16, 128, 32, 32) 
skip_connection = torch.randn(16, 64, 64, 64)
output = up_block(upsampling_input, skip_connection)

print("Input shape:", input.shape)
print("Output shape:", output.shape)

assert output.shape == (16, 64, 64, 64)

### <mark>실습</mark> U-Net

<img src="resources/unet.png" style="width:700px;height:400;">

위 이미지를 참고하여 `UNet`모델을 완성하세요
- 마지막 레이어에서는 1x1 convolution을 이용하여 feature vector를 class수 `num_classes`로 매핑합니다


In [None]:
class UNet(nn.Module):
    def __init__(self, in_channels, num_classes):
        super().__init__()
        self.in_channels = in_channels
        self.num_classes = num_classes

        ##### YOUR CODE START #####

        ##### YOUR CODE END #####


    def forward(self, x):
        ##### YOUR CODE START #####

        ##### YOUR CODE END #####
        return logits

In [None]:
# unit test
model = UNet(in_channels = 3, num_classes = 21)
assert model(torch.randn(4, 3, 224, 224)).shape == torch.Size((4, 21, 224, 224)), "output shape does not match"
assert sum(p.numel() for p in model.parameters()) == 31038933, "Number of model parameter does not match"

print("\033[92m All test passed!")

In [None]:
def get_model(model_name, num_classes, config):
    if model_name == "UNet":
        model = UNet(in_channels = 3, num_classes = num_classes)
    else:
        raise Exception("Model not supported: {}".format(model_name))
    
    total_params = sum(p.numel() for p in model.parameters())
    trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)

    print(f"Using model {model_name} with {total_params} parameters ({trainable_params} trainable)")

    return model

## <mark>실습</mark> mIoU

Sementic segmentation 평가를 위해서는 주로 mIoU (mean IoU)를 사용한다.

클래스 $c$ 에 대한 IoU값은 다음과 같이 주어진다
$$ IoU(c) = \frac{Intersection(c)}{Union(c)} = \frac{TP_c}{TP_c + FP_c + FN_c} $$


- $TP_c$(True Positives): class $c$로 옳바르게 예측한 픽셀의 수
- $FC_c$ (False Positives): 실제로 다른 class에 속하지만 class $c$로 틀리게 예측된 픽셀의 수
- $FN_c$(False Negatives) : 실제로 class $c$에 속하지만 다른 class에 속하는 것으로 예측된 핅셀의 수
- $Intersection(c)$: 예측과 ground truth가 모두 $c$인 픽셀의 수.
- $Union(c)$: 예측과 ground truth 둘중 하나가 $c$인 픽셀의 수.
- (참고) IoU값은 Jaccard Index와 같은 값임

만약 IoU가 1이면 예측과 실제(ground truth) mask가 완전히 동일한 것을 의미한다.

IoU값을 각각의 class에 대해서 계산한 뒤 이에 대한 평균을 계산하여 mIoU값을 계산한다 

mean IoU (mIoU) over $𝑁$ classes:
$$mIoU = \frac{1}{|C_{valid}|}\sum_{c \in C_{valid}}{IoU(c)}$$

- $C_{valid}$: $Union(c) > 0$ 만족하는 class들의 집합니다 (예측 혹은 ground truth 둘중 하나에 해당 class가 나타남을 의미)


위 정의를 참고하여 함수 `calculate_mIoU`를 완성하세요
- 먼저 `output` 텐서로 부터 예측된 class index를 얻는다.

In [None]:
def calculate_mIoU(output, target, num_classes):
    
    _, preds = torch.max(output, dim=1)

    iou_list = []
    for cls in range(num_classes):
        pred_mask = (preds == cls)
        target_mask = (target == cls)
        
        ##### YOUR CODE START #####
        intersection = None #TODO
        union = None #TODO
        ##### YOUR CODE END #####

        if union != 0:
            iou = intersection / union
            iou_list.append(iou)
    
    return sum(iou_list) / len(iou_list) if iou_list else 0


In [None]:
def evaluation_loop(model, device, dataloader, criterion, epoch = 0, phase = "validation"):
    loss_meter = AverageMeter('Loss', ':.4e')
    pixel_acc_meter = AverageMeter('Pixel_Acc', ':6.2f')
    mIoU_meter = AverageMeter('mIoU', ':6.4f')
    metrics_list = [loss_meter, pixel_acc_meter, mIoU_meter]

    model.eval() # switch to evaluate mode

    with torch.no_grad():
        tqdm_val = tqdm(dataloader, desc='Validation/Test', total=len(dataloader))
        for images, target in tqdm_val:
            images = images.to(device, non_blocking=True)
            target = target.to(device, non_blocking=True)

            output = model(images)
            loss = criterion(output, target)

            # calculate metrics
            pixel_acc = calculate_pixel_accuracy(output, target)
            mIoU = calculate_mIoU(output, target, model.num_classes)
            
            # Update the AverageMeters
            loss_meter.update(loss.item(), images.size(0))
            pixel_acc_meter.update(pixel_acc, images.size(0))
            mIoU_meter.update(mIoU, images.size(0))

            tqdm_val.set_postfix(avg_metrics = ", ".join([str(x) for x in metrics_list]))

        tqdm_val.close()

    wandb.log({
        "epoch" : epoch,
        f"{phase.capitalize()} Loss": loss_meter.avg, 
        f"{phase.capitalize()} Pixel Acc": pixel_acc_meter.avg,
        f"{phase.capitalize()} mIoU": mIoU_meter.avg,
    })

    return mIoU_meter.avg

## Training (모델 학습)

### Ignoring Unlabelled Pixels (Index 255)
라벨링 되지 않은 pixel들을 Loss계산에서 제외하기 위해 CrossEntropyLoss의 `ignore_index` argument를 이용한다
```
nn.CrossEntropyLoss(ignore_index=255)
```

In [None]:
def train_main(config):
    ## data and preprocessing settings
    data_root_dir = config['data_root_dir']
    num_worker = config.get('num_worker', 4)

    ## Hyper parameters
    batch_size = config['batch_size']
    learning_rate = config['learning_rate']
    start_epoch = config.get('start_epoch', 0)
    num_epochs = config['num_epochs']
    eval_interval = config.get('eval_interval', 10)


    ## checkpoint setting
    checkpoint_path = config.get('checkpoint_path', "checkpoints/checkpoint.pth")
    best_model_path = config.get('best_model_path', "checkpoints/best_model.pth")
    load_from_checkpoint = config.get('load_from_checkpoint', None)

    ## variables
    best_metric = 0

    wandb.init(
        project=config["wandb_project_name"],
        config=config
    )

    device = "cuda" if torch.cuda.is_available() else "mps" if torch.backends.mps.is_available() else "cpu"
    print(f"Using {device} device")

    train_dataset, test_dataset = load_VOC_Segmentation_datasets(data_root_dir)
    num_classes = len(train_dataset.classes)
    
    train_dataloader, test_dataloader = create_dataloaders(train_dataset, test_dataset, device, 
                                                           batch_size = batch_size, num_worker = num_worker)


    
    model = get_model(model_name = config["model_name"], num_classes= num_classes, config = config).to(device)

    criterion = nn.CrossEntropyLoss(ignore_index=255)
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
    scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=50, gamma=0.1) 

    if load_from_checkpoint:
        load_checkpoint_path = (best_model_path if load_from_checkpoint == "best" else checkpoint_path)
        start_epoch, best_metric = load_checkpoint(load_checkpoint_path, model, optimizer, scheduler, device)

    if config.get('test_mode', False):
        # Only evaluate on the test dataset
        print("Running test evaluation...")
        test_metric = evaluation_loop(model, device, test_dataloader, criterion, phase = "test")
        print(f"Test metric (mIoU): {test_metric}")
        
    else:
        # Train and validate using train/val datasets
        for epoch in range(start_epoch, num_epochs):
            train_loop(model, device, train_dataloader, criterion, optimizer, epoch)


            if (epoch + 1) % eval_interval == 0 or (epoch + 1) == num_epochs:
                test_metric = evaluation_loop(model, device, test_dataloader, criterion, epoch = epoch, phase = "validation")

                is_best = test_metric > best_metric
                best_metric = max(test_metric, best_metric)
                save_checkpoint(checkpoint_path, model, optimizer, scheduler, epoch, best_metric, is_best, best_model_path)

            scheduler.step()


    wandb.finish()


In [None]:
config = {
    'data_root_dir': '/datasets',
    'batch_size': 16,
    'learning_rate': 1e-3,
    'model_name': 'UNet',
    'num_epochs': 150,
    "eval_interval" : 10,

    "dataset": "VOC2012",
    'wandb_project_name': 'UNet',

    "checkpoint_path" : "checkpoints/checkpoint.pth",
    "best_model_path" : "checkpoints/best_model.pth",
    "load_from_checkpoint" : None,    # Options: "latest", "best", or None
}

In [None]:
train_main(config)

## Visualize your model's prediction

In [None]:
def visualize_prediction(X, y, y_pred):
    """
    Visualize a few samples from the VOCSegmentation dataset, showing both the input image and its corresponding label (segmentation mask).

    Parameters:
        dataset: A dataset object, e.g., VOCSegmentation, where each item is a tuple (image, label).
        cols (int): Number of columns in the visualization grid.
        rows (int): Number of rows in the visualization grid.
    """
    figure, ax = plt.subplots(nrows=X.shape[0], ncols=3, figsize=(12, X.shape[0] * 3))
    ax = ax.flatten()

    for i, idx in enumerate(range(X.shape[0])):
        # Get the image and label (segmentation mask)
        img, mask, mask_pred = X[i], y[i], y_pred[i]

        # unnormalize image
        mean = torch.tensor([0.485, 0.456, 0.406]).to(img.device).view(-1, 1, 1)
        std = torch.tensor([0.229, 0.224, 0.225]).to(img.device).view(-1, 1, 1)
        img = img * std + mean
        

        # Display the image
        ax[3 * i].imshow(img.numpy().transpose((1, 2, 0)))
        ax[3 * i].set_title(f"Image {i+1}")
        ax[3 * i].axis("off")

        # Display the segmentation mask (assuming it's a single-channel mask)
        mask = mask_tensor_to_pil(mask)
        mask_pred = mask_tensor_to_pil(mask_pred)
            
        ax[3 * i + 1].imshow(mask, cmap="gray")
        ax[3 * i + 1].set_title(f"Label {i+1}")
        ax[3 * i + 1].axis("off")

        ax[3 * i + 2].imshow(mask_pred, cmap="gray")
        ax[3 * i + 2].set_title(f"Prediction {i+1}")
        ax[3 * i + 2].axis("off")

    plt.tight_layout()
    plt.show()

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using {device} device")

train_dataset, test_dataset = load_VOC_Segmentation_datasets(config['data_root_dir'])

num_classes = len(train_dataset.classes)
train_dataloader, test_dataloader = create_dataloaders(train_dataset, test_dataset, device, 
                                                       batch_size = 16, num_worker = 4)
model = get_model(model_name = config["model_name"], num_classes= num_classes, config = config).to(device)


model_checkpoint_path = config["best_model_path"]
checkpoint = torch.load(model_checkpoint_path, map_location=device)
model.load_state_dict(checkpoint['state_dict'])
print(f"=> loaded checkpoint '{model_checkpoint_path}' with mIoU {checkpoint['best_metric']} (epoch {checkpoint['epoch']})")

model.eval()

print("Model ready for inference")

In [None]:
images, targets = next(iter(test_dataloader))
images = images.to(device)

with torch.no_grad():
    outputs = model(images)
    _, preds = torch.max(outputs, dim=1)
visualize_prediction(images.cpu(), targets.cpu(), preds.cpu())

## optional 실습
mIoU성능 개선을 위한 다양한 실험을 시도해보세요.

## 정리
Lab을 마무리 짓기 전 저장된 checkpoint를 모두 지워 저장공간을 확보한다

In [None]:
import shutil, os
if os.path.exists('checkpoints/'):
    shutil.rmtree('checkpoints/')