In [14]:
import torch 
import torch.nn as nn

class Residual(nn.Module):
    def __init__(self, in_channels, num_channels, use_1x1conv = False, strides = 1):
        super(Residual, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, num_channels, kernel_size = 3, padding = 1, stride = strides)
        self.conv2 = nn.Conv2d(num_channels, num_channels, kernel_size = 3, padding = 1)
        self.conv3 = None

        if use_1x1conv:
            self.conv3 = nn.Conv2d(in_channels, num_channels, kernel_size = 1, stride = strides)
        self.bn1 = nn.BatchNorm2d(num_channels)
        self.bn2 = nn.BatchNorm2d(num_channels)

    def forward(self, x):
        y = self.conv1(x)
        y = self.bn1(y)
        y = nn.ReLU()(y)
        y = self.conv2(y)
        y = self.bn2(y)

        if self.conv3:
            x = self.conv3(x)
        return nn.ReLU()(y + x)

In [15]:
X = torch.randn((4, 3, 6, 6))
blk = Residual(in_channels = X.shape[1], num_channels = 3)
assert blk(X).shape == (4, 3, 6, 6)
blk(X).shape

torch.Size([4, 3, 6, 6])

In [16]:
blk = Residual(in_channels = X.shape[1], num_channels = 6, use_1x1conv = True, strides = 2)
assert blk(X).shape == (4, 6, 3, 3)
blk(X).shape

torch.Size([4, 6, 3, 3])

In [25]:
net = nn.Sequential()
net.add_module("conv", nn.Conv2d(1, 64, kernel_size = 7, stride = 2, padding = 3))
net.add_module("batchnorm", nn.BatchNorm2d(64))
net.add_module("Relu", nn.ReLU())
net.add_module("maxpool", nn.MaxPool2d(3, stride = 2, padding = 1))

In [26]:
def resnet_block(in_channels, num_channels, num_residuals, first_block = False):
    blk = nn.Sequential()
    
    for i in range(num_residuals):
        
        if i == 0 and not first_block:
            blk.add_module('residual_{}'.format(i), Residual(in_channels, num_channels, use_1x1conv = True, strides = 2))
        else:
            blk.add_module('residual_{}'.format(i), Residual(num_channels, num_channels))
    return blk

In [27]:
net.add_module('resnet_block1', resnet_block(64, 64, 2, first_block = True))
net.add_module('resnet_block2', resnet_block(64, 128, 2))
net.add_module('resnet_block3', resnet_block(128, 256, 2))
net.add_module('resnet_block4', resnet_block(256, 512, 2))

In [28]:
net.add_module('GlobalAvr', nn.AdaptiveAvgPool2d((1, 1)))
net.add_module('Flatten', nn.Flatten())
net.add_module('FC', nn.Linear(512, 10))

In [29]:
X = torch.randn((1, 1, 224, 224))
for layer in net:
    X = layer(X)
    print(layer.__class__.__name__, 'output shape:\t', X.shape)

Conv2d output shape:	 torch.Size([1, 64, 112, 112])
BatchNorm2d output shape:	 torch.Size([1, 64, 112, 112])
ReLU output shape:	 torch.Size([1, 64, 112, 112])
MaxPool2d output shape:	 torch.Size([1, 64, 56, 56])
Sequential output shape:	 torch.Size([1, 64, 56, 56])
Sequential output shape:	 torch.Size([1, 128, 28, 28])
Sequential output shape:	 torch.Size([1, 256, 14, 14])
Sequential output shape:	 torch.Size([1, 512, 7, 7])
AdaptiveAvgPool2d output shape:	 torch.Size([1, 512, 1, 1])
Flatten output shape:	 torch.Size([1, 512])
Linear output shape:	 torch.Size([1, 10])


In [30]:
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
import numpy as np
import matplotlib.pyplot as plt
import random

In [31]:
num_classes = 10
epochs = 10
learning_rate = 0.001
batch_size = 128
display_step = 100
checkpoint = 'Dang_resnet.pth'
device = 'cuda' if torch.cuda.is_available() else 'cpu'
assert device == 'cuda'
print(torch.cuda.is_available())

True


In [32]:
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5, ), (0.5, ))
])

train_dataset = datasets.MNIST('../data', train = True, download = True, transform = transform)
test_dataset = datasets.MNIST('../data', train = False, transform = transform)

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size = batch_size)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size = batch_size)

In [34]:
model = net.to(device)

try:
    model.load_state_dict(torch.load(checkpoint))
except:
    print("!!! Hãy train để có checkpoint file")

!!! Hãy train để có checkpoint file


In [35]:
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr = learning_rate)
best_val_loss = 999

for epoch in range(1, epochs):
    
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()

        if batch_idx % display_step == 0:
            print('Train epoch: {} [{}/{} ({:.0f}%)]\tTrain Loss: {:.6f}'.format(epoch,  batch_idx * len(data),
                                                                                 len(train_loader.dataset),
                                                                                 100. * batch_idx / len(train_loader),
                                                                                 loss.item()))
    model.eval()
    test_loss = 0
    correct = 0

    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            output = F.log_softmax(output, dim = 1)
            test_loss += criterion(output, target)
            pred = output.argmax(dim = 1, keepdim = True)
            correct += pred.eq(target.view_as(pred)).sum().item()
        test_loss /= len(test_loader.dataset)

        if test_loss < best_val_loss:
            best_val_loss = test_loss
            torch.save(model.state_dict(), checkpoint)
            print("***********    TEST_ACC = {}%    ***********".format(correct))

***********    TEST_ACC = 9742%    ***********
***********    TEST_ACC = 9845%    ***********
***********    TEST_ACC = 9851%    ***********
***********    TEST_ACC = 9870%    ***********
***********    TEST_ACC = 9914%    ***********


In [36]:
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr = learning_rate, momentum = 0.9, weight_decay = 1e-4)
best_val_loss = 999

for epoch in range(1, epochs):
    
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()

        if batch_idx % display_step == 0:
            print('Train epoch: {} [{}/{} ({:.0f}%)]\tTrain Loss: {:.6f}'.format(epoch,  batch_idx * len(data),
                                                                                 len(train_loader.dataset),
                                                                                 100. * batch_idx / len(train_loader),
                                                                                 loss.item()))
    model.eval()
    test_loss = 0
    correct = 0

    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            output = F.log_softmax(output, dim = 1)
            test_loss += criterion(output, target)
            pred = output.argmax(dim = 1, keepdim = True)
            correct += pred.eq(target.view_as(pred)).sum().item()
        test_loss /= len(test_loader.dataset)

        if test_loss < best_val_loss:
            best_val_loss = test_loss
            torch.save(model.state_dict(), checkpoint)
            print("***********    TEST_ACC = {}%    ***********".format(correct))

***********    TEST_ACC = 9931%    ***********
***********    TEST_ACC = 9933%    ***********
***********    TEST_ACC = 9936%    ***********
***********    TEST_ACC = 9938%    ***********
***********    TEST_ACC = 9938%    ***********
***********    TEST_ACC = 9937%    ***********
***********    TEST_ACC = 9938%    ***********
***********    TEST_ACC = 9940%    ***********
***********    TEST_ACC = 9940%    ***********
