In [1]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.init as init
import torch.nn.functional as F
import torchvision.datasets as dset
import torchvision.transforms as transforms
from torch.utils.data import DataLoader

torch.backends.cudnn.enabled = False
torch.backends.cudnn.benchmark = False

import matplotlib.pyplot as plt

import os
os.environ['CUDA_VISIBLE_DEVICES'] = '0' 

import warnings
warnings.filterwarnings('ignore')
warnings.simplefilter('ignore')

device = 'cuda' if torch.cuda.is_available() else 'cpu'

import random

random.seed(777)
torch.manual_seed(777)
if device == 'cuda':
    torch.cuda.manual_seed_all(777)

# Data Loader

In [2]:
batch_size = 512

fmnist_train = dset.FashionMNIST("./", train=True, transform=transforms.ToTensor(), target_transform=None, download=True)
fmnist_test = dset.FashionMNIST("./", train=False, transform=transforms.ToTensor(), target_transform=None, download=True)

In [3]:
from sklearn.model_selection import StratifiedKFold
n_split = 5
skf = StratifiedKFold(n_splits = n_split, shuffle = True)

for train_idx, valid_idx in skf.split(np.arange(fmnist_train.__len__()), fmnist_train.targets) : 
    break;

def return_dataloader(train_idx, valid_idx) : 
    train_subsampler = torch.utils.data.SubsetRandomSampler(train_idx)
    valid_subsampler = torch.utils.data.SubsetRandomSampler(valid_idx)

    train_data = DataLoader(dataset=fmnist_train, batch_size = batch_size, sampler = train_subsampler)
    valid_data = DataLoader(dataset=fmnist_train, batch_size = batch_size, sampler = valid_subsampler)
    

    dataloaders = {}
    dataloaders['train'] = train_data
    dataloaders['valid'] = valid_data

    return dataloaders

# Build Model

## 0. 모델 구조
- Input : 1x28x28 Image
- Model : 
    - 1x28x28 -> conv(3x3) -> bn -> relu -> 16x28x28
    - layers2n : 16x28x28 -> 16x28x28 -> 16x28x28
    - layers4n : 32x28x28 (Down Sampling) -> 32x28x28 -> 32x28x28
    - layers6n : 64x28x28 (Down Sampling) -> 64x28x28 -> 64x28x28
    - AvgPool  : 

In [4]:
dataloader = return_dataloader(train_idx, valid_idx)

## 1. Convolution Network

In [5]:
class ResidualBlock(nn.Module) : 
    def __init__(self, in_channels, out_channels, stride = 1, down_sample = False) : 
        super(ResidualBlock, self).__init__()
        self.in_channels = in_channels
        self.out_channels = out_channels
        self.stride = stride
        self.down_sample = down_sample

        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, 
                                stride = stride, padding = 1, bias = False)

        self.bn1 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)
        

        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, 
                                stride = 1, padding = 1, bias = False)
        self.bn2 = nn.BatchNorm2d(out_channels)

    def down_sampling(self, x) : 
        out = F.pad(x, (0,0,0,0,0, self.out_channels - self.in_channels))
        out = nn.MaxPool2d(2, stride = self.stride)(out)
        return out

    def forward(self, x ) : 
        short_cut = x

        out = self.conv1(x)
        out = self.bn1(out) 
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)

        if self.down_sample : 
            short_cut = self.down_sampling(x)
        
        out += short_cut
        out = self.relu(out)
        return out

In [6]:
class ResNet(nn.Module) : 
    def __init__(self, num_layers, block, num_classes = 10) : 
        super(ResNet, self).__init__()
        self.num_layers = num_layers

        self.conv1 = nn.Conv2d(
            in_channels= 1,
            out_channels=16,
            kernel_size=3,
            stride = 1,
            padding = 1,
            bias = False
        )
        self.bn1 = nn.BatchNorm2d(16)
        self.relu = nn.ReLU(inplace = True)

        self.layers_2n = self.get_layers(block, 16, 16, stride = 1)
        self.layers_4n = self.get_layers(block, 16, 32, stride = 2)
        self.layers_6n = self.get_layers(block, 32, 64, stride = 2)

        self.avg_pool = nn.AvgPool2d(7, stride = 1)
        self.fc_out = nn.Linear(64, num_classes)

        for m in self.modules() : 
            if isinstance(m, nn.Conv2d) : 
                nn.init.kaiming_normal_(m.weight, mode = 'fan_out', nonlinearity='relu')
            elif isinstance(m, nn.BatchNorm2d) : 
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)
        
    def get_layers(self, block, in_channels, out_channels, stride) : 
        if stride == 2 :
            down_sample = True
        else : 
            down_sample = False
        
        layer_list = nn.ModuleList([block(in_channels, out_channels, stride, down_sample)])

        for _ in range(self.num_layers -1) :
            layer_list.append(block(out_channels, out_channels))

        return nn.Sequential(*layer_list)
    
    def forward(self,x) :
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)

        x = self.layers_2n(x)
        x = self.layers_4n(x)
        x = self.layers_6n(x)
        
        x = self.avg_pool(x)
        x = x.view(x.size(0), -1)
        x = self.fc_out(x)

        return x


In [7]:
def resnet18() : 
    block = ResidualBlock
    model = ResNet(3, block)
    return model

# Train_model

In [8]:
def train_model(model, dataloader, optimizer, step_lr_scheduler, criterion, num_epoch, early_stop, model_path) : 
    import time

    start_time = time.time()
    early_stop_epoch = 0
    best_val_loss = np.float('inf')

    for epoch in range(num_epoch) : 
        for phase in ['train','valid'] : 
            if phase == 'train' : 
                model.train()
                step_lr_scheduler.step()
            elif phase == 'valid' : 
                model.eval()
            
            running_loss = 0
            running_corr = 0
            total = 0 


            for x, y in dataloader[phase] : 
                x = x.to(device)
                y = y.to(device)
                optimizer.zero_grad()
                total += x.size(0)


                with torch.set_grad_enabled(phase == 'train') : 
                    output = model(x)
                    loss = criterion(output, y)
                    
                    if phase == 'train' : 
                        loss.backward()
                        optimizer.step()
                running_loss += loss.item()
                running_corr += (output.argmax(1)==y).sum().item()
            
            epoch_loss = running_loss / total
            epoch_acc = running_corr / total

            if phase == 'valid' and epoch_loss < best_val_loss : 
                print(f'On Epoch {epoch}, Best Model Saved with Valid Loss {round(epoch_loss, 6)} and Acc {round(epoch_acc, 4)*100}%')
                
                best_val_loss = epoch_loss
                best_acc = epoch_acc
                torch.save(model.state_dict(), model_path)
                early_stop_epoch = 0
        
            elif phase == 'valid' : 
                early_stop_epoch += 1

        if early_stop_epoch >= early_stop : 
            "Early Stop Occured on epoch" + str(epoch)
            break;
    time_elapsed = time.time() - start_time
    print(f'Training Complete in {time_elapsed//60}min {time_elapsed%60}sec')
    print(f'Best Validation Loss : {round(best_val_loss, 6)} with Accuracy {round(best_acc,4)*100}%')


    model.load_state_dict(torch.load(model_path))
    return model

In [9]:
resnet = resnet18().to(device)
dataloader = return_dataloader(train_idx, valid_idx)
optimizer = optim.Adam(resnet.parameters(), lr = 0.005)
loss_fn = nn.CrossEntropyLoss()
decay_epoch = [10*x for x in range(1,11)]
step_lr_scheduler = optim.lr_scheduler.MultiStepLR(optimizer, milestones=decay_epoch, gamma=0.1)
trained_resnet = train_model(resnet, dataloader, optimizer, step_lr_scheduler, loss_fn, num_epoch = 100, early_stop = 5, model_path =  'resent.pth')

On Epoch 0, Best Model Saved with Valid Loss 0.001083 and Acc 77.91%
On Epoch 1, Best Model Saved with Valid Loss 0.000668 and Acc 87.31%
On Epoch 2, Best Model Saved with Valid Loss 0.000654 and Acc 87.72%
On Epoch 3, Best Model Saved with Valid Loss 0.00058 and Acc 89.62%
On Epoch 4, Best Model Saved with Valid Loss 0.000519 and Acc 90.7%
On Epoch 8, Best Model Saved with Valid Loss 0.000486 and Acc 91.67%
On Epoch 9, Best Model Saved with Valid Loss 0.000363 and Acc 93.61%
Training Complete in 15.0min 41.32019376754761sec
Best Validation Loss : 0.000363 with Accuracy 93.61%


In [10]:
test_dataloder = DataLoader(dataset = fmnist_test, batch_size  = batch_size , shuffle = False)
class_correct = [0 for x in fmnist_test.classes]
total_correct = [0 for x in fmnist_test.classes]
with torch.no_grad() : 
    for images, labels in test_dataloder : 
        images = images.to(device)
        prediction_soft = trained_resnet(images)
        prediction_hard = prediction_soft.argmax(1)

        correction = (prediction_hard.cpu() == labels).squeeze()

        for label,corr in zip(labels, correction) : 
            total_correct[label] += 1
            if corr : 
                class_correct[label] += 1
acc_per_class = [cl/to for cl, to in zip(class_correct, total_correct)]

- 옷 종류에 대해서 혼동하는 양상을 보임

In [11]:
for cls, acc in zip(fmnist_test.classes, acc_per_class) : 
    print(f'Accuracy of {cls} : {round(acc*100,2)}%')
print('')
print(f'Total Accuracy : {round(sum(class_correct) / sum(total_correct), 4)}%')

Accuracy of T-shirt/top : 86.1%
Accuracy of Trouser : 98.2%
Accuracy of Pullover : 91.5%
Accuracy of Dress : 92.0%
Accuracy of Coat : 91.3%
Accuracy of Sandal : 98.0%
Accuracy of Shirt : 80.3%
Accuracy of Sneaker : 98.1%
Accuracy of Bag : 99.1%
Accuracy of Ankle boot : 96.6%

Total Accuracy : 0.9312%
