Basic Setup


In [141]:
pip install torch torchvision




In [142]:
import torch
print("cuda" if torch.cuda.is_available() else "cpu")
from google.colab import drive
drive.mount('/content/drive')
%cd /content/drive/My Drive/LSTM/

cuda
Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
/content/drive/My Drive/LSTM


Language Class

In [0]:
class Lang:
    def __init__(self, name):
        self.name = name
        self.word2index = {}
        self.word2count = {}
        self.index2word = {}
        self.n_words = 0

    def addSentence(self, sentence):
        for word in sentence.split(' '):
            self.addWord(word)

    def addWord(self, word):
        if word not in self.word2index:
            self.word2index[word] = self.n_words
            self.word2count[word] = 1
            self.index2word[self.n_words] = word
            self.n_words += 1
        else:
            self.word2count[word] += 1

Encoder GRU setup

In [0]:
import torch.nn as nn
import torch

class EncoderGRU(nn.Module):
    def __init__(self, input_size, hidden_size, dropout=0.1, bidirectional=True):
        super(EncoderGRU, self).__init__()
        self.bidirectional = bidirectional
        self.hidden_size = hidden_size
        self.gru = nn.GRU(input_size, hidden_size, bidirectional=bidirectional)

    def forward(self, input, hidden):
        output, hidden = self.gru(input, hidden)
        return output, hidden

    def initHidden(self, batch=1):
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        return torch.zeros(1 + self.bidirectional, batch, self.hidden_size, device=device)

Decoder GRU with Attention

In [0]:
class AttnDecoderGRU(nn.Module):
    def __init__(self, hidden_size, output_size, dropout=0.1, max_input_length=470):
        super(AttnDecoderGRU, self).__init__()
        self.hidden_size = hidden_size
        self.output_size = output_size
        # self.dropout_p = dropout_p
        self.max_input_length = max_input_length

        self.embedding = nn.Embedding(self.output_size, self.hidden_size)
        self.attn = nn.Linear(self.hidden_size * 2, self.max_input_length)
        self.attn_combine = nn.Linear(self.hidden_size * 3, self.hidden_size)
        # self.dropout = nn.Dropout(self.dropout_p)
        self.gru = nn.GRU(self.hidden_size, self.hidden_size)
        self.out = nn.Linear(self.hidden_size, self.output_size)

    def forward(self, input, hidden, encoder_outputs):
        embedded = self.embedding(input).view(1, 1, -1)
        # embedded = self.dropout(embedded)

        attn_weights = F.softmax(
            self.attn(torch.cat((embedded[0], hidden[0]), 1)), dim=1)
        attn_applied = torch.bmm(attn_weights.unsqueeze(0),
                                 encoder_outputs.unsqueeze(0))

        output = torch.cat((embedded[0], attn_applied[0]), 1)
        output = self.attn_combine(output).unsqueeze(0)

        output = F.relu(output)
        output, hidden = self.gru(output, hidden)

        output = F.log_softmax(self.out(output[0]), dim=1)
        return output, hidden, attn_weights

    def initHidden(self):
        return torch.zeros(1, 1, self.hidden_size, device=device)

Cross - Validation Fold generator

In [0]:
import random
import torch
from sklearn import model_selection

def indexesFromSentence(lang, sentence):
    return [lang.word2index[word] for word in sentence.split(' ')]


def tensorFromSentence(lang, sentence, device):
    indexes = indexesFromSentence(lang, sentence)
    return torch.tensor(indexes, dtype=torch.long, device=device).view(-1, 1)

def split(pairs, lang, device):
    train = []
    test = []
    for label in pairs:
        label_tensor = tensorFromSentence(lang, label, device)
        iters = pairs[label]
        test_index = random.randint(0, len(iters) - 1)
        accept_prob = random.random()
        for i in range(len(iters)):
            if i == test_index and len(iters) != 1 and accept_prob > 0.5:
                test.append([iters[i], label_tensor])
            else:
                train.append([iters[i], label_tensor])
    return train, test

def kfoldSplit(pairs, lang, device, split=10):
    folds = []
    inputs = []
    outputs = []
    for label in pairs:
        for iter in pairs[label]:
            inputs.append(iter)
            outputs.append(label)
    
    skf = model_selection.StratifiedKFold(n_splits=split, shuffle=True)
    indices = skf.split(inputs, outputs)

    for train_indices, test_indices in indices:
        curr_train = []
        curr_test = []
        for indices in train_indices:
            curr_train.append([inputs[indices], tensorFromSentence(lang,  outputs[indices], device)])
        for indices in test_indices:
            curr_test.append([inputs[indices], tensorFromSentence(lang,  outputs[indices], device)])
        folds.append([curr_train, curr_test])
    
    return folds
        


Accuracy calculator and result documentation

In [0]:
import torch


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


def evaluate(encoder, decoder, sentence, output_lang, sil0, sil1, max_input_length=470, max_output_length=5):
    with torch.no_grad():
        input_tensor = sentence
        input_length = len(sentence)
        encoder_hidden = encoder.initHidden()

        encoder_outputs = torch.zeros(max_input_length, encoder.hidden_size * 2, device=device)

        for ei in range(input_length):
            encoder_output, encoder_hidden = encoder(input_tensor[ei], encoder_hidden)
            encoder_outputs[ei] = encoder_output

        decoder_input = torch.tensor([[sil0]], device=device)
        decoder_attentions = torch.zeros(max_output_length, max_input_length)

        layers, batches, hidden_num = encoder_hidden.size()
        decoder_hidden = encoder_hidden.view(1,2,1,hidden_num)[0][1].view(1,1,hidden_num)
        decoded_words = []

        for di in range(max_output_length):
            decoder_output, decoder_hidden, decoder_attn = decoder(
                decoder_input, decoder_hidden, encoder_outputs)
            topv, topi = decoder_output.data.topk(1)
            decoder_attentions[di] = decoder_attn.data
            if topi.item() == sil1:
                decoded_words.append('sil1')
                break
            else:
                decoded_words.append(output_lang.index2word[topi.item()])

            decoder_input = topi.squeeze().detach()

        return decoded_words, decoder_attentions[:di+1]

def calculateTrainingAccuracy(encoder, decoder, pairs, output_lang, sil0, sil1, file_name=None, write = True, max_input_length=470, max_output_length=5):
    total = 0
    correct = 0
    results = None
    if write:
        results = open(file_name, 'w')
    attention = None
    for pair in pairs:
        output_words, attention = evaluate(encoder, decoder, pair[0], output_lang, sil0, sil1, max_input_length=max_input_length, max_output_length=max_output_length)
        output_sentence = ' '.join(output_words)
        sent = [output_lang.index2word[i.item()] for i in pair[1]]
        true_sentence = ' '.join(sent)
        if write:
            print('Predicted Sentence: ', output_sentence)
            print('True Sentence: ' , true_sentence)
            plt.matshow(attention.numpy())
            print('Predicted Sentence: ', output_sentence, file=results)
            print('True Sentence: ' , true_sentence, file=results)
        answer = None
        if output_sentence == true_sentence:
            correct += 1
            answer = "CORRECT"
        else:
            answer = "INCORRECT"
        total += 1
        if write:
            print('Result: ', answer, file=results)
    if write:
        print('Recognition Total: ', str(correct/total), file=results)
        results.close()
    return correct/total



LSTM training methods

In [0]:
import torch
import torch.nn as nn
import random 
import time
import torch.optim as optim
import math
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
import numpy as np
import copy

teacher_forcing_ratio = 0.5

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


def showPlot(points):
    plt.figure()
    fig, ax = plt.subplots()
    loc = ticker.MultipleLocator(base=0.2)
    ax.yaxis.set_major_locator(loc)
    plt.plot(points)
    plt.show()


def asMinutes(s):
    m = math.floor(s / 60)
    s -= m * 60
    return '%dm %ds' % (m, s)


def timeSince(since, percent):
    now = time.time()
    s = now - since
    es = s / (percent)
    rs = es - s
    return '%s (- %s)' % (asMinutes(s), asMinutes(rs))

def train(input_tensor, target_tensor, encoder, decoder, encoder_optimizer, decoder_optimizer, criterion, sil0, sil1, max_input_length = 470):
    encoder_hidden = encoder.initHidden()

    encoder_optimizer.zero_grad()
    decoder_optimizer.zero_grad()

    input_length = len(input_tensor)
    target_length = target_tensor.size(0)

    loss = 0

    encoder_outputs = torch.zeros(max_input_length, encoder.hidden_size*2, device=device)

    for ei in range(input_length):
        encoder_output, encoder_hidden = encoder(input_tensor[ei], encoder_hidden)
        encoder_outputs[ei] = encoder_output
    
    print(encoder_outputs)
    print(encoder_outputs.size())

    decoder_input = torch.tensor([[sil0]], device=device)

    layers, batches, hidden_num = encoder_hidden.size()
    decoder_hidden = encoder_hidden.view(1,2,1,hidden_num)[0][1].view(1,1,hidden_num)

    use_teacher_forcing = True if random.random() < teacher_forcing_ratio else False

    if use_teacher_forcing:
        # Teacher forcing: Feed the target as the next input
        for di in range(target_length):
            decoder_output, decoder_hidden, _ = decoder(
                decoder_input, decoder_hidden, encoder_outputs)
            loss += criterion(decoder_output, target_tensor[di])
            decoder_input = target_tensor[di]  # Teacher forcing

    else:
        # Without teacher forcing: use its own predictions as the next input
        for di in range(target_length):
            decoder_output, decoder_hidden, _ = decoder(
                decoder_input, decoder_hidden, encoder_outputs)
            topv, topi = decoder_output.topk(1)
            decoder_input = topi.squeeze().detach()  # detach from history as input

            loss += criterion(decoder_output, target_tensor[di])
            if decoder_input.item() == sil1:
                break

    loss.backward()

    encoder_optimizer.step()
    decoder_optimizer.step()

    return loss.item() / target_length

def testSetLoss(encoder, decoder, input_tensor, target_tensor, criterion, sil0, sil1, max_input_length=470):
    with torch.no_grad():
        input_tensor = input_tensor
        input_length = len(input_tensor)
        target_length = target_tensor.size(0)

        loss = 0

        encoder_hidden = encoder.initHidden()
        encoder_outputs = torch.zeros(max_input_length, encoder.hidden_size * 2, device=device)

        for ei in range(input_length):
            encoder_output, encoder_hidden = encoder(input_tensor[ei], encoder_hidden)
            encoder_outputs[ei] = encoder_output

        decoder_input = torch.tensor([[sil0]], device=device)

        layers, batches, hidden_num = encoder_hidden.size()
        decoder_hidden = encoder_hidden.view(1,2,1,hidden_num)[0][1].view(1,1,hidden_num)
        
        for di in range(target_length):
            decoder_output, decoder_hidden, _ = decoder(decoder_input, decoder_hidden, encoder_outputs)
            topv, topi = decoder_output.data.topk(1)
            loss += criterion(decoder_output, target_tensor[di])
            if topi.item() == sil1:
                break
            decoder_input = topi.squeeze().detach()

        return loss.item() / target_length

def trainIters(encoder, decoder, epochs, train_set, test_set, sil0, sil1, output_lang, lr=1e-4, lr_decay=1, lr_drop_epoch=10, l2_penalty = 0, max_input_length=470, max_output_length = 6):
    start = time.time()
    plot_losses = []
    print_loss_total = 0  # Reset every print_every
    test_loss_total = 0

    encoder_optimizer = optim.Adam(encoder.parameters(), lr=lr, weight_decay = l2_penalty)
    decoder_optimizer = optim.Adam(decoder.parameters(), lr=lr, weight_decay = l2_penalty)

    best_test_acc = -1
    best_encoder = None
    best_decoder = None

    criterion = nn.NLLLoss()

    for iter in range(1, epochs + 1):
        if iter == lr_drop_epoch:
            encoder_optimizer = optim.Adam(encoder.parameters(), lr=lr * (lr_decay)**(iter), weight_decay = l2_penalty)
            decoder_optimizer = optim.Adam(decoder.parameters(), lr=lr * (lr_decay)**(iter), weight_decay = l2_penalty)

        for pairs in train_set:
            input_tensor = pairs[0]
            target_tensor = pairs[1]
            loss = train(input_tensor, target_tensor, encoder,
                        decoder, encoder_optimizer, decoder_optimizer, criterion, sil0, sil1, max_input_length=max_input_length)
            print_loss_total += loss

        for pair in test_set:
            input_tensor = pair[0]
            target_tensor = pair[1]
            test_loss_total += testSetLoss(encoder, decoder, input_tensor, target_tensor, criterion, sil0, sil1, max_input_length=max_input_length)

        print_loss_avg = print_loss_total / len(train_set)
        test_loss_avg = test_loss_total / len(test_set)
        print_loss_total = 0
        test_loss_total = 0
        test_acc = calculateTrainingAccuracy(encoder, decoder, test_set, output_lang, sil0, sil1, write=False, max_input_length=max_input_length, max_output_length=max_output_length)
        train_acc = calculateTrainingAccuracy(encoder, decoder, train_set, output_lang, sil0, sil1, write=False, max_input_length=max_input_length, max_output_length=max_output_length)
        print('%s (%d %d%%) train loss: %.4f train acc: %.4f test loss: %.4f test acc: %.4f' % (timeSince(start, iter / epochs),
                                        iter, iter / epochs * 100, print_loss_avg, train_acc, test_loss_avg, test_acc))
        
        if test_acc > best_test_acc:
            best_test_acc = test_acc
            best_encoder = copy.deepcopy(encoder)
            best_decoder = copy.deepcopy(decoder)

        plot_losses.append(test_loss_avg)

    showPlot(plot_losses)
    return best_encoder, best_decoder

Main script - uses above files to run everything

In [149]:
import torch
import torch.nn as nn
from torch import optim
import torch.nn.functional as F
import glob
import random
import math

#########    HYPERPARAMETERS   ############
random.seed(42)
users = ["Ravi"]
file_name = "Ravi"
num_features = 0
hidden_size = 64
epochs = 256
limit_features = False
lr = 1e-4
lr_decay = 0.95
lr_drop = 20
dropout = 0
num_layers = 1
k_fold = False
folds = 5
bidirectional = True
expansion_factor = 2
l2_penalty = 0
###########################################

sil0 = 0
sil1 = 0

def expand(dataset_as_array, factor):
    expanded_array = []
    for pair in dataset_as_array:
        content = pair[0]
        label = pair[1]

        expanded_pair = [[[],label] for i in range(factor)]
        for frame in range(len(content)):
            expanded_pair[frame % factor][0].append(content[frame])
        expanded_array.extend(expanded_pair)
    return expanded_array


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
eng = Lang("english")
pairs = {}
max_input_length = 0
max_output_length = 0
print("Reading data from files...")
for user in users:
    for file in glob.glob("data/"+user+"/*.ark"):
        label = "sil0_"+file.split(".")[1]+"_sil1"
        label = label.replace("_", " ")
        eng.addSentence(label)

        max_output_length = max(max_output_length - 1, len(label.split(" ")))

        sil0 = eng.word2index["sil0"]
        sil1 = eng.word2index["sil1"]
        content = []
        f = open(file)
        for x in f:
            line = x
            if "[" in x:
                line = x.split("[ ")[1]
            elif "]" in x:
                line = x.split("]")[0]
            features = []
            line = line.strip("\n").split(" ")
            if limit_features:
                line = line[-num_features:]
            for f in line:
                try:
                    features.append(float(f)*1000)
                except:
                    pass
            if len(features) != 0:
                num_features = len(features)
                content.append(torch.tensor(features, dtype=torch.float, device=device).view(1, 1, -1))
        max_input_length = max(max_input_length, math.ceil(len(content)/2))
        if label in pairs:
            temp = pairs[label]
            temp.append(content)
            pairs[label] = temp
        else:
            pairs[label] = [content]

print("Max Input length = "+str(max_input_length) + " Max output length = " + str(max_output_length))

for label in pairs:
    print("Label = " + label + " Number of iterations = " + str(len(pairs[label])))
if not k_fold:    
    print("Splitting data into train and test...")
    train_set, test_set = split(pairs, eng, device)
    train_set, test_set = expand(train_set, expansion_factor), expand(test_set, expansion_factor)
    encoder = EncoderGRU(num_features, hidden_size, dropout=dropout, bidirectional=bidirectional).to(device)
    decoder = AttnDecoderGRU(hidden_size, eng.n_words, dropout=dropout, max_input_length=max_input_length).to(device)
    print("Split done. Elements in train: %d and elements in test: %d. Starting training..." % (len(train_set), len(test_set)))
    best_encoder, best_decoder = trainIters(encoder, decoder, epochs, train_set, test_set, sil0, sil1, eng, lr=lr, lr_decay=lr_decay, lr_drop_epoch=lr_drop, l2_penalty=l2_penalty, max_input_length=max_input_length, max_output_length=max_output_length)
    print("Training done. Printing stats to file....")
    calculateTrainingAccuracy(best_encoder, best_decoder, test_set, eng, sil0, sil1, 'results/'+file_name+'/results.txt')
    print("Saving Models")
    torch.save(best_encoder.state_dict(), "models/"+file_name+"/encoderLSTM.pt")
    torch.save(best_decoder.state_dict(), "models/"+file_name+"/decoderLSTM.pt")

else:
    print("Generating folds...")
    trainTestFolds = kfoldSplit(pairs, eng, device, split=folds)
    print("Fold generation done...")
    fold_num = 1
    for curr_fold in trainTestFolds:
        encoder = EncoderGRU(num_features, hidden_size, dropout, bidirectional=bidirectional).to(device)
        decoder = AttnDecoderGRU(hidden_size, eng.n_words, bidirectional=bidirectional, max_input_length=max_input_length).to(device)
        print("Starting training on fold %d. %d elements in curr_fold[0] and %d in curr_fold[1]" % (fold_num, len(curr_fold[0]), len(curr_fold[1])))
        best_encoder, best_decoder = trainIters(encoder, decoder, epochs, curr_fold[0], curr_fold[1], sil0, sil1, eng, lr=lr, lr_decay=lr_decay, lr_drop_epoch=lr_drop, l2_penalty=l2_penalty, max_input_length=max_input_length, max_output_length=max_output_length)
        print("Training done. Saving predictions to file...")
        calculateTrainingAccuracy(best_encoder, best_decoder, curr_fold[1], eng, sil0, sil1, 'results/'+file_name+'/results_fold'+str(fold_num)+'.txt')
        print("Saving Models")
        torch.save(best_encoder.state_dict(), "models/"+file_name+"/encoderLSTM_fold"+str(fold_num)+".pt")
        torch.save(best_decoder.state_dict(), "models/"+file_name+"/decoderLSTM_fold"+str(fold_num)+".pt")
        fold_num += 1



Reading data from files...
Max Input length = 231 Max output length = 6
Label = sil0 monkey in blue box sil1 Number of iterations = 3
Label = sil0 alligator above wall sil1 Number of iterations = 3
Label = sil0 snake below bed sil1 Number of iterations = 3
Label = sil0 lion above flowers sil1 Number of iterations = 3
Label = sil0 monkey below wagon sil1 Number of iterations = 3
Label = sil0 snake above box sil1 Number of iterations = 3
Label = sil0 blue alligator above grey wall sil1 Number of iterations = 2
Label = sil0 snake in grey wagon sil1 Number of iterations = 3
Label = sil0 alligator in wagon sil1 Number of iterations = 3
Label = sil0 lion above blue bed sil1 Number of iterations = 3
Label = sil0 white snake in blue flowers sil1 Number of iterations = 3
Label = sil0 orange monkey in grey box sil1 Number of iterations = 3
Label = sil0 white alligator above blue wall sil1 Number of iterations = 3
Label = sil0 monkey in orange flowers sil1 Number of iterations = 3
Label = sil0 li

KeyboardInterrupt: ignored