In [2]:
import torch
import torch.nn.functional as F
from torchvision import transforms
from torchvision import datasets
from torch import nn
from torch import optim
from torch.utils.data import DataLoader
#pyhon升级到2.7.9以后，引入了一个新特性，当使用urllib打开https的链接时，
#会检验一次ssl证书。而当目标网站使用的是自签名证书时，就会抛出urllib2.URLError的错误
#全局取消证书验证可解决
#CIFAR10数据集下载地址出了问题
import ssl

ssl._create_default_https_context = ssl._create_unverified_context

#### CIFAR10 Dataset
<font size = 2>

The CIFAR-10 dataset consists of 60000 32x32 colour images in 10 classes, with 6000 images per class. There are 50000 training images and 10000 test images.

The dataset is divided into five training batches and one test batch, each with 10000 images. The test batch contains exactly 1000 randomly-selected images from each class. The training batches contain the remaining images in random order, but some training batches may contain more images from one class than another. Between them, the training batches contain exactly 5000 images from each class.
    
Here are the classes in the dataset, as well as 10 random images from each:
    
<div>
<img src = "CIFAR10_1.png" style = "zoom:70%" />
</div>
  
The classes are completely mutually exclusive. There is no overlap between automobiles and trucks. "Automobile" includes sedans, SUVs, things of that sort. "Truck" includes only big trucks. Neither includes pickup trucks.

In [3]:
#prepare CIFAR10 dataset
def get_loaders(BatchSize):
    #get train loader
    CIFAR_train = DataLoader(
        datasets.CIFAR10(
            root = '/home/hardli/python/pytorch/datasets',
            train = True,
            download = True,
            transform = transforms.Compose([
                transforms.ToTensor(),
                transforms.Resize((32,32))
            ])
        ),
        batch_size = BatchSize,
        shuffle = True
    )
    #get test loader
    CIFAR_test = DataLoader(
        datasets.CIFAR10(
            root = '/home/hardli/python/pytorch/datasets',
            train = False,
            download = True,
            transform = transforms.Compose([
                transforms.ToTensor(),
                transforms.Resize((32,32))
            ])
        ),
        batch_size = BatchSize,
        shuffle = True
    )

    return CIFAR_train, CIFAR_test

#### Data in CIFAR10
<font size = 2>
    
The data after dataloader operation in CIFAR10 comprises of lists. Single list contain 2 elements. One is image data with shape of [3,32,32], the other is label which is unidimensional.

#### LeNet
<font size = 2>

LeNet has the structure as below. However, we adjust a little on original structure. Change the **Sub-sampling layers** into **Pooling Layers**:
    
<div>
<img src = "LeNet.png" style = "zoom:70%" />
</div>

Input are CIFAR10 data with 32x32 pixels for each image.
    
And the hidden layers are arranged as:
    
    1. Convolutional layer with 6 5x5 kernels, stride 1, no padding;
    2. Pooling layer with 2x2 windows, stride 2, no padding;
    3. Convolutional layer with 16 5x5 kernels, stride 1, no padding;
    4. Pooling layer with 2x2 window, stride 2, no padding;
    5. Linear fully connected layers;

In [4]:
#create LeNet class
class LeNet5(nn.Module):

    def __init__(self):
        
        super(LeNet5, self).__init__()
        #convolutional layers
        self.conv_unit = nn.Sequential(
            #original date is of RGB 3 channel
            #shape: [b,3,32,32] -> [b,6,28,28]
            nn.Conv2d(in_channels = 3,out_channels = 6,kernel_size = 5,stride = 1,padding = 0),
            #shape: [b,6,28,28] -> [b,6,14,14]
            nn.AvgPool2d(kernel_size = 2,stride = 2,padding = 0),
            #shape: [b,6,14,14] -> [b,16,10,10]
            nn.Conv2d(in_channels = 6,out_channels = 16,kernel_size = 5,stride = 1,padding = 0),
            #shape: [b,16,10,10] -> [b,16,5,5]
            nn.AvgPool2d(kernel_size = 2,stride = 2,padding = 0)
        )
        #flatten layer
        
        #linear fully connected layers
        self.FC_unit = nn.Sequential(
            #channel change: 16*5*5 -> 120
            #16*5*5 comes from flatten batch
            #see forward()
            nn.Linear(16*5*5, 120),
            nn.ReLU(),
            #channel change: 120 -> 84
            nn.Linear(120, 84),
            nn.ReLU(),
            #channel change: 84 -> 10
            nn.Linear(84, 10)
        )
        
#         #try
#         tmp = torch.randn(2,3,32,32)
#         out = self.conv_unit(tmp)
#         print('out size:', out.size())

    def forward(self,x):
        #[b,3,32,32], b is batch siez, i.e. numbers of pictures in a single batch
        batch_size = x.size(0)
        #convolutional operation
        #[b,3,32,32] -> [b,16,5,5]
        x = self.conv_unit(x)
        #flatten operation
        #[b,16,5,5] -> [b,16*5*5]
        x = x.view(batch_size, -1)
        #fully connected operation
        #[b,16*5*5] -> [b,10]
        logits = self.FC_unit(x)
        return logits

#### ResNet
<font size = 2>

ResNet has **skipping/shortcut** part which make the input skip the **layer block** and added into result of hidden layers of **layer block**:
    
<div>
<img src = "ResNet3.png" style = "zoom:70%" />
</div>

Input are CIFAR10 data with [3,32,32] pixels for each image.
    
The **weight layer** here are assigned as **convolutional layers**.
    
$F(x)$ is result after calculating of hidden layers, $x$ is input.

In [5]:
#create class ResNet_block: one block with skipping/shortcut unit
class ResNet_block(nn.Module):
    
    def __init__(self, ch_in, ch_out, stride = 1):
        
        '''
        param ch_in:
        param ch_out:
        '''
        
        super(ResNet_block, self).__init__()
        #two convolutional layers as shown above
        #change channels
        #change pic scale [h,w]
        self.conv1 = nn.Conv2d(ch_in, ch_out, kernel_size = 3, stride = stride, padding = 1)
        self.bn1 = nn.BatchNorm2d(ch_out)
        self.conv2 = nn.Conv2d(ch_out, ch_out, kernel_size = 3, stride = 1, padding = 1)
        self.bn2 = nn.BatchNorm2d(ch_out)
        
        #in case ch_in is not equal to ch_out
        #[b,ch_in,h,w] -> [b,ch_out,h,w]
        self.extra = nn.Sequential()
        if ch_in != ch_out:
            self.extra = nn.Sequential(
                #this operation is to resize the channel of input as result after block
                #self.conv1 makes: [b,c_in,h,w] -> [b,c_out,h/stride,w/stride](approximately)
                #with kernel_size = 3 and padding = 1
                #here we want to get the same dimensional shape
                #and with kernel_size = 1
                #so set padding = 0
                nn.Conv2d(ch_in, ch_out, kernel_size = 1, stride = stride, padding = 0),
                nn.BatchNorm2d(ch_out)
            )
    
    def forward(self, x):
        
        '''
        param x: [b,ch_in,h,w]
        return:
        '''
        
        #we need to skip the block, so original input x should be kept
        out = F.relu(self.bn1(self.conv1(x)))
        out = (self.bn2(self.conv2(out)))
        #skipping/shortcut
        #element-wise addition: [b,ch_in,h,w] + [b,ch_out,h,w]
        #in case ch_in is not equal to ch_out
        out = out + self.extra(x)
        return out

In [6]:
#create a Residual Network
class ResNet18(nn.Module):
    
    def __init__(self):
        
        super(ResNet18,self).__init__()
        
        #first process:[b,3,h,w] -> [b,64,h,w]
        self.first_conv = nn.Sequential(
            nn.Conv2d(3,64,kernel_size = 3,stride = 3,padding = 0),
            nn.BatchNorm2d(64)
            )
        
        #block1: [b,64,h,w] -> [b,128,h,w]
        self.block1 = ResNet_block(64,128,stride = 2)
        #block1: [b,128,h,w] -> [b,256,h,w]
        self.block2 = ResNet_block(128,256,stride = 2)
        #block1: [b,256,h,w] -> [b,512,h,w]
        self.block3 = ResNet_block(256,512,stride = 2)
        #block1: [b,512,h,w] -> [b,512,h,w]
        self.block4 = ResNet_block(512,512,stride = 2)
        
        #out layer
        self.outlayer = nn.Linear(512*1*1, 10)
        
    def forward(self,x):
        #[b,3,h,w] -> [b,64,h,w]
        x = F.relu(self.first_conv(x))
        #[b,64,h,w] -> [b,1024,h,w]
        x = self.block1(x)
        x = self.block2(x)
        x = self.block3(x)
        x = self.block4(x)
#         print('after conv:', x.shape)
        #reshape
        #F.adaptive_avg_pool2d(x,[1,1]) can do adaptive pooling
        #change input x with whatever scale [h,w] into designated shape, here is [1,1]
        #a.k.a, [b,c,h,w] -> [b,c,1,1]
        x = F.adaptive_avg_pool2d(x,[1,1])
#         print('after pooling:', x.shape)
        #change shape of x: [b,c,1,1] -> [b,c*1*1]
        x = x.view(x.size(0), -1)
        x = F.relu(self.outlayer(x))
        return x

In [8]:
def run():

    BatchSize = 3000

    CIFAR_train, CIFAR_test = get_loaders(BatchSize)
    
    #create gpu device
    device = torch.device("cuda:0")
    #create a network model
    model_name = int(input('Select a model: (1)LeNet5 or (2)ResNet?'))
    if model_name == 1:
        model = LeNet5().to(device)
    elif model_name == 2:
        model = ResNet18().to(device)
    #create loss function
    criterion = nn.CrossEntropyLoss().to(device)
    #create a optimizer
    optimizer = optim.Adam(model.parameters(), lr = 0.001)
    #show model details
    print(model)
    
    for epoch in range(10):
        
        #train mode
        model.train()
        for batch_idx,(img,label) in enumerate(CIFAR_train):
            #img:   [b,3,32,32]
            #label: [b]
            img, label = img.to(device), label.to(device)
            #logits: [b,10]
            logits = model(img)
            #loss: [b,10]
            #nn.CrossEntropyLoss() includes operation of softmax
            loss = criterion(logits, label)
            
            #backpropagation
            #clean gradients, or gradients will accmulate
            optimizer.zero_grad()
            #get gradients of parameters
            loss.backward()
            #update gradients of parameters
            optimizer.step()
        
        print(f'loss of {epoch}th epoch: {loss}')
        
        #test mode
        model.eval()
        #due to that testing has no need to calculate gradients, it is unnecessary to create graph
        #put the testing coding under 'torch.no_grad()' to avoid troubles
        with torch.no_grad():

            #count the correct classifications
            total_correct = 0
            #count the total number of testing pictures
            total_num = 0
            #accuracy of testing
            acc = 0

            for batch_idx,(img,label) in enumerate(CIFAR_test):
                #img:   [b,3,32,32]
                #label: [b]
                img, label = img.to(device), label.to(device)
                #logits:[b,10]
                #'10' means 10 classes that the pictures may be labeled into
                #each picture in current batch is of [1,10]
                #the 10 values can be considered as possibility of pic in corresponding class
                #so the largest value means the highest possibility
                #which is considered as the predictive label
                logits = model(img)
                #preds:  [b]
                #take index of maximum along the 1st dimension, i.e. values dimension
                preds = logits.argmax(dim = 1)
                #total_correct: sclar
                #count how many predictive labels are equal to target labels
                total_correct += torch.eq(preds, label).float().sum().item()
                #total_num: scalar
                #because img is of [b,3,32,32], img.size(0) = b
                #which is the number of pictures in a single batch
                total_num += img.size(0)
            
            #acc: scalar    
            acc = total_correct / total_num
            print(f'accuracy of epoch {epoch} is: {acc}')

In [10]:
if __name__ == '__main__':
    run()

Files already downloaded and verified
Files already downloaded and verified
Select a model: (1)LeNet5 or (2)ResNet?2
ResNet18(
  (first_conv): Sequential(
    (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(3, 3))
    (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  )
  (block1): ResNet_block(
    (conv1): Conv2d(64, 128, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
    (bn1): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (conv2): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (bn2): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (extra): Sequential(
      (0): Conv2d(64, 128, kernel_size=(1, 1), stride=(2, 2))
      (1): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    )
  )
  (block2): ResNet_block(
    (conv1): Conv2d(128, 256, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
    (bn1): BatchNorm2d(256,