<a href="https://colab.research.google.com/github/Chaeun26/Algorithm/blob/main/CIFAR10_ResNet_Simplified_ver.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.backends.cudnn as cudnn
import torch.optim as optim
import os
# Those are for calculating errors.
import numpy as np
import pandas as pd

In [None]:
# BasicBlock for ResNet (simplified version) 
# -> they are connected to make ResNet construction

# do each planes in the block
# make block (without bottleneck architecture)
class BasicBlock(nn.Module):
    def __init__(self, in_planes, planes, stride=1):
        # to execute nn.Module.__init__()
        super(BasicBlock, self).__init__()

        # use 3x3 filter(kernel) 
        # (change stride value to reduce width and height)
        self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
        # batch normalization for each planes (dimensions)
        self.bn1 = nn.BatchNorm2d(planes)

        # use 3x3 filter(kernel) (width and height are not changed because stride is 1)
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(planes) # batch normalization

        # nn.Sequential() is for small model
        self.shortcut = nn.Sequential() 
        
        # other option: identity mapping
        # if it is not an Identity mapping, do projection
        if stride != 1: 
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_planes, planes, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(planes)
            )

    # forward propagation
    def forward(self, x):
        # convolution - batch normalization - relu
        out = F.relu(self.bn1(self.conv1(x))) 
        # 2nd convolution - batch normalization
        out = self.bn2(self.conv2(out)) 
        # skip connection (x mapping)
        out += self.shortcut(x) 
        # relu
        out = F.relu(out) 
        return out

# define ResNet class
# based on ImageNet architecture
# CIFAR10: (less parameter than ImageNet)
class ResNet(nn.Module):
    def __init__(self, block, num_blocks, num_classes=10):
        
        super(ResNet, self).__init__()
        self.in_planes = 64

        # change dimension using convolution
        # input channel = 3 (R, G, B)
        # output channel = 64 (change dimension)
        # 3x3 filters (64 = # of filters)
        # calcaulate output size = (input_size(image size) - filter size + 2xpadding)/stride + 1
        self.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1, bias=False) 
        # batch normalization to additional channel dimension (64)
        self.bn1 = nn.BatchNorm2d(64)
        # (# of filters, # of blocks each layer, stride)
        self.layer1 = self._make_layer(block, 64, num_blocks[0], stride=1) 
        self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=2)
        self.layer3 = self._make_layer(block, 256, num_blocks[2], stride=2)
        self.layer4 = self._make_layer(block, 512, num_blocks[3], stride=2) 
        # linear transformation for fully connected layer
        self.linear = nn.Linear(512, num_classes)

    def _make_layer(self, block, planes, num_blocks, stride):
        # the next strides are all 1, reduce width and height by the only first convolution
        # downsampling through the stride 2 (halve feature map size) & double # of filter
        # see the paper p.3. Plain Network part
        strides = [stride] + [1] * (num_blocks - 1) 
        layers = []
        for stride in strides:
            # append blocks within the same layer
            layers.append(block(self.in_planes, planes, stride))
            # change the number of input channels for the next block within the same layer
            self.in_planes = planes 
        return nn.Sequential(*layers)

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x))) # input, (+1 to # layer)

        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.layer4(out)

        # average pooling
        out = F.avg_pool2d(out, 4)

        # use view to change the shape & applies linear transformation
        out = out.view(out.size(0), -1)
        out = self.linear(out) # fully-connected layer (+1 to # layer)
        return out


# define ResNet18 funtion (20 layers, n=3 for 6n+2)
def ResNet18():
    # each basic blocks (each column means the number of blocks) are overlapped twice
    return ResNet(BasicBlock, [2, 2, 2, 2]) 

# define ResNet34 funtion (32 layers, n=5 for 6n+2)
def ResNet34():
    # each basic blocks are overlapped twice
    return ResNet(BasicBlock, [3, 4, 6, 3]) 



In [None]:
# download dataset (without normalization)
import torchvision
import torchvision.transforms as transforms

transform_train = transforms.Compose([
    # training CIFAR10: padding=4, random crop with size 32                                  
    transforms.RandomCrop(32, padding=4), 
    transforms.RandomHorizontalFlip(), # data augumentation
    transforms.ToTensor(),
    
])

# evaluate the single view of the original 32x32 image
transform_test = transforms.Compose([
    transforms.ToTensor(),
    
])

# using CIFAR10 provided by torchvision (download)
train_dataset = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=transform_train)
test_dataset = torchvision.datasets.CIFAR10(root='./data', train=False, download=True, transform=transform_test)

# create loader objects
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=128, shuffle=True, num_workers=4)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=100, shuffle=False, num_workers=4)


Downloading https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz to ./data/cifar-10-python.tar.gz


  0%|          | 0/170498071 [00:00<?, ?it/s]

Extracting ./data/cifar-10-python.tar.gz to ./data
Files already downloaded and verified


  cpuset_checked))


In [None]:
device = 'cuda'

# which ResNet do you want to use (change net!)
net = ResNet34()
net = net.to(device)
# can we delete it?
net = torch.nn.DataParallel(net)
cudnn.benchmark = True

learning_rate = 0.1
# save model file
file_name = 'resnet18_cifar10.pt' 

# choose which loss to use & which optimizer to use
# nn.CrossEntropyLoss(): useful for classification with C classes
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(), lr=learning_rate, momentum=0.9, weight_decay=0.0001)

# use the same one with the referenced one to compare
def evaluate(model, data_loader, device):
    """
    Calculate classification error (%) for given model
    and data set.

    by comparing the true y value and predicted value based on the model
    
    Parameters:
    
    - model: A Trained Pytorch Model 
    - data_loader: A Pytorch data loader object
    """
    
    y_true = np.array([], dtype=np.int)
    y_pred = np.array([], dtype=np.int)
    
    with torch.no_grad():
        for data in data_loader:
            inputs, labels = data
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, predicted = torch.max(outputs.data, 1)
            
            y_true = np.concatenate((y_true, labels.cpu()))
            y_pred = np.concatenate((y_pred, predicted.cpu()))
    
    error = np.sum(y_pred != y_true) / len(y_true)
    return error

def train(epoch):
    print('\n[ Train epoch: %d ]' % epoch)
    net.train()
    # variables to print process
    train_loss = 0
    correct = 0
    total = 0

    # load data through the train_loader with the specific batch size
    for batch_idx, (inputs, targets) in enumerate(train_loader): 
        
        inputs, targets = inputs.to(device), targets.to(device)
        
        # update model(ResNet) parameter for every epoach, every iteration
        # through the back propagation (5 lines below)

        # add gradients when backward propagation (in Pytorch)
        # set all gradients to 0 before back propagation (after each iteration)
        # if not, gradients can direct the different direction
        optimizer.zero_grad() 
        # predicted data (made from the designed model)
        pred = net(inputs)
        # calculate loss 
        loss = criterion(pred, targets)
        # backpropagation 
        loss.backward()
        # update weight (the whole model parameter)
        optimizer.step()
        # print the loss value (training process)
        train_loss += loss.item()

        # calculate accuracy
        _, predicted = pred.max(1)

        total += targets.size(0)
        correct += predicted.eq(targets).sum().item()
        
        if batch_idx % 100 == 0:
            print('\nCurrent batch:', str(batch_idx))
            print('Current benign train accuracy:', str(predicted.eq(targets).sum().item() / targets.size(0)))
            print('Current benign train loss:', loss.item())

    train_error = evaluate(net, train_loader, device)

    print('\nTotal benign train accuarcy:', 100. * correct / total)
    print('Total benign train loss:', train_loss)
    print('Train error:', train_error)


def test(epoch):
    print('\n[ Test epoch: %d ]' % epoch)
    # change network into evaluation mode (do not update a parameter)
    net.eval() 
    # variables to print process (initialized)
    loss = 0
    correct = 0
    total = 0

    for batch_idx, (inputs, targets) in enumerate(test_loader):
        inputs, targets = inputs.to(device), targets.to(device)
        total += targets.size(0)

        # find output value from the input value
        outputs = net(inputs) 

        # calculate loss
        loss += criterion(outputs, targets).item()

        # calculate accuracy
        _, predicted = outputs.max(1)
        correct += predicted.eq(targets).sum().item()

    test_error = evaluate(net, test_loader, device)

    print('\nTest accuarcy:', 100. * correct / total)
    print('Test average loss:', loss / total)
    print('Test error:', test_error)

    state = {
        'net': net.state_dict()
    }
    if not os.path.isdir('checkpoint'):
        os.mkdir('checkpoint')
    torch.save(state, './checkpoint/' + file_name)
    print('Model Saved!')


In [None]:
# for epoch in range(0, 40)
for epoch in range(0, 40):
    train(epoch)
    test(epoch)


[ Train epoch: 0 ]


  cpuset_checked))



Current batch: 0
Current benign train accuracy: 0.1015625
Current benign train loss: 2.426790714263916

Current batch: 100
Current benign train accuracy: 0.2578125
Current benign train loss: 2.041116714477539

Current batch: 200
Current benign train accuracy: 0.2734375
Current benign train loss: 1.9775303602218628

Current batch: 300
Current benign train accuracy: 0.375
Current benign train loss: 1.7740352153778076

Total benign train accuarcy: 27.152
Total benign train loss: 804.1382712125778
Train error: 0.61156

[ Test epoch: 0 ]

Test accuarcy: 40.37
Test average loss: 0.01628780416250229
Test error: 0.5963
Model Saved!

[ Train epoch: 1 ]

Current batch: 0
Current benign train accuracy: 0.421875
Current benign train loss: 1.6428111791610718

Current batch: 100
Current benign train accuracy: 0.453125
Current benign train loss: 1.5901762247085571

Current batch: 200
Current benign train accuracy: 0.4921875
Current benign train loss: 1.5169438123703003

Current batch: 300
Current be