In [None]:
import os
import random
import time
import json
import warnings 
warnings.filterwarnings('ignore')

import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import models
from torchvision.models import vgg16
from torch.utils.data import Dataset, DataLoader
from utils import label_accuracy_score
import cv2

import numpy as np
import pandas as pd
from tqdm import tqdm

# 전처리를 위한 라이브러리
from pycocotools.coco import COCO
import torchvision
import torchvision.transforms as transforms

import albumentations as A
from albumentations.pytorch import ToTensorV2

# 시각화를 위한 라이브러리
import matplotlib.pyplot as plt
import seaborn as sns; sns.set()

plt.rcParams['axes.grid'] = False

print('pytorch version: {}'.format(torch.__version__))
print('GPU 사용 가능 여부: {}'.format(torch.cuda.is_available()))

print(torch.cuda.get_device_name(0))
print(torch.cuda.device_count())

device = "cuda" if torch.cuda.is_available() else "cpu"   # GPU 사용 가능 여부에 따라 device 정보 저장

## 하이퍼파라미터 세팅 및 seed 고정

In [None]:
batch_size = 4
num_epochs = 2
learning_rate = 0.0001
model_name = 'test_fcn8s_best_model'

val_every = 1

In [None]:
# seed 고정
random_seed = 21
torch.manual_seed(random_seed)
torch.cuda.manual_seed(random_seed)
# torch.cuda.manual_seed_all(random_seed) # if use multi-GPU
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
np.random.seed(random_seed)
random.seed(random_seed)

## 모델 이름 및 경로 설정

In [None]:
saved_dir = 'saved'
model_path = os.path.join(saved_dir, f'{model_name}.pt')

dataset_path = 'data'
train_path = os.path.join(dataset_path, 'train.json')
val_path = os.path.join(dataset_path, 'val.json')
test_path = os.path.join(dataset_path, 'test.json')

category_names = ['Backgroud', 'UNKNOWN', 'General trash', 'Paper', 'Paper pack', \
                  'Metal', 'Glass', 'Plastic', 'Styrofoam', 'Plastic bag', \
                  'Battery', 'Clothing']

## 데이터 전처리 (transfrorm)

In [None]:
train_transform = A.Compose([
    A.Normalize((0.4185, 0.4398, 0.461), (0.2466, 0.2345, 0.2382)),
    ToTensorV2(),
])

val_transform = A.Compose([
    A.Normalize((0.4185, 0.4398, 0.461), (0.2466, 0.2345, 0.2382)),
    ToTensorV2(),
])

test_transform = A.Compose([
    A.Normalize((0.4185, 0.4398, 0.461), (0.2466, 0.2345, 0.2382)),
    ToTensorV2(),
])

## Model

In [None]:
class MODEL(nn.Module):
    def __init__(self, num_classes=12, init_weights=True):
        super(MODEL, self).__init__()
        self.pretrained_model = vgg16(pretrained = True)
        features, classifiers = list(self.pretrained_model.features.children()), list(self.pretrained_model.classifier.children())

        self.features_map1 = nn.Sequential(*features[0:17])
        self.features_map2 = nn.Sequential(*features[17:24])
        self.features_map3 = nn.Sequential(*features[24:31])
        
        # Score pool3
        self.score_pool3_fr = nn.Conv2d(256, num_classes, 1)
        
        # Score pool4        
        self.score_pool4_fr = nn.Conv2d(512, num_classes, 1)        
        
        # fc6 ~ fc7
        self.conv = nn.Sequential(nn.Conv2d(512, 4096, kernel_size = 1),
                                  nn.ReLU(inplace=True),
                                  nn.Dropout(),
                                  nn.Conv2d(4096, 4096, kernel_size = 1),
                                  nn.ReLU(inplace=True),
                                  nn.Dropout()
                                  )
        
        # Score
        self.score_fr = nn.Conv2d(4096, num_classes, kernel_size = 1)
        
        # UpScore2 using deconv
        self.upscore2 = nn.ConvTranspose2d(num_classes,
                                           num_classes,
                                           kernel_size=4,
                                           stride=2,
                                           padding=1)
        
        # UpScore2_pool4 using deconv
        self.upscore2_pool4 = nn.ConvTranspose2d(num_classes, 
                                                 num_classes, 
                                                 kernel_size=4,
                                                 stride=2,
                                                 padding=1)
        
        # UpScore8 using deconv
        self.upscore8 = nn.ConvTranspose2d(num_classes, 
                                           num_classes,
                                           kernel_size=16,
                                           stride=8,
                                           padding=4)
    
    def forward(self, x):
        pool3 = h = self.features_map1(x)
        pool4 = h = self.features_map2(h)
        h = self.features_map3(h)
        
        h = self.conv(h)
        h = self.score_fr(h)
       
        score_pool3c = self.score_pool3_fr(pool3)    
        score_pool4c = self.score_pool4_fr(pool4)
        
        # Up Score I
        upscore2 = self.upscore2(h)
        
        # Sum I
        h = upscore2 + score_pool4c
        
        # Up Score II
        upscore2_pool4c = self.upscore2_pool4(h)
        
        # Sum II
        h = upscore2_pool4c + score_pool3c
        
        # Up Score III
        upscore8 = self.upscore8(h)
        
        return upscore8

### 구현된 model에 임의의 input을 넣어 output이 잘 나오는지 test

In [None]:
model = Model(num_classes=12).to(device)
x = torch.randn([1, 3, 512, 512]).to(device)
out = model(x)
print("input shape : ", x.shape)
print("output shape : ", out.size())

del x, out

## 데이터 전처리 함수 정의 (Dataset)

In [None]:
def get_classname(classID, cats):
    for cat in cats:
        if cat['id'] == classID:
            return cat['name']
    return "None"

class CustomDataLoader(Dataset):
    """COCO format"""
    def __init__(self, data_dir, mode = 'train', transform = None):
        super().__init__()
        self.mode = mode
        self.transform = transform
        self.coco = COCO(data_dir)
        
    def __getitem__(self, index: int):
        # dataset이 index되어 list처럼 동작
        image_id = self.coco.getImgIds(imgIds=index)
        image_infos = self.coco.loadImgs(image_id)[0]
        
        # cv2 를 활용하여 image 불러오기
        images = cv2.imread(os.path.join(dataset_path, image_infos['file_name']))
        images = cv2.cvtColor(images, cv2.COLOR_BGR2RGB).astype(np.float32)
        images /= 255.0
        
        if (self.mode in ('train', 'val')):
            ann_ids = self.coco.getAnnIds(imgIds=image_infos['id'])
            anns = self.coco.loadAnns(ann_ids)

            # Load the categories in a variable
            cat_ids = self.coco.getCatIds()
            cats = self.coco.loadCats(cat_ids)

            # masks : size가 (height x width)인 2D
            # 각각의 pixel 값에는 "category id + 1" 할당
            # Background = 0
            masks = np.zeros((image_infos["height"], image_infos["width"]))
            # Unknown = 1, General trash = 2, ... , Cigarette = 11
            for i in range(len(anns)):
                className = get_classname(anns[i]['category_id'], cats)
                pixel_value = category_names.index(className)
                masks = np.maximum(self.coco.annToMask(anns[i])*pixel_value, masks)
            masks = masks.astype(np.float32)

            # transform -> albumentations 라이브러리 활용
            if self.transform is not None:
                transformed = self.transform(image=images, mask=masks)
                images = transformed["image"]
                masks = transformed["mask"]
            
            return images, masks, image_infos
        
        if self.mode == 'test':
            # transform -> albumentations 라이브러리 활용
            if self.transform is not None:
                transformed = self.transform(image=images)
                images = transformed["image"]
            
            return images, image_infos
    
    
    def __len__(self) -> int:
        # 전체 dataset의 size를 return
        return len(self.coco.getImgIds())

## Dataset 정의 및 DataLoader 할당

In [None]:
# collate_fn needs for batch
def collate_fn(batch):
    return tuple(zip(*batch))

# train dataset
train_dataset = CustomDataLoader(data_dir=train_path, mode='train', transform=train_transform)
val_dataset = CustomDataLoader(data_dir=val_path, mode='val', transform=val_transform)
test_dataset = CustomDataLoader(data_dir=test_path, mode='test', transform=test_transform)


# DataLoader
train_loader = torch.utils.data.DataLoader(dataset=train_dataset, 
                                           batch_size=batch_size,
                                           shuffle=True,
#                                            num_workers=2,
                                           collate_fn=collate_fn)

val_loader = torch.utils.data.DataLoader(dataset=val_dataset, 
                                         batch_size=batch_size,
                                         shuffle=False,
#                                          num_workers=2,
                                         collate_fn=collate_fn)

test_loader = torch.utils.data.DataLoader(dataset=test_dataset,
                                          batch_size=batch_size,
#                                           num_workers=2,
                                          collate_fn=collate_fn)

## train, validation, test 함수 정의

In [None]:
def train(num_epochs, model, data_loader, val_loader, criterion, optimizer, saved_dir, val_every, device):
    best_loss = 10
    best_epoch = 0
    
    for epoch in range(num_epochs):
        model.train()
        with tqdm(data_loader, unit="batch") as loader:
            loader.set_description(f"Epoch {epoch}")
            loss_sum, n_batch = 0., 0.
            
            for images, masks, _ in loader:
                images = torch.stack(images)       # (batch, channel, height, width)
                masks = torch.stack(masks).long()  # (batch, channel, height, width)

                # gpu 연산을 위해 device 할당
                images, masks = images.to(device), masks.to(device)

                # inference
                outputs = model(images)

                # loss 계산 (cross entropy loss)
                loss = criterion(outputs, masks)
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

                # step 주기에 따른 loss 출력
                n_batch += 1
                loss_sum += loss.item()
                loader.set_postfix(loss=loss_sum / n_batch)
        
        # validation 주기에 따른 loss 출력 및 best model 저장
        if (epoch + 1) % val_every == 0:
            avrg_loss, avrg_mIoU = validation(epoch + 1, model, val_loader, criterion, device)
            if avrg_loss < best_loss:
                best_loss = avrg_loss
                best_epoch = epoch
                save_model(model)
        
            print(f'Best performance at epoch: {best_epoch}, loss: {avrg_loss:.4f}, mIoU: {avrg_mIoU:.4f}')

In [None]:
def validation(epoch, model, data_loader, criterion, device):
    model.eval()
    with torch.no_grad():
        total_loss = 0
        cnt = 0
        mIoU_list = []
        
        with tqdm(data_loader, unit="batch") as loader:
            loader.set_description(f"Valid {epoch}")
            loss_sum, n_batch = 0., 0.
            
            for images, masks, _ in loader:
                images = torch.stack(images)       # (batch, channel, height, width)
                masks = torch.stack(masks).long()  # (batch, channel, height, width)

                images, masks = images.to(device), masks.to(device)            

                outputs = model(images)
                loss = criterion(outputs, masks)
                total_loss += loss.item()
                cnt += 1

                outputs = torch.argmax(outputs, dim=1).detach().cpu().numpy()

                mIoU = label_accuracy_score(masks.detach().cpu().numpy(), outputs, n_class=12)[2]
                mIoU_list.append(mIoU)
                
                loader.set_postfix(loss=total_loss / cnt, mIoU=np.mean(mIoU_list))
            
    return total_loss / cnt, np.mean(mIoU_list)

## 모델 저장 함수 및 Loss function, Optimizer 정의

In [None]:
def save_model(model, output_path=model_path):
    check_point = {'net': model.state_dict()}
    torch.save(model.state_dict(), output_path)

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(params = model.parameters(), lr = learning_rate, weight_decay=1e-6)

## 모델 학습

In [None]:
train(num_epochs, model, train_loader, val_loader, criterion, optimizer, saved_dir, val_every, device)

## 저장된 Best Model 불러오기 (학습된 이후) 

In [None]:
checkpoint = torch.load(model_path, map_location=device)
model.load_state_dict(checkpoint)

In [None]:
# 첫번째 batch의 추론 결과 확인
for imgs, image_infos in test_loader:
    image_infos = image_infos
    temp_images = imgs
    
    model.eval()
    # inference
    outs = model(torch.stack(temp_images).to(device))
    oms = torch.argmax(outs.squeeze(), dim=1).detach().cpu().numpy()
    
    break

for i in range(batch_size):
    fig, (ax1, ax2) = plt.subplots(nrows=1, ncols=2, figsize=(16, 16))

    print('Shape of Original Image :', list(temp_images[i].shape))
    print('Shape of Predicted : ', list(oms[i].shape))
    print('Unique values, category of transformed mask : \n', [{int(i),category_names[int(i)]} for i in list(np.unique(oms[i]))])

    # Original image
    ax1.imshow(temp_images[i].permute([1,2,0]))
    ax1.grid(False)
    ax1.set_title("Original image : {}".format(image_infos[i]['file_name']), fontsize = 15)

    # Predicted
    ax2.imshow(oms[i])
    ax2.grid(False)
    ax2.set_title("Predicted : {}".format(image_infos[i]['file_name']), fontsize = 15)

    plt.show()

## submission을 위한 test 함수 정의

In [None]:
def test(model, data_loader, device):
    size = 256
    transform = A.Compose([A.Resize(256, 256)])
    model.eval()
    
    file_name_list = []
    preds_array = np.empty((0, size*size), dtype=np.long)
    
    print('Start prediction.')
    with torch.no_grad():
        for step, (imgs, image_infos) in enumerate(test_loader):

            # inference (512 x 512)
            outs = model(torch.stack(imgs).to(device))
            oms = torch.argmax(outs, dim=1).detach().cpu().numpy()
            
            # resize (256 x 256)
            temp_mask = []
            for img, mask in zip(np.stack(temp_images), oms):
                transformed = transform(image=img, mask=mask)
                mask = transformed['mask']
                temp_mask.append(mask)

            oms = np.array(temp_mask)
            oms = oms.reshape([oms.shape[0], size*size]).astype(int)
            preds_array = np.vstack((preds_array, oms))
            
            file_name_list.append([i['file_name'] for i in image_infos])
    print("End prediction.")
    file_names = [y for x in file_name_list for y in x]
    
    return file_names, preds_array

## submission.csv 생성

In [None]:
# sample_submisson.csv 열기
submission = pd.read_csv('./submission/sample_submission.csv', index_col=None)

# test set에 대한 prediction
file_names, preds = test(model, test_loader, device)

# PredictionString 대입
for file_name, string in zip(file_names, preds):
    submission = submission.append({"image_id" : file_name, "PredictionString" : ' '.join(str(e) for e in string.tolist())}, 
                                   ignore_index=True)

# submission.csv로 저장
submission.to_csv(f"./submission/{model_name}.csv", index=False)

## Reference

