# Import packages

In [681]:
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
%matplotlib inline

In [682]:
import torch
from torch import nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import transforms
from torchvision.datasets import MNIST
from torch.optim.lr_scheduler import StepLR
from torch.utils.data import Subset

# Configurations

In [683]:
device = torch.device("cuda" if torch.cuda.is_available() else "mps" if torch.backends.mps.is_available() else "cpu")
config = {
    "batch_size": 16,
    "test_batch_size": 1000,
    "epochs": 14,
    "lr": 1.0, # Learning rate
    "gamma": 0.7, # Learning rate step gamma
    "dry_run": False,
    "seed": 1,
    "log_interval": 20,
    "data_dir": "./data",
    "subset_size": 500
}

# Import and Preprocessing Dataset

In [684]:
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.1307,), (0.3081,))
])

train_set = MNIST(root = './data', train = True, download = True, transform = transform)
test_set = MNIST(root = './data', train = False, download = False, transform = transform)

# Nerual Network Definition

In [685]:
class Net(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1_layers = nn.Sequential(
            nn.Conv2d(1, 64, 5, 1),
            nn.LeakyReLU(0.1),
            nn.BatchNorm2d(64),
            nn.Conv2d(64, 64, 5, 1),
            nn.LeakyReLU(0.1),
            nn.BatchNorm2d(64),
            nn.MaxPool2d(2),
            nn.Dropout(0.25),
        )
        self.conv2_layers = nn.Sequential(
            nn.Conv2d(64, 128, 3, 1),
            nn.LeakyReLU(0.1),
            nn.BatchNorm2d(128),
            nn.Conv2d(128, 128, 3, 1),
            nn.LeakyReLU(0.1),
            nn.BatchNorm2d(128),
            nn.MaxPool2d(2),
            nn.Dropout(0.25),
        )
        self.fc_layers = nn.Sequential(
            nn.Flatten(),
            nn.Linear(1152, 256),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(256, 10),
            nn.LogSoftmax(dim=1)
        )

    def forward(self, x):
        x = self.conv1_layers(x)
        x = self.conv2_layers(x)
        return self.fc_layers(x)

# Training and Testing

In [686]:
def train(args, model, device, train_loader, optimizer, epoch):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()
        if batch_idx % args['log_interval'] == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(data), len(train_loader.dataset),
                100. * batch_idx / len(train_loader), loss.item()))
            if args['dry_run']:
                break

In [687]:
def test(model, device, test_loader):
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            test_loss += F.nll_loss(output, target, reduction='sum').item()  # sum up batch loss
            pred = output.argmax(dim=1, keepdim=True)  # get the index of the max log-probability
            correct += pred.eq(target.view_as(pred)).sum().item()

    test_loss /= len(test_loader.dataset)

    print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
        test_loss, correct, len(test_loader.dataset),
        100. * correct / len(test_loader.dataset)))

# KPI1: Use 500 training samples.

## Randomly select 500 samples

In [688]:
def get_balanced_subset_indices(targets, num_classes, num_per_class):
    indices = []
    for i in range(num_classes):
        class_indices = (targets == i).nonzero(as_tuple=True)[0]
        random_indices = np.random.choice(class_indices, num_per_class, replace=False)
        indices.extend(random_indices)
    return indices

In [689]:
num_classes = 10
num_per_class = 50

train_indices = get_balanced_subset_indices(train_set.targets, num_classes, num_per_class)

train_subset = Subset(train_set, train_indices)

In [690]:
# rows = 5
# fig, axes = plt.subplots(rows, 10, figsize=(10 ,rows))

# for i in range (10):
#     ds  = [x for x, y in train_subset if y == i]
#     for j in range(rows):
#         ax = axes[j,i]
#         ax.imshow(ds[j][0].numpy().reshape(28,28), cmap='gray')
#         ax.axis('off')

# plt.show()

In [691]:
train_kwargs = {'batch_size': config['batch_size']}
test_kwargs = {'batch_size': config['test_batch_size']}
if torch.cuda.is_available():
    cuda_kwargs = {'num_workers': 1,
                   'pin_memory': True,
                   'shuffle': True}
    train_kwargs.update(cuda_kwargs)
    test_kwargs.update(cuda_kwargs)

train_loader = DataLoader(train_set, **train_kwargs)
subset_train_loader = DataLoader(train_subset, **train_kwargs)
test_loader = DataLoader(test_set, **test_kwargs)

In [692]:
def train_and_test_model(train_loader):
    model = Net().to(device)
    optimizer = optim.Adadelta(model.parameters(), lr=config['lr'])

    scheduler = StepLR(optimizer, step_size=1, gamma=config['gamma'])
    for epoch in range(1, config['epochs'] + 1):
        train(config, model, device, train_loader, optimizer, epoch)
        test(model, device, test_loader)
        scheduler.step()

In [693]:
train_and_test_model(subset_train_loader)


Test set: Average loss: 5.8653, Accuracy: 1120/10000 (11%)


Test set: Average loss: 2.7862, Accuracy: 3257/10000 (33%)


Test set: Average loss: 2.2571, Accuracy: 3900/10000 (39%)


Test set: Average loss: 1.7154, Accuracy: 4684/10000 (47%)


Test set: Average loss: 1.1397, Accuracy: 5879/10000 (59%)


Test set: Average loss: 0.8678, Accuracy: 6716/10000 (67%)


Test set: Average loss: 0.6582, Accuracy: 7436/10000 (74%)


Test set: Average loss: 0.4996, Accuracy: 8187/10000 (82%)


Test set: Average loss: 0.4191, Accuracy: 8613/10000 (86%)


Test set: Average loss: 0.3912, Accuracy: 8735/10000 (87%)


Test set: Average loss: 0.3514, Accuracy: 8939/10000 (89%)


Test set: Average loss: 0.3372, Accuracy: 8993/10000 (90%)


Test set: Average loss: 0.3301, Accuracy: 9019/10000 (90%)


Test set: Average loss: 0.3225, Accuracy: 9045/10000 (90%)



### Data Augmentation

In [694]:
from torch.utils.data import ConcatDataset

augment_transform = transforms.Compose([
    transforms.RandomResizedCrop(28, scale=(0.9, 1.1)),
    transforms.RandomRotation(15),
    transforms.ToTensor(),
    transforms.Normalize((0.1307,), (0.3081,))
])

# 对子集应用数据增强
concat_dataset = train_subset
train_subset.dataset.transform = augment_transform

for i in range(5):
    concat_dataset = ConcatDataset([concat_dataset, train_subset])

augmented_train_loader = DataLoader(concat_dataset, **train_kwargs)

train_and_test_model(augmented_train_loader)


Test set: Average loss: 2.6489, Accuracy: 3560/10000 (36%)


Test set: Average loss: 0.8594, Accuracy: 7330/10000 (73%)


Test set: Average loss: 0.4510, Accuracy: 8385/10000 (84%)


Test set: Average loss: 0.2308, Accuracy: 9292/10000 (93%)


Test set: Average loss: 0.1729, Accuracy: 9509/10000 (95%)


Test set: Average loss: 0.1482, Accuracy: 9580/10000 (96%)


Test set: Average loss: 0.1502, Accuracy: 9595/10000 (96%)


Test set: Average loss: 0.1476, Accuracy: 9603/10000 (96%)


Test set: Average loss: 0.1446, Accuracy: 9603/10000 (96%)


Test set: Average loss: 0.1410, Accuracy: 9625/10000 (96%)


Test set: Average loss: 0.1412, Accuracy: 9625/10000 (96%)


Test set: Average loss: 0.1416, Accuracy: 9624/10000 (96%)


Test set: Average loss: 0.1424, Accuracy: 9620/10000 (96%)


Test set: Average loss: 0.1424, Accuracy: 9625/10000 (96%)

