# 6. DenseNet

In [1]:
import torch
from torch import nn
from torch.utils.data import DataLoader
from torchvision import datasets
from torchvision.transforms import ToTensor
import matplotlib.pyplot as plt
import matplotlib.cm as cm
import numpy as np
import os
import torchvision.transforms as transforms
from torch.nn import functional as F

In [2]:
device = torch.device('cuda:0')

In [3]:
# 트레이닝 데이터셋을 다운로드한다.
transform_train = transforms.Compose([
    transforms.Resize(7),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10),  # 회전
    transforms.RandomAffine(0, shear=10, scale=(0.8, 1.2)),  # 이동 및 왜곡
    transforms.ToTensor(),
    transforms.Normalize((0.5), (0.5)),
])


transform_test = transforms.Compose([
    transforms.Resize(7),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10),  # 회전
    transforms.RandomAffine(0, shear=10, scale=(0.8, 1.2)),  # 이동 및 왜곡
    transforms.ToTensor(),
    transforms.Normalize((0.5), (0.5)),
])

training_data = datasets.FashionMNIST(
    root="data",
    train=True,
    download=True,
    transform=ToTensor()
)

# 테스트 데이터셋을 다운로드한다.
test_data = datasets.FashionMNIST(
    root="data",
    train=False,
    download=True,
    transform=ToTensor()
)

train_dataloader = DataLoader(training_data, batch_size=64)
test_dataloader = DataLoader(test_data, batch_size=64)

In [4]:
def conv_block(num_channels):
    return nn.Sequential(
        nn.LazyConv2d(num_channels, kernel_size=3, padding=1),
        nn.BatchNorm2d(num_channels),  # 배치 정규화 추가
        nn.ReLU())

In [5]:
class DenseBlock(nn.Module):
    def __init__(self, num_convs, num_channels):
        super(DenseBlock, self).__init__()
        layer = []
        for i in range(num_convs):
            layer.append(conv_block(num_channels))
        self.net = nn.Sequential(*layer)

    def forward(self, X):
        for blk in self.net:
            Y = blk(X)
            # Concatenate input and output of each block along the channels
            X = torch.cat((X, Y), dim=1)
        return X

In [6]:
blk = DenseBlock(2, 10)
X = torch.randn(4, 3, 8, 8)
Y = blk(X)
Y.shape



torch.Size([4, 23, 8, 8])

In [7]:
def transition_block(num_channels):
    return nn.Sequential(
        nn.ReLU(),
        nn.LazyConv2d(num_channels, kernel_size=1),
        nn.AvgPool2d(kernel_size=2, stride=1))  # Changed stride to 1


In [8]:
blk = transition_block(10)
blk(Y).shape

torch.Size([4, 10, 7, 7])

In [9]:
def init_cnn(module):
    # Initialize weights for CNNs
    if type(module) == nn.Linear or type(module) == nn.Conv2d:
        nn.init.xavier_uniform_(module.weight)
        
class DenseNet(nn.Module):
    def b1(self):
        return nn.Sequential(
            nn.LazyConv2d(64, kernel_size=7, stride=2, padding=3),  # First conv layer
            nn.BatchNorm2d(64),  # Batch normalization
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=3, stride=2, padding=1))  # Max pooling
    
    def __init__(self, num_channels=64, growth_rate=32, arch=(4, 4, 4, 4),
                 lr=0.1, num_classes=10, dropout_rate=0.5):
        super(DenseNet, self).__init__()
        self.net = nn.Sequential(self.b1())  # Adding the first block `b1`
        for i, num_convs in enumerate(arch):
            self.net.add_module(f'dense_blk{i+1}', DenseBlock(num_convs, growth_rate))
            num_channels += num_convs * growth_rate
            if i != len(arch) - 1:
                num_channels //= 2
                self.net.add_module(f'tran_blk{i+1}', transition_block(num_channels))
            # Dropout layer
            self.net.add_module(f'dropout{i+1}', nn.Dropout(dropout_rate))
        self.net.add_module('last', nn.Sequential(
            nn.ReLU(),
            nn.AdaptiveAvgPool2d((1, 1)), nn.Flatten(),
            nn.LazyLinear(num_classes)))

    def para_init(self):
        self.net.apply(init_cnn)

    def forward(self, x):
        return self.net(x)

In [10]:
print(DenseNet())

DenseNet(
  (net): Sequential(
    (0): Sequential(
      (0): LazyConv2d(0, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3))
      (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (2): ReLU()
      (3): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
    )
    (dense_blk1): DenseBlock(
      (net): Sequential(
        (0): Sequential(
          (0): LazyConv2d(0, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
          (1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
          (2): ReLU()
        )
        (1): Sequential(
          (0): LazyConv2d(0, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
          (1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
          (2): ReLU()
        )
        (2): Sequential(
          (0): LazyConv2d(0, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
          (1): BatchNorm2d(32, 

In [11]:
def train_loop(dataloader, model, loss_fn, optimzer, device):
    size = len(dataloader.dataset)
    model.train() # 모델을 훈련 모드로 설정
    
    for batch, (X, y) in enumerate(dataloader):
        X = X.to(device)
        y = y.to(device)
        pred = model(X) # 포워드 패스 수행
        loss = loss_fn(pred, y) # CE 연산
        
        optimzer.zero_grad() # 0 으로 초기화
        loss.backward() # 역전파하여 그래디언트 계산
        optimzer.step() # 연산된 그래디언트를 사용해 파라미터를 업데이트
        
        if batch % 100 == 0: # 매 100회차 마다 다음 내용 출력
            loss, current = loss.item(), batch * len(X)
            #print(f'loss: {loss}, [{current:>5d}/{size:>5d}]')

def test_loop(dataloader, model, loss_fn, device):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    test_loss, correct = 0, 0
    model.eval() # 모델을 실행 모드로 설정
    
    with torch.no_grad(): # 그래디언트 연산 안함
        for X, y in dataloader:
            X = X.to(device)
            y = y.to(device)
            
            pred = model(X) # 포워드 패스 수행
            test_loss += loss_fn(pred, y) # CE 연산
            correct += (pred.argmax(1) == y).type(torch.float).sum().item() # 결과 일치하는지 확인
    
    test_loss /= num_batches
    correct /= size
    print(f'Test Error: \n Accuracy: {(100*correct):>0.1f}% Average Loss: {test_loss:>8f}\n')

In [12]:
def run(device):
    #device = 'cuda:0' if torch.cuda.is_available() else 'cpu'   
    #device = 'cpu'
    print(f"사용할 장치: {device}")

    model = DenseNet().to(device)

    learning_rate = 0.001
    batch_size = 64
    epochs = 10


    loss_fn = nn.CrossEntropyLoss().to(device)
    optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate, weight_decay=1e-4)

    for t in range(epochs):
        print(f'Epoch {t+1}\n--------------------------------------')
        train_loop(train_dataloader, model, loss_fn, optimizer, device)
        test_loop(train_dataloader, model, loss_fn, device)
    print("Done!")

In [13]:
run(device)

사용할 장치: cuda:0
Epoch 1
--------------------------------------
Test Error: 
 Accuracy: 53.5% Average Loss: 1.476431

Epoch 2
--------------------------------------
Test Error: 
 Accuracy: 70.3% Average Loss: 0.894926

Epoch 3
--------------------------------------
Test Error: 
 Accuracy: 75.1% Average Loss: 0.690411

Epoch 4
--------------------------------------
Test Error: 
 Accuracy: 77.4% Average Loss: 0.600286

Epoch 5
--------------------------------------
Test Error: 
 Accuracy: 79.7% Average Loss: 0.524975

Epoch 6
--------------------------------------
Test Error: 
 Accuracy: 80.1% Average Loss: 0.512616

Epoch 7
--------------------------------------
Test Error: 
 Accuracy: 82.0% Average Loss: 0.468492

Epoch 8
--------------------------------------
Test Error: 
 Accuracy: 83.8% Average Loss: 0.437227

Epoch 9
--------------------------------------
Test Error: 
 Accuracy: 85.1% Average Loss: 0.407206

Epoch 10
--------------------------------------
Test Error: 
 Accuracy: 85.5