# Imports

In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader
import torchvision.datasets as datasets
import torchvision.transforms as transforms

In [15]:
device = torch.device('cude' if torch.cuda.is_available() else 'cpu')
input_size = 784 # 28x28 image size
# --------- CNN -----------
in_channels = 1 # only gray img
# -------- RNN ---------
input_size_rnn = 28
sequence_lenght = 28
hidden_size = 256
num_layers = 2
# ---------------------
num_classes = 10
batch_size = 64
learning_rate = 0.001
num_epochs = 2

load_model = False

In [9]:
class NN(nn.Module):
    def __init__(self, input_size, num_classes):
        super(NN,self).__init__()
        self.fc1 = nn.Linear(input_size,50)
        self.fc2 = nn.Linear(50,num_classes)
        
    def forward(self,x):
        x = x.reshape(x.shape[0], -1)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x

class CNN(nn.Module):
    def __init__(self, in_channels=1, num_classes = 10):
        super(CNN, self).__init__()
        self.conv1 = nn.Conv2d(in_channels=in_channels, out_channels=8, kernel_size=(3,3), stride=(1,1), padding=(1,1))
        self.pool = nn.MaxPool2d(kernel_size=(2,2), stride=(2,2))
        self.conv2 = nn.Conv2d(in_channels=8, out_channels=16, kernel_size=(3,3), stride=(1,1), padding=(1,1))
        self.fc1 = nn.Linear(16*7*7, num_classes)
        
    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = self.pool(x)
        x = F.relu(self.conv2(x))
        x = self.pool(x)
        x = x.reshape(x.shape[0], -1)
        x = self.fc1(x)
        
        return x
    
class RNN(nn.Module): # + LSTM
    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        super(RNN, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
#         self.rnn = nn.RNN(input_size,hidden_size,num_layers, batch_first=True)
        self.lstm = nn.LSTM(input_size,hidden_size,num_layers, batch_first=True)
#         self.fc = nn.Linear(hidden_size*sequence_lenght, num_classes)
        self.fc = nn.Linear(hidden_size, num_classes)
        
    def forward(self, x):
        x = x.squeeze(1)
        # x - torch.Size([64, 28, 28])
        # h0 - torch.Size([2, 64, 256])
        # out - torch.Size([64, 28, 256])
        
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device) # hidden state - stores the data of prev step and may get flushed to zero through forget gate if irrelevent
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device) # cell state - stores data of whole network except of flused steps
        # Forward Bias
        out, _  = self.lstm(x,(h0,c0))
        # out = out.reshape(out.shape[0], -1)
        out = self.fc(out[:,-1,:])
        
        return out

In [10]:
def save_checkpoint(state, filename='model_checkpoint.pth.tar'):
    print('===> Saving Checkpoint')
    torch.save(state, filename)
def load_checkpoint(checkpoint):
    print('===> Loading Model')
    model.load_state_dict(checkpoint['state_dict'])
    optimizer.load_state_dict(checkpoint['optimizer'])

# Loading Data & Initializing Model

In [12]:
train_data = datasets.MNIST(root='/datasets', transform=transforms.ToTensor(), download=True, train=True)
train_loader = DataLoader(dataset=train_data, batch_size=batch_size, shuffle=True)
test_data = datasets.MNIST(root='/datasets', transform=transforms.ToTensor(), download=True, train=False)
test_loader = DataLoader(dataset=test_data, batch_size=batch_size, shuffle=True)

In [13]:
# model = NN(input_size=input_size,num_classes=num_classes).to(device)
# model = CNN(in_channels=in_channels,num_classes=num_classes).to(device)
model = RNN(input_size_rnn, hidden_size, num_layers, num_classes).to(device)

In [14]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(),lr=learning_rate)

In [17]:
if load_model:
    load_checkpoint(torch.load('model.pth.tar'))

else:
    for epoch in range(num_epochs):
        if epoch % 2 == 0 or epoch==(num_epochs-1):
            checkpoint = {'state_dict': model.state_dict(), 'optimizer': optimizer.state_dict()}
            save_checkpoint(checkpoint)

        for batch_idx, (data,targets) in enumerate(train_loader):
            # get data to cuda if possible
            data = data.to(device)
            targets = targets.to(device)

            # converting [64,1,28,28] to [64,784]
            # data = data.reshape(data.shape[0], -1)

            # forward
            scores = model(data)
            loss = criterion(scores, targets)

            # backward
            optimizer.zero_grad() # Nullyfing optimizers so it does save anything of back prop
            loss.backward()

            # gradient decent
            optimizer.step()

===> Saving Checkpoint
===> Saving Checkpoint


In [14]:
def get_accuracy(loader, model):
    if loader.dataset.train:
        print('Checking accuracy on training data')
    else:
        print('Checking accuracy on testing data')
        
    num_currect = 0
    num_samples = 0
    model.eval()
    
    with torch.no_grad():
        for x,y in loader:
            x = x.to(device)
            y = y.to(device)
            
            scores = model(x)
            _, predictions = scores.max(1)
            num_currect += (predictions == y).sum()
            num_samples += predictions.size(0)
            
        print(f'got {num_currect} / {num_samples} samples with accuracy: {float(num_currect)/float(num_samples)*100:.2f}')
        
    model.train()

In [15]:
get_accuracy(train_loader,model)
get_accuracy(test_loader,model)

Checking accuracy on training data
got 58591 / 60000 samples with accuracy: 97.65
Checking accuracy on testing data
got 9757 / 10000 samples with accuracy: 97.57


read this: https://d2l.ai/chapter_recurrent-modern/lstm.html