In [0]:
from google.colab import drive
drive.mount("/content/gdrive", force_remount=True)

Mounted at /content/gdrive


In [0]:
# root_dir = "/content/gdrive/My Drive/PDF slides/Term 7/50.039 Theory and practice of Deep Learning/HW5"
root_dir = "/content/gdrive/My Drive/" #CS root dir

from zipfile import ZipFile
import os

if not os.path.exists("data"):
    with ZipFile(os.path.join(root_dir, 'data.zip'), 'r') as zipObj:
    # Extract all the contents of zip file in current directory
        zipObj.extractall()

# Preprocess

In [0]:
from __future__ import unicode_literals, print_function, division
from io import open
import glob
import os
import torch
import unicodedata
import string
from tqdm.notebook import tqdm

class utils:
    def __init__(self):
        self.path = 'data/names/*.txt'
        self.all_letters = string.ascii_letters + " .,;'"
        self.n_letters = len(self.all_letters)
        self.category_lines = {}
        self.all_categories = []

        for filename in glob.glob(self.path):
            category = os.path.splitext(os.path.basename(filename))[0]
            self.all_categories.append(category)
            lines = self.readLines(filename)
            self.category_lines[category] = lines

        self.n_categories = len(self.all_categories)

    # Turn a Unicode string to plain ASCII, thanks to https://stackoverflow.com/a/518232/2809427
    def unicodeToAscii(self,s):
        return ''.join(
            c for c in unicodedata.normalize('NFD', s)
            if unicodedata.category(c) != 'Mn'
            and c in self.all_letters
        )
    # Build the category_lines dictionary, a list of names per language
    
    # Read a file and split into lines
    def readLines(self, filename):
        lines = open(filename, encoding='utf-8').read().strip().split('\n')
        return [self.unicodeToAscii(line) for line in lines]

    # Find letter index from all_letters, e.g. "a" = 0
    def letterToIndex(self,letter):
        return self.all_letters.find(letter)

    # Just for demonstration, turn a letter into a <1 x n_letters> Tensor
    def letterToTensor(self,letter):
        tensor = torch.zeros(1, self.n_letters)
        tensor[0][self.letterToIndex(letter)] = 1
        return tensor

    # Turn a line into a <line_length x 1 x n_letters>,
    # or an array of one-hot letter vectors
    def lineToTensor(self,line):
        tensor = torch.zeros(len(line), 1, self.n_letters)
        for li, letter in enumerate(line):
            tensor[li][0][self.letterToIndex(letter)] = 1
        return tensor

    def category_to_tensor(self, category):
        return torch.tensor([self.all_categories.index(category)], dtype=torch.long)

    def batch_categoryFromOutput(self, output):
        # print(output)
        output_tensor = torch.unbind(output, dim=0)
        ret_val = []
        for outp in output_tensor:
            top_n, top_i = outp.topk(1,0)
            category_i = top_i[0].item()
            ret_val.append(category_i)
        return ret_val

# Define model

In [0]:
import torch.nn as nn

class RNN(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(RNN, self).__init__()

        self.hidden_size = hidden_size

        self.i2h = nn.Linear(input_size + hidden_size, hidden_size)
        self.i2o = nn.Linear(input_size + hidden_size, output_size)
        self.softmax = nn.LogSoftmax(dim=1)

    def forward(self, input, hidden):
        combined = torch.cat((input, hidden), 1)
        hidden = self.i2h(combined)
        output = self.i2o(combined)
        output = self.softmax(output)
        return output, hidden

    def initHidden(self):
        return torch.zeros(1, self.hidden_size)

class LSTM_batchy(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, num_layers):
        super(LSTM_batchy, self).__init__()
        
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers)
        self.out = nn.Linear(hidden_size, output_size)
        self.softmax = nn.LogSoftmax(dim=-1)

    def forward(self, input, hidden_states):
        h0, c0 = hidden_states
        output, (hn, cn) = self.lstm(input, (h0,c0))
        output = self.out(output)
        output = self.softmax(output)
        return output, (hn, cn)

    def initHidden(self, batch_size):
        # return h0, c0
        self.batch_size = batch_size
        return (torch.zeros(self.num_layers*1, self.batch_size, self.hidden_size), torch.zeros(self.num_layers*1, self.batch_size, self.hidden_size))

class GRU(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(GRU, self).__init__()
        
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.gru = nn.GRU(input_size, hidden_size, 1)
        self.out = nn.Linear(hidden_size, output_size)
        self.softmax = nn.LogSoftmax(dim=-1)

    def forward(self, input, hidden):
        output, hidden = self.gru(input, hidden)
        output = self.out(output)
        output = self.softmax(output)
        # print(output, hidden)
        return output, hidden

    def initHidden(self, batch_size):
        self.batch_size = batch_size
        return torch.zeros(1, self.batch_size, self.hidden_size)

# Helper Functions

In [0]:
import random

def randomChoice(l):
    return l[random.randint(0, len(l) - 1)]

def randomTrainingExample():
    category = randomChoice(all_categories)
    line = randomChoice(category_lines[category])
    category_tensor = torch.tensor([all_categories.index(category)], dtype=torch.long)
    line_tensor = lineToTensor(line)
    return category, line, category_tensor, line_tensor

# Training the network

In [0]:
def train_lstm(criterion, model, device, optimizer, category_tensor, line_tensor, batch_size):
    hidden, cell = model.initHidden(batch_size)
    model.train()
    model.zero_grad()
    hidden, cell = hidden.to(device), cell.to(device)
    output, (hidden, cell) = model(line_tensor, (hidden, cell))
    output = output[-1]
    # print(output.shape, category_tensor.shape)
    # print("out:", output.shape, "out squeeze: ", output.squeeze(1).shape, "category:", category_tensor.shape, "hidden:", hidden.shape, "cell state:", cell.shape)
    loss = criterion(output.squeeze(1), category_tensor)
    loss.backward()

    # Add parameters' gradients to their values, multiplied by learning rate
    # for p in model.parameters():
    #     p.data.add_(-learning_rate, p.grad.data)
    optimizer.step()

    return output, loss.item()

def train_gru(criterion, model, device, optimizer, category_tensor, line_tensor, batch_size):
    model.train()
    hidden = model.initHidden(batch_size)
    # print(hidden.shape)
    model.zero_grad()

    hidden = hidden.to(device)
    output, hidden = model(line_tensor, hidden)
    output = output[-1]

    # print(output.shape, hidden.shape)

    loss = criterion(output, category_tensor)
    loss.backward()

    optimizer.step()
    return output, loss.item()

In [0]:
# define dataloader
import os
import torch
import random
import numpy as np
import matplotlib.pyplot as plt
from torch.utils.data import Dataset, DataLoader


class nameLanguageDataset(Dataset):
    """ Name Language Dataset """

    def __init__(self, language_name_dict, dataset_type, train_ratio, transform=None):
        self.keys = language_name_dict.keys()
        self.names = []
        self.labels = []
        self.dataset_type = dataset_type
        self.train_ratio = train_ratio
        self.transform=transform
        self.utils = utils()

        for key in self.keys:
            for label in language_name_dict[key]:
                self.names.append(label)
                self.labels.append(key)

        z = list(zip(self.names, self.labels))
        random.shuffle(z)
        self.names[:], self.labels[:] = zip(*z)

        self.split_index = int(len(self.names)*train_ratio)
        self.test_split_index = int(len(self.names)*(1+train_ratio)/2)
        self.names_train, self.names_val, self.names_test = self.names[:self.split_index], self.names[self.split_index:self.test_split_index], self.names[self.test_split_index:]
        self.labels_train, self.labels_val, self.labels_test = self.labels[:self.split_index], self.labels[self.split_index:self.test_split_index], self.labels[self.test_split_index:]

    def __len__(self):
        if self.dataset_type == "train":
            return len(self.names_train)
        elif self.dataset_type == "test":
            return len(self.names_test)
        elif self.dataset_type == "val":
            return len(self.names_val)
        else:
            return None

    def __getitem__(self, idx):
        if self.dataset_type == "train":
            name = self.names_train[idx]
            label = self.labels_train[idx]
            sample = (label, name, self.utils.category_to_tensor(label), self.utils.lineToTensor(name))

            return sample

        elif self.dataset_type == "val":
            name = self.names_val[idx]
            label = self.labels_val[idx]
            sample = (label, name, self.utils.category_to_tensor(label), self.utils.lineToTensor(name))

            return sample

        elif self.dataset_type == "test":
            name = self.names_test[idx]
            label = self.labels_test[idx]

            sample = (label, name, self.utils.category_to_tensor(label), self.utils.lineToTensor(name))

            return sample

from torch.nn.utils.rnn import pad_sequence
# from torch.autograd import Variable
import math

# create iterator to return batch size of tensors
class iteratefromDict():
    def __init__(self, dataset, batch_size):
        self.dataset = dataset
        self.batch_size = batch_size
        self.count = 0

    def __iter__(self):
        return self

    def __len__(self):
        return len(self.dataset)//self.batch_size + math.ceil(len(self.dataset) % self.batch_size)

    def __next__(self):
        if self.count == len(self.dataset):
            self.count = 0
            raise StopIteration()

        else:
            if len(self.dataset) - self.count < self.batch_size:
                self.batch_size = len(self.dataset) - self.count
            samples = [self.dataset[self.count+i][3] for i in range(0, self.batch_size)]
            labels = [self.dataset[self.count+i][2] for i in range(0, self.batch_size)]

            self.count += self.batch_size
            
            mini_batch_sample = pad_sequence(samples, padding_value = 0).squeeze(2)
            mini_batch_label = torch.Tensor(labels).long()

            return mini_batch_sample, mini_batch_label

In [0]:
def eval_model(model, model_type, device, criterion, batch_size, category_tensor, line_tensor):
    model.eval()
    if model_type == "gru":
        hidden = model.initHidden(batch_size)
        hidden = hidden.to(device)
        with torch.no_grad():
            output, hidden = model(line_tensor, hidden)
            
    else:
        hidden, cell = model.initHidden(batch_size)
        hidden = hidden.to(device)
        cell = cell.to(device)
        with torch.no_grad():
            output, (hidden, cell) = model(line_tensor, (hidden, cell))

    output = output[-1]
    loss = criterion(output.squeeze(1), category_tensor)
    
        
    return output, loss.item()

In [0]:
def eval_phase(dataloader, device, model, model_type, criterion):
    
    best_weights = None
    best_loss = 1000
    for iter, data in enumerate(dataloader):
        line_tensor, category_tensor = data[0].to(device), data[1].to(device)
        output, loss = eval_model(model, model_type, device, criterion, data[0].shape[1], category_tensor, line_tensor)
    return loss

In [0]:
def train_phase(trainloader, device, model, model_type, criterion, optimizer):
    train_loss = []
    for iter, data in enumerate(trainloader):
        line_tensor, category_tensor = data[0].to(device), data[1].to(device)
        if model_type == "lstm":
            output, loss = train_lstm(criterion, model, device, optimizer, category_tensor, line_tensor, data[0].shape[1])
        elif model_type == "gru":
            # todo: change to GRU method
            output, loss = train_gru(criterion, model, device, optimizer, category_tensor, line_tensor, data[0].shape[1])
        train_loss.append(loss)
    return np.mean(train_loss)

In [0]:
def test_phase(util_class, dataloader, device, model, model_type, criterion):
    best_weights = None
    best_loss = 1000
    correct = 0
    test_loss = []
    total = 0
    for iter, data in enumerate(dataloader):
        line_tensor, category_tensor = data[0].to(device), data[1].to(device)
        total += data[0].shape[1]
        output, loss = eval_model(model, model_type, device, criterion, data[0].shape[1], category_tensor, line_tensor)
        category = util_class.batch_categoryFromOutput(output)
        batch_correct = (torch.tensor(category) == category_tensor.cpu()).sum()
        correct += batch_correct
        test_loss.append(loss)
    acc = correct/float(total)
    return np.mean(test_loss), acc.data

In [0]:
import matplotlib.pyplot as plt
import torch.nn as nn

# name_lang_dataset_train = nameLanguageDataset(util_class.category_lines, "train", 0.8)
# name_lang_dataset_val = nameLanguageDataset(util_class.category_lines, "val", 0.8)
# name_lang_dataset_test = nameLanguageDataset(util_class.category_lines, "test", 0.8)

 
def train_test(util_class, model, model_type, device, dataset, batch_size, epochs):
    model.to(device)
    criterion = nn.NLLLoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=0.005)
    
    total = 0
    epochs_train_loss = []
    epochs_test_loss = []
    epochs_test_acc = []
    best_loss = 1000
    epoch_test_loss = []
    epoch_test_accuracy = []

    # define datasets
    name_lang_dataset_train = nameLanguageDataset(dataset, "train", 0.8)
    name_lang_dataset_val = nameLanguageDataset(dataset, "val", 0.8)
    name_lang_dataset_test = nameLanguageDataset(dataset, "test", 0.8)

    # define dataloaders
    trainloader = iteratefromDict(name_lang_dataset_train, batch_size)
    valloader = iteratefromDict(name_lang_dataset_val, batch_size)
    testloader = iteratefromDict(name_lang_dataset_test, batch_size)
    
    best_weights = None
    best_loss = 1000

    # starting of train val test loop
    for step in range(epochs):
        print('\n')
        print("Epoch %i" % step)
        print("="*10)
        train_loss = []
        val_loss = []
        current_loss = 0
        correct = 0
        total = 0

        # train loop
        
            
        loss_per_epoch = train_phase(trainloader, device, model, model_type, criterion, optimizer)
        epochs_train_loss.append(loss_per_epoch)
        print("Current training loss: %f at epoch %i" % (loss_per_epoch, step))
            
        # validation phase
        
        loss = eval_phase(valloader, device, model, model_type, criterion)
        print("Current val loss: %f at epoch: %i" % (loss, step))

        test_loss, test_acc = test_phase(util_class, testloader, device, model, model_type, criterion)
        print("Test loss: %f , test accuracy %f at epoch: %i" % (test_loss, test_acc, step))

        epochs_test_loss.append(test_loss)
        epochs_test_acc.append(test_acc)
    return epochs_test_loss, epochs_train_loss, epochs_test_acc

# Task 1

In [0]:
def taskA():
    util_class = utils()
    lstm_metrics = {}
    gru_metrics = {}
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    n_hidden = [128, 256]
    epochs = 8
    # rnn = RNN(util_class.n_letters, n_hidden, util_class.n_categories)
    for i in range(1,3):
        lstm_metrics[i] = {}
        if i == 1:
            gru_metrics[i] = {}
        for j in n_hidden:
            print("="*20)
            print("Layer: %i , hidden layer size: %i" % (i, j))
            print("="*20)
            if i == 1:
                print("++++++++++")
                print("+GRU phase")
                print("++++++++++")
                gru_batch = GRU(util_class.n_letters, j, util_class.n_categories)
                gepochs_test_loss, gepochs_train_loss, gepochs_test_acc = train_test(util_class, gru_batch, "gru", device, util_class.category_lines, 1, epochs)
                gru_metrics[i][j] = {"epoch_test_loss": gepochs_test_loss, "epochs_train_loss": gepochs_train_loss, "epochs_test_acc": gepochs_test_acc}
                print("="*40)
                print("++++++++++")
                print("+LSTM phase")
                print("++++++++++")
            lstm_batch = LSTM_batchy(util_class.n_letters, j, util_class.n_categories, i)
            epochs_test_loss, epochs_train_loss, epochs_test_acc = train_test(util_class, lstm_batch, "lstm", device, util_class.category_lines, 1, epochs)
            lstm_metrics[i][j] = {"epoch_test_loss": epochs_test_loss, "epochs_train_loss": epochs_train_loss, "epochs_test_acc":epochs_test_acc}
    return lstm_metrics, gru_metrics
    

In [0]:
def plot_taskA(lstm_met, gru_met):
    layer_nums = [1, 2]
    hidden_nums = [128, 256]
    for layers in layer_nums:
        for hidden_num in hidden_nums:
            plt.plot(lstm_met[layers][hidden_num]['epochs_test_acc'])
            plt.title(f"LSTM {layers} layer(s), {hidden_num} hidden")
            plt.show()
    plt.plot(gru_met[1][128]['epochs_test_acc'])
    plt.title(f"GRU 1 layer, 128 hidden")
    plt.show()

In [0]:
if __name__ == "__main__":
    lstm_metrics, gru_metrics = taskA()
    plot_taskA(lstm_metrics, gru_metrics)


Layer: 1 , hidden layer size: 128
++++++++++
+GRU phase
++++++++++


Epoch 0


KeyboardInterrupt: ignored

# Task 2

In [0]:
def taskB():
    util_class = utils()
    lstm_metrics = {}
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    n_hidden = 128
    n_layer = 1
    epochs = 10
    batch_sizes = [1,10,30]
    for batch_size in batch_sizes:
        lstm_batch = LSTM_batchy(util_class.n_letters, n_hidden, util_class.n_categories, n_layer)
        epochs_test_loss, epochs_train_loss, epochs_test_acc = train_test(util_class, lstm_batch, "lstm", device, util_class.category_lines, batch_size, epochs)
        lstm_metrics[batch_size] = {"epochs_test_loss": epochs_test_loss, "epochs_train_loss": epochs_train_loss, "epochs_test_acc":epochs_test_acc}
    return lstm_metrics

In [0]:
def plot_taskB(lstm_metrics):
    batch_sizes = [1,10,30]
    metrics = ['epochs_test_acc', 'epochs_train_loss', 'epochs_test_loss']
    for batch_size in batch_sizes:
        for metric in metrics:
            plt.plot(lstm_metrics[batch_size][metric])
            plt.title(f"LSTM trained on batch size {batch_size}. Metric: {metric}")
            plt.show()

In [0]:
if __name__ == "__main__":
    lstm_metrics = taskB()
    plot_taskB(lstm_metrics)
