In [1]:
import matplotlib.pyplot as plt
import numpy as np
from IPython.core.display import HTML
from itertools import chain
from collections import Counter, defaultdict, namedtuple, OrderedDict
from pomegranate import State, HiddenMarkovModel, DiscreteDistribution
import os
from io import BytesIO
from itertools import chain
import random

In [3]:
Sentence = namedtuple("Sentence", "words tags")

def read_data(filename):
    """Read tagged sentence data"""
    with open(filename, 'r') as f:
        sentence_lines = [line.split("\n") for line in f.read().split("\n\n")]
    return OrderedDict(((s[0], Sentence(*zip(*[line.strip().split("\t")
                        for line in s[1:]]))) for s in sentence_lines if s[0]))

def read_tags(filename):
    """Read a list of word tag classes"""
    with open(filename, 'r') as f:
        tags = f.read().split("\n")
    return frozenset(tags)

class Subset(namedtuple("BaseSet", "sentences keys vocab X tagset Y N stream")):
    def __new__(cls, sentences, keys):
        word_sequences = tuple([sentences[k].words for k in keys])
        tag_sequences = tuple([sentences[k].tags for k in keys])
        wordset = frozenset(chain(*word_sequences))
        tagset = frozenset(chain(*tag_sequences))
        N = sum(1 for _ in chain(*(sentences[k].words for k in keys)))
        stream = tuple(zip(chain(*word_sequences), chain(*tag_sequences)))
        return super().__new__(cls, {k: sentences[k] for k in keys}, keys, wordset, word_sequences,
                               tagset, tag_sequences, N, stream.__iter__)

    def __len__(self):
        return len(self.sentences)

    def __iter__(self):
        return iter(self.sentences.items())


class Dataset(namedtuple("_Dataset", "sentences keys vocab X tagset Y training_set testing_set N stream")):
    def __new__(cls, tagfile, datafile, train_test_split=0.8, seed=112890):
        tagset = read_tags(tagfile)
        sentences = read_data(datafile)
        keys = tuple(sentences.keys())
        wordset = frozenset(chain(*[s.words for s in sentences.values()]))
        word_sequences = tuple([sentences[k].words for k in keys])
        tag_sequences = tuple([sentences[k].tags for k in keys])
        N = sum(1 for _ in chain(*(s.words for s in sentences.values())))
        
        # split data into train/test sets
        _keys = list(keys)
        if seed is not None: random.seed(seed)
        random.shuffle(_keys)
        split = int(train_test_split * len(_keys))
        training_data = Subset(sentences, _keys[:split])
        testing_data = Subset(sentences, _keys[split:])
        stream = tuple(zip(chain(*word_sequences), chain(*tag_sequences)))
        return super().__new__(cls, dict(sentences), keys, wordset, word_sequences, tagset,
                               tag_sequences, training_data, testing_data, N, stream.__iter__)

    def __len__(self):
        return len(self.sentences)

    def __iter__(self):
        return iter(self.sentences.items())

In [7]:
data = Dataset("tags_universal.txt", "brown-universal.txt", train_test_split=0.8)

print(f"There are {len(data)} sentences in the corpus.")
print(f"Train: {len(data.training_set)}, Test: {len(data.testing_set)}")

There are 57340 sentences in the corpus.
Train: 45872, Test: 11468


In [6]:
key = 'b100-38532'
print(f"Sentence: {key}")
print(f"words: {data.sentences[key].words}")
print(f"tags: {data.sentences[key].tags}")

Sentence: b100-38532
words: ('Perhaps', 'it', 'was', 'right', ';', ';')
tags: ('ADV', 'PRON', 'VERB', 'ADJ', '.', '.')


In [8]:
print(f"Samples: {data.N}, unique words: {len(data.vocab)}")
print(f"Samples-Train: {data.training_set.N}, unique words: {len(data.training_set.vocab)}")
print(f"Samples-Test: {data.testing_set.N}, unique words: {len(data.testing_set.vocab)}")

Samples: 1161192, unique words: 56057
Samples-Train: 928458, unique words: 50536
Samples-Test: 232734, unique words: 25112


In [14]:
# accessing words with Dataset.X and tags with Dataset.Y 
for i in range(2):    
    print(f"Sentence {i+1} :  {data.X[i]}:\n")
    print(f"Labels {i+1} : {data.Y[i]}:\n")

Sentence 1 :  ('Mr.', 'Podger', 'had', 'thanked', 'him', 'gravely', ',', 'and', 'now', 'he', 'made', 'use', 'of', 'the', 'advice', '.'):

Labels 1 : ('NOUN', 'NOUN', 'VERB', 'VERB', 'PRON', 'ADV', '.', 'CONJ', 'ADV', 'PRON', 'VERB', 'NOUN', 'ADP', 'DET', 'NOUN', '.'):

Sentence 2 :  ('But', 'there', 'seemed', 'to', 'be', 'some', 'difference', 'of', 'opinion', 'as', 'to', 'how', 'far', 'the', 'board', 'should', 'go', ',', 'and', 'whose', 'advice', 'it', 'should', 'follow', '.'):

Labels 2 : ('CONJ', 'PRT', 'VERB', 'PRT', 'VERB', 'DET', 'NOUN', 'ADP', 'NOUN', 'ADP', 'ADP', 'ADV', 'ADV', 'DET', 'NOUN', 'VERB', 'VERB', '.', 'CONJ', 'DET', 'NOUN', 'PRON', 'VERB', 'VERB', '.'):



In [16]:
# (word, tag) stream
for i, pair in enumerate(data.stream()):
    print(pair)
    if i > 3: break

('Mr.', 'NOUN')
('Podger', 'NOUN')
('had', 'VERB')
('thanked', 'VERB')
('him', 'PRON')


### Build a MFC(Most Frequent Class) Tagger

In [17]:
def pair_counts(tags, words):
    d = defaultdict(lambda: defaultdict(int))
    for tag, word in zip(tags, words):
        d[tag][word] += 1
    return d

tags = [tag for i, (word, tag) in enumerate(data.training_set.stream())]
words = [word for i, (word, tag) in enumerate(data.training_set.stream())]

In [18]:
FakeState = namedtuple('FakeState', 'name')

class MFCTagger:
    missing = FakeState(name = '<MISSING>')
    
    def __init__(self, table):
        self.table = defaultdict(lambda: MFCTagger.missing)
        self.table.update({word: FakeState(name=tag) for word, tag in table.items()})
        
    def viterbi(self, seq):
        """This method simplifies predictions by matching the Pomegranate viterbi() interface"""
        return 0., list(enumerate(["<start>"] + [self.table[w] for w in seq] + ["<end>"]))
    
tags = [tag for i, (word, tag) in enumerate(data.training_set.stream())]
words = [word for i, (word, tag) in enumerate(data.training_set.stream())]

word_counts = pair_counts(words, tags)
mfc_table = dict((word, max(tags.keys(), key=lambda key: tags[key])) for word, tags in word_counts.items())

mfc_model = MFCTagger(mfc_table)

#### Make predictions

In [19]:
def replace_unknown(sequence):
    
    return [w if w in data.training_set.vocab else 'nan' for w in sequence]

def simplify_decoding(X, model):
    
    _, state_path = model.viterbi(replace_unknown(X))
    return [state[1].name for state in state_path[1:-1]]

In [41]:
def predict_pos_tags(key):
    print(f"Sentence Key: {key}")
    print("Predicted labels:\n----------------------------------")
    print(simplify_decoding(data.sentences[key].words, mfc_model))
    print("Actual labels:\n-------------------------------")
    print(data.sentences[key].tags)
    print()

In [42]:
for key in data.testing_set.keys[:2]:
    predict_pos_tags(key)

Sentence Key: b100-28144
Predicted labels:
----------------------------------
['CONJ', 'NOUN', 'NUM', '.', 'NOUN', 'NUM', '.', 'NOUN', 'NUM', '.', 'CONJ', 'NOUN', 'NUM', '.', '.', 'NOUN', '.', '.']
Actual labels:
-------------------------------
('CONJ', 'NOUN', 'NUM', '.', 'NOUN', 'NUM', '.', 'NOUN', 'NUM', '.', 'CONJ', 'NOUN', 'NUM', '.', '.', 'NOUN', '.', '.')

Sentence Key: b100-23146
Predicted labels:
----------------------------------
['PRON', 'VERB', 'DET', 'NOUN', 'ADP', 'ADJ', 'ADJ', 'NOUN', 'VERB', 'VERB', '.', 'ADP', 'VERB', 'DET', 'NOUN', 'ADP', 'NOUN', 'ADP', 'DET', 'NOUN', '.']
Actual labels:
-------------------------------
('PRON', 'VERB', 'DET', 'NOUN', 'ADP', 'ADJ', 'ADJ', 'NOUN', 'VERB', 'VERB', '.', 'ADP', 'VERB', 'DET', 'NOUN', 'ADP', 'NOUN', 'ADP', 'DET', 'NOUN', '.')



### Evaluate performance

The model.viterbi call in simplify_decoding will return None if the HMM raises an error (for example, if a test sentence contains a word that is out of vocabulary for the training set). Any exception counts the full sentence as an error (which makes this a conservative estimate).

In [28]:
def accuracy(X, Y, model):
    
    correct = total_predictions = 0
    for observations, actual_tags in zip(X, Y):
        try:
            most_likely_tags = simplify_decoding(observations, model)
            correct += sum( pred == tag for pred, tag in zip(most_likely_tags, actual_tags))
        except:
            pass
        total_predictions += len(observations)
    return correct / total_predictions

In [34]:
mfc_training_acc = accuracy(data.training_set.X, data.training_set.Y, mfc_model)
print(f"training accuracy mfc_model: {mfc_training_acc*100 :.2f}")

mfc_testing_acc = accuracy(data.testing_set.X, data.testing_set.Y, mfc_model)
print(f"testing accuracy mfc_model: {mfc_testing_acc*100 :.2f}")

training accuracy mfc_model: 95.72
testing accuracy mfc_model: 93.01


## Build HMM Tagger

In [36]:
def unigram_counts(sequences):
    return Counter(sequences)

tags = [tag for i, (word, tag) in enumerate(data.training_set.stream())]
tag_unigrams = unigram_counts(tags)

def bigram_counts(sequences):
    d = Counter(sequences)
    return d

tags = [tag for i, (word, tag) in enumerate(data.stream())]
o = [(tags[i],tags[i+1]) for i in range(0,len(tags)-2,2)]
tag_bigrams = bigram_counts(o)

def starting_counts(sequences):
    d = Counter(sequences)
    return d

start_tag = [i[0] for i in data.Y]
tag_starts = starting_counts(start_tag)

def ending_counts(sequences):
    d = Counter(sequences)
    return d

end_tag = [i[len(i)-1] for i in data.Y]
tag_ends = ending_counts(end_tag)

In [37]:
basic_model = HiddenMarkovModel(name="base-hmm-tagger")

tags = [tag for i, (word, tag) in enumerate(data.stream())]
words = [word for i, (word, tag) in enumerate(data.stream())]

tags_count=unigram_counts(tags)
tag_words_count=pair_counts(tags,words)

starting_tag_list=[i[0] for i in data.Y]
ending_tag_list=[i[-1] for i in data.Y]

starting_tag_count=starting_counts(starting_tag_list)#the number of times a tag occured at the start
ending_tag_count=ending_counts(ending_tag_list)      #the number of times a tag occured at the end



to_pass_states = []
for tag, words_dict in tag_words_count.items():
    total = float(sum(words_dict.values()))
    distribution = {word: count/total for word, count in words_dict.items()}
    tag_emissions = DiscreteDistribution(distribution)
    tag_state = State(tag_emissions, name=tag)
    to_pass_states.append(tag_state)


basic_model.add_states()    
    

start_prob={}

for tag in tags:
    start_prob[tag]=starting_tag_count[tag]/tags_count[tag]

for tag_state in to_pass_states :
    basic_model.add_transition(basic_model.start,tag_state,start_prob[tag_state.name])    

end_prob={}

for tag in tags:
    end_prob[tag]=ending_tag_count[tag]/tags_count[tag]
for tag_state in to_pass_states :
    basic_model.add_transition(tag_state,basic_model.end,end_prob[tag_state.name])
    


transition_prob_pair={}

for key in tag_bigrams.keys():
    transition_prob_pair[key]=tag_bigrams.get(key)/tags_count[key[0]]
for tag_state in to_pass_states :
    for next_tag_state in to_pass_states :
        basic_model.add_transition(tag_state,next_tag_state,transition_prob_pair[(tag_state.name,next_tag_state.name)])

basic_model.bake()

### Make predictions

In [43]:
for key in data.testing_set.keys[:2]:
    predict_pos_tags(key)

Sentence Key: b100-28144
Predicted labels:
----------------------------------
['CONJ', 'NOUN', 'NUM', '.', 'NOUN', 'NUM', '.', 'NOUN', 'NUM', '.', 'CONJ', 'NOUN', 'NUM', '.', '.', 'NOUN', '.', '.']
Actual labels:
-------------------------------
('CONJ', 'NOUN', 'NUM', '.', 'NOUN', 'NUM', '.', 'NOUN', 'NUM', '.', 'CONJ', 'NOUN', 'NUM', '.', '.', 'NOUN', '.', '.')

Sentence Key: b100-23146
Predicted labels:
----------------------------------
['PRON', 'VERB', 'DET', 'NOUN', 'ADP', 'ADJ', 'ADJ', 'NOUN', 'VERB', 'VERB', '.', 'ADP', 'VERB', 'DET', 'NOUN', 'ADP', 'NOUN', 'ADP', 'DET', 'NOUN', '.']
Actual labels:
-------------------------------
('PRON', 'VERB', 'DET', 'NOUN', 'ADP', 'ADJ', 'ADJ', 'NOUN', 'VERB', 'VERB', '.', 'ADP', 'VERB', 'DET', 'NOUN', 'ADP', 'NOUN', 'ADP', 'DET', 'NOUN', '.')



### Evaluate Performance

In [39]:
hmm_training_acc = accuracy(data.training_set.X, data.training_set.Y, basic_model)
print(f"training accuracy basic hmm model: {hmm_training_acc*100 :.2f}")

hmm_testing_acc = accuracy(data.testing_set.X, data.testing_set.Y, basic_model)
print(f"testing accuracy basic hmm model: {hmm_testing_acc*100 :.2f}")

training accuracy basic hmm model: 97.49
testing accuracy basic hmm model: 96.09
