# Imports

In [1]:
# !pip install numpy
# !pip install torch
# !pip install sklearn
# !pip install pickle

import torch
import torch.nn as nn
import torch.optim as optim
from torch.autograd import Variable
from torch.utils.data import Dataset, TensorDataset, DataLoader

import numpy as np
import math
import os.path
from collections import defaultdict
import pickle

## Ensure reproducibility
Use a fixed seed such that all steps and results can be reproduced.

In [2]:
SEED = 544

np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed(SEED)
torch.backends.cudnn.deterministic = True

# Credit: From PyTorch's documentation
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

# Data Loading

In [3]:
def read_data(fname, test_dataset=False):
    sentences = []
    with open(fname, 'r') as f:
        lines = [line.strip() for line in f.readlines()]
        sentence_words = []
        sentence_tags = []
        for line in lines:
            if line:
                # test data has only index and word
                if test_dataset:
                    index, word = line.split()
                    sentence_words.append(word)
                # train/dev data has index, word, and tag
                else:
                    index, word, tag = line.split()
                    sentence_words.append(word)
                    sentence_tags.append(tag)
            else:
                # Create a sentence upon reaching an empty new line
                if test_dataset:
                    sentences.append(sentence_words)
                else:
                    sentences.append((sentence_words, sentence_tags))
                sentence_words = []
                sentence_tags = []
        # Create a sentence for the last sentence in the document
        # incase it missed a newline in the document at the end
        if len(sentence_words) > 0:
            if test_dataset:
                sentences.append(sentence_words)
            else:
                sentences.append((sentence_words, sentence_tags))
    return sentences

## Read all datasets

In [4]:
# Read all datasets given
dev_data = read_data('data/dev')
test_data = read_data('data/test', test_dataset=True)

## Utility functions to process data

In [5]:
def prepare_sequence(seq, to_ix, use_unk=False):
    if use_unk:
        indices = [to_ix[w] if w in to_ix else to_ix['<UNK>'] for w in seq]
    else:
        indices = [to_ix[w] for w in seq]
    return indices

def get_spelling_feature(sentence):
    result = []
    for word in sentence:
        # PAD = 0
        if word == '<PAD>':
            result.append(0)
        ## ALL LOWER = 1
        elif word.islower():
            result.append(1)
        # ALL UPPER = 2
        elif word.isupper():
            result.append(2)
        # FIRST UPPER = 3
        elif word[0].isupper():
            result.append(3)
        # OTHERS = 4
        else:
            result.append(4)
    return result

## Dataset Implementation

In [6]:
class NERDataset(Dataset):
    def __init__(self, data):
        # Retrieves longest sentence, for padding
        max_sentence_len = max([len(sentence) for sentence, tags in data])
        self.X = []
        self.X_original = []
        self.y = []
        self.X_spelling = []
        
        for sentence, tags in data:
            # Pad the sentences to the same length
            padded_sentence = sentence.copy()
            padded_tags = tags.copy()
            while len(padded_sentence) < max_sentence_len:
                padded_sentence.append('<PAD>')
                padded_tags.append('<PAD>')
            # Convert to indices
            transformed_sentence = prepare_sequence(padded_sentence, word_to_ix, use_unk=True)
            transformed_tags = prepare_sequence(padded_tags, tag_to_ix)
            # Get spelling indices
            spelling_sentence = get_spelling_feature(padded_sentence)
            # Add to dataset
            self.X.append(transformed_sentence)
            self.X_original.append(padded_sentence)
            self.y.append(transformed_tags)
            self.X_spelling.append(spelling_sentence)
            
        self.X = torch.from_numpy(np.array(self.X, dtype=np.int64)).to(device)
        self.y = torch.from_numpy(np.array(self.y, dtype=np.int64)).to(device)
        self.X_spelling = torch.from_numpy(np.array(self.X_spelling, dtype=np.int64)).to(device)
    
    def __len__(self):
        return len(self.X)
    
    def __getitem__(self, index):
        return self.X[index], self.y[index], self.X_original[index], self.X_spelling[index]

# Task 1: Simple Bidirectional LSTM model

## Load vocab and word/tag -> index, and index -> word/tag

In [7]:
with open('word_to_ix_1.pkl', 'rb') as f:
    word_to_ix = pickle.load(f)
    
with open('tag_to_ix_1.pkl', 'rb') as f:
    tag_to_ix = pickle.load(f)
           
# Generate index to word/tag mappings
ix_to_word = {v: k for k, v in word_to_ix.items()}
ix_to_tag = {v: k for k, v in tag_to_ix.items()}

# Calculate the size of vocabulary & tags
VOCAB_SIZE = len(word_to_ix)
TAGS_SIZE = len(tag_to_ix)

## Bidirectional LSTM Model with random embeddings

In [8]:
class BLSTM1(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, linear_dim, tags_size, lstm_dropout, elu_alpha):
        super(BLSTM1, self).__init__()
        self.hidden_dim = hidden_dim
        self.embeddings = nn.Embedding(vocab_size, embedding_dim, padding_idx=word_to_ix['<PAD>'])
        self.dropout_pre_lstm = nn.Dropout(lstm_dropout)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, batch_first=True, bidirectional=True)
        self.dropout_post_lstm = nn.Dropout(lstm_dropout)
        self.linear = nn.Linear(hidden_dim * 2, linear_dim)
        self.elu = nn.ELU(alpha=elu_alpha)
        self.linear2 = nn.Linear(linear_dim, tags_size)
    
    def forward(self, x):
        x = self.embeddings(x)
        x = self.dropout_pre_lstm(x)
        
        h0 = torch.zeros(2, x.size(0), self.hidden_dim).to(device)
        c0 = torch.zeros(2, x.size(0), self.hidden_dim).to(device)
        out, _ = self.lstm(x, (h0, c0))
        
        out = self.dropout_post_lstm(out)
        out = self.linear(out)
        out = self.elu(out)
        out = self.linear2(out)
    
        return out

## Utility Functions for Prediction

In [9]:
# Used to predict on a development data loader
# Writes the output to a file, i.e. to dev.out
def predict_dev1(model, data_loader, fname):
    outputs = []
    model.eval()
    with torch.no_grad():
        for X, y, X_original, X_spelling in data_loader:
            X, y = X.to(device), y.to(device)

            y_pred_scores = model(X)
            y_pred = torch.argmax(y_pred_scores, dim=2)
            y_pred_flat = torch.flatten(y_pred).tolist()

            idx = 1
            output = []
            for i in range(len(y_pred_flat)):
                word = X_original[i][0]
                pred = ix_to_tag[y_pred_flat[i]]
                if word == '<PAD>':
                    break
                output.append((idx, word, pred))
                idx += 1
            outputs.append(output)

    with open(fname, 'w') as f:
        for i in range(len(outputs)):
            for j in range(len(outputs[i])):
                idx, word, pred = outputs[i][j]
                f.write(f'{idx} {word} {pred}\n')
            if i != len(outputs)-1:
                f.write('\n')

# Used to predict on a test data, list of sentences
# Writes the output to a file, i.e. to test.out
def predict_test1(model, sentences, fname):
    outputs = []
    model.eval()
    with torch.no_grad():
        for sentence in sentences:
            spelling_sentence = [get_spelling_feature(sentence)]
            spelling_sentence = torch.from_numpy(np.array(spelling_sentence, dtype=np.int64)).to(device)
            
            transformed_sentence = [prepare_sequence(sentence, word_to_ix, use_unk=True)]
            transformed_sentence = torch.from_numpy(np.array(transformed_sentence, dtype=np.int64)).to(device)
            
            y_pred_scores = model(transformed_sentence)
            y_pred = torch.argmax(y_pred_scores, dim=2)
            y_pred_flat = torch.flatten(y_pred).tolist()

            idx = 1
            output = []
            for i in range(len(y_pred_flat)):
                word = sentence[i]
                pred = ix_to_tag[y_pred_flat[i]]
                if word == '<PAD>':
                    break
                output.append((idx, word, pred))
                idx += 1
            outputs.append(output)

    with open(fname, 'w') as f:
        for i in range(len(outputs)):
            for j in range(len(outputs[i])):
                idx, word, pred = outputs[i][j]
                f.write(f'{idx} {word} {pred}\n')
            if i != len(outputs)-1:
                f.write('\n')
                
# Used to predict on a development data loader
# Writes the output to a file for PERL script, i.e. to prediction.txt
def predict_perl1(model, data_loader, fname):
    outputs = []
    model.eval()
    with torch.no_grad():
        for X, y, X_original, X_spelling in data_loader:
            X, y = X.to(device), y.to(device)

            y_pred_scores = model(X)
            y_pred = torch.argmax(y_pred_scores, dim=2)
            y_pred_flat = torch.flatten(y_pred).tolist()
            y_flat = torch.flatten(y).tolist()

            idx = 1
            output = []
            for i in range(len(y_pred_flat)):
                word = X_original[i][0]
                gold = ix_to_tag[y_flat[i]]
                pred = ix_to_tag[y_pred_flat[i]]
                if word == '<PAD>':
                    break
                output.append((idx, word, gold, pred))
                idx += 1
            outputs.append(output)

    with open(fname, 'w') as f:
        for i in range(len(outputs)):
            for j in range(len(outputs[i])):
                idx, word, gold, pred = outputs[i][j]
                f.write(f'{idx} {word} {gold} {pred}\n')
            if i != len(outputs)-1:
                f.write('\n')

## Convert dev data to data loader

In [10]:
dev_dataset = NERDataset(dev_data)
dev_loader = DataLoader(dev_dataset, batch_size=1, shuffle=False)

## Load model

In [11]:
model_1 = torch.load('blstm1.pt')
model_1.to(device)

BLSTM1(
  (embeddings): Embedding(8129, 100, padding_idx=0)
  (dropout_pre_lstm): Dropout(p=0.33, inplace=False)
  (lstm): LSTM(100, 256, batch_first=True, bidirectional=True)
  (dropout_post_lstm): Dropout(p=0.33, inplace=False)
  (linear): Linear(in_features=512, out_features=128, bias=True)
  (elu): ELU(alpha=0.5)
  (linear2): Linear(in_features=128, out_features=10, bias=True)
)

In [12]:
model_1

BLSTM1(
  (embeddings): Embedding(8129, 100, padding_idx=0)
  (dropout_pre_lstm): Dropout(p=0.33, inplace=False)
  (lstm): LSTM(100, 256, batch_first=True, bidirectional=True)
  (dropout_post_lstm): Dropout(p=0.33, inplace=False)
  (linear): Linear(in_features=512, out_features=128, bias=True)
  (elu): ELU(alpha=0.5)
  (linear2): Linear(in_features=128, out_features=10, bias=True)
)

## Evaluate and export results

In [13]:
%%time

# Prediction for all cases (dev, test, and dev for perl)
predict_perl1(model_1, dev_loader, 'prediction1.txt')
predict_dev1(model_1, dev_loader, 'dev1.out')
predict_test1(model_1, test_data, 'test1.out')

Wall time: 17.9 s


# Task 2: Using GloVe word embeddings

## Hyperparameters

## Load vocab and word/tag -> index, and index -> word/tag

In [14]:
with open('word_to_ix_2.pkl', 'rb') as f:
    word_to_ix = pickle.load(f)
    
with open('tag_to_ix_2.pkl', 'rb') as f:
    tag_to_ix = pickle.load(f)
           
# Generate index to word/tag mappings
ix_to_word = {v: k for k, v in word_to_ix.items()}
ix_to_tag = {v: k for k, v in tag_to_ix.items()}

# Calculate the size of vocabulary & tags
VOCAB_SIZE = len(word_to_ix)
TAGS_SIZE = len(tag_to_ix)

## Bidirectional LSTM Model with GloVe embeddings

In [15]:
class BLSTM2(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, linear_dim, tags_size, lstm_dropout, elu_alpha, embeddings, spelling_embedding_dim):
        super(BLSTM2, self).__init__()
        self.hidden_dim = hidden_dim
        
        self.embeddings_word = nn.Embedding.from_pretrained(torch.from_numpy(embeddings).float(), freeze=False, padding_idx=word_to_ix['<PAD>'])
        self.embeddings_spelling = nn.Embedding(num_embeddings=5, embedding_dim=spelling_embedding_dim, padding_idx=0)
        self.dropout_pre_lstm = nn.Dropout(lstm_dropout)
        self.lstm = nn.LSTM(embedding_dim+spelling_embedding_dim, hidden_dim, batch_first=True, bidirectional=True)
        self.dropout_post_lstm = nn.Dropout(lstm_dropout)
        self.linear = nn.Linear(hidden_dim * 2, linear_dim)
        self.elu = nn.ELU(alpha=elu_alpha)
        self.linear2 = nn.Linear(linear_dim, tags_size)
    
    def forward(self, x_word, x_spelling):
        x1 = self.embeddings_word(x_word)
        x2 = self.embeddings_spelling(x_spelling)
        x = torch.cat((x1, x2), dim=2).to(device)
        x = self.dropout_pre_lstm(x)
        
        h0 = torch.zeros(2, x.size(0), self.hidden_dim).to(device)
        c0 = torch.zeros(2, x.size(0), self.hidden_dim).to(device)
        out, _ = self.lstm(x, (h0, c0))
        
        out = self.dropout_post_lstm(out)
        out = self.linear(out)
        out = self.elu(out)
        out = self.linear2(out)
    
        return out

## Utility Functions for Prediction

In [16]:
# Used to predict on a development data loader
# Writes the output to a file, i.e. to dev.out
def predict_dev2(model, data_loader, fname):
    outputs = []
    model.eval()
    with torch.no_grad():
        for X, y, X_original, X_spelling in data_loader:
            X, y = X.to(device), y.to(device)

            y_pred_scores = model(X, X_spelling)
            y_pred = torch.argmax(y_pred_scores, dim=2)
            y_pred_flat = torch.flatten(y_pred).tolist()

            idx = 1
            output = []
            for i in range(len(y_pred_flat)):
                word = X_original[i][0]
                pred = ix_to_tag[y_pred_flat[i]]
                if word == '<PAD>':
                    break
                output.append((idx, word, pred))
                idx += 1
            outputs.append(output)

    with open(fname, 'w') as f:
        for i in range(len(outputs)):
            for j in range(len(outputs[i])):
                idx, word, pred = outputs[i][j]
                f.write(f'{idx} {word} {pred}\n')
            if i != len(outputs)-1:
                f.write('\n')

# Used to predict on a test data, list of sentences
# Writes the output to a file, i.e. to test.out
def predict_test2(model, sentences, fname):
    outputs = []
    model.eval()
    with torch.no_grad():
        for sentence in sentences:
            spelling_sentence = [get_spelling_feature(sentence)]
            spelling_sentence = torch.from_numpy(np.array(spelling_sentence, dtype=np.int64)).to(device)
            
            transformed_sentence = [prepare_sequence(sentence, word_to_ix, use_unk=True)]
            transformed_sentence = torch.from_numpy(np.array(transformed_sentence, dtype=np.int64)).to(device)
            
            y_pred_scores = model(transformed_sentence, spelling_sentence)
            y_pred = torch.argmax(y_pred_scores, dim=2)
            y_pred_flat = torch.flatten(y_pred).tolist()

            idx = 1
            output = []
            for i in range(len(y_pred_flat)):
                word = sentence[i]
                pred = ix_to_tag[y_pred_flat[i]]
                if word == '<PAD>':
                    break
                output.append((idx, word, pred))
                idx += 1
            outputs.append(output)

    with open(fname, 'w') as f:
        for i in range(len(outputs)):
            for j in range(len(outputs[i])):
                idx, word, pred = outputs[i][j]
                f.write(f'{idx} {word} {pred}\n')
            if i != len(outputs)-1:
                f.write('\n')
                
# Used to predict on a development data loader
# Writes the output to a file for PERL script, i.e. to prediction.txt
def predict_perl2(model, data_loader, fname):
    outputs = []
    model.eval()
    with torch.no_grad():
        for X, y, X_original, X_spelling in data_loader:
            X, y = X.to(device), y.to(device)

            y_pred_scores = model(X, X_spelling)
            y_pred = torch.argmax(y_pred_scores, dim=2)
            y_pred_flat = torch.flatten(y_pred).tolist()
            y_flat = torch.flatten(y).tolist()

            idx = 1
            output = []
            for i in range(len(y_pred_flat)):
                word = X_original[i][0]
                gold = ix_to_tag[y_flat[i]]
                pred = ix_to_tag[y_pred_flat[i]]
                if word == '<PAD>':
                    break
                output.append((idx, word, gold, pred))
                idx += 1
            outputs.append(output)

    with open(fname, 'w') as f:
        for i in range(len(outputs)):
            for j in range(len(outputs[i])):
                idx, word, gold, pred = outputs[i][j]
                f.write(f'{idx} {word} {gold} {pred}\n')
            if i != len(outputs)-1:
                f.write('\n')

## Convert dev data to data loader

In [17]:
dev_dataset = NERDataset(dev_data)
dev_loader = DataLoader(dev_dataset, batch_size=1, shuffle=False)

## Load model

In [18]:
model_2 = torch.load('blstm2.pt')
model_2.to(device)

BLSTM2(
  (embeddings_word): Embedding(30292, 100, padding_idx=25957)
  (embeddings_spelling): Embedding(5, 20, padding_idx=0)
  (dropout_pre_lstm): Dropout(p=0.33, inplace=False)
  (lstm): LSTM(120, 256, batch_first=True, bidirectional=True)
  (dropout_post_lstm): Dropout(p=0.33, inplace=False)
  (linear): Linear(in_features=512, out_features=128, bias=True)
  (elu): ELU(alpha=0.5)
  (linear2): Linear(in_features=128, out_features=10, bias=True)
)

In [19]:
model_2

BLSTM2(
  (embeddings_word): Embedding(30292, 100, padding_idx=25957)
  (embeddings_spelling): Embedding(5, 20, padding_idx=0)
  (dropout_pre_lstm): Dropout(p=0.33, inplace=False)
  (lstm): LSTM(120, 256, batch_first=True, bidirectional=True)
  (dropout_post_lstm): Dropout(p=0.33, inplace=False)
  (linear): Linear(in_features=512, out_features=128, bias=True)
  (elu): ELU(alpha=0.5)
  (linear2): Linear(in_features=128, out_features=10, bias=True)
)

## Evaluate and export results

In [20]:
%%time

# Prediction for all cases (dev, test, and dev for perl)
predict_perl2(model_2, dev_loader, 'prediction2.txt')
predict_dev2(model_2, dev_loader, 'dev2.out')
predict_test2(model_2, test_data, 'test2.out')

Wall time: 18.2 s
