## LSTM Part-of-Speech Tagger
### Step 1: Importing the <code>torch</code>  and other libraries

In [324]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

import numpy as np
import glob, os, random

torch.manual_seed(1)

<torch._C.Generator at 0x7f85775170b0>

### Step 2: Get the helper functions

In [387]:
# Read the data file
def read_data(filepath):
    data = []
    with open(filepath, 'r', encoding="utf8") as f:
        for line in f:
            data.append(tuple(zip(*[wt.split('/') for wt in line.strip().split()])))
    return data

# Get vocabs and tagset
def get_vocab_tagset(data):
    token_size, vocabs, tagset = 0, set(), set()
    for words, tags in data:
        token_size += len(words)
        vocabs.update(words)
        tagset.update(tags)
    return vocabs, tagset, token_size

# convert the sequencies to indexes and tensors
def prepare_sequence(seq, to_ix):
    idxs = [to_ix[w] if w in to_ix else 0 for w in seq]
    return torch.tensor(idxs, dtype=torch.long)

# indexing list elements: use this for vocabs and tagset
def to_index(aList):
    elem_to_idx = {e:aList.index(e)+1 for e in aList}
    elem_to_idx['Unknown'] = 0  
    idx_to_elem = {i:e for e,i in elem_to_idx.items()}
    return elem_to_idx, idx_to_elem

# with torch.no_grad():
#     y_gold, y_pred = [], []
#     for sent in test_set:
#         y_gold.extend(sent[1])
        
#         y_pred.extend(test_sent(sent))
#     score = [yg==yp for yg,yp in zip(y_gold, y_pred)]
#     return sum(score)/len(score)
#     inputs = prepare_sequence(tagged_test_sent[0], word_to_idx)
#     tag_scores = model(inputs)
#     return score_to_tag(untagged_sent, tag_scores, idx_to_tag)

# def test_all(test_set):
#     y_gold, y_pred = [], []
#     for sent in test_set:
#         y_gold.extend(sent[1])
        
#         y_pred.extend(test_sent(sent))
#     score = [yg==yp for yg,yp in zip(y_gold, y_pred)]
#     return sum(score)/len(score)

# import numpy as np
def score_to_tag(sent, tag_scores, i_to_tag):
    tagged=[]
    for w, preds in list(zip(sent, tag_scores)):
        preds = list(np.array(preds))
        idx = preds.index(max(preds))
        tagged.append(f"{i_to_tag[idx]}")
    return tagged

# tagged_files = [#"igbo/ITC5.coarse*", "igbo/fiction.coarse*", #coarse
#                 #"igbo/ITC5.fine*", "igbo/fiction.fine*", #fine
#                 "welsh/*pos_coarse*",
#                 #"welsh/*sem_coarse*", 
#                 #"welsh/*both_fine*", "welsh/*both_coarse*",
#                 #"english/corpus.small"
#                 #"swedish/*coarse*"
#                 ]
# tagged data file
tagged_data_file = "welsh/cy_pos_coarse_tagged"

### Step 3: Preparing the data

In [388]:
# Read training data for each folder
print(f"\nReading {tagged_data_file} ...", end='')
data = read_data(file)
vocabs, tagset, token_size = get_vocab_tagset(data)

# #Uncomment below for data statistics
# print(f"\nSentences:\t{len(training_data)}\nToken size:\t{token_size}\nVocab size:\t{len(vocabs)}\nTagset size:\t{len(tagset)}")

# Shuffle and split data
random.seed(7)
random.shuffle(data)
test_size = 0.1 #90% of data for training; 10% of data for testing
train_set, test_set = data[:-int(len(data)*test_size)], data[-int(len(data)*test_size):]

EMBEDDING_DIM = 100
HIDDEN_DIM = 5    
word_to_idx, idx_to_word = to_index(list(vocabs))
tag_to_idx, idx_to_tag = to_index(list(tagset))


Reading welsh/cy_pos_coarse_tagged ...

### Step 4: Creating the model

In [389]:
class LSTMTagger(nn.Module):
    def __init__(self, embedding_dim, hidden_dim, vocab_size, tagset_size):
        super(LSTMTagger, self).__init__()
        self.hidden_dim = hidden_dim
        self.word_embeddings = nn.Embedding(vocab_size, embedding_dim)

        # The LSTM takes word embeddings as inputs, and outputs hidden states
        # with dimensionality hidden_dim.
        self.lstm = nn.LSTM(embedding_dim, hidden_dim)

        # The linear layer that maps from hidden state space to tag space
        self.hidden2tag = nn.Linear(hidden_dim, tagset_size)

    def forward(self, sentence):
        embeds = self.word_embeddings(sentence)
        lstm_out, _ = self.lstm(embeds.view(len(sentence), 1, -1))
        tag_space = self.hidden2tag(lstm_out.view(len(sentence), -1))
        tag_scores = F.log_softmax(tag_space, dim=1)
        return tag_scores

### Step 5: Training the model

In [392]:
model = LSTMTagger(EMBEDDING_DIM, HIDDEN_DIM, len(word_to_idx), len(tag_to_idx))
loss_function = nn.NLLLoss()
optimizer = optim.SGD(model.parameters(), lr=0.1)

# Note that element i,j of the output is the score for tag j for word i.
# Here we don't need to train, so the code is wrapped in torch.no_grad()

for epoch in range(100):  # again, normally you would NOT do 300 epochs, it is toy data
    print(f"Epoch:{epoch} = {test_all(test_set)*100:.2f}")
    for sentence, tags in train_set:
        # Step 1. Remember that Pytorch accumulates gradients.
        # We need to clear them out before each instance
        model.zero_grad()

        # Step 2. Get our inputs ready for the network, that is, turn them into
        # Tensors of word indices.
        sentence_in = prepare_sequence(sentence, word_to_idx)
        targets = prepare_sequence(tags, tag_to_idx)

        # Step 3. Run our forward pass.
        tag_scores = model(sentence_in)

        # Step 4. Compute the loss, gradients, and update the parameters by
        #  calling optimizer.step()
        loss = loss_function(tag_scores, targets)
        loss.backward()
        optimizer.step()

Epoch:0 = 9.87
Epoch:1 = 21.02
Epoch:2 = 20.68
Epoch:3 = 19.06
Epoch:4 = 19.57
Epoch:5 = 19.23
Epoch:6 = 19.06
Epoch:7 = 18.81
Epoch:8 = 18.64
Epoch:9 = 18.21
Epoch:10 = 17.79
Epoch:11 = 17.79
Epoch:12 = 17.70
Epoch:13 = 18.04
Epoch:14 = 17.96
Epoch:15 = 17.79
Epoch:16 = 17.96
Epoch:17 = 17.79
Epoch:18 = 17.79
Epoch:19 = 17.45
Epoch:20 = 17.36
Epoch:21 = 17.45
Epoch:22 = 17.45
Epoch:23 = 17.19
Epoch:24 = 17.11
Epoch:25 = 17.19
Epoch:26 = 17.11
Epoch:27 = 17.11
Epoch:28 = 17.28
Epoch:29 = 17.19
Epoch:30 = 17.28
Epoch:31 = 17.11
Epoch:32 = 17.11
Epoch:33 = 17.19
Epoch:34 = 17.11
Epoch:35 = 16.85
Epoch:36 = 17.02
Epoch:37 = 16.94
Epoch:38 = 16.94
Epoch:39 = 17.11
Epoch:40 = 17.11
Epoch:41 = 16.94
Epoch:42 = 17.02
Epoch:43 = 16.94
Epoch:44 = 16.85
Epoch:45 = 16.68
Epoch:46 = 16.68
Epoch:47 = 16.60
Epoch:48 = 16.26
Epoch:49 = 16.51
Epoch:50 = 16.60
Epoch:51 = 16.51
Epoch:52 = 16.34
Epoch:53 = 16.34
Epoch:54 = 16.34
Epoch:55 = 16.68
Epoch:56 = 16.51
Epoch:57 = 16.51
Epoch:58 = 16.43
Epoch:59

In [316]:
# print("\nScores after training:")
def test_score():
    with torch.no_grad():
        inputs = prepare_sequence(untagged_sent, word_to_idx)
        tag_scores = model(inputs)
        model_tagged = score_to_tag(untagged_sent, tag_scores, idx_to_tag)
        #print(" ".join(model_tagged))
        scores = [t1==t2 for t1,t2 in zip(tagged_sent, model_tagged)]
        return(f"Score:\t{sum(scores)/len(scores)*100:.2f}%")
test_score()

'Score:\t76.92%'

In [198]:
with open('all_corpora/welsh/cy_both_coarse_tagged', 'w', encoding='utf8') as f1:
    with open('all_corpora/welsh/cy_both_fine_tagged', 'r', encoding='utf8') as f2:
        i=0
        for line in f2:
            words, tags = tuple(zip(*[wt.split('/') for wt in line.strip().split()]))
            pos_tags = [tag.split('|')[0] for tag in tags]
            sem_tags = [re.findall(r'[A-Za-z]+\d*',tag.split('|')[1])[0] for tag in tags]
            coarse_pos_sem = ["|".join(ps) for ps in zip(pos_tags,sem_tags)]
            word_coarse_pos_sem = [f"{w}/{t}" for w,t in zip(words, coarse_pos_sem)]
#             print(f"{words}\n\n{tags}\n\n{both}\n\n{' '.join(word_both_tags)}")
            i+=1
#             print(f"{i}\t{' '.join(word_coarse_pos_sem)}")
            f1.write(f"{' '.join(word_coarse_pos_sem)}\n")
#             break