In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn. functional as F
from torch.utils.data import DataLoader
import torchvision.datasets as datasets
import torchvision.transforms as transforms

In [3]:
device = torch.device("mps" if torch.backends.mps.is_available() else "cuda" if torch.cuda.is_available() else "cpu")

In [2]:
# Nx1x28x28, we think it as 28 sequence of datapoint that has 28 features

#hyperparameters
input_size = 28
sequence_length = 28
num_layers = 2
hidden_size = 256
num_classes = 10
learning_rate = 0.001
batch_size = 64
num_epochs = 2

In [37]:
class RNN(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        super().__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.rnn= nn.RNN(input_size, hidden_size, num_layers, batch_first= True) #batch first means that the first dimension is batch size i.e N x time_seq x features
        self.fc= nn.Linear(hidden_size* sequence_length, num_classes)
    
    #we have 28 time sequence, so we concatenate all the hidden states of all the time sequence and then pass it to the fully connected layer, so it use
    # information from every hidden state, can take only last hidden state
    def forward(self, x):
        h0= torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device)
        # h0 is hidden state at time 0, and it is of size num_layers x batch_size x hidden_size, and it stores the hidden state of the first time sequence and then updates it
        out, _= self.rnn(x, h0) #we dont want the hidden state, so we use _, as every time sequence has its own hidden state
        out= out.reshape(out.shape[0], -1) #flatten the output
        out= self.fc(out)
        return out
    
#change to gru for better results
class RNN(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        super().__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.gru= nn.GRU(input_size, hidden_size, num_layers, batch_first= True)
        self.fc= nn.Linear(hidden_size* sequence_length, num_classes)
    def forward(self, x):
        h0= torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device)
        out, _= self.gru(x, h0)
        out= out.reshape(out.shape[0], -1)
        out= self.fc(out)
        return out
    
#now change to lstm
class RNN(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        super().__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm= nn.LSTM(input_size, hidden_size, num_layers, batch_first= True)
        self.fc= nn.Linear(hidden_size* sequence_length, num_classes)

    def forward(self, x):
        h0= torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device)
        #here need to define a seperate cell state, its same as hidden state, but it is passed to the next time sequence
        c0= torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device)
        
        out, _= self.lstm(x, (h0, c0))
        out= out.reshape(out.shape[0], -1)
        out= self.fc(out)
        return out
    
#instead of using informaiton from all hidden states, we can use only the last hidden state
class RNN(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        super().__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm= nn.LSTM(input_size, hidden_size, num_layers, batch_first= True)
        self.fc= nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        h0= torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device)
        #here need to define a seperate cell state, its same as hidden state, but it is passed to the next time sequence
        c0= torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device)
        
        out, _= self.lstm(x, (h0, c0))
        out= self.fc(out[:, -1, :]) #take only the last hidden state, take all training examples, take all features
        #in this we are lossing information from all the hidden states, but it is faster
        return out




#we can use nn.RNN(input_size, hidden_size, num_layers, batch_first= True, bidirectional= True)
#this will give us 2 hidden states, one from left to right and one from right to left, so we can concatenate them and pass it to the fully connected layer
#this will give us more information, but it will be slower
#we can also use nn.LSTM, nn.GRU with bidirectional=True
class RNN(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        super().__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm= nn.LSTM(input_size, hidden_size, num_layers, batch_first= True,
                           bidirectional= True)
        self.fc= nn.Linear(hidden_size* 2, num_classes)

    def forward(self, x):
        h0= torch.zeros(self.num_layers* 2, x.size(0), self.hidden_size).to(device) #as two layers for every hidden state, one for forward other backwrd
        c0= torch.zeros(self.num_layers* 2, x.size(0), self.hidden_size).to(device)
        
        out, _= self.lstm(x, (h0, c0)) #in _ we got (hidden_state, cell_state)
        out= self.fc(out[:, -1, :])
        return out

In [38]:
train_dataset = datasets.MNIST(root='data/', train=True, transform=transforms.ToTensor (), download=False)
train_loader = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)


test_dataset = datasets.MNIST(root='data/', train=False, transform=transforms.ToTensor (), download=False)
test_loader = DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=False)

In [39]:
model = RNN(input_size, hidden_size, num_layers, num_classes).to(device)

In [40]:
print(model)

RNN(
  (lstm): LSTM(28, 256, num_layers=2, batch_first=True, bidirectional=True)
  (fc): Linear(in_features=512, out_features=10, bias=True)
)


In [41]:
# Loss and optimizer
criterion = nn.CrossEntropyLoss ()
optimizer = optim.Adam(model.parameters (), lr=learning_rate)

In [42]:
for epoch in range(num_epochs):
  print(f"epoch {epoch+1}/{num_epochs}")
  for batch_idx, (data, targets) in enumerate(train_loader):
    data = data.to(device=device).squeeze(1) #because rnn need 28x28 and not 1x28x28
    targets = targets.to(device=device)
    scores = model(data)
    loss = criterion(scores, targets)
    optimizer.zero_grad()
    loss.backward()
    optimizer .step()

epoch 1/2
epoch 2/2


In [43]:
# Check accuracy on training & test to see how good our model
def check_accuracy(loader, model) :
  if loader.dataset.train:
    print ('Checking accuracy on training data')
  else:
    print ('Checking accuracy on test data')
  num_correct = 0
  num_samples = 0
  model.eval()
  with torch.no_grad():
    for x, y in loader:
      x = x. to(device=device).squeeze(1)
      y = y.to(device=device)
      scores = model(x)
      _, predictions = scores.max(1)
      num_correct += (predictions == y) .sum()
      num_samples += predictions.size (0)

    print (f'Got {num_correct} / {num_samples} with accuracy {float (num_correct)/float (num_samples) *100: .2f} ')
  model. train()

In [44]:
check_accuracy (train_loader, model)
check_accuracy (test_loader, model)

Checking accuracy on training data
Got 58981 / 60000 with accuracy  98.30 
Checking accuracy on test data
Got 9822 / 10000 with accuracy  98.22 
