## 卷积神经网络识别

### 准备数据

In [1]:
import torch
import torchvision
import numpy as np
import pandas as pd
from torch.utils.data import random_split
from torch.utils.data import DataLoader
import os
from torch.optim import Adam
from torch.utils.data import Dataset, DataLoader
from torch import nn
import torch.nn.functional as F

### 下载MNIST数据集用作测试`data loader`

In [2]:
# Download MNIST dataset or load directly if it already exist
if os.path.exists('../data/MNIST'):
    _dl = False
else:
    _dl = True
    
transform = torchvision.transforms.Compose([
                               torchvision.transforms.ToTensor(),
                               torchvision.transforms.Normalize(
                                 (0.1307,), (0.3081,))
                             ])
trainset_all = torchvision.datasets.MNIST('../data', train = True, download = _dl, transform = transform)
testset = torchvision.datasets.MNIST('../data', train = False, download = _dl, transform = transform)

samples = []
print('trainset size: ', len(trainset_all))
print('testset size: ', len(testset))

trainset size:  60000
testset size:  10000


### 分割训练集

In [3]:
SEED = 20220106
valid_size = 10000
train_size = len(trainset_all) - valid_size

trainset, valset = random_split(trainset_all, [train_size, valid_size], generator=torch.Generator().manual_seed(SEED))

print('Trainset size: ', len(trainset))
print('Validation set size: ', len(valset))

Trainset size:  50000
Validation set size:  10000


### 设置模型参数和`data loader`

#### 设置模型参数

In [4]:
# hyperparameters
batch_size = 32

# for CNN model (conv2d x10 + fc x2)
conv1c = 16
conv2c = 32
conv3c = 64
conv4c = 128
conv5c = 256
conv6c = 128
conv7c = 64
conv8c = 32


conv1k = 3
conv2k = 3
conv3k = 3
conv4k = 3
conv5k = 3
conv6k = 3
conv7k = 3
conv8k = 3


fc1 = 2048
fc2 = 512
fc3 = 100
batchnorm = True
dropout = 0.1

# for training
lr = 0.001
weight_decay = 1e-5

#### 设置`data loader`

In [5]:
trainloader = DataLoader(trainset, batch_size, shuffle = True, num_workers = 8, drop_last = True)
valloader = DataLoader(valset, batch_size, shuffle = False, num_workers = 8, drop_last = False)
# testloader = DataLoader(testset, batch_size, shuffle = False, drop_last = False)

##### 用于测试CNN代码的数据

In [6]:
# input
batch_size = 2
images = []

for i in range(batch_size*5):
    label = np.random.randint(1, 100)
    image = torch.randn(3,134,134)
    images.append((image, label))

class mySet(Dataset):
    def __init__(self, images):
        super(mySet, self).__init__()
        self.data = images
    def __getitem__(self, x):
        return self.data[x]
    def __len__(self):
        return len(self.data)

myevalset = mySet(images)
print('My dataset size: ', len(myevalset))

valid_size = 4
train_size = len(myevalset) - valid_size
SEED = 20220106
trainset, valset = random_split(myevalset, [train_size, valid_size], generator=torch.Generator().manual_seed(SEED))

print('Trainset size: ', len(trainset))
print('Validation set size: ', len(valset))

trainloader = DataLoader(trainset, batch_size = batch_size, shuffle = True, num_workers = 0, drop_last = True)
valloader = DataLoader(valset, batch_size = batch_size, shuffle = False, num_workers = 0, drop_last = False)

My dataset size:  10
Trainset size:  6
Validation set size:  4


### 设置CNN模型

In [7]:
class CNN(nn.Module):
    def __init__(self, batchnorm = True, dropout = 0.1): 
        super(CNN, self).__init__()
        # layers
        self.conv1 = nn.Conv2d(3, 16, kernel_size=(3,3), padding = 1)
        self.conv2 = nn.Conv2d(16, 32, kernel_size=(3,3), padding = 1)
        self.conv3 = nn.Conv2d(32, 64, kernel_size=(3,3), padding = 0)
        self.conv4 = nn.Conv2d(64, 128, kernel_size=(3,3), padding = 1)
        self.conv5 = nn.Conv2d(128, 256, kernel_size=(3,3), padding = 0)
        self.conv6 = nn.Conv2d(256, 128, kernel_size=(3,3), padding = 1)
        self.conv7 = nn.Conv2d(128, 64, kernel_size=(3,3), padding = 1)
        self.conv8 = nn.Conv2d(64, 32, kernel_size=(3,3), padding = 0)
        
        self.pool = nn.MaxPool2d(kernel_size = 2)
        self.dropout = nn.Dropout(p = dropout)
        
        self.batchnorm = batchnorm
        if batchnorm:
            self.bn1 = nn.BatchNorm2d(16)
            self.bn2 = nn.BatchNorm2d(32)
            self.bn3 = nn.BatchNorm2d(64)
            self.bn4 = nn.BatchNorm2d(128)
            self.bn5 = nn.BatchNorm2d(256)
            self.bn6 = nn.BatchNorm2d(128)
            self.bn7 = nn.BatchNorm2d(64)
            self.bn8 = nn.BatchNorm2d(32)
        
        self.fc1 = nn.Linear(32*15*15, 2048)
        self.fc2 = nn.Linear(2048, 512)
        self.fc3 = nn.Linear(512, 100)
        
    def forward(self, x):
        # x: [batch_size, 3, 134, 134]
        # out = LAYERS(x)
        out = self.conv1(x) # [batch_size, 16, 134, 134]
        out = F.relu(out) # [batch_size, 16, 134, 134]
        if self.batchnorm:
            out = self.bn1(out) # [batch_size, 16, 134, 134]
            
        out = self.conv2(out) # [batch_size, 32, 134, 134]
        out = F.relu(out) # [batch_size, 32, 134, 134]
        if self.batchnorm:
            out = self.bn2(out) # [batch_size, 32, 134, 134]
        
        out = self.conv3(out) # [batch_size, 64, 132, 132]
        out = F.relu(self.pool(out)) # [batch_size, 64, 66, 66]
        if self.batchnorm:
            out = self.bn3(out) # [batch_size, 64, 66, 66]
        
        out = self.conv4(out) # [batch_size, 128, 66, 66]
        out = F.relu(out) # [batch_size, 128, 66, 66]
        if self.batchnorm:
            out = self.bn4(out) # [batch_size, 128, 66, 66]
        
        out = self.conv5(out) # [batch_size, 256, 64, 64]
        out = F.relu(self.pool(out)) # [batch_size, 256, 32, 32]
        if self.batchnorm:
            out = self.bn5(out) # [batch_size, 256, 32, 32]
            
        out = self.conv6(out) # [batch_size, 128, 32, 32]
        out = F.relu(out) # [batch_size, 128, 32, 32]
        if self.batchnorm:
            out = self.bn6(out) # [batch_size, 128, 32, 32]
        
        out = self.conv7(out) # [batch_size, 64, 32, 32]
        out = F.relu(out) # [batch_size, 64, 32, 32]
        if self.batchnorm:
            out = self.bn7(out) # [batch_size, 64, 32, 32]
        
        out = self.conv8(out) # [batch_size, 32, 30, 30]
        out = F.relu(self.pool(out)) # [batch_size, 32, 15, 15]
        if self.batchnorm:
            out = self.bn8(out) # [batch_size, 32, 15, 15]

        
        out = out.reshape(out.shape[0], -1) # [batch_size, 32*15*15]
        out = F.relu(self.fc1(out)) # [batch_size, fc1]
        out = self.dropout(out)
        out = F.relu(self.fc2(out)) # [batch_size, fc2]
        out = self.dropout(out)
        out = F.log_softmax(self.fc3(out), dim = 1) # [batch_size, fc3]
        return out
    
    def fit(self, trainloader, valloader, lr = 0.001, weight_decay = 1e-5, max_epoch = 1, checkpoints_path = './checkpoints'):
        # Training Procedure
        train_batch_losses = []
        val_acc = []
        
        loss_fn = nn.NLLLoss(reduction = 'mean')
        optimizer = Adam(self.parameters(), lr = lr, weight_decay = weight_decay) # Using Adam optimizer
        batches_per_epoch = len(trainloader)
        
        for epoch in range(max_epoch):
            epoch_loss = 0
            for i, x in enumerate(trainloader):
                optimizer.zero_grad()
                image, label = x
                
                # to cuda
                image = image.cuda()
                label = label.cuda()
                
                pred = self(image)
                loss = loss_fn(pred, label)
                loss.backward()
                optimizer.step()
                
                # back to cpu to calculate
                epoch_loss += loss.cpu().item()
                train_batch_losses.append((epoch*batches_per_epoch+i, loss.item()))
                
                if (i % 200 == 0):
                    print('Epoch %d, Batch %d loss: %f'%(epoch, i, loss.item()))
                    acc, _cm = self.evaluation(valloader)
                    val_acc.append((epoch * batches_per_epoch + i, acc))
                    print('   Accuracy after epoch %d batch %d: %f'%(epoch, i, acc))
            print("\n##### Epoch %d average loss: "%epoch, epoch_loss/batches_per_epoch, ' #####\n')
            self.save_checkpoint(checkpoints_path + '/model_cnn.pt', epoch, epoch_loss/batches_per_epoch)
        return train_batch_losses, val_acc
        
    @torch.no_grad()
    def evaluation(self, evalloader):
        # Evaluation Procedure
        conf_mat = np.zeros((100, 100))
        # set to evaluate mode
        self.eval()
        numT = 0
        numF = 0
        for i, x in enumerate(evalloader):
            image, label = x
            
            # to gpu
            image = image.cuda()
            
            pred = torch.argmax(self(image), dim = 1)
            
            # to cpu
            pred = pred.cpu()
            
            _T = torch.sum(pred == label).item()
            numT += _T
            numF += len(label) - _T
            for j in range(len(label)):
                conf_mat[label[j], pred[j]] += 1
        # reset to train model
        self.train()
        # return accuracy, confuse matrix
        accuracy = numT/(numT + numF)
        confusion_matrix = conf_mat
        return accuracy, confusion_matrix
    
    def save_checkpoint(self, path, epoch, loss):
        try:
            optim_state = optimizer.state_dict()
        except:
            optim_state = None
        checkpoint = {
            "model_state_dict": self.state_dict(),
            "epoch": epoch,
            "loss": loss,
            "optimizer_state_dict": optim_state
        }
        torch.save(checkpoint, path)
        
    def load_checkpoint(self, path, optimizer = Adam):
        checkpoint = torch.load(path)
        self.load_state_dict(checkpoint['model_state_dict'])
        if checkpoint['optimizer_state_dict'] is not None:
            self.optimizer = optimizer(self.parameters())
            self.optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
        epoch = checkpoint['epoch']
        loss = checkpoint['loss']
        return epoch, loss

#### 实例化CNN model，测试训练代码

In [8]:
cpu_model_cnn = CNN(True, 0.1)
gpu_model_cnn = cpu_model_cnn.cuda()
cpu_model_cnn.train()
gpu_model_cnn.train()
train_losses_cnn, val_acc_cnn = gpu_model_cnn.fit(trainloader, valloader, 0.001, 1e-5, 2)

Epoch 0, Batch 0 loss: 4.644213
   Accuracy after epoch 0 batch 0: 0.000000

##### Epoch 0 average loss:  4.725253740946452  #####

Epoch 1, Batch 0 loss: 0.048848
   Accuracy after epoch 1 batch 0: 0.000000

##### Epoch 1 average loss:  0.42368133983654843  #####



#### 所使用输入数据格式如下

In [9]:
images

[(tensor([[[-0.8523, -0.5109, -1.2456,  ..., -0.6834,  0.4318, -0.0993],
           [ 0.8662, -2.1869,  1.2805,  ...,  0.8843, -0.5892,  0.6235],
           [-0.2933,  1.1440, -0.7612,  ..., -0.3511,  0.5238,  1.3164],
           ...,
           [-1.8759,  1.0829, -2.2164,  ...,  0.7284,  0.5715, -1.6558],
           [ 0.4807,  0.8289,  0.1104,  ..., -0.4549, -0.7552, -0.1324],
           [ 1.0142,  0.6923,  0.4540,  ...,  0.3183, -0.6326,  1.3648]],
  
          [[ 0.1489, -0.3089,  0.1681,  ...,  0.8227, -0.8666,  1.3754],
           [-0.3518, -0.2872, -0.7408,  ..., -0.4545,  0.0135, -0.7661],
           [ 1.3796,  1.4480,  1.0072,  ...,  1.5683, -0.1576, -1.5803],
           ...,
           [-0.9696, -0.6755,  0.8583,  ..., -0.5503,  0.8215,  0.1836],
           [-0.8790, -1.5808, -2.7467,  ...,  1.3471, -0.4356, -0.0131],
           [ 0.4348, -0.5804, -0.2268,  ...,  0.1363, -0.0494,  0.9567]],
  
          [[ 0.5248,  0.4384,  1.2423,  ..., -0.2706,  1.0613,  2.8675],
           