In [1]:
# Jupyter "magic methods" -- only need to be run once per kernel restart
%load_ext autoreload
%aimport helpers, tests
%autoreload 1

In [2]:
import matplotlib.pyplot as plt
import numpy as np

from collections import Counter, defaultdict
from helpers import show_model, Dataset
from pomegranate import State, HiddenMarkovModel, DiscreteDistribution
from collections import namedtuple

import nltk

In [3]:
data = Dataset("tags-universal.txt", "brown-universal.txt", train_test_split=0.8)
print("There are {} sentences in the corpus.".format(len(data)))
print("There are {} sentences in the training set.".format(len(data.training_set)))
print("There are {} sentences in the testing set.".format(len(data.testing_set)))

There are 57340 sentences in the corpus.
There are 45872 sentences in the training set.
There are 11468 sentences in the testing set.


In [4]:
print("There are a total of {} samples of {} unique words in the corpus."
      .format(data.N, len(data.vocab)))
print("There are {} samples of {} unique words in the training set."
      .format(data.training_set.N, len(data.training_set.vocab)))
print("There are {} samples of {} unique words in the testing set."
      .format(data.testing_set.N, len(data.testing_set.vocab)))
print("There are {} words in the test set that are missing in the training set."
      .format(len(data.testing_set.vocab - data.training_set.vocab)))

There are a total of 1161192 samples of 56057 unique words in the corpus.
There are 928458 samples of 50536 unique words in the training set.
There are 232734 samples of 25112 unique words in the testing set.
There are 5521 words in the test set that are missing in the training set.


In [5]:
tags = (tag for i, (word, tag) in enumerate(data.training_set.stream()))
words = (word for i, (word, tag) in enumerate(data.training_set.stream()))

In [6]:
def pair_counts(sequences_A, sequences_B):
    dict_tag_word = defaultdict(list)
    dict_word = defaultdict(list)
    
    for i, (tag, word) in  enumerate(zip(sequences_A, sequences_B)):
        dict_tag_word[tag].append(word)
        
    for k in dict_tag_word.keys():
        dict_word[k] = Counter(dict_tag_word[k])
        
    return dict_word

pair_counts(tags,words).keys()

dict_keys(['ADV', 'NOUN', '.', 'VERB', 'ADP', 'ADJ', 'CONJ', 'DET', 'PRT', 'NUM', 'PRON', 'X'])

In [7]:
tags = (tag for i, (word, tag) in enumerate(data.training_set.stream()))
words = (word for i, (word, tag) in enumerate(data.training_set.stream()))
word_counts = pair_counts(words, tags)

In [8]:
FakeState = namedtuple("FakeState", "name")

class MFCTagger:
    missing = FakeState(name="<MISSING>")
    def __init__(self, table):
        self.table = defaultdict(lambda: MFCTagger.missing)
        self.table.update({word: FakeState(name=tag) for word, tag in table.items()})
        
    def viterbi(self, seq):
        return 0., list(enumerate(["<start>"] + [self.table[w] for w in seq] + ["<end>"]))

In [9]:
mfc_table = dict()
for word, tags in word_counts.items():
    mfc_table[word] = max(tags.keys(), key = lambda key: tags[key])

mfc_model = MFCTagger(mfc_table)

In [10]:
def replace_unknown(sequence):
    return [w if w in data.training_set.vocab else 'nan' for w in sequence]

def simplify_decoding(X, model):
    _, state_path = model.viterbi(replace_unknown(X))
    return [state[1].name for state in state_path[1:-1]]

In [11]:
def accuracy(X, Y, model):
    correct = total_predictions = 0
    for observations, actual_tags in zip(X, Y):
        try:
            most_likely_tags = simplify_decoding(observations, model)
            correct += sum(p == t for p, t in zip(most_likely_tags, actual_tags))
        except:
            pass
        total_predictions += len(observations)
    return correct / total_predictions

In [12]:
mfc_training_acc = accuracy(data.training_set.X, data.training_set.Y, mfc_model)
print("training accuracy mfc_model: {:.2f}%".format(100 * mfc_training_acc))

mfc_testing_acc = accuracy(data.testing_set.X, data.testing_set.Y, mfc_model)
print("testing accuracy mfc_model: {:.2f}%".format(100 * mfc_testing_acc))

training accuracy mfc_model: 95.72%
testing accuracy mfc_model: 93.01%


In [13]:
def unigram_counts(sequences):
    dictionary = defaultdict(int)
    for i_seq in sequences:
        dictionary[i_seq] += 1
    return dictionary

def bigram_counts(sequences):
    dictionary = defaultdict(int)
    bigrams = list(nltk.bigrams(tags))
    for i_bigram in bigrams:
        dictionary[i_bigram] += 1
    return dictionary

In [14]:
tags = [tag for i, (word, tag) in enumerate(data.training_set.stream())]
tag_unigrams = unigram_counts(tags)

In [15]:
tags = [tag for i, (word, tag) in enumerate(data.training_set.stream())]
tag_bigrams = bigram_counts(tags)

In [16]:
def starting_counts(sequences):
    start_tags = [i_seq[0] for i_seq in sequences]
    dictionary = defaultdict(int)
    for i_tag in start_tags:
        dictionary[i_tag] += 1
    return dictionary

def ending_counts(sequences):
    start_tags = [i_seq[-1] for i_seq in sequences]
    dictionary = defaultdict(int)
    for i_tag in start_tags:
        dictionary[i_tag] += 1
    return dictionary


tag_starts = starting_counts(data.training_set.Y)
tag_ends = ending_counts(data.training_set.Y)

In [17]:
tags = (tag for i, (word, tag) in enumerate(data.training_set.stream()))
words = (word for i, (word, tag) in enumerate(data.training_set.stream()))

In [18]:
basic_model = HiddenMarkovModel(name="base-hmm-tagger")

count_tag_and_word = pair_counts(tags, words)
states = {}
for tag, word_dict in count_tag_and_word.items():
    p_words_given_tag_state = defaultdict(float)
    for word in word_dict.keys():
        p_words_given_tag_state[word] = count_tag_and_word[tag][word] / tag_unigrams[tag]
    emission = DiscreteDistribution(dict(p_words_given_tag_state))
    states[tag] = State(emission, name=tag)
    
basic_model.add_states(list(states.values()))

# Adding start states
for tag in data.training_set.tagset:
    state = states[tag]
    basic_model.add_transition(basic_model.start, state, tag_starts[tag]/len(data.training_set))

# Adding end states
for tag in data.training_set.tagset:
    state = states[tag]
    basic_model.add_transition(state, basic_model.end, tag_ends[tag]/tag_unigrams[tag])

# Adding pairs
for tag1 in data.training_set.tagset:
    state1 = states[tag1]
    for tag2 in data.training_set.tagset:
        state2 = states[tag2]
        basic_model.add_transition(state1, state2, tag_bigrams[(tag1, tag2)]/tag_unigrams[tag1])

# finalize the model
basic_model.bake()

In [19]:
hmm_training_acc = accuracy(data.training_set.X, data.training_set.Y, basic_model)
print("training accuracy basic hmm model: {:.2f}%".format(100 * hmm_training_acc))

hmm_testing_acc = accuracy(data.testing_set.X, data.testing_set.Y, basic_model)
print("testing accuracy basic hmm model: {:.2f}%".format(100 * hmm_testing_acc))

training accuracy basic hmm model: 97.52%
testing accuracy basic hmm model: 95.94%
