In [1]:
import nltk
import numpy as np

from collections import deque

In [2]:
nltk.download('treebank')
nltk.download('tagsets')

[nltk_data] Downloading package treebank to
[nltk_data]     C:\Users\DomainFlag\AppData\Roaming\nltk_data...
[nltk_data]   Package treebank is already up-to-date!
[nltk_data] Downloading package tagsets to
[nltk_data]     C:\Users\DomainFlag\AppData\Roaming\nltk_data...
[nltk_data]   Package tagsets is already up-to-date!


True

In [3]:
# nltk.help.upenn_tagset()

In [4]:
def extract_data(ratio = 0.9):
    assert (ratio >= 0 and ratio <= 1.0)
    
    # treebank corpus
    file_names = nltk.corpus.treebank.fileids()
    
    # extract tagged words
    tagged_words = []
    for file_name in file_names:
        tagged_words += list(nltk.corpus.treebank.tagged_words(file_name))
        
    # data offset
    offset = int(len(tagged_words) * ratio)
        
    return (tagged_words[:offset], tagged_words[offset:])

In [5]:
train_data, valid_data = extract_data()

In [6]:
class Model:
    """ HMM model based on Search Beam & Viterbi algorithm """
    
    def __init__(self, data):
        
        self.train_data = data
        
        # initialize the model
        self.init(data)
    
    def init(self, data):
                
        # unpack word tokens and pos labels
        tokens, labels = zip(*data)
        
        # our vocabulary
        self.vocabulary, self.labels = set(tokens), set(labels)
        self.label_types = list(self.labels)
        
        # freq token <-> pos label
        self.freq_tokens = nltk.FreqDist(data)
        
        # create 2-gram generator
        grams = nltk.bigrams(labels)

        # create 1-gram freq and 2-gram freq for labels
        self.freq_gram_labels = (nltk.FreqDist(labels), nltk.FreqDist(grams))
    
    def get_seq_prob(self, word, prev_target, target):
        
        # case if the target word is unknown
        word_likelihood = 1.0 if word not in self.vocabulary else self.freq_tokens[(word, target)]
            
        return self.freq_gram_labels[1][(prev_target, target)] * word_likelihood / self.freq_gram_labels[0][prev_target] ** 2
        
    def forward(self, features, window_size = None):
        
        # avoid prob underflow
        if window_size is None:
            window_size = len(features)
        
        # sequence size
        seq_size, label_size = len(features), len(self.label_types)
        
        # memoization of max prob for each t timestamp and indexing for backtracking
        T1, T2 = np.zeros((window_size, label_size)), np.zeros((window_size, label_size))
        
        # initialize with equal prob
        T1[0] = [ 1 / label_size ] * label_size
        
        # initialize helper variables
        labels, offset = [], 0
        
        while True:
            
            # case of overflow
            offset_lim = min(offset + window_size, seq_size) - offset
            
            # forward pass
            for t in range(1, offset_lim):
                
                # current target token
                token = features[t + offset]

                # markov assumption
                for index, label in enumerate(self.label_types):
                    for prev_index, prev_label in enumerate(self.label_types):
                        prob = model.get_seq_prob(token, prev_label, label) * T1[t - 1][prev_index]

                        if prob > T1[t][index]:
                            T1[t][index] = prob
                            T2[t][index] = prev_index

            # backward pass
            prob_max_index = np.argmax(T1[-1, :])
            
            labels_window, prob_index = deque(), prob_max_index
            for t in range(offset_lim, 1, -1):
                prob_index = int(T2[t - 1][prob_index])
                
                labels_window.appendleft(self.label_types[prob_index])
            
            # reset mem
            T1.fill(0)
            T2.fill(0)
            
            # assign max prob to last word
            T1[0][prob_max_index] = 1.0
            
            offset += window_size - 1
            labels += list(labels_window)
            
            if offset >= seq_size:
                break
        
        # update with last item
        labels.append(self.label_types[prob_max_index])
            
        return labels           

In [7]:
# create an instance of our model
model = Model(train_data)

# pre-process labelled data 
features, targets = zip(*valid_data)

In [None]:
output = model.forward(features, window_size = 8)

In [None]:
def validate_model(outputs, targets):
    assert (len(outputs) == len(targets))
    
    # compute number of valid labels
    labels_predicted = 0
    for predicted, target in zip(outputs, targets):
        labels_predicted += predicted == target
    
    # size of annotations
    size = len(targets)
    
    return labels_predicted / size, (labels_predicted, size)

In [None]:
validate_model(output, targets)