In [1]:
import torch
from torch import nn, optim
from torch.autograd import Variable as var
from torch.nn import functional as F
import torchtext.vocab as vocab
from tqdm import tqdm
from pprint import pprint
import json
import _pickle as pkl

In [2]:
def clean(token):
    cleaned_token = token.strip(".,?!-:;'()[]\"`")
    if cleaned_token[-2:] == "'s":
        cleaned_token = cleaned_token[:-2]
    if cleaned_token[-2:] == "'t":
        cleaned_token = cleaned_token[:-2]+'t'
    return cleaned_token

glove = vocab.GloVe(name='6B', dim=50)

def get_vector(word):
    return glove.vectors[glove.stoi[word]]

glove.stoi['<unk>'] = len(glove.stoi)+1 # add token->index for unknown/oov
glove.vectors = torch.cat((glove.vectors, torch.zeros(1, 50))) # add index->vec for unknown/oov

def vectorize(input_txt, max_len, only_idx=True):
    input_seq = [clean(w) for w in input_txt if len(clean(w).strip())]
    glove_vec = []
    if only_idx:
        for w in input_seq:
            try:
                glove_vec.append(glove.stoi[w])
            except:
                glove_vec.append(400000)
        if len(glove_vec)<max_len:
            padding_zeros = [400000]*(max_len-len(glove_vec))
            glove_vec = padding_zeros + glove_vec
        return torch.LongTensor(glove_vec[:max_len])
    else: # add padding same as above - TODO
        glove_vec = [get_vector(w).tolist() for w in input_seq]
        return torch.FloatTensor(glove_vec[:max_len])
    
def make_data(raw_X):
    X = []
    y = []
    for (c, q, _) in raw_X:
        context_rep = vectorize(c.lower(), 600, only_idx=True)
        ques_rep = vectorize(q.lower(), 100, only_idx=True)
        X.append(torch.cat((context_rep, ques_rep))) #only context for now
    return X

In [3]:
with open('../data/data.json', 'r') as f:
    data = json.load(f)

In [4]:
print(glove.vectors.size())

torch.Size([400001, 50])


In [5]:
idx = 5
example_X = (data['X_train'][idx])
example_y = (data['y_train'][idx])
print("Context:", example_X[0])
print("Question:", example_X[1])
print("Answer Span:", example_y)
print("Answer:", example_X[2])
X = make_data([example_X])

Context: The storage in sensory memory and short-term memory generally has a strictly limited capacity and duration, which means that information is not retained indefinitely. By contrast, long-term memory can store much larger quantities of information for potentially unlimited duration (sometimes a whole life span). Its capacity is immeasurable. For example, given a random seven-digit number we may remember it for only a few seconds before forgetting, suggesting it was stored in our short-term memory. On the other hand, we can remember telephone numbers for many years through repetition; this information is said to be stored in long-term memory.
Question: Why can't some memories be held onto forever?
Answer Span: [0, 106]
Answer: The storage in sensory memory and short-term memory generally has a strictly limited capacity and duration


In [6]:
num_ex = 10
X_pass = make_data(data['X_train'][:num_ex])
y_pass = data['y_train'][:num_ex]

In [7]:
class ModelV1(nn.Module):
    def __init__(self, config):
        super(ModelV1, self).__init__()
        
        self.input_size = config.get("input_size", 700)
        self.hidden_size = config.get("hidden_size", 128)
        self.output_size = config.get("output_size", 5000)
        self.n_layers = config.get("n_layers", 1)
        self.vocab_size = config.get("vocab", 400001)
        self.emb_dim = config.get("embedding_dim", 50)
        self.bidir = config.get("Bidirectional", True)
        self.dirs = int(self.bidir)+1
        self.lr = config.get("learning_rate", 1e-3)
        self.batch_size = config.get("batch_size", 1)
        self.epochs = config.get("epochs", 2)
        self.opt = config.get("opt", "SGD")
        
        if self.opt == 'Adam':
            self.opt = optim.Adam
        else:
            self.opt = optim.SGD
        
        self.encoder = nn.Embedding(self.vocab_size, self.emb_dim)
        self.lstm = nn.LSTM(self.emb_dim, self.hidden_size, self.n_layers, bidirectional=self.bidir)
        self.decoder_start = nn.Linear(self.hidden_size, self.output_size)
        self.decoder_end = nn.Linear(self.hidden_size, self.output_size)
        self.init_weights()
    
    def init_weights(self):
        weight_scale = 0.01
        self.encoder.weight.data = glove.vectors
        self.decoder_start.bias.data.fill_(0)
        self.decoder_start.weight.data.uniform_(-weight_scale, weight_scale)
        self.decoder_end.bias.data.fill_(0)
        self.decoder_end.weight.data.uniform_(-weight_scale, weight_scale)

    def init_hidden(self):
        weight = next(self.parameters()).data
        return var(weight.new(self.n_layers*self.dirs, self.batch_size, self.hidden_size).zero_())
        
    def forward(self, inputs, hidden):
        inputs = inputs[0]
        embeds = self.encoder(var(torch.LongTensor(inputs).unsqueeze(1))) # get glove repr
        # print("embeds:", embeds.size())
        seq_len = embeds.size()[0]
        lstm_op, (hnext, cnext) = self.lstm(embeds, hidden)
        # print("lstm op:", lstm_op.size()) # (seq_len, bs, hidden_size*(dirs=2 for bi))
        lstm_op = lstm_op.permute(1, 0, 2) # (seq_len, bs, hdim)->(bs, seq_len, hdim)
        start_pred = lstm_op[:, 0, :self.hidden_size] # reverse direction
        end_pred = lstm_op[:, -1, self.hidden_size:] # forward direction
        # print("lstm start, end preds:", start_pred.size(), end_pred.size())
        out_start = F.log_softmax(F.relu(self.decoder_start(start_pred)), dim=-1)
        out_end = F.log_softmax(F.relu(self.decoder_end(end_pred)), dim=-1)
        # print("outs:", out_start.size(), out_end.size())
        return (out_start, out_end)

    def fit(self, X, y):
        opt = self.opt(self.parameters(), self.lr)
        losses = [] # epoch loss
        for epoch in range(self.epochs):
            print("epoch:", epoch, end=', loss: ')
            bs = 1
            bloss = 0.0 # batch loss
            for i in range(0, len(y)-bs+1, bs):
                h, c = self.init_hidden(), self.init_hidden()
                # print(h.size(), c.size())
                opt.zero_grad()
                Xb = X[i:i+bs]
                yb = var(torch.LongTensor(y[i:i+bs]))
                pred = self.forward(Xb, (h, c)) #prediction on batch features
                # print(pred[0].view(1, -1).size(), yb[0, :, :].size())
                loss = F.nll_loss(pred[0].view(1, -1), yb[:, 0]) \
                     + F.nll_loss(pred[1].view(1, -1), yb[:, 1])
                bloss += loss.data[0]
                # print(bloss)
                loss.backward()
                opt.step()
            losses.append(bloss)
            print(losses[-1], end=', change: ')
            if len(losses)>1:
                diff = losses[-2]-losses[-1]
                rel_diff = diff/losses[-2]
                print("%s"%rel_diff, "%")
            else:
                print("00.0%")
        return losses

    def predict(self, X):
        h, c = self.init_hidden(), self.init_hidden()
        s_pred, e_pred = self.forward(X, (h, c))
        _,  s_index = torch.max(s_pred, -1)
        _,  e_index = torch.max(e_pred, -1)
        return torch.cat((s_index, e_index), -1)

In [8]:
conf = {"learning_rate": 0.5, 
        "epochs": 5,
        "hidden_size": 2}
model = ModelV1(conf)

In [9]:
res = model.fit(X_pass, y_pass)

epoch: 0, loss: 

KeyboardInterrupt: 

In [None]:
for x, y in zip(data['X_train'][60:70], data['y_train'][60:70]):
    c = x[0]
    a = x[2]
    x = make_data([x])
    res = model.predict(x).data.tolist()
    print("Predicted span:", res)
    print("Predicted Answer:", c[res[0]:res[1]])
    print("Actual:", a)
    print("="*50)

In [None]:
import matplotlib.pyplot as plt

plt.plot(list(range(len(res))), res)
plt.show()