## 드라이브 마운트

In [1]:
from google.colab import drive
drive.mount('/content/drive')

basic_path = '/content/drive/MyDrive/test/'

Mounted at /content/drive


## 라이브러리 Import

In [2]:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from torchvision import datasets
import torchvision.transforms as transforms
import matplotlib.pyplot as plt
import numpy as np
import torch.nn.functional as F
from torchsummary import summary

In [3]:
def get_device():
    return torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [4]:
def get_mean_std(channel, training_dataset):
  if channel.lower() is 'rgb':
    mean_rgb = [np.mean(x.numpy(), axis=(1, 2)) for x,_ in training_dataset]
    std_rgb = [np.std(x.numpy(), axis=(1, 2)) for x,_ in training_dataset]

    mean_r = np.mean([m[0] for m in mean_rgb])
    mean_g = np.mean([m[1] for m in mean_rgb])
    mean_b = np.mean([m[2] for m in mean_rgb])

    std_r = np.mean([s[0] for s in std_rgb])
    std_g = np.mean([s[1] for s in std_rgb])
    std_b = np.mean([s[2] for s in std_rgb])
    return [mean_r, mean_g, mean_b], [std_r, std_g, std_b]
  else:
    mean = [np.mean(x.numpy(), axis=(1, 2)) for x,_ in training_dataset]
    std = [np.std(x.numpy(), axis=(1, 2)) for x,_ in training_dataset]

    return np.mean([m[0] for m in mean]), np.mean([s[0] for s in std])

## DenseNet 구현

### BottleneckLayer 구현

In [5]:
class BottleneckLayer(nn.Module):
  def __init__(self, in_channel, base_out_channel, dropout_rate=0.2):
    super(BottleneckLayer, self).__init__()
    self.norm1 = nn.BatchNorm2d(in_channel)
    self.relu = nn.ReLU(inplace=True)
    self.conv1 = nn.Conv2d(in_channel, base_out_channel*4, kernel_size=1, stride=1, padding=0, bias=False)
    self.norm2 = nn.BatchNorm2d(base_out_channel*4)
    self.conv2 = nn.Conv2d(base_out_channel*4, base_out_channel, kernel_size=3, stride=1, padding=1, bias=False)

    self.dropout_rate =dropout_rate
  
  def forward(self, x):
      y = self.conv1(self.relu(self.norm1(x)))
      if self.dropout_rate > 0:
        y = F.dropout(y, p=self.dropout_rate, training=self.training)
      y = self.conv2(self.relu(self.norm2(y)))
      if self.dropout_rate > 0:
        y = F.dropout(y, p=self.dropout_rate, training=self.training)
      return torch.cat([x, y], 1)

### DenseBlock 구현

In [6]:
class DenseBlock(nn.Sequential):
    def __init__(self, in_channel, growth_rate, number_of_layer):
        super(DenseBlock, self).__init__()
        current_in_channel = in_channel
        for number in range(number_of_layer):
            # DenseBlock 내부에 쌓이는 Layer 구성
            current_in_channel = in_channel + (growth_rate * number)
            current_layer_name = 'DENSEBLOCK_LAYER_' + str(number)
            self.add_module(current_layer_name, BottleneckLayer(current_in_channel, growth_rate))

### Transition Layer 구현

In [7]:
class TransitionLayer(nn.Sequential):
    def __init__(self, in_channel, reduction):
        super(TransitionLayer, self).__init__()
        self.add_module('BATCH_NORM', nn.BatchNorm2d(in_channel))
        self.add_module('RELU', nn.ReLU(inplace=True))
        self.add_module('CONV_1x1', nn.Conv2d(in_channel, int(in_channel*reduction), kernel_size=1, stride=1, bias=False))
        self.add_module('AVG_POOL_2x2', nn.AvgPool2d(kernel_size=2, stride=2))

### 모델 구현

In [8]:
class MyDenseNet(nn.Module):
    def __init__(self, size_of_channel, growth_rate, number_of_class, interation, reduction = 0.5):
        super(MyDenseNet, self).__init__()
        self.input = nn.Conv2d(size_of_channel, growth_rate*2, kernel_size=7, stride=2, padding=3, bias=True)
        self.max_pool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        
        # 1) e.g., Type is DenseNet-121: input → dense → transition: 64 → 256 → 128
        in_channel = growth_rate*2
        self.dense_block_1 = DenseBlock(in_channel, growth_rate, number_of_layer=interation[0])
        out_channel = in_channel + (growth_rate*interation[0])
        self.transition_layer_1 = TransitionLayer(out_channel, reduction)
        
        # 2) e.g., Type is DenseNet-121: input → dense → transition: 128 → 512 → 256
        in_channel = int(out_channel*reduction)
        self.dense_block_2 = DenseBlock(in_channel, growth_rate, number_of_layer=interation[1])
        out_channel = in_channel + (growth_rate*interation[1])
        self.transition_layer_2 = TransitionLayer(out_channel, reduction)
        
        # 3) e.g., Type is DenseNet-121: input → dense → transition: 256 → 1024 → 512
        in_channel = int(out_channel*reduction)
        self.dense_block_3 = DenseBlock(in_channel, growth_rate, number_of_layer=interation[2])
        out_channel = in_channel + (growth_rate*interation[2])
        self.transition_layer_3 = TransitionLayer(out_channel, reduction)
        
        # 4) e.g., Type is DenseNet-121: input → dense → transition: 512 → 1024
        in_channel = int(out_channel*reduction)
        self.dense_block_4 = DenseBlock(in_channel, growth_rate, number_of_layer=interation[3])
        out_channel = in_channel + (growth_rate*interation[3])
        
        self.fc_layer = nn.Linear(out_channel, number_of_class)
        self.__initialize_weights__()

    def __initialize_weights__(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.Linear):
                nn.init.normal_(m.weight, 0, 0.01)
                nn.init.constant_(m.bias, 0)
    
    def forward(self, x):
        input_feature_map = self.max_pool(self.input(x))
        
        dense_block_output_1 = self.dense_block_1(input_feature_map)
        transition_layer_output_1 = self.transition_layer_1(dense_block_output_1)
        
        dense_block_output_2 = self.dense_block_2(transition_layer_output_1)
        transition_layer_output_2 = self.transition_layer_2(dense_block_output_2)
        
        dense_block_output_3 = self.dense_block_3(transition_layer_output_2)
        transition_layer_output_3 = self.transition_layer_3(dense_block_output_3)
        
        dense_block_output_4 = self.dense_block_4(transition_layer_output_3)
        
        avg_pool_output = nn.functional.adaptive_avg_pool2d(dense_block_output_4, (1, 1))
        flatten = avg_pool_output.view(avg_pool_output.size(0), -1)
        
        output = self.fc_layer(flatten)
        return output

In [9]:
GROWTH_RATE = 32
ITERATIONS_PER_DEPTH = {
    'DenseNet-121': [6, 12, 24, 16],
    'DenseNet-169': [6, 12, 32, 32],
    'DenseNet-201': [6, 12, 48, 32],
    'DenseNet-264': [6, 12, 64, 48],
};

## Train / Test 코드

In [10]:
def train(device, dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    model.train()
    
    for batch, (X, y) in enumerate(dataloader):
      X, y = X.to(device), y.to(device)
      
      pred = model(X.cuda())
      loss = loss_fn(pred, y)
      
      optimizer.zero_grad()
      loss.backward()
      optimizer.step()
      
      print('.', end='')
      if batch % 100 == 0:
          print()
          loss, current = loss.item(), batch*len(X)
          print(f'loss: {loss:>7f}   [{current:>5d}/{size:>5d}]')

In [11]:
def test(device, dataloader, model, loss_fn):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.eval()
    
    test_loss, correct = 0, 0
    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()
    test_loss /= num_batches
    correct /= size
    print(f'Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loass: {test_loss:>8f}\n')

In [12]:
device = get_device()
print('Device:', device)

Device: cuda


## Model - CIFAR-10

In [13]:
training_dataset_cifar10 = datasets.CIFAR10(
    root=basic_path + '/data', train=True, download=True, transform=transforms.ToTensor(),
)
test_dataset_cifar10 = datasets.CIFAR10(
    root=basic_path + '/data', train=False, download=True, transform=transforms.ToTensor(),
)

rgb_mean, rgb_std = get_mean_std('rgb', training_dataset_cifar10)
rgb_transform = transforms.Compose([
  transforms.ToTensor(),
  transforms.RandomHorizontalFlip(),
  transforms.Resize(224),
  transforms.Normalize(mean=[rgb_mean], std=[rgb_std])
])
training_dataset_cifar10.transform = rgb_transform
test_dataset_cifar10.transform = rgb_transform

Downloading https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz to /content/drive/MyDrive/test//data/cifar-10-python.tar.gz


  0%|          | 0/170498071 [00:00<?, ?it/s]

Extracting /content/drive/MyDrive/test//data/cifar-10-python.tar.gz to /content/drive/MyDrive/test//data
Files already downloaded and verified


In [14]:
training_dataloader_cifar10 = DataLoader(training_dataset_cifar10, batch_size=64)
test_dataloader_cifar10 = DataLoader(test_dataset_cifar10, batch_size=64)

for X, y in test_dataloader_cifar10:
  print('Shape of X [N, C, H, W]:', X.shape)
  print('Shape of y:', y.shape, y.dtype)
  break

Shape of X [N, C, H, W]: torch.Size([64, 3, 224, 224])
Shape of y: torch.Size([64]) torch.int64


In [15]:
number_of_cifar10_classes = 10
model_cifar10 = MyDenseNet(3, GROWTH_RATE, number_of_cifar10_classes, ITERATIONS_PER_DEPTH['DenseNet-121'])
model_cifar10 = model_cifar10.to(device)

In [16]:
summary(model_cifar10, input_size=(3, 224, 224))

  return torch.max_pool2d(input, kernel_size, stride, padding, dilation, ceil_mode)


----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1         [-1, 64, 112, 112]           9,472
         MaxPool2d-2           [-1, 64, 56, 56]               0
       BatchNorm2d-3           [-1, 64, 56, 56]             128
              ReLU-4           [-1, 64, 56, 56]               0
            Conv2d-5          [-1, 128, 56, 56]           8,192
       BatchNorm2d-6          [-1, 128, 56, 56]             256
              ReLU-7          [-1, 128, 56, 56]               0
            Conv2d-8           [-1, 32, 56, 56]          36,864
   BottleneckLayer-9           [-1, 96, 56, 56]               0
      BatchNorm2d-10           [-1, 96, 56, 56]             192
             ReLU-11           [-1, 96, 56, 56]               0
           Conv2d-12          [-1, 128, 56, 56]          12,288
      BatchNorm2d-13          [-1, 128, 56, 56]             256
             ReLU-14          [-1, 128,

In [None]:
epochs = 40
for epoch in range(epochs):
  print(f'Epoch {epoch + 1}\n---------------------------------')
  train(device, training_dataloader_cifar10, model_cifar10, nn.CrossEntropyLoss(),
        torch.optim.SGD(model_cifar10.parameters(),lr=0.1))
  test(device, test_dataloader_cifar10, model_cifar10, nn.CrossEntropyLoss())

Epoch 1
---------------------------------
.
loss: 2.367886   [    0/50000]
....................................................................................................
loss: 1.881196   [ 6400/50000]
....................................................................................................
loss: 1.647252   [12800/50000]
....................................................................................................
loss: 1.842821   [19200/50000]
....................................................................................................
loss: 1.748433   [25600/50000]
....................................................................................................
loss: 1.820175   [32000/50000]
....................................................................................................
loss: 1.569057   [38400/50000]
....................................................................................................
loss: 1.612423   [44800/50000]
.

In [None]:
dest_model_path = basic_path + 'densenet_models/model_cifar10.pth'
print(dest_model_path)
torch.save(model_cifar10.state_dict(), dest_model_path)