# imports

In [27]:
import tqdm

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F # relu, tanh # also included in nn package
from torch.utils.data import DataLoader
import torchvision.datasets as datasets
import torchvision.transforms as transforms

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# create network

In [5]:
# hyperparameters
input_size = 28
sequence_length = 28 # each row of the image creates the sequence
num_layers = 2
hidden_size = 256
num_classes = 10
lr = 0.001
batch_size = 64
num_epochs = 2

In [32]:
class RNN(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes, 
                 gru=False):
        super(RNN, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        # batch is the first axis, so we need batch_first=True
        # N x time_seq x num_features
        if gru:
            self.rnn = nn.GRU(input_size, hidden_size, num_layers,
                              batch_first=True) # RNN will work for any number of sequences
        else:
            self.rnn = nn.RNN(input_size, hidden_size, num_layers,
                              batch_first=True) # RNN will work for any number of sequences
        # use all hidden states for the prediction
        self.fc1 = nn.Linear(hidden_size*sequence_length, num_classes)

    def forward(self, x):
        # hidden state
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device)
        
        # forward prop
        out, _ = self.rnn(x, h0)
        out = out.reshape(out.shape[0], -1) # sequence_length*hidden_size
        # use all hidden states for the prediction
        out = self.fc1(out)
        return out

    
model = RNN(input_size, hidden_size, num_layers, num_classes).to(device)
x = torch.randn(64, 28, 28).to(device)
model(x).shape

torch.Size([64, 10])

In [44]:
class LSTM(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        super(LSTM, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        # batch is the first axis, so we need batch_first=True
        # N x time_seq x num_features
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers,
                            batch_first=True) # RNN will work for any number of sequences
        # just use the last hidden state for the prediction
        self.fc1 = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        # hidden state
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device)
        
        # forward prop
        out, _ = self.lstm(x, (h0, c0))
        # just use the last hidden state for the prediction
        out = self.fc1(out[:,-1,:])
        return out
    
model = LSTM(input_size, hidden_size, num_layers, num_classes).to(device)
x = torch.randn(64, 28, 28).to(device)
model(x).shape

torch.Size([64, 10])

In [49]:
# bidirectional LSTM
class BLSTM(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        super(BLSTM, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers,
                            batch_first=True, bidirectional=True)
        # multiply by 2 (one forward, one backward), concatenated together
        self.fc = nn.Linear(hidden_size*2, num_classes)

    def forward(self, x):
        # multiply by 2 (one forward, one backward), concatenated together
        h0 = torch.zeros(self.num_layers*2, x.size(0), self.hidden_size).to(device)
        c0 = torch.zeros(self.num_layers*2, x.size(0), self.hidden_size).to(device)

        # forward
        out, (hidden_state, cell_state) = self.lstm(x, (h0, c0))
        out = self.fc(out[:,-1,:]) # take last hidden state
        
        return out

model = BLSTM(input_size, hidden_size, num_layers, num_classes).to(device)
x = torch.randn(64, 28, 28).to(device)
model(x).shape

torch.Size([64, 10])

# load data

In [50]:
train_dataset = datasets.MNIST(root='dataset/', train=True,
                               transform=transforms.ToTensor(),
                               download=True) # training_set
train_loader = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)

test_dataset = datasets.MNIST(root='dataset/', train=False,
                               transform=transforms.ToTensor(),
                               download=True) # training_set
test_loader = DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=True)

# initialize network

In [53]:
#model = RNN(input_size, hidden_size, num_layers, num_classes, gru=True).to(device)
#model = LSTM(input_size, hidden_size, num_layers, num_classes).to(device)
model = BLSTM(input_size, hidden_size, num_layers, num_classes).to(device)

# loss and optimizer

In [54]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=lr)

# train network

In [55]:
for epoch in tqdm.tqdm(range(num_epochs)):
    for batch_idx, (data, targets) in enumerate(train_loader):
        # put data on device
        data = data.to(device).squeeze(1) # remove the extra dimension
        targets = targets.to(device)

        # forward
        scores = model(data)
        loss = criterion(scores, targets)

        # backward
        optimizer.zero_grad() # set all gradients to 0 for each batch
                              # to not save gradient for each minibatch
        loss.backward()

        # gradient descent or adam step
        optimizer.step()

100%|██████████| 2/2 [00:13<00:00,  6.91s/it]


# check accuracy

In [56]:
def check_accuracy(loader, model):
    if loader.dataset.train:
        print("Checking accuracy on training data")
    else:
        print("Checking accuracy on test data")

    num_correct = 0
    num_samples = 0
    model.eval()

    # don't need to compute gradients when checking accuracy
    with torch.no_grad():
        for x,y in loader:
            x = x.to(device).squeeze(1) # remove the extra dimension
            y = y.to(device)

            scores = model(x)
            _, preds = scores.max(1) # index of maximum value for second dim
            num_correct += (preds == y).sum()
            num_samples += preds.size(0)

        print(f'Got {num_correct} / {num_samples} with accuracy {float(num_correct)/float(num_samples)*100:.2f}')

check_accuracy(train_loader, model)
check_accuracy(test_loader, model)

Checking accuracy on training data
Got 57929 / 60000 with accuracy 96.55
Checking accuracy on test data
Got 9602 / 10000 with accuracy 96.02
