In [1]:
import math
import numpy as np

# torch
import torch
import torch.nn as nn
import torch.optim as optim
import torch.utils.model_zoo as model_zoo
from torch.utils.data.sampler import SubsetRandomSampler # train valid split
from torch.utils.data import Dataset, DataLoader, random_split

# torchvision
from torchvision import datasets, transforms
from torchvision.datasets import ImageFolder

# 시각화
import plotly.graph_objects as go

In [2]:
def conv3x3(in_planes, out_planes, stride=1):
    "3x3 convolution with padding"
    return nn.Conv2d(in_planes, out_planes, kernel_size=3, stride=stride,
                     padding=1, bias=False)

In [3]:
class BasicBlock(nn.Module):
    outchannel_ratio = 1

    def __init__(self, inplanes, planes, stride=1, downsample=None):
        super(BasicBlock, self).__init__()
        self.bn1 = nn.BatchNorm2d(inplanes)
        self.conv1 = conv3x3(inplanes, planes, stride)        
        self.bn2 = nn.BatchNorm2d(planes)
        self.conv2 = conv3x3(planes, planes)
        self.bn3 = nn.BatchNorm2d(planes)
        self.relu = nn.ReLU(inplace=True)
        self.downsample = downsample
        self.stride = stride

    def forward(self, x):

        out = self.bn1(x)
        out = self.conv1(out)        
        out = self.bn2(out)
        out = self.relu(out)
        out = self.conv2(out)
        out = self.bn3(out)
       
        if self.downsample is not None:
            shortcut = self.downsample(x)
            featuremap_size = shortcut.size()[2:4]
        else:
            shortcut = x
            featuremap_size = out.size()[2:4]

        batch_size = out.size()[0]
        residual_channel = out.size()[1]
        shortcut_channel = shortcut.size()[1]

        if residual_channel != shortcut_channel:
            padding = torch.autograd.Variable(torch.cuda.FloatTensor(batch_size, residual_channel - shortcut_channel, featuremap_size[0], featuremap_size[1]).fill_(0)) 
            out += torch.cat((shortcut, padding), 1)
        else:
            out += shortcut 

        return out

In [4]:
class PyramidBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride):
        super(PyramidBlock, self).__init__()
        # 정의하려는 PyramidBlock의 내용을 구현
        # Convolutional 레이어, BatchNorm, ReLU 등을 포함
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU()
    
    def forward(self, x):
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)
        return out

In [5]:
class PyramidNet(nn.Module):
    def __init__(self, num_classes, depth, alpha):
        super(PyramidNet, self).__init__()
        
        n = int((depth - 2) / 9)
        
        block = BasicBlock
        
        self.addrate = alpha / (3*n*1.0)
        
        n = int((depth - 2) / 6)
        
        self.inplanes = 16
        
        self.input_featuremap_dim = self.inplanes
        self.conv1 = nn.Conv2d(3, self.input_featuremap_dim, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(self.input_featuremap_dim)

        self.featuremap_dim = self.input_featuremap_dim 
        self.layer1 = self.pyramidal_make_layer(block, n)
        self.layer2 = self.pyramidal_make_layer(block, n, stride=2)
        self.layer3 = self.pyramidal_make_layer(block, n, stride=2)

        self.final_featuremap_dim = self.input_featuremap_dim
        self.bn_final= nn.BatchNorm2d(self.final_featuremap_dim)
        self.relu_final = nn.ReLU(inplace=True)
        self.avgpool = nn.AdaptiveAvgPool2d(1)
        self.fc = nn.Linear(self.final_featuremap_dim, num_classes)
        # PyramidNet의 구조를 구현
        # PyramidBlock을 여러 층으로 쌓음
        
    def forward(self, x):
        # PyramidNet의 forward 연산을 구현
        # PyramidBlock들을 연결
        x = self.conv1(x)
        x = self.bn1(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)

        x = self.bn_final(x)
        x = self.relu_final(x)
        x = self.avgpool(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return x
    
    def pyramidal_make_layer(self, block, block_depth, stride=1):
        downsample = None
        if stride != 1: # or self.inplanes != int(round(featuremap_dim_1st)) * block.outchannel_ratio:
            downsample = nn.AvgPool2d((2,2), stride = (2, 2), ceil_mode=True)

        layers = []
        self.featuremap_dim = self.featuremap_dim + self.addrate
        layers.append(block(self.input_featuremap_dim, int(round(self.featuremap_dim)), stride, downsample))
        for i in range(1, block_depth):
            temp_featuremap_dim = self.featuremap_dim + self.addrate
            layers.append(block(int(round(self.featuremap_dim)) * block.outchannel_ratio, int(round(temp_featuremap_dim)), 1))
            self.featuremap_dim  = temp_featuremap_dim
        self.input_featuremap_dim = int(round(self.featuremap_dim)) * block.outchannel_ratio

        return nn.Sequential(*layers)

In [6]:
# imagefloder 생성
train_imgfolder = ImageFolder(root='data/train',
                      transform=transforms.Compose([
                          transforms.ToTensor(),
                          transforms.Resize(224),
                          transforms.CenterCrop(224)
                      ]))

test_imgfolder = ImageFolder(root='data/test',
                      transform=transforms.Compose([
                          transforms.ToTensor(),
                          transforms.Resize(224),
                          transforms.CenterCrop(224)
                      ]))

In [7]:
# validation 설정

# validation 데이터로 사용할 비율
valid_size = 0.2

# validation으로 사용할 indices 구하기
train_length = len(train_imgfolder)
indices = list(range(train_length))
np.random.shuffle(indices) # 인덱스 셔플
split = int(np.floor(valid_size * train_length))
train_idx, valid_idx = indices[split:], indices[:split]

In [8]:
# trainning, validation batch를 얻기 위한 sampler정의
train_sampler = SubsetRandomSampler(train_idx)
valid_sampler = SubsetRandomSampler(valid_idx)

In [9]:
# get data_loader
train_data_loader = DataLoader(dataset=train_imgfolder, 
                         batch_size=16, 
                         sampler=train_sampler,
                         num_workers=0
                        )

valid_data_loader = DataLoader(dataset=train_imgfolder, 
                         batch_size=16, 
                         sampler=valid_sampler,
                         num_workers=0
                        )

test_data_loader = DataLoader(dataset=test_imgfolder, 
                         batch_size=1, 
                         num_workers=0
                        )

In [10]:
# 데이터 개수 구하기
train_datanum = 0

for train_batch in train_data_loader:
    train_datanum += train_batch[0].shape[0]
    
valid_datanum = 0

for valid_batch in valid_data_loader:
    valid_datanum += valid_batch[0].shape[0]



# Train

In [15]:
# PyramidNet 모델을 생성
pyramid_net = PyramidNet(num_classes=5, depth=12, alpha=270)
pyramid_net.to('cuda') # 모델을 GPU로 이동

# loss function
criterion = nn.CrossEntropyLoss()
# optimizer
optimizer = optim.SGD(pyramid_net.parameters(), lr=1e-2, momentum=0.9, weight_decay=1e-4)

# 초기화
num_epochs = 50
print_steps = 100

## loss history
train_loss_history = []
val_loss_history = []

## acc history
train_acc_history = []
val_acc_history = []

In [20]:
# 학습 진행
for epoch in range(num_epochs):
    
    loss_sum = 0
    correct = 0
    
    for train_x, train_y in train_data_loader:
        optimizer.zero_grad()
        output = pyramid_net(train_x.cuda())
        
        loss = criterion(output.cpu(), train_y)
        
        loss.backward()
        optimizer.step()

        loss_sum += loss.item()
        
        predicted = torch.max(output, 1)[1]
        correct += (train_y == predicted.cpu()).sum()
    
    
    print(f'Epoch [{epoch+1}/{num_epochs}] Loss: {loss_sum/len(train_data_loader):.4f} Accuracy: {correct / train_datanum:.4f}')
    
    # history 추가
    train_loss_history.append(loss_sum/len(train_data_loader))
    train_acc_history.append(correct/train_datanum)
    
    
    # validation
    val_loss_sum = 0
    val_correct = 0
    
    with torch.no_grad():

        for valid_x, valid_y in valid_data_loader:
            valid_x = valid_x.to('cuda')

            val_output = pyramid_net(valid_x)
            val_loss = criterion(val_output.cpu(), valid_y)
            val_loss_sum += val_loss.item()
            
            val_predicted = torch.max(val_output, 1)[1]
            val_correct += (valid_y == val_predicted.cpu()).sum()

        print(f'Epoch [{epoch+1}/{num_epochs}] val_Loss: {val_loss_sum/len(valid_data_loader):.4f} Accuracy: {val_correct / valid_datanum:.4f}')
        
        # history 추가
        val_loss_history.append(val_loss_sum/len(valid_data_loader))
        val_acc_history.append(val_correct/valid_datanum)
        
    print('-----------------next-----------------')

Epoch [1/50] Loss: 0.9043 Accuracy: 0.6029
Epoch [1/50] val_Loss: 0.8441 Accuracy: 0.5672
-----------------next-----------------
Epoch [2/50] Loss: 0.8580 Accuracy: 0.5846
Epoch [2/50] val_Loss: 0.8700 Accuracy: 0.6119
-----------------next-----------------
Epoch [3/50] Loss: 0.8470 Accuracy: 0.6434
Epoch [3/50] val_Loss: 0.7832 Accuracy: 0.7164
-----------------next-----------------
Epoch [4/50] Loss: 0.7427 Accuracy: 0.6985
Epoch [4/50] val_Loss: 0.8638 Accuracy: 0.6716
-----------------next-----------------
Epoch [5/50] Loss: 0.7424 Accuracy: 0.6949
Epoch [5/50] val_Loss: 0.6978 Accuracy: 0.6567
-----------------next-----------------
Epoch [6/50] Loss: 0.7125 Accuracy: 0.7096
Epoch [6/50] val_Loss: 0.6917 Accuracy: 0.7015
-----------------next-----------------
Epoch [7/50] Loss: 0.6881 Accuracy: 0.7353
Epoch [7/50] val_Loss: 0.9627 Accuracy: 0.6567
-----------------next-----------------
Epoch [8/50] Loss: 0.6733 Accuracy: 0.7721
Epoch [8/50] val_Loss: 0.9725 Accuracy: 0.6269
-------

## 시각화

In [21]:
loss_x = list(range(len(val_loss_history)))

fig = go.Figure()
fig.add_trace(go.Scatter(x=loss_x,y=train_loss_history,
                         name="Train_loss"))
fig.add_trace(go.Scatter(x=loss_x,y=val_loss_history,
                         name="Val_loss"))
fig.show()

In [22]:
acc_x = list(range(len(val_acc_history)))

fig = go.Figure()
fig.add_trace(go.Scatter(x=acc_x,y=train_acc_history,
                         name="Train_acc"))
fig.add_trace(go.Scatter(x=acc_x,y=val_acc_history,
                         name="Val_acc"))
fig.show()

## model save

In [None]:
# 모델 저장
torch.save(pyramid_net.state_dict(), 'pyramidnet_custom.pth')

# Test

In [None]:
# 모델 불러오기
test_model = PyramidNet(num_classes=5, depth=12, alpha=270)
test_model.load_state_dict(torch.load('pyramidnet_custom.pth'))

In [None]:
# 초기화
test_loss = 0
test_correct = 0
test_loss_sum = 0

with torch.no_grad():
    
    for test_x, test_y in test_data_loader:
        test_x = test_x.to('cuda')
        test_y = test_y.to('cuda')

        test_output = test_model.to('cuda')(test_x)
        test_loss = criterion(test_output, test_y)
        test_loss_sum += test_loss.item()
        predicted = torch.max(test_output, 1)[1]
        test_correct += (y == predicted).sum()

In [None]:
acc = test_correct / len(test_data_loader)
print(f'Test Accuracy: {acc.item()}')