# 基于CIFAR-10数据集的图片分类任务

In [None]:
!pip install torchsummary

In [None]:
# train cifar dataset

import numpy as np
import torch
import torchvision
import torchvision.transforms as transforms
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset
from torchsummary import summary
import matplotlib.pyplot as plt
import time

# define the transform of the data
transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]) # mean=0.5,std=0.5,data range[-1,1]

# download the data
cifar_root = '/Users/lvangge/Documents/ /code/codes_/神经网络深度学习/pj2'
trainset = torchvision.datasets.CIFAR10(root=cifar_root, train=True, download=True, transform=transform)
testset = torchvision.datasets.CIFAR10(root=cifar_root, train=False, download=True, transform=transform)

trainloader = DataLoader(trainset, batch_size=64, shuffle=2, num_workers=2)
testloader = DataLoader(testset, batch_size=64, shuffle=2, num_workers=2)


## **定义训练函数和测试函数，返回损失函数数值和错误率，主要训练架构**

In [None]:
def train(dataloader, loss_func, optimizer, model, device):
    size = len(dataloader.dataset)
    num_batch = len(dataloader)
    train_loss, train_err = 0, 0
    
    for x, y in dataloader:
        x, y = x.to(device), y.to(device)
        pred = model(x)
        loss = loss_func(pred, y)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        train_loss += loss.item()
        train_err += (torch.argmax(F.softmax(pred, dim=1), dim=1) != y).type(torch.float).sum().item()

    train_loss /= num_batch
    train_err /= size

    return train_loss, train_err

def test(dataloader, loss_func, optimizer, model, device=device):
    size = len(dataloader.dataset)
    num_batch = len(dataloader)
    test_loss, test_err = 0, 0
    
    for x, y in dataloader:
        x, y = x.to(device), y.to(device)
        pred = model(x)
        loss = loss_func(pred, y)

        test_loss += loss.item()
        test_err += (torch.argmax(F.softmax(pred, dim=1), dim=1) != y).type(torch.float).sum().item()

    test_loss /= num_batch
    test_err /= size

    return test_loss, test_err
print('function done')

**主要训练框架,初步训练模型**

In [None]:
nclass = 10
class Model(nn.Module):
    def __init__(self) -> None:
        super().__init__()
        self.conv1 = nn.Conv2d(3, 6, kernel_size=5) # 32 -> 28
        self.pool1 = nn.MaxPool2d(kernel_size=2) # 28 -> 14
        self.conv2 = nn.Conv2d(6, 16, kernel_size=5) # 14 -> 10
        self.pool2 = nn.MaxPool2d(kernel_size=2) # 10 -> 5
        self.fc1 = nn.Linear(16*5*5, 64)
        self.dr = nn.Dropout(p=0.5)
        self.fc2 = nn.Linear(64, nclass)

    def forward(self, x):
        x = self.pool1(F.relu(self.conv1(x)))
        x = self.pool2(F.relu(self.conv2(x)))
        x = torch.flatten(x, start_dim=1)
        x = F.relu(self.fc1(x))
        x = self.dr(x)
        x = F.relu(self.fc2(x))
        
        return x

model = Model()
summary(model, input_size=(3,32,32), device='cpu')
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

loss_func = nn.CrossEntropyLoss()
lr = 1e-3
optimizer = torch.optim.SGD(model.parameters(), lr=lr, momentum = 0.9)

if __name__ == '__main__':
    epochs = 10
    train_loss = []
    train_err = []
    test_loss = []
    test_err =[]
    start_time = time.time()

    for epoch in range(epochs):
        model.train()
        e_train_loss, e_train_err = train(trainloader, loss_func, optimizer, model, device)

        model.eval()
        e_test_loss, e_test_err = test(testloader, loss_func, optimizer, model)

        train_loss.append(e_test_loss)
        train_err.append(e_train_err)
        test_loss.append(e_train_loss)
        test_err.append(e_test_err)

        template = ('Epoch:{:2d}, train_err:{:.1f}%, train_loss:{:.3f}, test_err:{:.1f}%, test_loss:{:.3f}')
        print(template.format(epoch+1, e_train_err*100, e_train_loss, e_test_err*100, e_test_loss))
#         epoch_time = time.time()
#         print(f'Epoch{epoch+1} time: {epoch_time - start_time}')

def plot():
    epoch_range = range(epochs)
    plt.figure(figsize=(12,3))
    
    plt.subplot(1, 2, 1)
    plt.plot(epoch_range, train_loss, label='Train Loss')
    plt.plot(epoch_range, test_loss, label='Test Loss')
    plt.legend()
    plt.title('Loss Plot')
    
    plt.subplot(1, 2, 2)
    plt.plot(epoch_range, train_err, label='Train Error')
    plt.plot(epoch_range, test_err, label='Test Error')
    plt.legend()
    plt.title('Error Plot')
    
    plt.show()

plot()


**尝试使用更复杂的模型**

In [None]:
nclass = 10
class Model(nn.Module):
    def __init__(self) -> None:
        super().__init__()
        self.conv1 = nn.Conv2d(3, 32, kernel_size=5) #inpue size 32*32
        self.pool1 = nn.MaxPool2d(kernel_size=2) # 28
        self.conv2 = nn.Conv2d(32, 64, kernel_size=5) #14
        self.pool2 = nn.MaxPool2d(kernel_size=2) #10
        self.conv3 = nn.Conv2d(64, 128, kernel_size=5) #size 5*5 -> 1*1
        self.fc1 = nn.Linear(128, 64)
        self.dr = nn.Dropout(p=0.5)
        self.fc2 = nn.Linear(64, 10)

    def forward(self, x):
        x = self.pool1(F.relu(self.conv1(x)))
        x = self.pool2(F.relu(self.conv2(x)))
        x = F.relu(self.conv3(x))
        x = torch.flatten(x, start_dim=1)
        x = F.relu(self.fc1(x))
        x = self.dr(x)
        x = F.relu(self.fc2(x))
        
        return x

model = Model()
summary(model, input_size=(3,32,32), device='cpu')
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

In [None]:
loss_func = nn.CrossEntropyLoss()
lr = 1e-3
optimizer = torch.optim.SGD(model.parameters(), lr=lr, momentum = 0.9)

if __name__ == '__main__':
    epochs = 10
    train_loss = []
    train_err = []
    test_loss = []
    test_err =[]
    start_time = time.time()

    for epoch in range(epochs):
        model.train()
        e_train_loss, e_train_err = train(trainloader, loss_func, optimizer, model, device)

        model.eval()
        e_test_loss, e_test_err = test(testloader, loss_func, optimizer, model)

        train_loss.append(e_test_loss)
        train_err.append(e_train_err)
        test_loss.append(e_train_loss)
        test_err.append(e_test_err)

        template = ('Epoch:{:2d}, train_err:{:.1f}%, train_loss:{:.3f}, test_err:{:.1f}%, test_loss:{:.3f}')
        print(template.format(epoch+1, e_train_err*100, e_train_loss, e_test_err*100, e_test_loss))
#         epoch_time = time.time()
#         print(f'Epoch{epoch+1} time: {epoch_time - start_time}')

def plot():
    epoch_range = range(epochs)
    plt.figure(figsize=(12,3))
    
    plt.subplot(1, 2, 1)
    plt.plot(epoch_range, train_loss, label='Train Loss')
    plt.plot(epoch_range, test_loss, label='Test Loss')
    plt.ylim(0,3)
    plt.legend()
    plt.title('Loss Plot')
    
    plt.subplot(1, 2, 2)
    plt.plot(epoch_range, train_err, label='Train Error')
    plt.plot(epoch_range, test_err, label='Test Error')
    plt.ylim(0,1)
    plt.legend()
    plt.title('Error Plot')
    
    plt.show()

plot()

**更换复杂的模型后大大提升了网络中神经元的数量，但是对训练结果的改进却很小，下面尝试使用其他的优化器，e.g. Adam**

In [None]:
nclass = 10
class Model(nn.Module):
    def __init__(self) -> None:
        super().__init__()
        self.conv1 = nn.Conv2d(3, 6, kernel_size=5) # 32 -> 28
        self.pool1 = nn.MaxPool2d(kernel_size=2) # 28 -> 14
        self.conv2 = nn.Conv2d(6, 16, kernel_size=5) # 14 -> 10
        self.pool2 = nn.MaxPool2d(kernel_size=2) # 10 -> 5
        self.fc1 = nn.Linear(16*5*5, 64)
        self.dr = nn.Dropout(p=0.5)
        self.fc2 = nn.Linear(64, nclass)

    def forward(self, x):
        x = self.pool1(F.relu(self.conv1(x)))
        x = self.pool2(F.relu(self.conv2(x)))
        x = torch.flatten(x, start_dim=1)
        x = F.relu(self.fc1(x))
        x = self.dr(x)
        x = F.relu(self.fc2(x))
        
        return x

model = Model()
summary(model, input_size=(3,32,32), device='cpu')
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

In [None]:
loss_func = nn.CrossEntropyLoss()
lr = 1e-3
optimizer = torch.optim.Adam(model.parameters(),lr = lr)

if __name__ == '__main__':
    epochs = 50
    train_loss = []
    train_err = []
    test_loss = []
    test_err =[]
    start_time = time.time()

    for epoch in range(epochs):
        model.train()
        e_train_loss, e_train_err = train(trainloader, loss_func, optimizer, model, device)

        model.eval()
        e_test_loss, e_test_err = test(testloader, loss_func, optimizer, model)

        train_loss.append(e_test_loss)
        train_err.append(e_train_err)
        test_loss.append(e_train_loss)
        test_err.append(e_test_err)

        template = ('Epoch:{:2d}, train_err:{:.1f}%, train_loss:{:.3f}, test_err:{:.1f}%, test_loss:{:.3f}')
        if (epoch+1) % 5 == 0:
            print(template.format(epoch+1, e_train_err*100, e_train_loss, e_test_err*100, e_test_loss))
#         epoch_time = time.time()
#         print(f'Epoch{epoch+1} time: {epoch_time - start_time}')

def plot():
    epoch_range = range(epochs)
    plt.figure(figsize=(12,3))
    
    plt.subplot(1, 2, 1)
    plt.plot(epoch_range, train_loss, label='Train Loss')
    plt.plot(epoch_range, test_loss, label='Test Loss')
    plt.ylim(0,3)
    plt.legend()
    plt.title('Loss Plot')
    
    plt.subplot(1, 2, 2)
    plt.plot(epoch_range, train_err, label='Train Error')
    plt.plot(epoch_range, test_err, label='Test Error')
    plt.ylim(0,1)
    plt.legend()
    plt.title('Error Plot')
    
    plt.show()

plot()

**Adam优化器在第一个epoch表现良好，但是后续收敛速度减慢，考虑调整学习率为0.01**

In [None]:
loss_func = nn.CrossEntropyLoss()
lr = 1e-2
optimizer = torch.optim.Adam(model.parameters(),lr = lr)

if __name__ == '__main__':
    epochs = 50
    train_loss = []
    train_err = []
    test_loss = []
    test_err =[]
    start_time = time.time()

    for epoch in range(epochs):
        model.train()
        e_train_loss, e_train_err = train(trainloader, loss_func, optimizer, model, device)

        model.eval()
        e_test_loss, e_test_err = test(testloader, loss_func, optimizer, model)

        train_loss.append(e_test_loss)
        train_err.append(e_train_err)
        test_loss.append(e_train_loss)
        test_err.append(e_test_err)

        template = ('Epoch:{:2d}, train_err:{:.1f}%, train_loss:{:.3f}, test_err:{:.1f}%, test_loss:{:.3f}')
        print(template.format(epoch+1, e_train_err*100, e_train_loss, e_test_err*100, e_test_loss))
#         epoch_time = time.time()
#         print(f'Epoch{epoch+1} time: {epoch_time - start_time}')

def plot():
    epoch_range = range(epochs)
    plt.figure(figsize=(12,3))
    
    plt.subplot(1, 2, 1)
    plt.plot(epoch_range, train_loss, label='Train Loss')
    plt.plot(epoch_range, test_loss, label='Test Loss')
    plt.ylim(0,3)
    plt.legend()
    plt.title('Loss Plot')
    
    plt.subplot(1, 2, 2)
    plt.plot(epoch_range, train_err, label='Train Error')
    plt.plot(epoch_range, test_err, label='Test Error')
    plt.ylim(0,1)
    plt.legend()
    plt.title('Error Plot')
    
    plt.show()

plot()

## 观察可知此时训练结果出现震荡，考虑采用学习率波动衰减策略

使用CosineAnnealingLR

In [None]:
from torch.optim.lr_scheduler import CosineAnnealingLR

In [None]:
def train(dataloader, loss_func, optimizer, model, device):
    size = len(dataloader.dataset)
    num_batch = len(dataloader)
    train_loss, train_err = 0, 0
    
    for x, y in dataloader:
        x, y = x.to(device), y.to(device)
        pred = model(x)
        loss = loss_func(pred, y)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        train_loss += loss.item()
        train_err += (torch.argmax(F.softmax(pred, dim=1), dim=1) != y).type(torch.float).sum().item()

    train_loss /= num_batch
    train_err /= size

    return train_loss, train_err


In [None]:
model = Model()
summary(model, input_size=(3,32,32), device='cpu')
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

lr = 1e-3
loss_func = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
lr_schedule = CosineAnnealingLR(optimizer, T_max = 25)

if __name__ == '__main__':
    epochs = 50
    train_loss = []
    train_err = []
    test_loss = []
    test_err =[]
    start_time = time.time()

    for epoch in range(epochs):
        model.train()
        e_train_loss, e_train_err = train(trainloader, loss_func, optimizer, model, device)
        lr_schedule.step()

        model.eval()
        e_test_loss, e_test_err = test(testloader, loss_func, optimizer, model)

        train_loss.append(e_test_loss)
        train_err.append(e_train_err)
        test_loss.append(e_train_loss)
        test_err.append(e_test_err)

        template = ('Epoch:{:2d}, train_err:{:.1f}%, train_loss:{:.3f}, test_err:{:.1f}%, test_loss:{:.3f}')
        if (epoch+1) % 5 == 0:
            print(template.format(epoch+1, e_train_err*100, e_train_loss, e_test_err*100, e_test_loss))
#         epoch_time = time.time()
#         print(f'Epoch{epoch+1} time: {epoch_time - start_time}')

def plot():
    epoch_range = range(epochs)
    plt.figure(figsize=(12,3))
    
    plt.subplot(1, 2, 1)
    plt.plot(epoch_range, train_loss, label='Train Loss')
    plt.plot(epoch_range, test_loss, label='Test Loss')
    plt.ylim(0,3)
    plt.legend()
    plt.title('Loss Plot')
    
    plt.subplot(1, 2, 2)
    plt.plot(epoch_range, train_err, label='Train Error')
    plt.plot(epoch_range, test_err, label='Test Error')
    plt.ylim(0,1)
    plt.legend()
    plt.title('Error Plot')
    
    plt.show()

plot()

训练效果无明显进步，尝试配合使用复杂模型和Adam优化器

In [None]:
nclass = 10
class Model(nn.Module):
    def __init__(self) -> None:
        super().__init__()
        self.conv1 = nn.Conv2d(3, 32, kernel_size=5) #inpue size 32 -> 28
        self.pool1 = nn.MaxPool2d(kernel_size=2) # 28 -> 14
        self.conv2 = nn.Conv2d(32, 64, kernel_size=5) #14 ->10 
        self.pool2 = nn.MaxPool2d(kernel_size=2) #10 -> 5
        self.conv3 = nn.Conv2d(64, 128, kernel_size=3) #5 -> 3
        self.fc1 = nn.Linear(128*3*3, 64)
        self.dr = nn.Dropout(p=0.5)
        self.fc2 = nn.Linear(64, 10)

    def forward(self, x):
        x = self.pool1(F.relu(self.conv1(x)))
        x = self.pool2(F.relu(self.conv2(x)))
        x = F.relu(self.conv3(x))
        x = torch.flatten(x, start_dim=1)
        x = F.relu(self.fc1(x))
        x = self.dr(x)
        x = F.relu(self.fc2(x))
        
        return x

model = Model()
summary(model, input_size=(3,32,32), device='cpu')
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

loss_func = nn.CrossEntropyLoss()
lr = 1e-3
optimizer = torch.optim.Adam(model.parameters(),lr = lr)

if __name__ == '__main__':
    epochs = 50
    train_loss = []
    train_err = []
    test_loss = []
    test_err =[]
    start_time = time.time()

    for epoch in range(epochs):
        model.train()
        e_train_loss, e_train_err = train(trainloader, loss_func, optimizer, model, device)

        model.eval()
        e_test_loss, e_test_err = test(testloader, loss_func, optimizer, model)

        train_loss.append(e_train_loss)
        train_err.append(e_train_err)
        test_loss.append(e_test_loss)
        test_err.append(e_test_err)

        template = ('Epoch:{:2d}, train_err:{:.1f}%, train_loss:{:.3f}, test_err:{:.1f}%, test_loss:{:.3f}')
        if (epoch+1) % 5 == 0:
            print(template.format(epoch+1, e_train_err*100, e_train_loss, e_test_err*100, e_test_loss))
    end_time = time.time()
    print('runing time: {:.2f}s'.format(end_time - start_time))

def plot():
    epoch_range = range(epochs)
    plt.figure(figsize=(12,3))
    
    plt.subplot(1, 2, 1)
    plt.plot(epoch_range, train_loss, label='Train Loss')
    plt.plot(epoch_range, test_loss, label='Test Loss')
    plt.ylim(0,3)
    plt.legend()
    plt.title('Loss Plot')
    
    plt.subplot(1, 2, 2)
    plt.plot(epoch_range, train_err, label='Train Error')
    plt.plot(epoch_range, test_err, label='Test Error')
    plt.ylim(0,1)
    plt.legend()
    plt.title('Error Plot')
    
    plt.show()

plot()

训练效果改进明显，但是在使用复杂模型跑50个epoch后过拟合明显

## 尝试进行数据增广

使用tochvision的transform

In [None]:
import torch
import torchvision.transforms as transforms

train_transform = transforms.Compose([
    transforms.RandomVerticalFlip(p=0.5),
    transforms.RandomRotation((-45, 45)),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
    transforms.RandomCrop(32, padding=4),
    transforms.ToTensor(),
    transforms.Normalize((0.5,0.5,0.5),(0.5,0.5,0.5))
])
test_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5,0.5,0.5),(0.5,0.5,0.5))
])
print('transform done')

In [None]:
cifar_root = '/Users/lvangge/Documents/ /code/codes_/神经网络深度学习/pj2'
def load_data(root,is_train,transform):
    set = torchvision.datasets.CIFAR10(root=cifar_root, train=is_train, download=False, transform=transform)
    dataloader = DataLoader(set, batch_size=128, shuffle=True, num_workers=4,pin_memory=True)
    return dataloader
print('dataloader done')

In [None]:
nclass = 10
class Model(nn.Module):
    def __init__(self) -> None:
        super().__init__()
        self.conv1 = nn.Conv2d(3, 32, kernel_size=5) #inpue size 32 -> 28
        self.pool1 = nn.MaxPool2d(kernel_size=2) # 28 -> 14
        self.conv2 = nn.Conv2d(32, 64, kernel_size=5) #14 ->10 
        self.pool2 = nn.MaxPool2d(kernel_size=2) #10 -> 5
        self.conv3 = nn.Conv2d(64, 128, kernel_size=3) #5 -> 3
        self.fc1 = nn.Linear(128*3*3, 64)
        self.dr = nn.Dropout(p=0.5)
        self.fc2 = nn.Linear(64, 10)

    def forward(self, x):
        x = self.pool1(F.relu(self.conv1(x)))
        x = self.pool2(F.relu(self.conv2(x)))
        x = F.relu(self.conv3(x))
        x = torch.flatten(x, start_dim=1)
        x = F.relu(self.fc1(x))
        x = self.dr(x)
        x = F.relu(self.fc2(x))
        
        return x

model = Model()
summary(model, input_size=(3,32,32), device='cpu')
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)
model.to(device)

loss_func = nn.CrossEntropyLoss().to(device)
lr = 1e-3
optimizer = torch.optim.Adam(model.parameters(),lr = lr)

if __name__ == '__main__':
    epochs = 50
    train_loss = []
    train_err = []
    test_loss = []
    test_err =[]
    start_time = time.time()

    for epoch in range(epochs):
        trianloader = load_data(cifar_root, is_train=True, transform=train_transform)
        model.train()
        e_train_loss, e_train_err = train(trainloader, loss_func, optimizer, model, device=device)
        
        testloader = load_data(cifar_root, is_train=False, transform=test_transform)
        model.eval()
        e_test_loss, e_test_err = test(testloader, loss_func, optimizer, model, device=device)

        train_loss.append(e_train_loss)
        train_err.append(e_train_err)
        test_loss.append(e_test_loss)
        test_err.append(e_test_err)

        template = ('Epoch:{:2d}, train_err:{:.1f}%, train_loss:{:.3f}, test_err:{:.1f}%, test_loss:{:.3f}')
        if (epoch+1) % 5 == 0:
            print(template.format(epoch+1, e_train_err*100, e_train_loss, e_test_err*100, e_test_loss))
    end_time = time.time()
    print('runing time: {:.2f}s'.format(end_time - start_time))

def plot():
    epoch_range = range(epochs)
    plt.figure(figsize=(12,3))
    
    plt.subplot(1, 2, 1)
    plt.plot(epoch_range, train_loss, label='Train Loss')
    plt.plot(epoch_range, test_loss, label='Test Loss')
    plt.ylim(0,3)
    plt.legend()
    plt.title('Loss Plot')
    
    plt.subplot(1, 2, 2)
    plt.plot(epoch_range, train_err, label='Train Error')
    plt.plot(epoch_range, test_err, label='Test Error')
    plt.ylim(0,1)
    plt.legend()
    plt.title('Error Plot')
    
    plt.show()

plot()

相比增广前，模型效果无太大变化

## 尝试使用ResNet-18网络

In [None]:
import torch
import torch.nn as nn
# from torch.optim.lr_scheduler import CosineAnnealingLR

class BasicBlock(nn.Module):
    expansion = 1
    def __init__(self,in_planes, planes, stride=1, downsample=None) -> None:
        super(BasicBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=3,stride=stride, padding=1)
        self.bn1 = nn.BatchNorm2d(planes)
        self.relu = nn.ReLU()
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3,stride=1, padding=1)
        self.bn2 = nn.BatchNorm2d(planes)
#         self.downsample = downsample
#         self.stride = stride
        self.shortcut = nn.Sequential()
        if stride != 1 or in_planes != self.expansion*planes:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_planes, self.expansion*planes, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(self.expansion*planes)
            )

    def forward(self, x):
        identity = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)
        out = self.conv2(out)
        out = self.bn2(out)

#         if self.downsample is not None:
#             identity = self.dowmsample(x)

        out += self.shortcut(identity)
        out = self.relu(out)

        return out
    
print('basic network done')

In [None]:
class ResNet(nn.Module):
    def __init__(self, block, num_blocks, num_classes=10) -> None:
        super(ResNet, self).__init__()
        self.in_planes = 64

        self.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu1 = nn.ReLU()
        self.layer1 = self._make_layer(block, 64, num_blocks[0], stride=1)
        self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=2)
        self.layer3 = self._make_layer(block, 256, num_blocks[2], stride=2)
        self.layer4 = self._make_layer(block, 512, num_blocks[3], stride=2)
        self.avg1 = nn.AdaptiveAvgPool2d((1,1))
        self.fc1 = nn.Linear(512 * block.expansion, num_classes)

    def _make_layer(self, block, planes, num_blocks, stride):
        strides = [stride] + [1] * (num_blocks - 1)
        layers = []
        for stride in strides:
            layers.append(block(self.in_planes, planes, stride))
            self.in_planes = planes * block.expansion
            
        return nn.Sequential(*layers)
    
    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu1(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avg1(x)
        x = torch.flatten(x, 1)
        x = self.fc1(x)

        return x
print('ResNet done')

In [None]:
def train(dataloader, loss_func, optimizer, model, device):
    size = len(dataloader.dataset)
    num_batch = len(dataloader)
    train_loss, train_err = 0, 0
    
    for x, y in dataloader:
        x, y = x.to(device), y.to(device)
        pred = model(x)
        loss = loss_func(pred, y)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    
        train_loss += loss.item()
        train_err += (torch.argmax(F.softmax(pred, dim=1), dim=1) != y).type(torch.float).sum().item()

    train_loss /= num_batch
    train_err /= size

    return train_loss, train_err

def test(dataloader, loss_func, optimizer, model, device):
    size = len(dataloader.dataset)
    num_batch = len(dataloader)
    test_loss, test_err = 0, 0
    
    for x, y in dataloader:
        x, y = x.to(device), y.to(device)
        pred = model(x)
        loss = loss_func(pred, y)

        test_loss += loss.item()
        test_err += (torch.argmax(F.softmax(pred, dim=1), dim=1) != y).type(torch.float).sum().item()

    test_loss /= num_batch
    test_err /= size

    return test_loss, test_err
print('function done')

In [None]:
model = ResNet(BasicBlock, [2, 2, 2, 2])
summary(model, input_size=(3,32,32), device='cpu')
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)
model.to(device)

loss_func = nn.CrossEntropyLoss().to(device)
optimizer = torch.optim.Adam(model.parameters(),lr = 1e-3, weight_decay=1e-4)
# lr_schedule = CosineAnnealingLR(optimizer, T_max = 25, eta_min=1e-5)

if __name__ == '__main__':
    epochs = 50
    train_loss = []
    train_err = []
    test_loss = []
    test_err =[]
    start_time = time.time()

    for epoch in range(epochs):
        trianloader = load_data(cifar_root, is_train=True, transform=trian_transform)
        model.train()
        e_train_loss, e_train_err = train(trainloader, loss_func, optimizer, model, device=device)
#         lr_schedule.step()
        
        testloader = load_data(cifar_root, is_train=False, transform=test_transform)
        model.eval()
        e_test_loss, e_test_err = test(testloader, loss_func, optimizer, model, device=device)

        train_loss.append(e_train_loss)
        train_err.append(e_train_err)
        test_loss.append(e_test_loss)
        test_err.append(e_test_err)

        template = ('Epoch:{:2d}, train_err:{:.1f}%, train_loss:{:.3f}, test_err:{:.1f}%, test_loss:{:.3f}')
        if (epoch+1) % 5 == 0:
            print(template.format(epoch+1, e_train_err*100, e_train_loss, e_test_err*100, e_test_loss))
    end_time = time.time()
    print('runing time: {:.2f}s'.format(end_time - start_time))

def plot():
    epoch_range = range(epochs)
    plt.figure(figsize=(12,3))
    
    plt.subplot(1, 2, 1)
    plt.plot(epoch_range, train_loss, label='Train Loss')
    plt.plot(epoch_range, test_loss, label='Test Loss')
    plt.ylim(0,3)
    plt.legend()
    plt.title('Loss Plot')
    
    plt.subplot(1, 2, 2)
    plt.plot(epoch_range, train_err, label='Train Error')
    plt.plot(epoch_range, test_err, label='Test Error')
    plt.ylim(0,1)
    plt.legend()
    plt.title('Error Plot')
    
    plt.show()

plot()