In [33]:
import os
import math
import time
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import pandas as pd
import numpy as np
import shutil

from datetime import datetime
from torch.utils.data import Dataset, DataLoader

In [10]:
import warnings
warnings.filterwarnings("ignore")

In [2]:
use_cuda = True
if use_cuda and torch.cuda.is_available():
    torch.device('cuda')

In [16]:
# constants
EMBED_DIM = 300
BATCH_SIZE = 16
EPOCHS = 50
PAD_WORD = '`'
PAD_TAG = -1

# Helpers

In [3]:
def test_train_split(df, ratio=.2):
    max_doc = df.iloc[-1].doc
    # :ratio: - percentage of data left to testing
    split_doc = max_doc * (1 - ratio)

    df_train = df[df.doc <= split_doc]
    df_test = df[df.doc > split_doc]

    return df_train, df_test

In [4]:
def create_data(df):
    sentences = []

    doc_ids = list(set(df.doc))
    for doc_id in doc_ids:
        sent_ids = list(set(df[df.doc == doc_id].sentence))
        df_docs = df[df.doc == doc_id]
        for sent_id in sent_ids:
            df_sents = df_docs[df_docs.sentence == sent_id]

            words = [word for word in df_sents.word]
            tags = [tag for tag in df_sents.type]

            sentences.append((words, tags))
    return sentences

In [5]:
def sentences_to_indices(sentences, word_to_ix, tag_to_ix):
    '''
        sentences - [[token, ...], [token, ...], ...]
        converts sentences to word indices
    '''
    all_word_indices = []
    all_target_indices = []
    for sent in sentences:
        word_indices, target_indices = get_indices_train(
            sent, word_to_ix, tag_to_ix)
        all_word_indices.append(word_indices)
        all_target_indices.append(target_indices)
    return all_word_indices, all_target_indices


def get_indices_train(sentence, word_to_ix, tag_to_ix):
    '''
        retrieves indices of sentence from helpping dictionaries,
        maximum length of word in sentence
    '''
    word_indices = []
    target_indices = []
    for token in range(len(sentence[0])):
        # read word index from dictionary
        # if word is not in the dictionary treat as unknown word
        tok = sentence[0][token] if sentence[0][
            token] in word_to_ix else PAD_WORD
        tag = sentence[1][token] if sentence[1][token] in tag_to_ix else PAD_TAG
        try:
            word_indices.append(word_to_ix[tok])
        except:
            print(sentence[0])
        try:
            target_indices.append(tag_to_ix[tag])
        except:
            print(sentence[1])
            print(tag, sentence[1][token])

    return torch.tensor(word_indices,
                        dtype=torch.long), torch.tensor(target_indices,
                                                        dtype=torch.long)

# Model

In [6]:
class Event_tagger(nn.Module):
    def __init__(self,
                 word_to_ix,
                 tag_to_ix,
                 ix_to_tag,
                 weights_matrix,
                 batch_size,
                 word_embed_dim=300,
                 lstm_num_layers=2,
                 bidirectional=True,
                 dropout=.5):
        '''
            initialize models
            batch_size      - size of batches for traininig
            word_embed_dim  - dimension of word embeddings
            lstm_num_layers - number of LSTM layers
            dropout         - rate for dropout layer
        '''
        super(Event_tagger, self).__init__()

        # dictionaries
        self.word_to_ix = word_to_ix
        self.tag_to_ix = tag_to_ix
        self.ix_to_tag = ix_to_tag

        # parameters
        self.batch_size = batch_size
        self.word_embed_dim = word_embed_dim
        self.lstm_hidden_dim = self.word_embed_dim
        self.lstm_num_layers = lstm_num_layers
        self.dropout = dropout

        self.num_directions = 2 if bidirectional else 1

        self.word_embeds, num_embeds = create_embed_layer(weights_matrix, True)
        self.lstm_hidden_embeds = self.init_hidden_embeddings(self.batch_size)

        self.lstm = nn.LSTM(self.word_embed_dim,
                            self.lstm_hidden_dim,
                            dropout=self.dropout,
                            num_layers=self.lstm_num_layers,
                            bidirectional=bidirectional,
                            batch_first=True)
        self.dense = nn.Linear(self.lstm_hidden_dim * self.num_directions,
                               len(self.tag_to_ix))

    def init_hidden_embeddings(self, batch_size):
        '''
            initialize hidden embeddings as zeros
        '''
        return (torch.zeros(self.lstm_num_layers * self.num_directions,
                            batch_size, self.lstm_hidden_dim),
                torch.zeros(self.lstm_num_layers * self.num_directions,
                            batch_size, self.lstm_hidden_dim))
        
    def forward(self, words_batch):
        '''
            words_batch - contains indices of words in current batch with shape
                        (batch_size, num_words_in_sentence)

            creates word level word embeddings from words_batch

            runs Bi-directional LSTM over input word representation to get
            final word representation which is fed to linear layer and softmax
            activation function to generate probability distribution for event
            tag set
        '''
        # create word-level word embeddings
        word_embed_word_level = words_batch.view(-1)
        word_embed_word_level = self.word_embeds(word_embed_word_level)
        batch_sent_embed = word_embed_word_level.view(words_batch.shape[0], -1,
                                                 word_embed_word_level.shape[-1])

        # create final word representation from LSTM
        lstm_out, self.lstm_hidden_embeds = self.lstm(batch_sent_embed,
                                                      self.lstm_hidden_embeds)

        # get probabilities for event tag
        tag_space = self.dense(lstm_out)
        tag_scores = F.log_softmax(tag_space, dim=2)
        return tag_scores

# Test

In [14]:
def test_sentences(model, df):
    batch_size = 1

    model.lstm_hidden_embeds = model.init_hidden_embeddings(batch_size)
    model.lstm.flatten_parameters()

    word_to_ix = model.word_to_ix
    ix_to_tag = model.ix_to_tag
    tag_to_ix = model.tag_to_ix
    tag_to_ix[-1] = len(tag_to_ix)
    # print(tag_to_ix['-1'])

    # load test dataset
    data = create_data(df)
    print(data[0])

    predicted_tags = []
    y_true = []

    word_indices, target_indices = sentences_to_indices(
        data, word_to_ix, tag_to_ix)
    # run test dataset through model
    for i in range(len(word_indices)):
        tag_scores = model(torch.stack([word_indices[i]]))
        out_probs = torch.squeeze(tag_scores)
        tmp = []
        for j, pset in enumerate(out_probs):
            if j >= len(word_indices[i]):
                break
            _, predicted_ix = torch.max(pset, 0)
            predicted_tags.append(predicted_ix.item())
            tmp.append(predicted_ix.item())
        y_true.extend(target_indices[i])
    return predicted_tags, y_true

## Evaluate performance

In [39]:
def conf_matrix(y_true, y_pred, tag_to_ix):
    y_true = [int(y) for y in y_true]
    labels = set(tag_to_ix.values())
    neg_class = tag_to_ix['O']

    tp, fp, fn = {}, {}, {}
    for label in labels:
        tp[label], fp[label], fn[label] = 0, 0, 0

    for true, pred in zip(y_true, y_pred):
        if true == neg_class:
            continue
        tp[true] += 1 if true == pred else 0  # true positive
        fn[true] += 1 if true != pred else 0  # false negative
        fp[pred] += 1 if true != pred else 0  # false positive
    return tp, fp, fn


def precision_score(tp, fp, tag_to_ix):
    labels = set(tag_to_ix.values())
    neg_class = tag_to_ix['O']

    tp_sum, tp_fp_sum = 0, 0
    for label in labels:
        if label == neg_class:
            continue
        tp_sum += tp[label]
        tp_fp_sum += tp[label] + fp[label]

    precision = tp_sum / tp_fp_sum
    return precision


def recall_score(tp, fn, tag_to_ix):
    labels = set(tag_to_ix.values())
    neg_class = tag_to_ix['O']

    tp_sum, tp_fn_sum = 0, 0
    for label in labels:
        if label == neg_class:
            continue
        tp_sum += tp[label]
        tp_fn_sum += tp[label] + fn[label]

    recall = tp_sum / tp_fn_sum
    return recall


def f1_score(precision, recall):
    return (2 * precision * recall) / (precision + recall)

# Main

In [12]:
model_path = '../models/biLSTM-model.pt'
model = torch.load(model_path)

df = pd.read_csv('../output.csv')
df_train, df_test = test_train_split(df)

In [17]:
predicted_tags, y_true = test_sentences(model, df_test)

(['Автобус', 'столкнулся', 'с', '"', 'Газелью', '"', 'под', 'Уфой', ':', '10', 'пострадавших', 'УФА', ',', '2', 'июля', '.'], ['O', 'O', 'O', 'O', 'O', 'O', 'O', 'B-T-GPE-Location', 'O', 'B-T-Person', 'I-T-Person', 'B-T-GPE-Location', 'O', 'B-T-Time', 'I-T-Time', 'O'])


In [40]:
tp, fp, fn = conf_matrix(y_true, predicted_tags, model.tag_to_ix)

precision = precision_score(tp, fp, model.tag_to_ix)
recall = recall_score(tp, fn, model.tag_to_ix)
f1 = f1_score(precision, recall)
print('Precision score {:.3f}'.format(precision))
print('Recall score {:.3f}'.format(recall))
print('F1 score {:.3f}'.format(f1))

Precision score 0.808
Recall score 0.663
F1 score 0.728
