In [1]:
import os
import time
import random
import numpy as numpy

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torch.utils.data as data

import torchvision.transforms as transforms
import torchvision.datasets as datasets

import matplotlib.pyplot as plt
from PIL import Image

In [2]:
ROOT = './data'
train_data = datasets.MNIST(
    root=ROOT,
    train=True,
    download=True
)

test_data = datasets.MNIST(
    root=ROOT,
    train=False,
    download=True
)

In [3]:
# split training : validation = 0.9 : 0.1
VALID_RATIO = 0.9

n_train_examples = int(len(train_data) * VALID_RATIO)
n_valid_examples = len(train_data) - n_train_examples

train_data, valid_data = data.random_split(
    train_data,
    [n_train_examples, n_valid_examples]
)

In [4]:
# compute mean and std for normalization
mean = train_data.dataset.data.float().mean() / 255
std = train_data.dataset.data.float().std() / 255

train_transforms = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=[mean], std=[std])
])

test_transforms = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=[mean], std=[std])
])

train_data.dataset.transform = train_transforms
valid_data.dataset.transform = test_transforms

In [5]:
# Create dataloader
BATCH_SIZE = 256

train_dataloader = data.DataLoader(
    train_data,
    shuffle=True,
    batch_size=BATCH_SIZE
)

valid_dataloader = data.DataLoader(
    valid_data,
    batch_size=BATCH_SIZE
)

![Mô hình LeNet](public/images/lenet.png)

In [6]:
class LeNetClassifier(nn.Module):
    def __init__(self, num_classes):
        super().__init__()
        self.conv1 = nn.Conv2d(
            in_channels=1, out_channels=6, kernel_size=5, padding='same'
        )
        self.avgpool1 = nn.AvgPool2d(kernel_size=2)
        self.conv2 = nn.Conv2d(in_channels=6, out_channels=16, kernel_size=5)
        self.avgpool2 = nn.AvgPool2d(kernel_size=2)
        self.flatten = nn.Flatten()
        self.fc_1 = nn.Linear(16 * 5 * 5, 120)
        self.fc_2 = nn.Linear(120, 84)
        self.fc_3 = nn.Linear(84, num_classes)

    def forward(self, inputs):
        outputs = self.conv1(inputs)
        outputs = self.avgpool1(outputs)
        outputs = F.relu(outputs)
        outputs = self.conv2(outputs)
        outputs = self.avgpool2(outputs)
        outputs = F.relu(outputs)
        outputs = self.flatten(outputs)
        outputs = self.fc_1(outputs)
        outputs = self.fc_2(outputs)
        outputs = self.fc_3(outputs)
        return outputs


In [7]:
def train(
    model,
    optimizer,
    criterion,
    train_dataloader,
    device,
    epoch=0,
    log_interval=50,
):
    model.train()
    total_acc, total_count = 0, 0
    losses = []
    start_time = time.time()

    for idx, (inputs, labels) in enumerate(train_dataloader):
        inputs = inputs.to(device)
        labels = labels.to(device)

        optimizer.zero_grad()
        predictions = model(inputs)

        # compute loss
        loss = criterion(predictions, labels)
        losses.append(loss.item())

        # backward
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 0.1)
        optimizer.step()
        total_acc += (predictions.argmax(1) == labels).sum().item()
        total_count += labels.size(0)

        if idx & log_interval == 0 and idx > 0:
            elapsed = time.time() - start_time
            print(
                f"| epoch {epoch:3d} | {idx:5d}/{len(train_dataloader):5d} batches {elapsed:8.3f} sec"
                f"| accuracy {total_acc / total_count:8.3f}"
            )
            total_acc, total_count = 0, 0
            start_time = time.time()

    epoch_acc = total_acc / total_count
    epoch_loss = sum(losses) / len(losses)
    return epoch_acc, epoch_loss

def evaluate(model, criterion, valid_dataloader, device):
    model.eval()
    total_acc, total_count = 0, 0

    losses = []

    with torch.no_grad():
        for _, (inputs, labels) in enumerate(valid_dataloader):
            inputs = inputs.to(device)
            labels = labels.to(device)

            predictions = model(inputs)

            loss = criterion(predictions, labels)
            losses.append(loss.item())

            total_acc += (predictions.argmax(1) == labels).sum().item()
            total_count += labels.size(0)

    epoch_acc = total_acc / total_count
    epoch_loss = sum(losses) / len(losses)
    return epoch_acc, epoch_loss
            

In [8]:
num_classes = len(train_data.dataset.classes)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

lenet_model = LeNetClassifier(num_classes=num_classes)
lenet_model.to(device)

criterion = torch.nn.CrossEntropyLoss()
optimizer = optim.Adam(lenet_model.parameters())

num_epochs = 10
save_model = './model'
if not os.path.exists(save_model):
    os.makedirs(save_model)

train_accs, train_losses = [], []
eval_accs, eval_losses = [], []
best_loss_eval = 100

for epoch in range(1, num_epochs + 1):
    epoch_start_time = time.time()

    train_acc, train_loss = train(
        lenet_model,
        optimizer,
        criterion,
        train_dataloader,
        device,
        epoch
    )
    train_accs.append(train_acc)
    train_losses.append(train_loss)

    eval_acc, eval_loss = evaluate(
        lenet_model,
        criterion,
        valid_dataloader,
        device
    )
    eval_accs.append(eval_acc)
    eval_losses.append(eval_loss)

    if eval_loss < best_loss_eval:
        torch.save(lenet_model.state_dict(), save_model + '/lenet_model.pt')
    
    print('-' * 60)
    print(
        f"| End of epoch {epoch:3d} | Time:{time.time() - epoch_start_time}s | Train Accuracy {train_acc:8.3f} | Train Loss {train_loss:8.3f} "
        f"| Valid Accuracy {eval_acc:8.3f} | Valid Loss {eval_loss:8.3f}"
    )
    print('-' * 60)

lenet_model.load_state_dict(torch.load(save_model + '/lenet_model.pt'))
lenet_model.eval()

| epoch   1 |     1/  211 batches    0.330 sec| accuracy    0.092
| epoch   1 |     4/  211 batches    0.126 sec| accuracy    0.292
| epoch   1 |     5/  211 batches    0.046 sec| accuracy    0.328
| epoch   1 |     8/  211 batches    0.126 sec| accuracy    0.471
| epoch   1 |     9/  211 batches    0.043 sec| accuracy    0.594
| epoch   1 |    12/  211 batches    0.122 sec| accuracy    0.634
| epoch   1 |    13/  211 batches    0.042 sec| accuracy    0.660
| epoch   1 |    64/  211 batches    2.069 sec| accuracy    0.786
| epoch   1 |    65/  211 batches    0.040 sec| accuracy    0.859
| epoch   1 |    68/  211 batches    0.121 sec| accuracy    0.879
| epoch   1 |    69/  211 batches    0.040 sec| accuracy    0.879
| epoch   1 |    72/  211 batches    0.123 sec| accuracy    0.871
| epoch   1 |    73/  211 batches    0.041 sec| accuracy    0.883
| epoch   1 |    76/  211 batches    0.123 sec| accuracy    0.901
| epoch   1 |    77/  211 batches    0.040 sec| accuracy    0.859
| epoch   

  lenet_model.load_state_dict(torch.load(save_model + '/lenet_model.pt'))


LeNetClassifier(
  (conv1): Conv2d(1, 6, kernel_size=(5, 5), stride=(1, 1), padding=same)
  (avgpool1): AvgPool2d(kernel_size=2, stride=2, padding=0)
  (conv2): Conv2d(6, 16, kernel_size=(5, 5), stride=(1, 1))
  (avgpool2): AvgPool2d(kernel_size=2, stride=2, padding=0)
  (flatten): Flatten(start_dim=1, end_dim=-1)
  (fc_1): Linear(in_features=400, out_features=120, bias=True)
  (fc_2): Linear(in_features=120, out_features=84, bias=True)
  (fc_3): Linear(in_features=84, out_features=10, bias=True)
)

In [10]:
test_data.transform = test_transforms
test_dataloader = data.DataLoader(
    test_data,
    BATCH_SIZE,
)
test_acc, test_loss = evaluate(lenet_model, criterion, test_dataloader, device)
test_acc, test_loss

(0.9882, 0.038810593219386645)