## Import

In [1]:
import random
import pandas as pd
import numpy as np
import os
from PIL import Image
import cv2

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

from torchvision import transforms
import torchvision.models as models

from tqdm.auto import tqdm

import warnings
warnings.filterwarnings(action='ignore') 

In [2]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

In [3]:
device

device(type='cuda')

## Hyperparameter Setting

In [4]:
CFG = {
    'IMG_SIZE':224,
    'EPOCHS':20,
    'LEARNING_RATE':1e-4,
    'BATCH_SIZE':16,
    'SEED':128
}

## Fixed RandomSeed

In [5]:
def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True

seed_everything(CFG['SEED']) # Seed 고정

## Data Load

In [6]:
train_df = pd.read_csv('./train0.csv')
test_df = pd.read_csv('./test.csv')

## Data Preprocessing

In [7]:
# Train / Validation Split (70% : 30%)
train_len = int(len(train_df) * 0.8)
val_df = train_df.iloc[train_len:]
train_df = train_df.iloc[:train_len]

In [8]:
train_labels = train_df.iloc[:,2:].values.reshape(-1, 4, 4)
val_labels = val_df.iloc[:,2:].values.reshape(-1, 4, 4)

In [9]:
(train_labels.shape, val_labels.shape)

((167998, 4, 4), (42000, 4, 4))

## CustomDataset

In [10]:
class CustomDataset(Dataset):
    def __init__(self, img_path_list, label_list, transform=None):
        self.img_path_list = img_path_list
        self.label_list = label_list
        self.transform = transform
        
    def __getitem__(self, index):
        img_path = self.img_path_list[index]

        # PIL 이미지로 불러오기
        image = cv2.imread(img_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        if self.transform is not None:
            image = self.transform(image)
        
        if self.label_list is not None:
            label = torch.tensor(self.label_list[index], dtype=torch.long) - 1
            return image, label
        else:
            return image
        
    def __len__(self):
        return len(self.img_path_list)
    


In [11]:
train_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Resize((CFG['IMG_SIZE'], CFG['IMG_SIZE'])),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

test_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Resize((CFG['IMG_SIZE'], CFG['IMG_SIZE'])),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

In [12]:
train_dataset = CustomDataset(train_df['img_path'].values, train_labels, train_transform)
train_loader = DataLoader(train_dataset, batch_size = CFG['BATCH_SIZE'], shuffle=True, num_workers=0)

val_dataset = CustomDataset(val_df['img_path'].values, val_labels, test_transform)
val_loader = DataLoader(val_dataset, batch_size = CFG['BATCH_SIZE'], shuffle=False, num_workers=0)

test_dataset = CustomDataset(test_df['img_path'].values, None, test_transform)
test_loader = DataLoader(test_dataset, batch_size = CFG['BATCH_SIZE'], shuffle=False, num_workers=0)

In [13]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class ConvBlock(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size=3):
        super(ConvBlock, self).__init__()
        self.conv = nn.Conv2d(in_channels, out_channels, kernel_size, padding=1)
        self.bn = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU()

    def forward(self, x):
        return self.relu(self.bn(self.conv(x)))

class DeconvBlock(nn.Module):
    def __init__(self, in_channels, mid_channels, out_channels):
        super(DeconvBlock, self).__init__()
        self.up = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)
        self.conv_mid = nn.Conv2d(in_channels, mid_channels, kernel_size=3, padding=1)
        self.bn_mid = nn.BatchNorm2d(mid_channels)
        self.relu_mid = nn.ReLU()
        self.conv_block = ConvBlock(mid_channels + mid_channels, out_channels)

    def forward(self, x, skip):
        x = self.up(x)
        x = self.relu_mid(self.bn_mid(self.conv_mid(x)))
        x = torch.cat((x, skip), dim=1)
        return self.conv_block(x)
    
class BaseModel(nn.Module):
    def __init__(self):
        super(BaseModel, self).__init__()
        # Contraction path
        self.conv1 = ConvBlock(3, 16)
        self.pool1 = nn.MaxPool2d(2)
        self.conv2 = ConvBlock(16, 32)
        self.pool2 = nn.MaxPool2d(2)
        self.conv3 = ConvBlock(32, 64)
        self.pool3 = nn.MaxPool2d(2)
        self.conv4 = ConvBlock(64, 128)
        self.pool4 = nn.MaxPool2d(2)
        self.conv5 = ConvBlock(128, 256)
        self.pool5 = nn.MaxPool2d(2)
        self.conv6 = ConvBlock(256, 512)

        # Expansion path
        self.up7 = DeconvBlock(512, 256, 256)
        self.up8 = DeconvBlock(256, 128, 128)
        self.up9 = DeconvBlock(128, 64, 64)
        self.up10 = DeconvBlock(64, 32, 32)
        self.up11 = DeconvBlock(32, 16, 16)

        self.final_pool = nn.MaxPool2d(2)
        self.final_conv = nn.Conv2d(16, 16, kernel_size=28, stride=28)
        self.final_bn = nn.BatchNorm2d(16)

    def forward(self, x):
        # Contraction path
        x1 = self.conv1(x)
        x = self.pool1(x1)
        x2 = self.conv2(x)
        x = self.pool2(x2)
        x3 = self.conv3(x)
        x = self.pool3(x3)
        x4 = self.conv4(x)
        x = self.pool4(x4)
        x5 = self.conv5(x)
        x = self.pool5(x5)
        x6 = self.conv6(x)

        # Expansion path
        x = self.up7(x6, x5)
        x = self.up8(x, x4)
        x = self.up9(x, x3)
        x = self.up10(x, x2)
        x = self.up11(x, x1)

        x = self.final_pool(x)
        out = self.final_bn(self.final_conv(x)) # (B,16,4,4)
        return out

## Train

In [14]:
def train(model, optimizer, scheduler, train_loader, val_loader, device):
    model.to(device)
    criterion = nn.CrossEntropyLoss().to(device)
    
    best_val_acc = 0
    best_model = None
    for epoch in range(1, CFG['EPOCHS']+1):
        model.train()
        train_loss = []
        for imgs, labels in tqdm(iter(train_loader)):
            imgs = imgs.float().to(device)
            labels = labels.to(device)
            
            optimizer.zero_grad()
            
            output = model(imgs)
            loss = criterion(output, labels)
            
            loss.backward()
            optimizer.step()
            
            train_loss.append(loss.item())
        
        scheduler.step()
            
                    
        _val_loss, _val_acc = validation(model, criterion, val_loader, device)
        _train_loss = np.mean(train_loss)
        print(f'Epoch [{epoch}], Train Loss : [{_train_loss:.5f}] Val Loss : [{_val_loss:.5f}] Val ACC : [{_val_acc:.5f}]')
        
        if best_val_acc < _val_acc:
            best_val_acc = _val_acc
            best_model = model
    
    return best_model

In [15]:
def validation(model, criterion, val_loader, device):
    model.eval()
    val_loss = []
    val_acc = []
    
    with torch.no_grad():
        for imgs, labels in tqdm(iter(val_loader)):
            imgs = imgs.float().to(device)
            labels = labels.to(device)
            
            output = model(imgs)
            
            loss = criterion(output, labels)

            val_loss.append(loss.item())
            
            # 정확도 계산을 위한 예측 레이블 추출
            predicted_labels = torch.argmax(output, dim=1)

            # 샘플 별 정확도 계산
            for predicted_label, label in zip(predicted_labels, labels):
                val_acc.append(((predicted_label == label).sum() / 16).item())
            
        _val_loss = np.mean(val_loss)
        _val_acc = np.mean(val_acc)
    
    return _val_loss, _val_acc

## Run!!

In [17]:
from torch.optim.lr_scheduler import CosineAnnealingWarmRestarts
from torch.optim import AdamW

model = BaseModel().to(device)
optimizer = torch.optim.AdamW(params = model.parameters(), lr = CFG["LEARNING_RATE"])
scheduler = CosineAnnealingWarmRestarts(optimizer, T_0=10, T_mult=2)
infer_model = train(model, optimizer, scheduler, train_loader, val_loader, device)

  0%|          | 0/10500 [00:00<?, ?it/s]

RuntimeError: Given groups=1, weight of size [256, 128, 3, 3], expected input[16, 256, 7, 7] to have 128 channels, but got 256 channels instead

## Inference

In [None]:
def inference(model, test_loader, device):
    model.eval()
    preds = []
    with torch.no_grad():
        for imgs in tqdm(iter(test_loader)):
            imgs = imgs.float().to(device)
            
            output = model(imgs)
            
            # 정확도 계산을 위한 예측 레이블 추출
            predicted_labels = torch.argmax(output, dim=1).view(-1, 16)
            predicted_labels = predicted_labels.cpu().detach().numpy()

            preds.extend(predicted_labels)
    
    return preds

In [None]:
preds = inference(infer_model, test_loader, device)

## Submission

In [None]:
submit = pd.read_csv('./sample_submission.csv')

In [None]:
len(preds)

In [None]:
submit.iloc[:, 1:] = preds
submit.iloc[:, 1:] += 1

In [None]:
submit.to_csv('./baseline_submit.csv', index=False)

In [None]:
print("Length of submit index:", len(submit.index))
print("First few elements of submit index:", submit.index[:10])
print("Length of preds:", len(preds))
print("First few elements of preds:", preds[:10])
