In [99]:
from typing import Tuple, List

import torch
from torch import nn
from torch.utils.data import DataLoader, random_split
from torchvision.datasets import SEMEION
from torchvision.transforms import transforms


def train_model(model: nn.Module, optimizer, crit, data_loader: DataLoader, device: str = "cuda:0") -> List[float]:
    losses = []
    if not torch.cuda.is_available():
        device = "cpu"
    model.to(device)
    for i, (x, y) in enumerate(data_loader):
        x = x.to(device)
        y = y.to(device)
        optimizer.zero_grad()
        y_hat = model(x)
        y_hat = y_hat.squeeze()
        loss = crit(y_hat, y)
        losses.append(loss.item())
        loss.backward()
        optimizer.step()
    return losses


def val_model(model: nn.Module, data_loader: DataLoader, device: str = "cuda:0") -> float:
    if not torch.cuda.is_available():
        device = "cpu"
    with torch.no_grad():
        total = 0
        correct = 0
        model.to(device)
        for x, y in data_loader:
            x = x.to(device)
            y = y.to(device)

            y_hat = model(x)
            y_hat = y_hat.squeeze()
            y_hat = y_hat.softmax(dim=1)
            probs, idxes = y_hat.max(dim=1)
            total += y.size(0)
            correct += (idxes == y).sum().item()
    return correct / total


def get_semeion_data(root: str = "./data", image_size: int = 28, batch_size: int = 32, channel: int = 1,
                     train_percent: float = 0.8) -> Tuple[
    DataLoader, DataLoader]:
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Resize((image_size, image_size)),
        transforms.RandomCrop(image_size, padding=2),
        # 构造一个通道的数据
        transforms.Lambda(lambda x: x.repeat(channel, 1, 1))
    ])
    data = SEMEION(root=root, download=False, transform=transform)

    # 构建dataset 和 dataloader
    train_size = int(len(data) * train_percent)
    test_size = len(data) - train_size
    train_data, test_data = random_split(data, [train_size, test_size])
    train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True)
    test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=True)
    return train_loader, test_loader


def enhance_data(data: SEMEION, random_times: int = 5, image_size: int = 28, batch_size: int = 32,
                 train_percent: float = 0.8) -> [DataLoader,
                                                 DataLoader]:
    crop_transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Resize((image_size, image_size)),
        transforms.RandomCrop(image_size, padding=2),
    ])

    x = []
    y = []
    for image, label in data:
        for i in range(random_times):
            x.append(crop_transform(image))
            y.append(label)
    x = torch.stack(x)
    y = torch.tensor(y)
    data = torch.utils.data.TensorDataset(x, y)
    train_size = int(len(data) * train_percent)
    test_size = len(data) - train_size
    train_data, test_data = random_split(data, [train_size, test_size])
    train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True)
    test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=True)
    return train_loader, test_loader


$$LeNet$$

In [100]:
class CNNTorch(nn.Module):
    def __init__(self, num_classes=10):
        super(CNNTorch, self).__init__()
        self.conv1 = nn.Conv2d(1, 6, 5)
        self.conv2 = nn.Conv2d(6, 16, 5)
        self.pool = nn.MaxPool2d(2, 2)

        self.fc1 = nn.Linear(16 * 4 * 4, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, num_classes)

    def forward(self, x):
        batch_size = x.size(0)
        x = self.pool(F.relu(self.conv1(x)))  # 6 * 12 * 12
        x = self.pool(F.relu(self.conv2(x)))  # 16 * 4 * 4
        x = x.view(batch_size, -1, 16 * 4 * 4)  #
        x = F.relu(self.fc1(x))  # 120
        x = F.relu(self.fc2(x))  # 84
        x = self.fc3(x)  # 10
        return x

In [None]:
epoch_losses, train_scores, test_scores = [], [], []
num_epoch = 1000

model = CNNTorch(num_classes=10)
optimizer = optim.Adam(model.parameters(), lr=0.01)
criterion = nn.CrossEntropyLoss()
data = SEMEION(root='./data', download=False)
train_loader,test_loader = enhance_data(data,batch_size=1024,image_size=28,random_times=10)
for epoch in tqdm(range(num_epoch)):
    epoch_loss = train_model(model, optimizer, criterion, train_loader)
    epoch_losses.append(np.mean(epoch_loss))
    # 训练集测试精度
    train_score = val_model(model, train_loader)
    train_scores.append(train_score)
    # 验证集测试精度
    test_score = val_model(model, test_loader)
    test_scores.append(test_score)

 95%|████████████████████████████████████████████████████████████████████████████████████████████████████▋     | 950/1000 [04:31<00:15,  3.30it/s]

$保存数据$

In [None]:
resultData = pd.DataFrame({'epoch_loss': epoch_losses, 'train_score': train_scores, 'test_score': test_scores})
resultData.to_csv(f'./results/{type(model).__name__}-result.csv', index=False)

$绘制图像$

In [None]:
# 训练损失
plt.plot(epoch_losses)
plt.title('epoch loss')
plt.legend(['train epoch loss'])
plt.savefig(f'./results/{type(model).__name__}-epoch_loss.png')
plt.show()

In [None]:
# 训练精度
plt.plot(train_scores, label='train score')
plt.plot(test_scores, label='test score')
plt.title('epoch score')
plt.legend()
plt.savefig(f'./results/{type(model).__name__}-epoch_score.png')
plt.show()