# Sequential MNIST & Permuted Sequential MNIST

## Overview

MNIST is a handwritten digit classification dataset (Lecun et al., 1998) that is frequently used to test deep learning models. In particular, sequential MNIST is frequently used to test a recurrent network's ability to retain information from the distant past (see paper for references). In this task, each MNIST image ($28 \times 28$) is presented to the model as a $748 \times 1$ sequence for digit classification. In the more chanllenging permuted MNIST (P-MNIST) setting, the order of the sequence is permuted at a (fixed) random order.

**NOET**: 
- Because a TCN's receptive field depends on depth of the network and the filter size, we need to make sure that the model we used can cover the sequence length 784.

- While this is a sequence model task, we only use the last output (i.e. at time T=784) for the eventual classification.

## Settings

In [1]:
import torch as th
import torch.nn as nn
import numpy as np
import torch.nn.functional as F

from tqdm.notebook import tqdm

BATCH_SIZE = 128
DEVICE = "cuda:0"
DROPOUT = 0.05
CLIP = -1
EPOCHS = 5
KSIZE = 7
LEVELS = 8
LR = 2e-3
OPTIM = "Adam"
NHID = 30
SEED = 1111
PERMUTE = False

DATA_ROOT = "/home/densechen/dataset"

N_CLASSES = 10
INPUT_CHANNELS = 1
SEQ_LEN = int(784 / INPUT_CHANNELS)

CHANNEL_SIZES = [NHID] * LEVELS

th.manual_seed(SEED)

<torch._C.Generator at 0x7f5708b534f0>

## Data Generation

In [2]:
from torchvision import datasets, transforms

def data_generator():
    transform = transforms.Compose(
                    [transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))])
    train_set = datasets.MNIST(DATA_ROOT, train=True, download=False, 
                               transform=transform)
    test_set = datasets.MNIST(DATA_ROOT, train=False, download=False,
                               transform=transform)
    train_loader = th.utils.data.DataLoader(train_set, batch_size=BATCH_SIZE)
    test_loader = th.utils.data.DataLoader(test_set, batch_size=BATCH_SIZE)
    return train_loader, test_loader

print("Producing data...")
train_loader, test_loader = data_generator()
print("Finished.")

Producing data...
Finished.


## Build Model

In [3]:
from core.tcn import TemporalConvNet

class TCN(nn.Module):
    def __init__(self, input_size, output_size, num_channels, kernel_size, dropout):
        super().__init__()
        self.tcn = TemporalConvNet(input_size, num_channels,
            kernel_size=kernel_size, dropout=dropout)
        self.linear = nn.Linear(num_channels[-1], output_size)
    
    def forward(self, inputs):
        # inputs shape: [N, C_in, L_in]
        y1 = self.tcn(inputs)
        o = self.linear(y1[..., -1])
        return F.log_softmax(o, dim=1)

print("Building model...")
permute_idx = th.Tensor(np.random.permutation(784).astype(np.float64)).long()
model = TCN(INPUT_CHANNELS, N_CLASSES, CHANNEL_SIZES, kernel_size=KSIZE, dropout=DROPOUT)

model = model.to(DEVICE)

optimizer = getattr(th.optim, OPTIM)(model.parameters(), lr=LR)

model_size = sum(p.numel() for p in model.parameters())

print(f"Model Size: {model_size/1000} K")

print("Finished.")

Building model...
Model Size: 96.04 K
Finished.


## Run

In [4]:
def train(ep):
    model.train()
    process = tqdm(train_loader)
    for data, target in process:
        data, target = data.to(DEVICE), target.to(DEVICE)
        data = data.view(-1, INPUT_CHANNELS, SEQ_LEN)
        if PERMUTE:
            data = data[:, :, permute_idx]
        optimizer.zero_grad()
        output = model(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        if CLIP > 0:
            th.nn.utils.clip_grad_norm_(model.parameters(), CLIP)
        optimizer.step()
        process.set_description(
            f"Train Epoch: {ep}, Loss: {loss.item():.6f}")

def test():
    model.eval()
    correct = 0
    with th.no_grad():
        for data, target in test_loader:
            data, target = data.to(DEVICE), target.to(DEVICE)
            data = data.view(-1, INPUT_CHANNELS, SEQ_LEN)
            if PERMUTE:
                data = data[:, :, permute]
            output = model(data)
            pred = output.data.max(1, keepdim=True)[1]
            correct += pred.eq(target.data.view_as(pred)).cpu().sum()
    print(f'Accuracy: {correct/len(test_loader.dataset) * 100:.2f}%')

for epoch in range(1, EPOCHS+1):
    train(epoch)
    test()

  0%|          | 0/469 [00:00<?, ?it/s]

Accuracy: 95.59%


  0%|          | 0/469 [00:00<?, ?it/s]

Accuracy: 97.02%


  0%|          | 0/469 [00:00<?, ?it/s]

Accuracy: 97.72%


  0%|          | 0/469 [00:00<?, ?it/s]

Accuracy: 97.61%


  0%|          | 0/469 [00:00<?, ?it/s]

Accuracy: 98.07%
