<a href="https://colab.research.google.com/github/rakesh4real/pytorch-examples/blob/main/rnn.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [37]:
from tqdm import tqdm
import torch
import torchvision
import torch.nn as nn  
import torch.optim as optim
import torch.nn.functional as F  
from torch.utils.data import DataLoader
import torchvision.datasets as datasets
import torchvision.transforms as transforms

In [38]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

---
---

In [39]:
input_size = 28
hidden_size = 256
n_layers = 2
n_classes = 10
seq_len = 28
learning_rate = 0.005
batch_size = 64
num_epochs = 2

In [56]:
"""
basic rnn 
"""

class RNN(nn.Module):
  
  def __init__(self, seq_length, input_size, hidden_size, n_layers, n_classes):
    """
    :param input_size: num of features of input
    """
    super(RNN, self).__init__()

    self.n_layers = n_layers
    self.hidden_size = hidden_size

    self.rnn = nn.RNN(input_size, self.hidden_size, self.n_layers,
                      batch_first = True)
    self.fc = nn.Linear(
        self.hidden_size*seq_length,
        n_classes
    )


  def forward(self, x):
    """
    :param x: is of shape (batch_size, seq_len, input_size) where `seq_len` is 
              number of individual sequences and `input_size` is num of features.
              For eg.
                + In case of text, seq_len is size of word and input_size is
                  vocabulary size as it will be ohe
                + In case of image, seq_len is height and input_size is width
                  (note: same as ohe if you can imagine)

                     input size (one-hot vocab size)
                   s +-+-+-+-+
                   e +-+-+-+-+
                   q +-+-+-+-+  ....
                     +-+-+-+-+
                   l +-+-+-+-+
                   e +-+-+-+-+
                   n +-+-+-+-+
                     +-+-+-+-+

    """

    # 3-D: (n_layers, bath_size, hidden_size)
    h0 = torch.zeros(
        (self.n_layers, x.shape[0], self.hidden_size)
    ).to(device)

    # pass
    out, _ = self.rnn(x, h0)

    # classification layer on output of last time step
    out = out.reshape(out.shape[0], -1) # flatten
    out = self.fc(out)
    
    return out

In [41]:
# test
model = RNN(100, input_size, n_layers, hidden_size, n_classes).to(device)
test_batch_seq = torch.rand(64, 100, 28).to(device)

model(test_batch_seq).shape

torch.Size([64, 10])

In [42]:
"""
rnn gru

Only change required is `self.rnn = nn.GRU ...`
"""

class RNN_GRU(nn.Module):
  
  def __init__(self, seq_length, input_size, hidden_size, n_layers, n_classes):
    """
    :param input_size: num of features of input
    """
    super(RNN_GRU, self).__init__()

    self.n_layers = n_layers
    self.hidden_size = hidden_size

    self.rnn = nn.GRU(input_size, self.hidden_size, self.n_layers,
                      batch_first = True)
    self.fc = nn.Linear(
        self.hidden_size*seq_length,
        n_classes
    )


  def forward(self, x):
    """
    :param x: is of shape (batch_size, seq_len, input_size) where `seq_len` is 
              number of individual sequences and `input_size` is num of features.
              For eg.
                + In case of text, seq_len is size of word and input_size is
                  vocabulary size as it will be ohe
                + In case of image, seq_len is height and input_size is width
                  (note: same as ohe if you can imagine)

                     input size (one-hot vocab size)
                   s +-+-+-+-+
                   e +-+-+-+-+
                   q +-+-+-+-+  ....
                     +-+-+-+-+
                   l +-+-+-+-+
                   e +-+-+-+-+
                   n +-+-+-+-+
                     +-+-+-+-+

    """

    # 3-D: (n_layers, bath_size, hidden_size)
    h0 = torch.zeros(
        (self.n_layers, x.shape[0], self.hidden_size)
    ).to(device)

    # pass
    out, _ = self.rnn(x, h0)

    # classification layer on output of last time step
    out = out.reshape(out.shape[0], -1) # flatten
    out = self.fc(out)
    
    return out


In [43]:
# test
model = RNN_GRU(100, input_size, n_layers, hidden_size, n_classes).to(device)
test_batch_seq = torch.rand(64, 100, 28).to(device)

model(test_batch_seq).shape

torch.Size([64, 10])

**Stacked RNN with different hidden size at each layer**

```
import torch 
from torch import nn
from torch.autograd import Variable

# layer1
# input_dim=10, output_dim=20
rnn1 = nn.LSTM(10, 20, 1)
input = Variable(torch.randn(5, 3, 10))
output1, hn = rnn1(input)

# layer2
# input_dim=20 output_dim=30
rnn2 = nn.LSTM(20, 30, 1)
output2, hn2 = rnn2(output1) 
```

In [46]:
"""
rnn lstm

only two differences compared to above code:
  1. self.lstm = nn.LSTM( ...
  2. extra input (memory cells) to lstm
"""

"""
rnn gru

Only change required is `self.rnn = nn.GRU ...`
"""

class RNN_LSTM(nn.Module):
  
  def __init__(self, seq_length, input_size, hidden_size, n_layers, n_classes):
    """
    :param input_size: num of features of input
    """
    super(RNN_LSTM, self).__init__()

    self.n_layers = n_layers
    self.hidden_size = hidden_size

    self.lstm = nn.LSTM(input_size, self.hidden_size, self.n_layers,
                      batch_first = True)
    self.fc = nn.Linear(
        self.hidden_size*seq_length,
        n_classes
    )


  def forward(self, x):
    """
    :param x: is of shape (batch_size, seq_len, input_size) where `seq_len` is 
              number of individual sequences and `input_size` is num of features.
              For eg.
                + In case of text, seq_len is size of word and input_size is
                  vocabulary size as it will be ohe
                + In case of image, seq_len is height and input_size is width
                  (note: same as ohe if you can imagine)

                     input size (one-hot vocab size)
                   s +-+-+-+-+
                   e +-+-+-+-+
                   q +-+-+-+-+  ....
                     +-+-+-+-+
                   l +-+-+-+-+
                   e +-+-+-+-+
                   n +-+-+-+-+
                     +-+-+-+-+

    """

    # 3-D: (n_layers, bath_size, hidden_size)
    h0 = torch.zeros(
        (self.n_layers, x.shape[0], self.hidden_size)
    ).to(device)
    c0 = torch.zeros(
        (self.n_layers, x.shape[0], self.hidden_size)
    ).to(device)

    # pass
    out, _ = self.lstm(
        x, (h0, c0)
    ) # out: tensor of shape (batch_size, seq_length, hidden_size)

    # classification layer on output of last time step
    out = out.reshape(out.shape[0], -1) # flatten
    out = self.fc(out)
    
    return out

In [47]:
# test
model = RNN_LSTM(100, input_size, n_layers, hidden_size, n_classes).to(device)
test_batch_seq = torch.rand(64, 100, 28).to(device)

model(test_batch_seq).shape

torch.Size([64, 10])

---
---

In [48]:
"""
load data
"""

train_dataset = datasets.MNIST(root="dataset/", train=True, transform=transforms.ToTensor(), download=True)
test_dataset = datasets.MNIST(root="dataset/", train=False, transform=transforms.ToTensor(), download=True)

train_loader = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=True)

In [61]:
"""
instantiate model
"""
#model = RNN(seq_len, input_size, hidden_size, n_layers, n_classes).to(device)
model = RNN_LSTM(seq_len, input_size, hidden_size, n_layers, n_classes).to(device)

In [62]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

In [63]:
log_every = 1
hist = {
    'train_loss': []
}

for epoch in range(num_epochs):
    for batch_idx, (data, targets) in enumerate(train_loader):
        # Get data to cuda if possible
        data = data.to(device=device).squeeze(1)
        targets = targets.to(device=device)

        # forward
        scores = model(data)
        loss = criterion(scores, targets)

        # backward
        optimizer.zero_grad()
        loss.backward()

        # gradient descent or adam step
        optimizer.step()

    # log 
    # ======================================================
    if epoch % log_every == 0:
      print(f"loss: {loss.item()}")
      hist['train_loss'].append(loss.item())

loss: 0.01947651244699955
loss: 0.0009912042878568172


In [64]:
def check_accuracy(loader, model):
    if loader.dataset.train:
        print("Checking accuracy on training data")
    else:
        print("Checking accuracy on test data")

    num_correct = 0
    num_samples = 0

    # Set model to eval
    model.eval()

    with torch.no_grad():
        for x, y in loader:
            x = x.to(device=device).squeeze(1)
            y = y.to(device=device)

            scores = model(x)
            _, predictions = scores.max(1)
            num_correct += (predictions == y).sum()
            num_samples += predictions.size(0)

        print(
            f"Got {num_correct} / {num_samples} with \
              accuracy {float(num_correct)/float(num_samples)*100:.2f}"
        )
    # Set model back to train
    model.train()

check_accuracy(train_loader, model)
check_accuracy(test_loader, model)

Checking accuracy on training data
Got 59208 / 60000 with               accuracy 98.68
Checking accuracy on test data
Got 9844 / 10000 with               accuracy 98.44
