## Import

In [12]:
import os
import cv2
from PIL import Image
import pandas as pd
import numpy as np

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms

from tqdm import tqdm
import albumentations as A
from albumentations.pytorch import ToTensorV2

from torchvision import models
from torchsummary import summary

In [13]:
# GPU 사용이 가능할 경우, GPU를 사용할 수 있게 함.'
os.environ['CUDA_VISIBLE_DEVICES'] = '2'
device = "cuda" if torch.cuda.is_available() else "cpu"
device = torch.device(device)
print(device)

print(os.environ.get('CUDA_VISIBLE_DEVICES'))

cuda
2


## Utils

In [14]:
# RLE 인코딩 함수
def rle_encode(mask):
    pixels = mask.flatten()
    pixels = np.concatenate([[0], pixels, [0]])
    runs = np.where(pixels[1:] != pixels[:-1])[0] + 1
    runs[1::2] -= runs[::2]
    return ' '.join(str(x) for x in runs)

## Custom Dataset

In [15]:
class CustomDataset(Dataset):
    def __init__(self, csv_file, transform=None, infer=False):
        self.data = pd.read_csv(csv_file)
        self.transform = transform
        self.infer = infer

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        directory_path = "/mnt/nas27/Dataset/Samsung_DM"
        img_path = self.data.iloc[idx, 1]
        img_path = os.path.join(directory_path, img_path[2:])
        image = cv2.imread(img_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        #image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
        
        if self.infer:
            if self.transform:
                image = self.transform(image=image)['image']
            return image
        
        mask_path = self.data.iloc[idx, 2]
        mask_path = os.path.join(directory_path, mask_path[2:])
        mask = cv2.imread(mask_path)
        #mask = cv2.cvtColor(mask, cv2.COLOR_BGR2RGB)
        mask = cv2.cvtColor(mask, cv2.COLOR_BGR2GRAY)
        mask[mask == 255] = 12 #배경을 픽셀값 12로 간주

        if self.transform:
            augmented = self.transform(image=image, mask=mask)
            image = augmented['image']
            mask = augmented['mask']

        return image, mask

## Data Loader

In [16]:
# 이미지 전처리 클래스
class ImageTransform():
  """
  훈련, 검증 동작 다르게 설정
  이미지 크기 resize, 색상 표준화
  훈련시 RandomResizedCrop, RandomHorizontalFilp으로 데이터 확장
  """
  def __init__(self, resize, mean, std):
    self.data_transform = {
        'train' : transforms.Compose([
            #transforms.RandomResizedCrop(
            #    resize, scale = (0.5, 1.0)), # 데이터 확장
            transforms.RandomHorizontalFlip(), # 데이터 확장
            transforms.ToTensor(), # Tensor로 변환
            transforms.Normalize(mean = mean, std = std) #표준화
        ]),
        'test': transforms.Compose([
            #transforms.Resize(resize), # Resize
            #transforms.CenterCrop(resize), # 중앙을 resize*resize로 crop
            transforms.ToTensor(), # Tensor로 변환
            transforms.Normalize(mean = mean, std = std) # 표준화
        ])

    }
  def __call__(self, img, phase = 'train'):
    """
    phase : 'train' or 'test'
    전처리 모드 지정
    """
    return self.data_transform[phase](img)

In [17]:
transform = A.Compose(
    [   
        #A.Resize(224, 224),
        A.Resize(256, 256),
        A.Normalize(),
        
        # 변형
        A.VerticalFlip(p=0.5),
        # A.RandomRotate90(p=0.5),
        A.HueSaturationValue(p=0.2),
        
        ToTensorV2()
    ]
)

dataset = CustomDataset(csv_file=os.path.join("/mnt/nas27/Dataset/Samsung_DM",'./train_source.csv'), transform=transform)
dataloader = DataLoader(dataset, batch_size=32, shuffle=True, num_workers=4)
valid_dataset = CustomDataset(csv_file=os.path.join("/mnt/nas27/Dataset/Samsung_DM",'./val_source.csv'), transform=transform)
valid_dataloader = DataLoader(valid_dataset, batch_size=32, shuffle=False, num_workers=4)


In [18]:
# 클래스(레이블) 수
num_classes = 12

# 클래스별 IoU를 계산하기 위한 함수
def calculate_iou_per_class(y_true, y_pred, class_id):
    intersection = np.sum((y_true == class_id) & (y_pred == class_id))
    union = np.sum((y_true == class_id) | (y_pred == class_id))
    iou = intersection / union if union > 0 else 0
    return iou

## Define Model

In [19]:
class Decoder(nn.Module):
    def __init__(self, in_channel, mid_channel, out_channel):
        super(Decoder, self).__init__()
        
        self.conv = nn.Conv2d(in_channel, mid_channel, kernel_size=3, stride=1, padding=1) #keep ratio
        self.conv_trans = nn.ConvTranspose2d(mid_channel, out_channel, kernel_size=4, stride=2, padding=1)
        
    def forward(self, x):
        x = F.relu(self.conv(x), inplace=True)
        x = F.relu(self.conv_trans(x), inplace=True)
        return x
    
class Unet_resnet18(nn.Module):
    def __init__(self, n_classes):
        super(Unet_resnet18, self).__init__()
        
        #encoder
        self.encoder = models.resnet18(pretrained=False)
        
        self.pool = nn.MaxPool2d(2, 2)
        self.conv1 = nn.Sequential(self.encoder.conv1, self.encoder.bn1,
                                  self.encoder.relu, self.pool) #64
        self.conv2 = self.encoder.layer1 #64
        self.conv3 = self.encoder.layer2 #128
        self.conv4 = self.encoder.layer3 #256
        self.conv5 = self.encoder.layer4 #depth 512
        
        #center
        self.center = Decoder(512, 312, 256)
        
        #decoder
        self.decoder5 = Decoder(256+512, 256, 256)
        self.decoder4 = Decoder(256+256, 128, 128)
        self.decoder3 = Decoder(128+128, 64, 64)
        self.decoder2 = Decoder(64+64, 32, 32)
        self.decoder1 = Decoder(32, 16, 16)
        self.decoder0 = nn.Conv2d(in_channels=16, out_channels=8, kernel_size=3, stride=1, padding=1)
    
        self.final = nn.Conv2d(8, n_classes, kernel_size=1)
        
    def forward(self, x):
        
        #encoder
        conv1 = self.conv1(x) #64x64
        conv2 = self.conv2(conv1) #32x32
        conv3 = self.conv3(conv2) #16x16
        conv4 = self.conv4(conv3) #8x8
        conv5 = self.conv5(conv4) #4x4
        
        center = self.center(self.pool(conv5)) #4x4
        #decoder
        dec5 = self.decoder5(torch.cat([center, conv5], 1)) #8x8
        dec4 = self.decoder4(torch.cat([dec5, conv4], 1)) #16x16
        dec3 = self.decoder3(torch.cat([dec4, conv3], 1)) #32x32
        dec2 = self.decoder2(torch.cat([dec3, conv2], 1)) #64x64
        dec1 = self.decoder1(dec2) #128x128
        dec0 = F.relu(self.decoder0(dec1))
        
        final = torch.sigmoid(self.final(dec0))
        
        return final

In [20]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import models

class Decoder(nn.Module):
    def __init__(self, in_channel, mid_channel, out_channel):
        super(Decoder, self).__init__()
        self.conv = nn.Conv2d(in_channel, mid_channel, kernel_size=3, stride=1, padding=1)
        self.conv_trans = nn.ConvTranspose2d(mid_channel, out_channel, kernel_size=4, stride=2, padding=1)

    def forward(self, x):
        x = F.relu(self.conv(x), inplace=True)
        x = F.relu(self.conv_trans(x), inplace=True)
        return x

class Unet_resnet50(nn.Module):
    def __init__(self, n_classes):
        super(Unet_resnet50, self).__init__()

        # Encoder (ResNet-50)
        self.encoder = models.resnet50(pretrained=True)

        self.pool = nn.MaxPool2d(2, 2)
        self.conv1 = nn.Sequential(self.encoder.conv1, self.encoder.bn1, self.encoder.relu, self.pool)
        self.conv2 = self.encoder.layer1
        self.conv3 = self.encoder.layer2
        self.conv4 = self.encoder.layer3
        self.conv5 = self.encoder.layer4

        # Center
        self.center = Decoder(2048, 1024, 512)  # Adjusted for ResNet-50

        # Decoder
        self.decoder5 = Decoder(512 + 2048, 1024, 512)  # Adjusted for ResNet-50
        self.decoder4 = Decoder(512 + 1024, 512, 256)  # Adjusted for ResNet-50
        self.decoder3 = Decoder(256 + 512, 256, 128)   # Adjusted for ResNet-50
        self.decoder2 = Decoder(128 + 256, 128, 64)    # Adjusted for ResNet-50
        self.decoder1 = Decoder(64, 32, 32)
        self.decoder0 = nn.Conv2d(in_channels=32, out_channels=16, kernel_size=3, stride=1, padding=1)

        self.final = nn.Conv2d(16, n_classes, kernel_size=1)

    def forward(self, x):
        # Encoder
        conv1 = self.conv1(x)
        conv2 = self.conv2(conv1)
        conv3 = self.conv3(conv2)
        conv4 = self.conv4(conv3)
        conv5 = self.conv5(conv4)

        center = self.center(self.pool(conv5))

        # Decoder
        dec5 = self.decoder5(torch.cat([center, conv5], 1))
        dec4 = self.decoder4(torch.cat([dec5, conv4], 1))
        dec3 = self.decoder3(torch.cat([dec4, conv3], 1))
        dec2 = self.decoder2(torch.cat([dec3, conv2], 1))
        dec1 = self.decoder1(dec2)
        dec0 = F.relu(self.decoder0(dec1))

        final = torch.sigmoid(self.final(dec0))

        return final



In [21]:
# model 초기화
model = Unet_resnet18(n_classes = 13).to(device)
#model = Unet_resnet50(n_classes = 13).to(device)

# loss function과 optimizer 정의
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.005)


## Model Train

In [22]:
import wandb
import random

# start a new wandb run to track this script
wandb.init(
    # set the wandb project where this run will be logged
    project="practice_09_28",
    
    # track hyperparameters and run metadata
    config={
    "learning_rate": 0.005,
    "architecture": "CNN",
    "dataset": "Samsung",
    "epochs": 20,
    }
)

for epoch in range(5):  # 5 에폭 동안 학습합니다.
          
    # 클래스별 IoU를 누적할 리스트 초기화
    train_class_ious = []
    
    # 학습
    model.train()
    epoch_loss = 0
    
    for images, masks in tqdm(dataloader):
        images = images.float().to(device)
        masks = masks.long().to(device)

        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, masks.squeeze(1))
        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()

        # train 클래스별 IoU 계산
        outputs = torch.softmax(outputs, dim=1).cpu()
        outputs = torch.argmax(outputs, dim=1).numpy()

        for class_id in range(num_classes):
            iou = calculate_iou_per_class(np.array(masks.cpu()), np.array(outputs), class_id)
            train_class_ious.append(iou)
            
    train_class_ious = np.array(train_class_ious).reshape(num_classes, -1)
    #print(np.shape(train_class_ious))
    train_class_ious = np.mean(train_class_ious, axis=1)
    #print(train_class_ious)
    
    for class_id, iou in enumerate(train_class_ious):
        print(f'Class {class_id} IoU: {iou:.4f}')
     
        
    # mIoU 계산
    train_mIoU = np.mean(train_class_ious)
    

    # validation
    val_loss = 0
    val_class_ious = []  # 클래스별 IoU를 누적할 리스트 초기화
    with torch.no_grad():
        model.eval()
        for images, masks in tqdm(valid_dataloader):
            images = images.float().to(device)
            masks = masks.long().to(device)
            outputs = model(images)

            # validation loss 계산
            val_loss += criterion(outputs, masks.squeeze(1)).item()

            # validation 클래스별 IoU 계산
            outputs = torch.softmax(outputs, dim=1).cpu()
            outputs = torch.argmax(outputs, dim=1).numpy()

            for class_id in range(num_classes):
                iou = calculate_iou_per_class(np.array(masks.cpu()), np.array(outputs), class_id)
                val_class_ious.append(iou)
                
    val_class_ious = np.array(val_class_ious).reshape(num_classes, -1)
    val_class_ious = np.mean(val_class_ious, axis=1)
    
    for class_id, iou in enumerate(val_class_ious):
        print(f'Class {class_id} IoU: {iou:.4f}')       
   
    # mIoU 계산
    val_mIoU = np.mean(val_class_ious)
    
    # 에폭마다 결과 출력
    print(f"\nEpoch{epoch+1}")
    print(f"Train Loss: {epoch_loss/len(dataloader)}, Train mIoU Score: {train_mIoU:.4f}")
    print(f"Validation Loss: {val_loss/len(valid_dataloader)}, Validation mIoU Score: {val_mIoU:.4f}")
    print("___________________________________________________________________________________________\n")
    
    # log metrics to wandb
    wandb.log({"train score": train_mIoU, "train loss": epoch_loss})
    wandb.log({"val score": val_mIoU, "val loss": val_loss})
    
    
# [optional] finish the wandb run, necessary in notebooks
wandb.finish()




0,1
train loss,█▁▁▁
train score,█▁▁▁
val loss,▁█▆█
val score,▁▁▁▁

0,1
train loss,151.59922
train score,0.01339
val loss,32.82194
val score,0.01447


100%|██████████| 69/69 [02:12<00:00,  1.92s/it]


Class 0 IoU: 0.0141
Class 1 IoU: 0.0143
Class 2 IoU: 0.0145
Class 3 IoU: 0.0120
Class 4 IoU: 0.0142
Class 5 IoU: 0.0139
Class 6 IoU: 0.0136
Class 7 IoU: 0.0117
Class 8 IoU: 0.0137
Class 9 IoU: 0.0137
Class 10 IoU: 0.0144
Class 11 IoU: 0.0115


100%|██████████| 15/15 [00:17<00:00,  1.16s/it]


Class 0 IoU: 0.0221
Class 1 IoU: 0.0113
Class 2 IoU: 0.0108
Class 3 IoU: 0.0124
Class 4 IoU: 0.0234
Class 5 IoU: 0.0121
Class 6 IoU: 0.0111
Class 7 IoU: 0.0109
Class 8 IoU: 0.0246
Class 9 IoU: 0.0118
Class 10 IoU: 0.0116
Class 11 IoU: 0.0119

Epoch1
Train Loss: 2.3387430681698564, Train mIoU Score: 0.0135
Validation Loss: 2.2608136494954425, Validation mIoU Score: 0.0145
___________________________________________________________________________________________



100%|██████████| 69/69 [01:18<00:00,  1.14s/it]


Class 0 IoU: 0.0141
Class 1 IoU: 0.0140
Class 2 IoU: 0.0139
Class 3 IoU: 0.0115
Class 4 IoU: 0.0139
Class 5 IoU: 0.0140
Class 6 IoU: 0.0139
Class 7 IoU: 0.0114
Class 8 IoU: 0.0137
Class 9 IoU: 0.0144
Class 10 IoU: 0.0144
Class 11 IoU: 0.0118


100%|██████████| 15/15 [00:17<00:00,  1.16s/it]


Class 0 IoU: 0.0221
Class 1 IoU: 0.0113
Class 2 IoU: 0.0110
Class 3 IoU: 0.0124
Class 4 IoU: 0.0234
Class 5 IoU: 0.0121
Class 6 IoU: 0.0112
Class 7 IoU: 0.0109
Class 8 IoU: 0.0247
Class 9 IoU: 0.0118
Class 10 IoU: 0.0117
Class 11 IoU: 0.0119

Epoch2
Train Loss: 2.2601535804029824, Train mIoU Score: 0.0134
Validation Loss: 2.240032116572062, Validation mIoU Score: 0.0145
___________________________________________________________________________________________



 38%|███▊      | 26/69 [00:36<01:00,  1.40s/it]


KeyboardInterrupt: 

## Inference

In [None]:
# test_dataset = CustomDataset(csv_file='./test.csv', transform=transform, infer=True)
# test_dataloader = DataLoader(test_dataset, batch_size=16, shuffle=False, num_workers=4)

In [None]:
# with torch.no_grad():
#     model.eval()
#     result = []
#     for images in tqdm(test_dataloader):
#         images = images.float().to(device)
#         outputs = model(images)
#         outputs = torch.softmax(outputs, dim=1).cpu()
#         outputs = torch.argmax(outputs, dim=1).numpy()
#         # batch에 존재하는 각 이미지에 대해서 반복
#         for pred in outputs:
#             pred = pred.astype(np.uint8)
#             pred = Image.fromarray(pred) # 이미지로 변환
#             pred = pred.resize((960, 540), Image.NEAREST) # 960 x 540 사이즈로 변환
#             pred = np.array(pred) # 다시 수치로 변환
#             # class 0 ~ 11에 해당하는 경우에 마스크 형성 / 12(배경)는 제외하고 진행
#             for class_id in range(12):
#                 class_mask = (pred == class_id).astype(np.uint8)
#                 if np.sum(class_mask) > 0: # 마스크가 존재하는 경우 encode
#                     mask_rle = rle_encode(class_mask)
#                     result.append(mask_rle)
#                 else: # 마스크가 존재하지 않는 경우 -1
#                     result.append(-1)
        

## Submission

In [None]:
# submit = pd.read_csv('./sample_submission.csv')
# submit['mask_rle'] = result
# submit

In [None]:
# submit.to_csv('./baseline_submit.csv', index=False)