## 0. Libarary 불러오기 및 경로설정

In [1]:
import os
import pandas as pd
from PIL import Image

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

from torchvision import transforms
from torchvision.transforms import Resize, ToTensor, Normalize

In [2]:
# 테스트 데이터셋 폴더 경로를 지정해주세요.
test_dir = '/opt/ml/input/data/eval'

## 1. Model 정의

In [3]:
# Conv_block
# activation = relu
# y = relu(BN(conv(x)))
class Conv_block(nn.Module):
    def __init__(self, in_channels, out_channels, activation=True, **kwargs) -> None:
        super(Conv_block, self).__init__()
        self.relu = nn.ReLU()
        self.conv = nn.Conv2d(in_channels, out_channels, **kwargs) # kernel size = ...
        self.batchnorm = nn.BatchNorm2d(out_channels)
        self.activation = activation

    def forward(self, x):
        if not self.activation:
            return self.batchnorm(self.conv(x))
        return self.relu(self.batchnorm(self.conv(x)))

In [5]:
class Res_block(nn.Module):
    def __init__(self, in_channels, red_channels, out_channels, is_plain=False):
        super(Res_block,self).__init__()
        self.relu = nn.ReLU()
        self.is_plain = is_plain
        
        if in_channels==64:
            self.convseq = nn.Sequential(
                Conv_block(in_channels, red_channels, kernel_size=1, padding=0),
                Conv_block(red_channels, red_channels*2, kernel_size=3, padding=1),
                Conv_block(red_channels*2, out_channels, activation=False, kernel_size=1, padding=0)
            )
            self.iden = nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=1)
        elif in_channels == out_channels:
            self.convseq = nn.Sequential(
                Conv_block(in_channels, red_channels, kernel_size=1, padding=0),
                Conv_block(red_channels, red_channels*2, kernel_size=3, padding=1),
                Conv_block(red_channels*2, out_channels, activation=False, kernel_size=1, padding=0)
            )
            self.iden = nn.Identity()
        else:
            self.convseq = nn.Sequential(
                Conv_block(in_channels, red_channels, kernel_size=1, padding=0, stride=2),
                Conv_block(red_channels, red_channels*2, kernel_size=3, padding=1),
                Conv_block(red_channels*2, out_channels, activation=False, kernel_size=1, padding=0)
                
            )
            self.iden = nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=2)
        
    def forward(self, x):
        y = self.convseq(x)
        if self.is_plain:
            x = y
        else:
            x = y + self.iden(x)
        x = self.relu(x)  # relu(skip connection)
        return x

In [8]:
class MyModel(nn.Module):
    def __init__(self, num_classes: int = 1000, is_plain=False):
        super(MyModel, self).__init__()
        self.layers = []
        self.ksize = 3
        
        #L1
        self.conv1 = Conv_block(3,32,krenel_size=self.ksize, padding=self.ksize//2, stride=(1,1))
        self.conv2 = Conv_block(32,64,krenel_size=self.ksize, padding=self.ksize//2, stride=(2,2))
        #Res1 
        self.conv3_x = nn.Sequential(
            Res_block(64, 32, 64, is_plain)
        )
        #L2
        self.conv4 = Conv_block(64,128,krenel_size=self.ksize, padding=self.ksize//2, stride=(2,2))
        #Res2  
        self.conv5_x = nn.Sequential(
            Res_block(128, 64, 128, is_plain),
            Res_block(128, 64, 128, is_plain)
        )  
        #L3
        self.conv6 = Conv_block(128,256,krenel_size=self.ksize, padding=self.ksize//2, stride=(2,2))
        #Res3  
        self.conv7_x = nn.Sequential(
            Res_block(256, 128, 256, is_plain),
            Res_block(256, 128, 256, is_plain),
            Res_block(256, 128, 256, is_plain),
            Res_block(256, 128, 256, is_plain),
            Res_block(256, 128, 256, is_plain),
            Res_block(256, 128, 256, is_plain),
            Res_block(256, 128, 256, is_plain),
            Res_block(256, 128, 256, is_plain),
        )
        #L4
        self.conv8 = Conv_block(256,512,krenel_size=self.ksize, padding=self.ksize//2, stride=(2,2))
        #Res4
        self.conv9_x = nn.Sequential(
            Res_block(512, 256, 512, is_plain),
            Res_block(512, 256, 512, is_plain),
            Res_block(512, 256, 512, is_plain),
            Res_block(512, 256, 512, is_plain),
            Res_block(512, 256, 512, is_plain),
            Res_block(512, 256, 512, is_plain),
            Res_block(512, 256, 512, is_plain),
            Res_block(512, 256, 512, is_plain)
        )
        #L5
        self.conv10 = Conv_block(512,1024,krenel_size=self.ksize, padding=self.ksize//2, stride=(2,2))
        #Res5
        self.conv11_x = nn.Sequential(
            Res_block(1024, 512, 1024, is_plain),
            Res_block(1024, 512, 1024, is_plain),
            Res_block(1024, 512, 1024, is_plain),
            Res_block(1024, 512, 1024, is_plain),
        )
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        
        self.classifier = nn.Sequential(
            # nn.Dropout(),
            # nn.Linear(64, 32),
            # nn.ReLU(inplace=True),
            nn.Linear(1024, num_classes),
        )
        
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.conv3_x(x)
        x = self.conv4(x)
        x = self.conv5_x(x)
        x = self.conv6(x)
        x = self.conv7_x(x)
        x = self.conv8(x)
        x = self.conv9_x(x)
        x = self.conv10(x)
        x = self.conv11_x(x)
        x = self.avgpool(x)

        x = torch.flatten(x, 1)
        x = self.classifier(x)
        return x

## 2. Test Dataset 정의

In [9]:
class TestDataset(Dataset):
    def __init__(self, img_paths, transform):
        self.img_paths = img_paths
        self.transform = transform

    def __getitem__(self, index):
        image = Image.open(self.img_paths[index])

        if self.transform:
            image = self.transform(image)
        return image

    def __len__(self):
        return len(self.img_paths)

## 3. Inference

In [10]:
# meta 데이터와 이미지 경로를 불러옵니다.
submission = pd.read_csv(os.path.join(test_dir, 'info.csv'))
image_dir = os.path.join(test_dir, 'images')

# Test Dataset 클래스 객체를 생성하고 DataLoader를 만듭니다.
image_paths = [os.path.join(image_dir, img_id) for img_id in submission.ImageID]
transform = transforms.Compose([
    Resize((512, 384), Image.BILINEAR),
    ToTensor(),
    Normalize(mean=(0.5, 0.5, 0.5), std=(0.2, 0.2, 0.2)),
])
dataset = TestDataset(image_paths, transform)

loader = DataLoader(
    dataset,
    shuffle=False
)

# 모델을 정의합니다. (학습한 모델이 있다면 torch.load로 모델을 불러주세요!)
device = torch.device('cuda')
model = MyModel(num_classes=18).to(device)
model.eval()

# 모델이 테스트 데이터셋을 예측하고 결과를 저장합니다.
all_predictions = []
for images in loader:
    with torch.no_grad():
        images = images.to(device)
        pred = model(images)
        pred = pred.argmax(dim=-1)
        all_predictions.extend(pred.cpu().numpy())
submission['ans'] = all_predictions

# 제출할 파일을 저장합니다.
submission.to_csv(os.path.join(test_dir, 'submission.csv'), index=False)
print('test inference is done!')

test inference is done!
