## Import

In [15]:
import os
import cv2
from PIL import Image
import pandas as pd
import numpy as np

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms

from tqdm import tqdm

#데이터 증강기법 활용을 위한 라이브러리 임포트
import albumentations as A
from albumentations.pytorch import ToTensorV2
from albumentations import (
    Compose, HorizontalFlip, Rotate, RandomBrightnessContrast,
    Resize, Normalize)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

cuda


## Utils

In [16]:
# RLE 인코딩 함수
def rle_encode(mask):
    pixels = mask.flatten()
    pixels = np.concatenate([[0], pixels, [0]])
    runs = np.where(pixels[1:] != pixels[:-1])[0] + 1
    runs[1::2] -= runs[::2]
    return ' '.join(str(x) for x in runs)

## Custom Dataset

In [17]:
class CustomDataset(Dataset):
    def __init__(self, csv_file, transform=None, infer=False):
        self.data = pd.read_csv(csv_file)
        self.transform = transform
        self.infer = infer

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img_path = self.data.iloc[idx, 1]
        image = cv2.imread(img_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        
        if self.infer:
            if self.transform:
                image = self.transform(image=image)['image']
            return image
        
        mask_path = self.data.iloc[idx, 2]
        mask = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)
        mask[mask == 255] = 12 #배경을 픽셀값 12로 간주

        if self.transform:
            augmented = self.transform(image=image, mask=mask)
            image = augmented['image']
            mask = augmented['mask']

        return image, mask

## Data Loader

In [29]:
transform = Compose([
    Resize(224, 224),  # 이미지 크기 조정
    HorizontalFlip(p=0.5),  # 50%의 확률로 좌우 뒤집기
    Rotate(limit=10, p=0.3),  # 최대 10도 회전, 30%의 확률로 적용
    RandomBrightnessContrast(p=0.2),  # 밝기와 대비 조절, 20%의 확률로 적용
    Normalize(),  # 이미지 정규화
    ToTensorV2()  # 텐서 형식으로 변환
])

dataset = CustomDataset(csv_file='/home/work/CPS_Project/Samsung AI-Challenge/open/train_source.csv', transform=transform)
dataloader = DataLoader(dataset, batch_size=16, shuffle=True, num_workers=4)

Vdataset = CustomDataset(csv_file='/home/work/CPS_Project/Samsung AI-Challenge/open/val_source.csv', transform=transform)
Vdataloader = DataLoader(dataset, batch_size=1, shuffle=True, num_workers=4)

In [30]:
for images, masks in dataloader:
    print(images.shape, masks.shape)
    break
for images, masks in Vdataloader:
    print(images.shape, masks.shape)
    break

torch.Size([16, 3, 224, 224]) torch.Size([16, 224, 224])
torch.Size([1, 3, 224, 224]) torch.Size([1, 224, 224])


In [31]:
mask_path = "/home/work/CPS_Project/Samsung AI-Challenge/open/train_source_gt/TRAIN_SOURCE_0024.png"
mask = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)

unique_values = np.unique(mask)
print(unique_values)


[  0   1   2   3   4   6   7   8  11 255]


## Define Model

In [21]:
# U-Net의 기본 구성 요소인 Double Convolution Block을 정의합니다.
def double_conv(in_channels, out_channels):
    return nn.Sequential(
        nn.Conv2d(in_channels, out_channels, 3, padding=1),
        nn.ReLU(inplace=True),
        nn.Conv2d(out_channels, out_channels, 3, padding=1),
        nn.ReLU(inplace=True)
    )

# 간단한 U-Net 모델 정의
class UNet(nn.Module):
    def __init__(self):
        super(UNet, self).__init__()
        self.dconv_down1 = double_conv(3, 64)
        self.dconv_down2 = double_conv(64, 128)
        self.dconv_down3 = double_conv(128, 256)
        self.dconv_down4 = double_conv(256, 512)
        self.dconv_down5 = double_conv(512, 1024)  # 추가된 층

        self.maxpool = nn.MaxPool2d(2)
        self.upsample = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)

        self.dconv_up4 = double_conv(512 + 1024, 512)  # 추가된 층
        self.dconv_up3 = double_conv(256 + 512, 256)
        self.dconv_up2 = double_conv(128 + 256, 128)
        self.dconv_up1 = double_conv(128 + 64, 64)

        self.conv_last = nn.Conv2d(64, 13, 1) # 12개 class + 1 background

    def forward(self, x):
        conv1 = self.dconv_down1(x)
        x = self.maxpool(conv1)

        conv2 = self.dconv_down2(x)
        x = self.maxpool(conv2)

        conv3 = self.dconv_down3(x)
        x = self.maxpool(conv3)   

        x = self.dconv_down4(x)

        x = self.upsample(x)        
        x = torch.cat([x, conv3], dim=1)

        x = self.dconv_up3(x)
        x = self.upsample(x)        
        x = torch.cat([x, conv2], dim=1)

        x = self.dconv_up2(x)
        x = self.upsample(x)        
        x = torch.cat([x, conv1], dim=1)

        x = self.dconv_up1(x)

        out = self.conv_last(x)

        return out

## Model Train

In [23]:
# model 초기화
model = UNet().to(device)

# loss function과 optimizer 정의
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# training loop
for epoch in range(2):  # 20 에폭 동안 학습합니다.
    model.train()
    epoch_loss = 0
    for images, masks in tqdm(dataloader):
        images = images.float().to(device)
        masks = masks.long().to(device)

        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, masks.squeeze(1))
        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()

    print(f'Epoch {epoch+1}, Loss: {epoch_loss/len(dataloader)}')

100%|██████████| 138/138 [00:50<00:00,  2.76it/s]


Epoch 1, Loss: 1.461506044951038


100%|██████████| 138/138 [00:49<00:00,  2.76it/s]

Epoch 2, Loss: 0.7333139790141064





## Inference

In [24]:
transform = A.Compose(
    [   
        A.Resize(224, 224),
        A.Normalize(),
        ToTensorV2()
    ]
)

test_dataset = CustomDataset(csv_file='/home/work/CPS_Project/Samsung AI-Challenge/open/test.csv', transform=transform, infer=True)
test_dataloader = DataLoader(test_dataset, batch_size=16, shuffle=False, num_workers=4)

In [33]:
with torch.no_grad():
    model.eval()
    images = images.to('cuda')  
    outputs = model(images)
    print(outputs.shape)

torch.Size([1, 13, 224, 224])


In [None]:
with torch.no_grad():
    model.eval()
    result = []
    for images in tqdm(test_dataloader):
        images = images.float().to(device)
        outputs = model(images)
        outputs = torch.softmax(outputs, dim=1).cpu()
        outputs = torch.argmax(outputs, dim=1).numpy()
        # batch에 존재하는 각 이미지에 대해서 반복
        for pred in outputs:
            pred = pred.astype(np.uint8)
            pred = Image.fromarray(pred) # 이미지로 변환
            pred = pred.resize((960, 540), Image.NEAREST) # 960 x 540 사이즈로 변환
            pred = np.array(pred) # 다시 수치로 변환
            # class 0 ~ 11에 해당하는 경우에 마스크 형성 / 12(배경)는 제외하고 진행
            for class_id in range(12):
                class_mask = (pred == class_id).astype(np.uint8)
                if np.sum(class_mask) > 0: # 마스크가 존재하는 경우 encode
                    mask_rle = rle_encode(class_mask)
                    result.append(mask_rle)
                else: # 마스크가 존재하지 않는 경우 -1
                    result.append(-1)

  pred = pred.resize((960, 540), Image.NEAREST) # 960 x 540 사이즈로 변환
100%|██████████| 119/119 [01:25<00:00,  1.38it/s]


## Submission

In [None]:
submit = pd.read_csv('/home/work/CPS_Project/Samsung AI-Challenge/open/sample_submission.csv')
submit['mask_rle'] = result
submit

Unnamed: 0,id,mask_rle
0,TEST_0000_class_0,218414 13 219374 13 220325 26 221285 26 222237...
1,TEST_0000_class_1,-1
2,TEST_0000_class_2,1 450 601 810 1561 814 2517 818 3477 818 4437 ...
3,TEST_0000_class_3,514090 34 515050 34 516010 34
4,TEST_0000_class_4,-1
...,...,...
22771,TEST_1897_class_7,871 9 884 30 1831 9 1844 30 2800 34 3760 34 47...
22772,TEST_1897_class_8,48 587 678 124 1008 587 1638 124 1972 583 2598...
22773,TEST_1897_class_9,204202 5 205162 5 206122 5 207078 9 208038 9 2...
22774,TEST_1897_class_10,-1


In [None]:
submit.to_csv('./baseline_submit_5(Augmented_layer+).csv', index=False)