In [1]:
import pandas as pd
pd.options.mode.chained_assignment = None

import numpy as np
import torch

from torch.utils.tensorboard import SummaryWriter
from utils.nn_data_classifier import load_data, Classifier
from utils.preprocess import preprocess, RNNDataset
import torchvision.transforms as transforms
import torchvision
from torch.utils.data import DataLoader

writer = SummaryWriter(log_dir='logs')



Load the dataset

In [2]:
train_dataset = torchvision.datasets.MNIST(root='./historicals/',
                                           train=True,
                                           transform=transforms.ToTensor(),
                                           download=True)

test_dataset = torchvision.datasets.MNIST(root='./historicals/',
                                           train=False,
                                           transform=transforms.ToTensor(),
                                           download=True)

In [3]:
input_size = 28
SEQ_LEN = 28
hidden_size = 500
num_classes = 10
batch_size = 64
learning_rate = 0.001
num_epochs = 2
num_layers = 2

In [4]:
train_loader = DataLoader(dataset=train_dataset,
                          batch_size=batch_size,
                          shuffle=True)

test_loader = DataLoader(dataset=test_dataset,
                          batch_size=batch_size,
                          shuffle=True)

Sort data

In [5]:
from torch import nn
class RNN_module(nn.Module):
    def __init__(self, hidden_size, input_size, output_size, num_layers):
        super(RNN_module, self).__init__()
        self._num_layers = num_layers
        self._input_size = input_size
        self._hidden_size = hidden_size
        self._output_size = output_size

        self.lstm = nn.LSTM(input_size = self._input_size, hidden_size = self._hidden_size, 
                            num_layers = self._num_layers, batch_first = True)
        self.fc = nn.Linear(in_features=self._hidden_size, out_features= self._output_size)

    def __str__(self):
        return f"RNN LSTM Model w/ {self._input_size} features and {self._num_layers} layers and {self._hidden_size} of hidden size"

    def forward(self, input):
        
        h_0 = torch.zeros(self._num_layers, input.size(0), self._hidden_size, dtype=torch.float64)
        c_0 = torch.zeros(self._num_layers, input.size(0), self._hidden_size, dtype=torch.float64)
        lstm_output, (h_n, c_n) = self.lstm(input)
        pred = self.fc(lstm_output[:, -1, :])
        
        return pred

In [6]:
lstm_model = RNN_module(hidden_size = hidden_size, input_size = input_size,
                     output_size = num_classes, num_layers = num_layers)


In [7]:
criterion = nn.CrossEntropyLoss()
# optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

In [8]:
CHECKPOINT_PATH = './models_parameters/MNIST/'
BEST_PATH = './models_parameters/MNIST/best_model.pth'

def epoch_training(model: RNN_module, train_dataloader, epoch, total_epochs, optimizer):
    running_correct = 0
    running_loss = 0.0
    n_of_steps = len(train_dataloader)

    for current_batch, (sequence, label) in enumerate(train_dataloader):
        #forward: we are calculating the loss given the parameters
        sequence = sequence.reshape(-1, SEQ_LEN, input_size)
        outputs = model(sequence)
        loss = criterion(input=outputs, target = label)

        #backward: lets update the parameters given the current loss
        optimizer.zero_grad() #nullifies the current gradients. If you don't do this, gradients will be added up (you don't want that)
        loss.backward() #computates the bwrd-prop gradient for each model parameter
        optimizer.step() #updates the model current parameter using the gradients.

        predictions = torch.argmax(outputs, 1)
        correct = torch.argmax(label, 0)

        running_correct += (predictions == label).sum().item()
        running_loss += loss.item()

        if (current_batch + 1) % 50 == 0:
            print(f"epoch {epoch+1}/{total_epochs}, current step(batch): {current_batch+1}/{n_of_steps}, loss = {loss.item():.4f} ")
            writer.add_scalar('training loss: ', running_loss/50, epoch * n_of_steps + current_batch)
            writer.add_scalar('accuracy: ', running_correct/50, epoch * n_of_steps + current_batch)
            running_loss = 0.0
            running_correct = 0

    writer.add_scalar('Epoch loss: ', loss, epoch + 1)


def epoch_validate(model, validation_dataloader, epoch, total_epochs):
    with torch.no_grad():
        n_corrects = 0
        n_samples = 0
        
        for current_batch, (sequence, label) in enumerate(validation_dataloader):

            #forward: we are calculating the loss given the parameters
            sequence = sequence.reshape(-1, SEQ_LEN, input_size)
            outputs = model(sequence)
            
            predictions = torch.argmax(outputs, 1)
            correct = torch.argmax(label, 0)
            
            n_samples += label.shape[0]
            n_corrects += (predictions == label).sum().item()

        acc = 100.0 * n_corrects / n_samples

        print(f"epoch {epoch+1}/{total_epochs} accuracy: {acc}")
        writer.add_scalar('Validation Accuracy: ', acc, epoch+1)

    return acc


def train_loop(model: RNN_module, train_dataloader: DataLoader, validation_dataloader: DataLoader, epochs: int):
    
    max_accuracy = 0
    is_best = False
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

    for epoch in range(epochs):
        epoch_training(model, train_dataloader, epoch, epochs, optimizer)
        accuracy = epoch_validate(model, validation_dataloader, epoch, epochs)

        if accuracy > max_accuracy:
            is_best = True
            max_accuracy = accuracy
        else:
            is_best = False
        
        checkpoint = {
            'epoch': epoch+1,
            'model_state': model.state_dict(),
            'optim_state': optimizer.state_dict()
        }

        if is_best:
            torch.save(checkpoint, BEST_PATH)
        
        torch.save(checkpoint, CHECKPOINT_PATH+f'model_{epoch+1}.pth')

In [9]:
def test_loop(test_dataloader: DataLoader, model: nn.Module):
    with torch.no_grad():
        n_corrects = 0
        n_samples = 0

        for current_batch, (sequence, label) in enumerate(test_dataloader):
            #forward: we are calculating the loss given the parameters
            outputs = model(sequence)
            predictions = torch.argmax(outputs, 1)

            n_samples += outputs.shape[0]
            n_corrects += (predictions == label).sum().item()

            if (current_batch + 1) % 200 == 0:
                print(f"test batch: {current_batch+1}/{len(test_dataloader)}, current accuracy: {100 * n_corrects / n_samples}")

        acc = 100.0 * n_corrects / n_samples
        print(f"final test accuracy: {acc}")


In [10]:
c = lstm_model.state_dict()

In [11]:
epochs = 2
train_loop(lstm_model, train_dataloader=train_loader, validation_dataloader = test_loader, epochs=epochs)
# test_loop(test_dataloader=test_dataloader, model=model)

KeyboardInterrupt: 

Using best model in validation

In [None]:
best_model = RNN_module(hidden_size = hidden_size, input_size = input_size,
                     output_size = num_classes, num_layers = num_layers)

checkpoint = torch.load(BEST_PATH)
print(f'Model type: {best_model}')
print(f'Best performing model found at {checkpoint["epoch"]}ºepoch')

best_model.load_state_dict(state_dict=checkpoint['model_state'], strict=True)
best_model.eval()

# test_loop(test_dataloader=test_loader, model=best_model)

In [None]:
checkpoint = torch.load('./models_parameters/LSTM/checkpoints_2/model_1.pth')

a = checkpoint['model_state']

checkpoint = torch.load('./models_parameters/LSTM/checkpoints_2/model_69.pth')

b = checkpoint['model_state']

In [None]:
writer.close()