## Import modules and set parameters

In [10]:
from torch import nn
from torch.nn import functional as F
import torch
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import torchvision
import torchvision.transforms as transforms
import os
from matplotlib import pyplot as plt
import random
import numpy as np


BATCH_SIZE = 512
EPOCHS = 30
LEARNING_RATE = 0.01
RANDOM_SEED = 42
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")


## Load dataset and visualization

In [11]:

MNIST_PATH = os.path.join(os.getcwd(), "MNIST")
print(MNIST_PATH)

trainset = torchvision.datasets.MNIST(root=MNIST_PATH, train=True, download=True, transform=transforms.Compose([
                           transforms.ToTensor(),
                           transforms.Normalize((0.1307,), (0.3081,))
                       ]))
testset = torchvision.datasets.MNIST(root=MNIST_PATH, train=False, download=True, transform=transforms.Compose([
                            transforms.ToTensor(),
                            transforms.Normalize((0.1307,), (0.3081,))
                          ]))

trainloader = DataLoader(trainset, batch_size=BATCH_SIZE, shuffle=True, num_workers=2)
testloader = DataLoader(testset, batch_size=BATCH_SIZE, shuffle=True, num_workers=2)

# print(len(trainset), len(testset))
# print(trainset[0][0].shape)
for batch_idx, (data, target) in enumerate(trainloader):
    print(data.shape, target.shape)
    # torch.Size([512, 1, 28, 28]) torch.Size([512])
    # print(target)
    # print(data[0][0])
    print(batch_idx)



# visualize some images


X, y = next(iter(DataLoader(trainset, batch_size=18)))


def get_labels(y):
    return [trainset.classes[label] for label in y]


def plot_images(images):
    n_images = len(images)
    rows = int(np.sqrt(n_images))
    cols = int(np.sqrt(n_images))
    fig = plt.figure()
    for i in range(rows*cols):
        ax = fig.add_subplot(rows, cols, i+1)
        ax.imshow(images[i].numpy().squeeze(), cmap="gray_r")
        ax.set_title(get_labels(y)[i])
        ax.axis("off")
    plt.show()


# plot_images(X)

e:\VSCODE\Python\DL-Hw\MNIST
torch.Size([512, 1, 28, 28]) torch.Size([512])
0
torch.Size([512, 1, 28, 28]) torch.Size([512])
1
torch.Size([512, 1, 28, 28]) torch.Size([512])
2
torch.Size([512, 1, 28, 28]) torch.Size([512])
3
torch.Size([512, 1, 28, 28]) torch.Size([512])
4
torch.Size([512, 1, 28, 28]) torch.Size([512])
5
torch.Size([512, 1, 28, 28]) torch.Size([512])
6
torch.Size([512, 1, 28, 28]) torch.Size([512])
7
torch.Size([512, 1, 28, 28]) torch.Size([512])
8
torch.Size([512, 1, 28, 28]) torch.Size([512])
9
torch.Size([512, 1, 28, 28]) torch.Size([512])
10
torch.Size([512, 1, 28, 28]) torch.Size([512])
11
torch.Size([512, 1, 28, 28]) torch.Size([512])
12
torch.Size([512, 1, 28, 28]) torch.Size([512])
13
torch.Size([512, 1, 28, 28]) torch.Size([512])
14
torch.Size([512, 1, 28, 28]) torch.Size([512])
15
torch.Size([512, 1, 28, 28]) torch.Size([512])
16
torch.Size([512, 1, 28, 28]) torch.Size([512])
17
torch.Size([512, 1, 28, 28]) torch.Size([512])
18
torch.Size([512, 1, 28, 28]) to

## Define CNN model

In [23]:
class CNN(nn.Module):
    def __init__(self):
        super(CNN, self).__init__()
        self.conv1 = nn.Conv2d(
            in_channels=1, out_channels=16, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(
            in_channels=16, out_channels=32, kernel_size=3, padding=1)
        self.conv3 = nn.Conv2d(
            in_channels=32, out_channels=64, kernel_size=3, padding=1)
        self.fc1 = nn.Linear(64*3*3, 256)
        self.fc2 = nn.Linear(256, 10)

    def forward(self, x):
        # input: 1x28x28
        out = self.conv1(x)
        out = F.relu(out)
        out = F.max_pool2d(out, 2)
        # 16x14x14
        out = self.conv2(out)
        out = F.relu(out)
        out = F.max_pool2d(out, 2)
        # 32x7x7
        out = self.conv3(out)
        out = F.relu(out)
        out = F.max_pool2d(out, 2)
        # 64x3x3
        out = out.view(-1, 64*3*3)
        out = self.fc1(out)
        out = F.relu(out)
        out = self.fc2(out)
        out = F.log_softmax(out, dim=1)
        return out


model = CNN().to(DEVICE)
def print_model_summary(model):
    print(model)
    print("Model Summary:")
    total_params = 0
    for name, param in model.named_parameters():
        if param.requires_grad:
            print(f"Layer: {name} | Size: {param.size()} | Parameters: {param.numel()}")
            total_params += param.numel()
    print(f"Total Trainable Parameters: {total_params}")

print_model_summary(model)


CNN(
  (conv1): Conv2d(1, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (conv2): Conv2d(16, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (conv3): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (fc1): Linear(in_features=576, out_features=256, bias=True)
  (fc2): Linear(in_features=256, out_features=10, bias=True)
)
Model Summary:
Layer: conv1.weight | Size: torch.Size([16, 1, 3, 3]) | Parameters: 144
Layer: conv1.bias | Size: torch.Size([16]) | Parameters: 16
Layer: conv2.weight | Size: torch.Size([32, 16, 3, 3]) | Parameters: 4608
Layer: conv2.bias | Size: torch.Size([32]) | Parameters: 32
Layer: conv3.weight | Size: torch.Size([64, 32, 3, 3]) | Parameters: 18432
Layer: conv3.bias | Size: torch.Size([64]) | Parameters: 64
Layer: fc1.weight | Size: torch.Size([256, 576]) | Parameters: 147456
Layer: fc1.bias | Size: torch.Size([256]) | Parameters: 256
Layer: fc2.weight | Size: torch.Size([10, 256]) | Parameters: 2560
Layer: fc2.bias | Size: tor

## Train model

### Load model

In [19]:
# check if there is pth file
import re

def save_model(model, path):
    torch.save(model.state_dict(), path)
    print(f"Model saved at {path}")

def load_model(model, path):
    model.load_state_dict(torch.load(path))
    print(f"Model loaded from {path}")

def check_model(model, path):
    if os.path.exists(path):
        load_model(model, path)
        return True
    return False

def get_latest_model(path):
    Latest_Model = re.compile(r"MNIST_CNN_(\d+).pth")
    files = os.listdir(path)
    latest_model = 0
    for file in files:
        match = Latest_Model.match(file)
        if match:
            epoch = int(match.group(1))
            if epoch > latest_model:
                latest_model = epoch
    return latest_model


MODEL_PATH = os.path.join(os.getcwd(), "MNIST_CNN_" + str(get_latest_model(os.getcwd())) + ".pth")

if check_model(model, MODEL_PATH):
    print_model_summary(model)
else:
    print("No model found, training new model...")


No model found, training new model...


### Train model


In [24]:

# define learning strategy
Loss = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters())

# train model
from torch.utils.tensorboard import SummaryWriter

writer = SummaryWriter("runs/BERT/")

def train(model, device, train_loader, optimizer, epoch):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        # loss = F.nll_loss(output, target)
        loss = Loss(output, target)
        loss.backward()
        optimizer.step()
        if(batch_idx+1)%30 == 0: 
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(data), len(train_loader.dataset),
                100. * batch_idx / len(train_loader), loss.item()))
            writer.add_scalar("Loss/train", loss.item(), epoch*len(train_loader)+batch_idx)


def test(model, device, test_loader):
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            # test_loss += F.nll_loss(output, target, reduction='sum').item() # 将一批的损失相加
            test_loss += Loss(output, target).item()
            pred = output.max(1, keepdim=True)[1] # 找到概率最大的下标
            correct += pred.eq(target.view_as(pred)).sum().item()

    test_loss /= len(test_loader.dataset)
    print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
        test_loss, correct, len(test_loader.dataset),
        100. * correct / len(test_loader.dataset)))
    writer.add_scalar("Loss/test", test_loss, epoch)
    writer.add_scalar("Accuracy/test", 100. * correct / len(test_loader.dataset), epoch)


for epoch in range(1, 10 + 1):
    train(model, DEVICE, trainloader, optimizer, epoch)
    test(model, DEVICE, testloader)

save_model(model, os.path.join(os.getcwd(), "MNIST_CNN_" + str(epoch) + ".pth"))


Test set: Average loss: 0.0002, Accuracy: 9610/10000 (96%)


Test set: Average loss: 0.0001, Accuracy: 9813/10000 (98%)


Test set: Average loss: 0.0001, Accuracy: 9841/10000 (98%)


Test set: Average loss: 0.0001, Accuracy: 9867/10000 (99%)


Test set: Average loss: 0.0001, Accuracy: 9883/10000 (99%)


Test set: Average loss: 0.0001, Accuracy: 9878/10000 (99%)


Test set: Average loss: 0.0001, Accuracy: 9882/10000 (99%)


Test set: Average loss: 0.0001, Accuracy: 9913/10000 (99%)


Test set: Average loss: 0.0001, Accuracy: 9892/10000 (99%)


Test set: Average loss: 0.0001, Accuracy: 9883/10000 (99%)

Model saved at e:\VSCODE\Python\DL-Hw\MNIST_CNN_10.pth
