In [1]:
import matplotlib.pyplot as plt
import numpy as np

from itertools import chain
from collections import Counter, defaultdict
from pomegranate import State, HiddenMarkovModel, DiscreteDistribution

from nltk import pos_tag, word_tokenize
from nltk.corpus import floresta
from sklearn.model_selection import train_test_split

In [2]:
def pair_counts(X, Y):
    """Return a dictionary keyed to each unique value in the first sequence list
    that counts the number of occurrences of the corresponding value from the
    second sequences list.
    """
    words = [ii for i in X for ii in i if type(i) != str]
    tags = [ii for i in Y for ii in i if type(i) != str]

    pair_count = {tag:{} for tag in set(tags)}
    
    for tag, word in zip(tags, words):
        pair_count[tag][word] = pair_count[tag].get(word, 0) + 1
    
    return pair_count


def unigram_counts(sequences):
    """Return a dictionary keyed to each unique value in the input sequence list that
    counts the number of occurrences of the value in the sequences list. The sequences
    collection should be a 2-dimensional array.
    """
    tags = [ii for i in sequences for ii in i if type(i) != str]
    
    unigram_counts = {tag:tags.count(tag) for tag in set(tags)}
    
    return unigram_counts


def bigram_counts(sequences):
    """Return a dictionary keyed to each unique PAIR of values in the input sequences
    list that counts the number of occurrences of pair in the sequences list. The input
    should be a 2-dimensional array.
    """
    
    bigram_counts = Counter()

    for sequence in sequences:
        for tag1, tag2 in zip(sequence[:-1], sequence[1:]):
            bigram_counts[(tag1, tag2)] += 1
    
    return bigram_counts


def starting_counts(sequences):
    """Return a dictionary keyed to each unique value in the input sequences list
    that counts the number of occurrences where that value is at the beginning of
    a sequence.
    """
    
    init_tags = [sentence[0] for sentence in sequences]
    starting_counts = Counter(init_tags)
    
    return starting_counts


def ending_counts(sequences):
    """Return a dictionary keyed to each unique value in the input sequences list
    that counts the number of occurrences where that value is at the end of
    a sequence.
    """
    end_tags = [sentence[-1] for sentence in sequences]
    ending_counts = Counter(end_tags)
    
    return ending_counts


# For accuracy testing

def my_replace_unknown(sequence, training_vocab):
    """Return a copy of the input sequence where each unknown word is replaced
    by the literal string value 'nan'. Pomegranate will ignore these values
    during computation.
    """
    return [w if w in training_vocab else 'nan' for w in sequence]


def my_simplify_decoding(X, model, training_vocab):
    """X should be a 1-D sequence of observations for the model to predict"""
    _, state_path = model.viterbi(my_replace_unknown(X, training_vocab))
    return [state[1].name for state in state_path[1:-1]]  # do not show the start/end state predictions


def my_accuracy(X, Y, model, training_vocab):
    """Calculate the prediction accuracy by using the model to decode each sequence
    in the input X and comparing the prediction with the true labels in Y.
    """
    correct = total_predictions = 0
    for observations, actual_tags in zip(X, Y):
        
        # The model.viterbi call in simplify_decoding will return None if the HMM
        # raises an error. Any exception counts the full sentence as an error.
        try:
            most_likely_tags = my_simplify_decoding(observations, model, training_vocab)
            correct += sum(p == t for p, t in zip(most_likely_tags, actual_tags))
        except:
            pass
        total_predictions += len(observations)
    return correct / total_predictions


In [3]:
# Define corpus

corpus = floresta.tagged_sents()


# Separate words and tags in corpus

words = [i[0] for i in [list(zip(*c)) for c in corpus]]
tags = [i[1] for i in [list(zip(*c)) for c in corpus]]

assert len(words) == len(tags)

In [4]:
# Training / Testing data

X_train, X_test, y_train, y_test = train_test_split(words, tags, test_size=0.2, random_state=42)

In [5]:
# Vocabulary used for training

train_words = list(set([word for words in X_train for word in words]))
train_tags = list(set([word for words in y_train for word in words]))

In [6]:
# Functions calls

emission_counts = pair_counts(X_train, y_train)
tag_unigrams = unigram_counts(y_train)
tag_bigrams = bigram_counts(y_train)
tag_starts = starting_counts(y_train)
tag_ends = ending_counts(y_train)

In [7]:
# Model training

model = HiddenMarkovModel(name="floresta-hmm-tagger")


# Create states with emission probability distributions P(word | tag) and add to the model

states = {}

for tags, words in emission_counts.items():
    n = tag_unigrams[tags]
    prob = {word:count/n for word, count in words.items()}
    emissions = DiscreteDistribution(prob)
    state = State(emissions, name=tags)
    states[tags] = state
    model.add_states(state)
    

    
# Add edges between states for the observed transition frequencies P(tag_i | tag_i-1)

for tags, counts in tag_starts.items():
    model.add_transition(model.start, states[tags], counts/sum(tag_starts.values()))

for (tag1, tag2), counts in tag_bigrams.items():
    model.add_transition(states[tag1], states[tag2], counts/tag_unigrams[tag1])

for tags, counts in tag_ends.items():
    model.add_transition(states[tags], model.end, counts/tag_unigrams[tags])
    

# Laplace smoothing:

tag_bigrams_test = bigram_counts(y_test)

for (tag1, tag2), counts in tag_bigrams_test.items():
    if (tag1, tag2) in tag_bigrams:
        continue
    if tag1 not in states or tag2 not in states:
        continue
    denominator = len(train_tags)
    if tag1 in tag_unigrams:
        denominator += tag_unigrams[tag1]
    model.add_transition(states[tag1], states[tag2], 1/denominator)

    
model.bake()

In [8]:
# Test Accuracy

training_acc = my_accuracy(X_train, y_train, model, train_words)
print("Training accuracy: {:.2f}%".format(100 * training_acc))

testing_acc = my_accuracy(X_test, y_test, model, train_words)
print("Testing accuracy: {:.2f}%".format(100 * testing_acc))

Training accuracy: 96.19%
Testing accuracy: 73.36%


In [9]:
# TODO: improve accuracy for portuguese using better training dataset