## Long-Short Term Memory (LSTM) for Part-of-Speech Tagging
- [Bi-LSTM in TF example](https://github.com/monikkinom/ner-lstm)

```bash
# Install PyTorch : Linux, Pip, Python 2.7, CUDA 8.0
!pip install http://download.pytorch.org/whl/cu80/torch-0.3.0.post4-cp27-cp27mu-linux_x86_64.whl 
```

In [1]:
from __future__ import print_function

import os
import numpy as np

import torch
import torch.autograd as autograd
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

- Let input sentence be $w_1, ..., w_M$, where $w_i \in V$, $V$ is vocabulary
- Let $T$ be tag set and $y_i$ the tag of word $w_i$
- Prediction of the tag of word $w_i$ is denoted by $\hat{y_i}$ and output is a sequence $\hat{y_1}, ..., \hat{y_M}$, where $\hat{y_i} \in T$
- To predict, pass an LSTM over the sentence. Hidden state at time step $i$ is denoted by $h_i$. Also assign each tag a unique index. Then prediction rule for $\hat{y_i}$ is 

$$\hat{y_i}=argmax_j (log \text{ }Softmax(Ah_i+b))_j$$

In [2]:
# Prepare data

training_data = [
    ('The dog ate the apple'.split(), ["DET", "NN", "V", "DET", "NN"]),
    ('Everybody read that book'.split(), ["NN", "V", "DET", "NN"])
]

testing_data = [
    ('The dog ate the book'.split(), ["DET", "NN", "V", "DET", "NN"]),
]

word_to_idx = {}
for sent, tags in training_data:
    for word in sent:
        if word not in word_to_idx:
            word_to_idx[word] = len(word_to_idx)
            
print('word_to_idx: ', word_to_idx)

tag_to_idx = {'DET': 0, 'NN': 1, 'V': 2}

def prepare_sequence(seq, to_idx):
    indices = [to_idx[w] for w in seq]
    tensor = torch.LongTensor(indices)
    return autograd.Variable(tensor)

word_to_idx:  {'Everybody': 5, 'ate': 2, 'apple': 4, 'that': 7, 'read': 6, 'dog': 1, 'book': 8, 'the': 3, 'The': 0}


In [3]:
## Hyper-parameters

# Development so DIM is small
EMBEDDING_DIM = 6 
HIDDEN_DIM = 6 
num_epochs = 25
learning_rate = 0.1
best_accuracy = torch.FloatTensor([0])
start_epoch = 0

# Path to saved model weights (as hdf5)
resume_weights = './lstm-ner/checkpoint.pth.tar'

# CUDA
cuda = False # torch.cuda.is_available()

# Seed for reproducibility 
torch.manual_seed(1)
if cuda:
    torch.cuda.manual_seed(1)

In [4]:
# Create the model
class LSTM_Tagger(nn.Module):
    
    def __init__(self, embedding_dim, hidden_dim, vocab_size, tagset_size):
        super(LSTM_Tagger, self).__init__()
        self.hidden_dim = hidden_dim
        self.word_embeddings = nn.Embedding(vocab_size, embedding_dim)
        
        # LSTM takes word embeddings as inputs and outputs hidden states with dimensionality hidden dim
        self.lstm = nn.LSTM(embedding_dim, hidden_dim)
        
        # Linear layer that maps from hidden state space to tag space
        self.hidden_to_tag = nn.Linear(hidden_dim, tagset_size)
        self.hidden = self.init_hidden()
        
    def init_hidden(self):
        # Initially there is no hidden state
        # torch.zeros(num_layers, minibatch_size, hidden_dim)
        return (autograd.Variable(torch.zeros(1, 1, self.hidden_dim)),
                autograd.Variable(torch.zeros(1, 1, self.hidden_dim)))
    
    def forward(self, sentence):
        embeddings = self.word_embeddings(sentence)
        lstm_out, self.hidden = self.lstm(
            embeddings.view(len(sentence), 1, -1), self.hidden)
        tag_space = self.hidden_to_tag(lstm_out.view(len(sentence), -1))
        tag_scores = F.log_softmax(tag_space, dim=1)
        return tag_scores

In [5]:
# Helper functions
def train(model, optimizer, loss_fn, train_data):
    """
    train_data -- list of tuples, e.g. [(['dog', 'ate'], ['NN', 'V']), (...), ...]
    """
    for sentence, tags in train_data:
        
        # Prepare inputs
        sentence_in = prepare_sequence(sentence, word_to_idx)
        targets = prepare_sequence(tags, tag_to_idx)
        
        if cuda:
            sentence_in, targets = sentence_in.cuda(), targets.cuda()
                    
        # Clear gradients as Pytorch accumulates gradients
        model.zero_grad()
        
         # Clear out the hidden state of LSTM
        model.hidden = model.init_hidden()
        
        # Forward pass
        tag_scores = model(sentence_in) # Element i, j is the score for tag j for word i
        
        # Compute loss
        loss = loss_function(tag_scores, targets)
        
        # Backward + Optimize
        loss.backward()
        optimizer.step()
        
def evaluate(model, data):
    model.eval()
    
    correct = 0
    for sentence, tags in data:
        
        # Prepare inputs
        sentence_in = prepare_sequence(sentence, word_to_idx)
        targets = prepare_sequence(tags, tag_to_idx)
        
        if cuda:
            sentence_in, targets = sentence_in.cuda(), targets.cuda()
            
        # Forward pass
        tag_scores = model(sentence_in)
        
        # Load output to CPU
        if cuda:
            tag_scores.cpu()
            
        # Prediction
        _, pred = torch.max(tag_scores, 1) # argmax for axis 1 
        
        # Compute accuracy
        correct += torch.equal(pred, targets)
        
    return correct/float(len(data))

# Keep only a single checkpoint, the best over test accuracy.
def save_checkpoint(state, is_best, filename=None):
    """Save checkpoint if a new best is achieved"""
    if is_best:
        print ("=> Saving a new best")
        torch.save(state, filename)  # save checkpoint
    else:
        print ("=> Validation Accuracy did not improve")

In [6]:
# Define model and loss function
model = LSTM_Tagger(EMBEDDING_DIM, HIDDEN_DIM, len(word_to_idx), len(tag_to_idx))
loss_function = nn.NLLLoss()

# If GPU available then load the model on GPU
if cuda:
    model.cuda()
    loss_function.cuda()

optimizer = optim.SGD(model.parameters(), lr=learning_rate)

In [7]:
# If best model weights exist then load it
if os.path.isfile(resume_weights):
    print('Loading checkpoint: "{}" ...'.format(resume_weights))
    
    # Load weights
    checkpoint = torch.load(resume_weights)
    
    start_epoch = checkpoint['epoch']
    best_accuracy = checkpoint['best_accuracy']
    model.load_state_dict(checkpoint['state_dict'])
    print('Loaded checkpoint "{}" (trained for {} epochs)'.format(resume_weights, start_epoch))

Loading checkpoint: "./lstm-ner/checkpoint.pth.tar" ...
Loaded checkpoint "./lstm-ner/checkpoint.pth.tar" (trained for 129 epochs)


In [8]:
# Train the model
for epoch in range(num_epochs):
    train(model, optimizer, loss_function, training_data)
    acc = evaluate(model, testing_data)
    print('Epoch-{} Test Set: Accuracy: {:.2f}'.format(epoch, acc))
    
    acc = torch.FloatTensor([acc])
    
    # Get bool not ByteTensor
    is_best = bool(acc.numpy() > best_accuracy.numpy())
    
    # Get greater tensor to keep track of best_accuracy
    best_accuracy = torch.FloatTensor(max(acc.numpy(), best_accuracy.numpy()))
    
    # Save checkpoint
    save_checkpoint({'epoch': start_epoch + epoch + 1,
                     'state_dict': model.state_dict(),
                     'best_accuracy': best_accuracy,}, is_best, filename=resume_weights)

Epoch-0 Test Set: Accuracy: 1.00
=> Validation Accuracy did not improve
Epoch-1 Test Set: Accuracy: 1.00
=> Validation Accuracy did not improve
Epoch-2 Test Set: Accuracy: 1.00
=> Validation Accuracy did not improve
Epoch-3 Test Set: Accuracy: 1.00
=> Validation Accuracy did not improve
Epoch-4 Test Set: Accuracy: 1.00
=> Validation Accuracy did not improve
Epoch-5 Test Set: Accuracy: 1.00
=> Validation Accuracy did not improve
Epoch-6 Test Set: Accuracy: 1.00
=> Validation Accuracy did not improve
Epoch-7 Test Set: Accuracy: 1.00
=> Validation Accuracy did not improve
Epoch-8 Test Set: Accuracy: 1.00
=> Validation Accuracy did not improve
Epoch-9 Test Set: Accuracy: 1.00
=> Validation Accuracy did not improve
Epoch-10 Test Set: Accuracy: 1.00
=> Validation Accuracy did not improve
Epoch-11 Test Set: Accuracy: 1.00
=> Validation Accuracy did not improve
Epoch-12 Test Set: Accuracy: 1.00
=> Validation Accuracy did not improve
Epoch-13 Test Set: Accuracy: 1.00
=> Validation Accuracy did 

In [9]:
# Predictions
yt = prepare_sequence(testing_data[0][1], tag_to_idx)
ts = model(prepare_sequence(testing_data[0][0], word_to_idx))
_, yh = torch.max(ts, 1)
print('Test: ', torch.equal(yh, yt))

yt = prepare_sequence(training_data[0][1], tag_to_idx)
ts = model(prepare_sequence(training_data[0][0], word_to_idx))
_, yh = torch.max(ts, 1)
print('Train: ', torch.equal(yh, yt))

Test:  True
Train:  True
