### DELETE AFTER AND KEEP script predict_bilstm_word.py

---------------

In [1]:
import torch
import torch.nn as nn
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence
from torch.utils.data import DataLoader, TensorDataset
import pickle

In [3]:
# Load the dictionaries
with open('models/bilstm_word/id2word.pkl', 'rb') as f:
    id2word = pickle.load(f)
with open('models/bilstm_word/id2tag.pkl', 'rb') as f:
    id2tag = pickle.load(f)
with open('models/bilstm_word/vocab.pkl', 'rb') as f:
    vocab = pickle.load(f)
with open('models/bilstm_word/nertags.pkl', 'rb') as f:
    nertags = pickle.load(f)

In [5]:
class BiLSTM(nn.Module):
    def __init__(self, embedding_size, hidden_size, total_words, num_class):
        super(BiLSTM, self).__init__()
        self.hidden_size = hidden_size
        self.wordembed = nn.Embedding(total_words, embedding_size)
        self.dropout = nn.Dropout(p=0.5)
        self.bilstm = nn.LSTM(embedding_size, hidden_size, bidirectional=True, batch_first=True)
        self.linear = nn.Linear(2 * hidden_size, num_class)

    def forward(self, x, xlengths):
        x = torch.nn.utils.rnn.pack_padded_sequence(x, xlengths.cpu(), batch_first=True, enforce_sorted=False)
        x, _ = torch.nn.utils.rnn.pad_packed_sequence(x, batch_first=True)
        word_embedding = self.wordembed(x)
        word_embedding = self.dropout(word_embedding)

        out, (h, c) = self.bilstm(word_embedding)
        out = self.linear(out)
        out = out.view(-1, out.shape[2])
        out = torch.nn.functional.log_softmax(out, dim=1)
        return out

# Initialize your model instance with the same architecture as the trained model
model = BiLSTM(embedding_size=100, hidden_size=100, total_words=len(vocab), num_class=len(nertags))

# Load the saved model state dictionary
model.load_state_dict(torch.load('models/bilstm_word/trained_bilstm_model_state_dict.pth'))

# Put the model in evaluation mode
model.eval()


BiLSTM(
  (wordembed): Embedding(23626, 100)
  (dropout): Dropout(p=0.5, inplace=False)
  (bilstm): LSTM(100, 100, batch_first=True, bidirectional=True)
  (linear): Linear(in_features=200, out_features=10, bias=True)
)

In [7]:
state_dict = model.state_dict()

# Print the keys and shapes of the parameters in the state dictionary
for key, value in state_dict.items():
    print(key, value.shape)

wordembed.weight torch.Size([23626, 100])
bilstm.weight_ih_l0 torch.Size([400, 100])
bilstm.weight_hh_l0 torch.Size([400, 100])
bilstm.bias_ih_l0 torch.Size([400])
bilstm.bias_hh_l0 torch.Size([400])
bilstm.weight_ih_l0_reverse torch.Size([400, 100])
bilstm.weight_hh_l0_reverse torch.Size([400, 100])
bilstm.bias_ih_l0_reverse torch.Size([400])
bilstm.bias_hh_l0_reverse torch.Size([400])
linear.weight torch.Size([10, 200])
linear.bias torch.Size([10])


In [9]:
def out_predictions(model, loader, output_file):
    with open(output_file, 'w') as f:
        with torch.no_grad():
            for step, (X, Y, xlen) in enumerate(loader):
                Y = pack_padded_sequence(Y, xlen, batch_first=True, enforce_sorted=False)
                Y, _ = pad_packed_sequence(Y, batch_first=True)
                ypred = model(X.long().to(device), xlen.to(device))
                ypred = torch.argmax(ypred.to('cpu'), dim=1)
                ypred = ypred.view(Y.shape[0], -1)
                for i in range(len(ypred)):
                    for j in range(len(ypred[i])):
                        word = id2word[int(X[i, j])]
                        tag = id2tag[int(ypred[i, j])]
                        f.write(f"{word}\t{tag}\n")
                    f.write('\n')

# Assuming 'device' and 'id2word', 'id2tag' are defined elsewhere
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [10]:
def load_data(datapath):
    sentences = []
    tags = []
    with open(datapath) as f:
        lines = f.readlines()
        sentence = []
        tag = []
        for line in lines:
            line = line.strip()  
            if line: 
                word, tag_label = line.split('\t')
                if vocab is not None:
                    if word in vocab.keys():
                        sentence.append(vocab[word])
                    else:
                        sentence.append(vocab['<oov>'])
                if nertags is not None:
                    tag.append(nertags[tag_label])
            else:  
                if sentence:
                    sentences.append(sentence)
                    tags.append(tag)
                    sentence = []
                    tag = []

    max_length = max(len(x) for x in sentences)
    x_lengths = [len(x) for x in sentences]
    X_test = []
    Y_test = []
    for sent, tag in zip(sentences, tags):
        length_to_append = max_length - len(sent)
        X_test.append(sent + [0] * length_to_append)  
        Y_test.append(tag + [0] * length_to_append) 

    X_test = torch.Tensor(X_test)
    Y_test = torch.Tensor(Y_test)
    x_lengths = torch.Tensor(x_lengths)

    return X_test, Y_test, x_lengths

In [13]:
# SPAN-F1 SCORE

def readBIO(path):
    ents = []
    curEnts = []
    for line in open(path):
        line = line.strip()
        if line == '':
            ents.append(curEnts)
            curEnts = []
        elif line[0] == '#' and len(line.split('\t')) == 1:
            continue
        else:
            curEnts.append(line.split('\t')[1])
    return ents

def toSpans(tags):
    spans = set()
    for beg in range(len(tags)):
        if tags[beg][0] == 'B':
            end = beg
            for end in range(beg+1, len(tags)):
                if tags[end][0] != 'I':
                    break
            spans.add(str(beg) + '-' + str(end) + ':' + tags[beg][2:])
            #print(end-beg)
    return spans

def getInstanceScores(predPath, goldPath):
    goldEnts = readBIO(goldPath)
    predEnts = readBIO(predPath)
    entScores = []
    tp = 0
    fp = 0
    fn = 0
    for goldEnt, predEnt in zip(goldEnts, predEnts):
        goldSpans = toSpans(goldEnt)
        predSpans = toSpans(predEnt)
        overlap = len(goldSpans.intersection(predSpans))
        tp += overlap
        fp += len(predSpans) - overlap
        fn += len(goldSpans) - overlap
        
    prec = 0.0 if tp+fp == 0 else tp/(tp+fp)
    rec = 0.0 if tp+fn == 0 else tp/(tp+fn)
    f1 = 0.0 if prec+rec == 0.0 else 2 * (prec * rec) / (prec + rec)
    return f1

In [14]:
noise_type = 'capitalization_swap'
rate = 0.1

testdatapath = f'data/altered/{noise_type}_rate_{rate}.txt'

# Test dataset preparation
Xtest, Ytest, x_testlengths = load_data(testdatapath)

testdataset = TensorDataset(Xtest, Ytest, x_testlengths)
loader_test = DataLoader(testdataset, batch_size=1, shuffle=False)
prediction_path = f'predictions/bilstm_word/{noise_type}_rate_{rate}.txt'

# Output predictions
out_predictions(model, loader_test, prediction_path)

In [16]:
span_f1_score = getInstanceScores(prediction_path,'data/gold.txt')
print('Span-F1 score: ', span_f1_score)

Span-F1 score:  0.6900822626860154
