# Translation models using MFC and HMM Models

In [124]:
!pip install pomegranate



In [125]:
import matplotlib.pyplot as plt
import numpy as np
from IPython.core.display import HTML
from itertools import chain
from collections import Counter, defaultdict, namedtuple, OrderedDict
from pomegranate import State, HiddenMarkovModel, DiscreteDistribution
import os
from io import BytesIO
from itertools import chain
import random

# Functions to read data sets

In [126]:
def read_data(filename):
    """Read tagged sentence data"""
    with open(filename, 'r', encoding="UTF-8") as f:
        sentence_lines = [l.split("\n") for l in f.read().split("\n\n")]
    return OrderedDict(((s[0], Sentence(*zip(*[l.strip().split("\t")
                        for l in s[1:]]))) for s in sentence_lines if s[0]))

def read_tags(filename):
    """Read a list of word tag classes"""
    with open(filename, 'r', encoding="UTF-8") as f:
        tags = f.read().split("\n")
    return frozenset(tags)

Sentence = namedtuple("Sentence", "words tags")

# Read text files containing data

In [127]:
tagset = read_tags("data/Final-Tags.txt")
sentences = read_data("data/final-docs.txt")

In [128]:
class Dataset(namedtuple("_Dataset", "sentences keys vocab X tagset Y training_set testing_set N stream")):
    def __new__(cls, tagfile, datafile, train_test_split=0.8, seed=112890):
        tagset = read_tags(tagfile)
        sentences = read_data(datafile)
        keys = tuple(sentences.keys())
        wordset = frozenset(chain(*[s.words for s in sentences.values()]))
        word_sequences = tuple([sentences[k].words for k in keys])
        tag_sequences = tuple([sentences[k].tags for k in keys])
        N = sum(1 for _ in chain(*(s.words for s in sentences.values())))
        
        # split data into train/test sets
        _keys = list(keys)
        if seed is not None: random.seed(seed)
        random.shuffle(_keys)
        split = int(train_test_split * len(_keys))
        training_data = Subset(sentences, _keys[:split])
        testing_data = Subset(sentences, _keys[split:])
        stream = tuple(zip(chain(*word_sequences), chain(*tag_sequences)))
        return super().__new__(cls, dict(sentences), keys, wordset, word_sequences, tagset,
                               tag_sequences, training_data, testing_data, N, stream.__iter__)

    def __len__(self):
        return len(self.sentences)

    def __iter__(self):
        return iter(self.sentences.items())
    
    
class Subset(namedtuple("BaseSet", "sentences keys vocab X tagset Y N stream")):
    def __new__(cls, sentences, keys):
        word_sequences = tuple([sentences[k].words for k in keys])
        tag_sequences = tuple([sentences[k].tags for k in keys])
        wordset = frozenset(chain(*word_sequences))
        tagset = frozenset(chain(*tag_sequences))
        N = sum(1 for _ in chain(*(sentences[k].words for k in keys)))
        stream = tuple(zip(chain(*word_sequences), chain(*tag_sequences)))
        return super().__new__(cls, {k: sentences[k] for k in keys}, keys, wordset, word_sequences,
                               tagset, tag_sequences, N, stream.__iter__)

    def __len__(self):
        return len(self.sentences)

    def __iter__(self):
        return iter(self.sentences.items())

In [129]:
data = Dataset("data/Final-Tags.txt", "data/final-docs.txt", train_test_split=0.8)

print("There are {} sentences in the corpus.".format(len(data)))
print("There are {} sentences in the training set.".format(len(data.training_set)))
print("There are {} sentences in the testing set.".format(len(data.testing_set)))

assert len(data) == len(data.training_set) + len(data.testing_set), \
       "The number of sentences in the training set + testing set should sum to the number of sentences in the corpus"

There are 116 sentences in the corpus.
There are 92 sentences in the training set.
There are 24 sentences in the testing set.


In [130]:
print("There are a total of {} samples of {} unique words in the corpus."
      .format(data.N, len(data.vocab)))
print("There are {} samples of {} unique words in the training set."
      .format(data.training_set.N, len(data.training_set.vocab)))
print("There are {} samples of {} unique words in the testing set."
      .format(data.testing_set.N, len(data.testing_set.vocab)))
print("There are {} words in the test set that are missing in the training set."
      .format(len(data.testing_set.vocab - data.training_set.vocab)))

assert data.N == data.training_set.N + data.testing_set.N, \
       "The number of training + test samples should sum to the total number of samples"

There are a total of 236 samples of 55 unique words in the corpus.
There are 188 samples of 50 unique words in the training set.
There are 48 samples of 20 unique words in the testing set.
There are 5 words in the test set that are missing in the training set.


In [131]:
# accessing words with Dataset.X and tags with Dataset.Y 
for i in range(2):    
    print("Sentence {}:".format(i + 1), data.X[i])
    print()
    print("Labels {}:".format(i + 1), data.Y[i])
    print()

Sentence 1: ('वाह!', '.')

Labels 1: ('Wow!', '.')

Sentence 2: ('बचाओ!', '.')

Labels 2: ('Help!', '.')



In [132]:
class Dataset(namedtuple("_Dataset", "sentences keys vocab X tagset Y training_set testing_set N stream")):
    def __new__(cls, tagfile, datafile, train_test_split=0.8, seed=112890):
        tagset = read_tags(tagfile)
        sentences = read_data(datafile)
        keys = tuple(sentences.keys())
        wordset = frozenset(chain(*[s.words for s in sentences.values()]))
        word_sequences = tuple([sentences[k].words for k in keys])
        tag_sequences = tuple([sentences[k].tags for k in keys])
        N = sum(1 for _ in chain(*(s.words for s in sentences.values())))
        
        # split data into train/test sets
        _keys = list(keys)
        if seed is not None: random.seed(seed)
        random.shuffle(_keys)
        split = int(train_test_split * len(_keys))
        training_data = Subset(sentences, _keys[:split])
        testing_data = Subset(sentences, _keys[split:])
        stream = tuple(zip(chain(*word_sequences), chain(*tag_sequences)))
        return super().__new__(cls, dict(sentences), keys, wordset, word_sequences, tagset,
                               tag_sequences, training_data, testing_data, N, stream.__iter__)

    def __len__(self):
        return len(self.sentences)

    def __iter__(self):
        return iter(self.sentences.items())
    
    
class Subset(namedtuple("BaseSet", "sentences keys vocab X tagset Y N stream")):
    def __new__(cls, sentences, keys):
        word_sequences = tuple([sentences[k].words for k in keys])
        tag_sequences = tuple([sentences[k].tags for k in keys])
        wordset = frozenset(chain(*word_sequences))
        tagset = frozenset(chain(*tag_sequences))
        N = sum(1 for _ in chain(*(sentences[k].words for k in keys)))
        stream = tuple(zip(chain(*word_sequences), chain(*tag_sequences)))
        return super().__new__(cls, {k: sentences[k] for k in keys}, keys, wordset, word_sequences,
                               tagset, tag_sequences, N, stream.__iter__)

    def __len__(self):
        return len(self.sentences)

    def __iter__(self):
        return iter(self.sentences.items())

In [133]:
data = Dataset("data/Final-Tags.txt", "data/final-docs.txt", train_test_split=0.8)

print("There are {} sentences in the corpus.".format(len(data)))
print("There are {} sentences in the training set.".format(len(data.training_set)))
print("There are {} sentences in the testing set.".format(len(data.testing_set)))

assert len(data) == len(data.training_set) + len(data.testing_set), \
       "The number of sentences in the training set + testing set should sum to the number of sentences in the corpus"

There are 116 sentences in the corpus.
There are 92 sentences in the training set.
There are 24 sentences in the testing set.


In [134]:
print("There are a total of {} samples of {} unique words in the corpus."
      .format(data.N, len(data.vocab)))
print("There are {} samples of {} unique words in the training set."
      .format(data.training_set.N, len(data.training_set.vocab)))
print("There are {} samples of {} unique words in the testing set."
      .format(data.testing_set.N, len(data.testing_set.vocab)))
print("There are {} words in the test set that are missing in the training set."
      .format(len(data.testing_set.vocab - data.training_set.vocab)))

assert data.N == data.training_set.N + data.testing_set.N, \
       "The number of training + test samples should sum to the total number of samples"

There are a total of 236 samples of 55 unique words in the corpus.
There are 188 samples of 50 unique words in the training set.
There are 48 samples of 20 unique words in the testing set.
There are 5 words in the test set that are missing in the training set.


In [135]:
for i in range(2):    
    print("Sentence {}:".format(i + 1), data.X[i])
    print()
    print("Labels {}:".format(i + 1), data.Y[i])
    print()

Sentence 1: ('वाह!', '.')

Labels 1: ('Wow!', '.')

Sentence 2: ('बचाओ!', '.')

Labels 2: ('Help!', '.')



In [136]:
print("\nStream (word, tag) pairs:\n")
for i, pair in enumerate(data.stream()):
    print("\t", pair)
    if i > 3: break


Stream (word, tag) pairs:

	 ('वाह!', 'Wow!')
	 ('.', '.')
	 ('बचाओ!', 'Help!')
	 ('.', '.')
	 ('उछलो.', 'Jump.')


In [137]:
words = [word for i, (word, tag) in enumerate(data.training_set.stream())]
tags = [tag for i, (word, tag) in enumerate(data.training_set.stream())]
words[0:4], tags[0:4]

(['बहुत बढ़िया!', '.', 'भयानक', '.'], ['Awesome!', '.', 'Awesome!', '.'])

In [138]:
def pair_counts(tags, words):
    d = defaultdict(lambda: defaultdict(int))
    for tag, word in zip(tags, words):
        d[tag][word] += 1
    return d
        
word_counts = pair_counts(words, tags)

In [139]:
mfc_table = dict((word, max(tags.keys(), key=lambda key: tags[key])) for word, tags in word_counts.items())

In [140]:
i = 0
for key, value in mfc_table.items():
    print(key, value)
    i += 1
    if i > 3: break

बहुत बढ़िया! Awesome!
. .
भयानक Awesome!
कूद. Jump.


In [141]:
FakeState = namedtuple('FakeState', 'name')

class MFCTagger:
    missing = FakeState(name = '<MISSING>')
    
    def __init__(self, table):
        self.table = defaultdict(lambda: MFCTagger.missing)
        self.table.update({word: FakeState(name=tag) for word, tag in table.items()})
        
    def viterbi(self, seq):
        """This method simplifies predictions by matching the Pomegranate viterbi() interface"""
        return 0., list(enumerate(["<start>"] + [self.table[w] for w in seq] + ["<end>"]))

In [142]:
mfc_model = MFCTagger(mfc_table)

In [143]:
def replace_unknown(sequence):
    return [w if w in data.training_set.vocab else 'nan' for w in sequence]

def simplify_decoding(X, model):    
    _, state_path = model.viterbi(replace_unknown(X))
    return [state[1].name for state in state_path[1:-1]]

In [144]:
for key in data.testing_set.keys[:3]:
    print("Sentence Key: {}\n".format(key))
    print("Sentence: {}\n".format(data.sentences[key].words))
    print("Predicted labels:\n-----------------")
    print(simplify_decoding(data.sentences[key].words, mfc_model))
    print()
    print("Actual labels:\n--------------")
    print(data.sentences[key].tags)
    print("\n")

Sentence Key: B100-0020

Sentence: ('कूद.', '.')

Predicted labels:
-----------------
['Jump.', '.']

Actual labels:
--------------
('Jump.', '.')


Sentence Key: B100-00902

Sentence: ('समझे कि नहीं?', '.')

Predicted labels:
-----------------
['Got it?', '.']

Actual labels:
--------------
('Got it?', '.')


Sentence Key: B100-0152

Sentence: ('ख़ुदा हाफ़िज़।', '.')

Predicted labels:
-----------------
['Goodbye!', '.']

Actual labels:
--------------
('Goodbye!', '.')




In [145]:
def accuracy(X, Y, model):
    
    correct = total_predictions = 0
    for observations, actual_tags in zip(X, Y):
        
        # The model.viterbi call in simplify_decoding will return None if the HMM
        # raises an error (for example, if a test sentence contains a word that
        # is out of vocabulary for the training set). Any exception counts the
        # full sentence as an error (which makes this a conservative estimate).
        try:
            most_likely_tags = simplify_decoding(observations, model)
            correct += sum(p == t for p, t in zip(most_likely_tags, actual_tags))
        except:
            pass
        total_predictions += len(observations)
    return correct / total_predictions

# MFC Model Accuracy

In [146]:
mfc_training_acc = accuracy(data.training_set.X, data.training_set.Y, mfc_model)
print("training accuracy mfc_model: {:.2f}%".format(100 * mfc_training_acc))

mfc_testing_acc = accuracy(data.testing_set.X, data.testing_set.Y, mfc_model)
print("testing accuracy mfc_model: {:.2f}%".format(100 * mfc_testing_acc))

training accuracy mfc_model: 98.94%
testing accuracy mfc_model: 89.58%


# Hidden Markov Model (HMM)

In [None]:
def unigram_counts(sequences):
    return Counter(sequences)

tags = [tag for i, (word, tag) in enumerate(data.training_set.stream())]
tag_unigrams = unigram_counts(tags)
tag_unigrams

In [148]:
def bigram_counts(sequences):
    return Counter(sequences)

tags = [tag for i, (word, tag) in enumerate(data.stream())]
o = [(tags[i],tags[i+1]) for i in range(0,len(tags)-2,2)]
tag_bigrams = bigram_counts(o)
tag_bigrams 

Counter({('Wow!', '.'): 2,
         ('Help!', '.'): 2,
         ('Jump.', '.'): 8,
         ('Hello!', '.'): 5,
         ('Cheers!', '.'): 4,
         ('Got it?', '.'): 3,
         ("I'm OK.", '.'): 3,
         ('Awesome!', '.'): 2,
         ('Come in.', '.'): 3,
         ('Get out!', '.'): 2,
         ('Go away!', '.'): 2,
         ('Goodbye!', '.'): 2,
         ("What's your name?", '.'): 1,
         ("Don't touch it.", '.'): 1,
         ('Happy New Year!', '.'): 1,
         ('Clean the room.', '.'): 1,
         ("Don't touch it.", 'Get out of bed!'): 1,
         ('.', 'I like history.'): 1,
         ('I like the dog.', 'I must buy one.'): 1,
         ('.', 'Wow!'): 4,
         ('.', 'Help!'): 4,
         ('.', 'Jump.'): 10,
         ('.', 'Hello!'): 7,
         ('.', 'Cheers!'): 8,
         ('.', 'I feel nauseous.'): 1,
         ('I go every year.', '.'): 1,
         ('Congratulations!', 'Congratulations!'): 1,
         ('.', 'Got it?'): 3,
         ('.', "I'm OK."): 3,
         ('.

In [149]:
def starting_counts(sequences):
    return Counter(sequences)

tags = [tag for i, (word, tag) in enumerate(data.stream())]
starts_tag = [i[0] for i in data.Y]
tag_starts = starting_counts(starts_tag)
tag_starts

Counter({'Wow!': 6,
         'Help!': 6,
         'Jump.': 18,
         'Hello!': 12,
         'Cheers!': 12,
         'Got it?': 6,
         "I'm OK.": 6,
         'Awesome!': 6,
         'Come in.': 6,
         'Get out!': 6,
         'Go away!': 6,
         'Goodbye!': 6,
         "What's your name?": 1,
         "Don't touch it.": 2,
         'Happy New Year!': 1,
         'Clean the room.': 1,
         'I like history.': 1,
         'I feel nauseous.': 1,
         'Congratulations!': 1,
         'I want a guitar.': 1,
         "I'm a good cook.": 2,
         "I'm lucky today.": 1,
         'Is anybody here?': 1,
         "I don't know him.": 1,
         'I have a problem.': 1,
         'I have to go now.': 1,
         'I saw him running.': 1,
         'I truly loved her.': 1,
         'I listen to music.': 1,
         "I haven't met him.": 1})

In [150]:
def ending_counts(sequences):    
    return Counter(sequences)

end_tag = [i[len(i)-1] for i in data.Y]
tag_ends = ending_counts(end_tag)
tag_ends

Counter({'.': 114, 'I have to go now.': 1, "I haven't met him.": 1})

In [151]:
end_tag = [i[len(i)-2] for i in data.Y]
tag_ends = ending_counts(end_tag)
tag_ends

Counter({'Wow!': 6,
         'Help!': 6,
         'Jump.': 18,
         'Hello!': 12,
         'Cheers!': 12,
         'Got it?': 6,
         "I'm OK.": 6,
         'Awesome!': 6,
         'Come in.': 6,
         'Get out!': 6,
         'Go away!': 6,
         'Goodbye!': 6,
         "What's your name?": 1,
         "Don't touch it.": 1,
         'Happy New Year!': 1,
         'Clean the room.': 1,
         'Get out of bed!': 1,
         'I must buy one.': 1,
         'I go every year.': 1,
         'Congratulations!': 1,
         'I was assaulted.': 1,
         "I'm a good cook.": 2,
         "I'm lucky today.": 1,
         'Is anybody here?': 1,
         "I don't know him.": 1,
         'I have a problem.': 1,
         'I have to go now.': 1,
         'I saw him running.': 1,
         'I truly loved her.': 1,
         'I listen to music.': 1,
         "I haven't met him.": 1})

In [152]:
hmm_model = HiddenMarkovModel(name="base-hmm-tagger")

tags = [tag for i, (word, tag) in enumerate(data.stream())]
words = [word for i, (word, tag) in enumerate(data.stream())]

tags_count = unigram_counts(tags)
tag_words_count = pair_counts(tags, words)

starting_tag_list = [i[0] for i in data.Y]
#ending_tag_list = [i[-1] if len(i)==1 else i[-2] for i in data.Y]
#ending_tag_list = [i[-1] for i in data.Y]
ending_tag_list = [i[len(i)-2] for i in data.Y]

starting_tag_count = starting_counts(starting_tag_list) #the number of times a tag occured at the start
ending_tag_count = ending_counts(ending_tag_list)       #the number of times a tag occured at the end

tag_words_count

defaultdict(<function __main__.pair_counts.<locals>.<lambda>()>,
            {'Wow!': defaultdict(int, {'वाह!': 4, 'सनसनीखेज कामयाबी': 2}),
             '.': defaultdict(int, {'.': 114}),
             'Help!': defaultdict(int, {'बचाओ!': 4, 'मदद!': 2}),
             'Jump.': defaultdict(int,
                         {'उछलो.': 2,
                          'कूदो.': 4,
                          'छलांग.': 4,
                          'कूद.': 2,
                          'जम्प.': 2,
                          'एकाएक दाम की वृद्धि.': 2,
                          'लपक.': 2}),
             'Hello!': defaultdict(int,
                         {'नमस्ते।': 4,
                          'नमस्कार।': 4,
                          'हलो!': 2,
                          'नमस्कार!': 2}),
             'Cheers!': defaultdict(int, {'वाह-वाह!': 6, 'चियर्स!': 6}),
             'Got it?': defaultdict(int, {'समझे कि नहीं?': 4, 'समझ गया?': 2}),
             "I'm OK.": defaultdict(int, {'मैं ठीक हूँ।': 6}),
          

In [153]:
ending_tag_list

['Wow!',
 'Help!',
 'Jump.',
 'Jump.',
 'Jump.',
 'Hello!',
 'Hello!',
 'Cheers!',
 'Cheers!',
 'Got it?',
 "I'm OK.",
 'Awesome!',
 'Come in.',
 'Get out!',
 'Go away!',
 'Goodbye!',
 "What's your name?",
 "Don't touch it.",
 'Happy New Year!',
 'Clean the room.',
 'Get out of bed!',
 'I must buy one.',
 'Wow!',
 'Help!',
 'Jump.',
 'Jump.',
 'Jump.',
 'Hello!',
 'Hello!',
 'Cheers!',
 'Cheers!',
 'Cheers!',
 'I go every year.',
 'Got it?',
 "I'm OK.",
 'Awesome!',
 'Come in.',
 'Get out!',
 'Go away!',
 'Goodbye!',
 'Wow!',
 'Help!',
 'Jump.',
 'Jump.',
 'Jump.',
 'Hello!',
 'Congratulations!',
 'Hello!',
 'Cheers!',
 'Got it?',
 "I'm OK.",
 'Awesome!',
 'Awesome!',
 'Come in.',
 'Get out!',
 'Get out!',
 'Go away!',
 'Goodbye!',
 'Wow!',
 'Help!',
 'Jump.',
 'I was assaulted.',
 'Jump.',
 'Jump.',
 'Hello!',
 'Hello!',
 "I'm a good cook.",
 "I'm a good cook.",
 'Cheers!',
 'Cheers!',
 'Got it?',
 "I'm lucky today.",
 'Is anybody here?',
 "I'm OK.",
 'Come in.',
 "I don't know him.",

In [158]:
to_pass_states = []
for tag, words_dict in tag_words_count.items():
    total = float(sum(words_dict.values()))
    distribution = {word: count/total for word, count in words_dict.items()}
    tag_emissions = DiscreteDistribution(distribution)
    tag_state = State(tag_emissions, name=tag)
    to_pass_states.append(tag_state)

In [159]:
to_pass_states

[{
     "class" : "State",
     "distribution" : {
         "class" : "Distribution",
         "dtype" : "str",
         "name" : "DiscreteDistribution",
         "parameters" : [
             {
                 "\u0935\u093e\u0939!" : 0.6666666666666666,
                 "\u0938\u0928\u0938\u0928\u0940\u0916\u0947\u091c \u0915\u093e\u092e\u092f\u093e\u092c\u0940" : 0.3333333333333333
             }
         ],
         "frozen" : false
     },
     "name" : "Wow!",
     "weight" : 1.0
 }, {
     "class" : "State",
     "distribution" : {
         "class" : "Distribution",
         "dtype" : "str",
         "name" : "DiscreteDistribution",
         "parameters" : [
             {
                 "." : 1.0
             }
         ],
         "frozen" : false
     },
     "name" : ".",
     "weight" : 1.0
 }, {
     "class" : "State",
     "distribution" : {
         "class" : "Distribution",
         "dtype" : "str",
         "name" : "DiscreteDistribution",
         "parameters" : [
 

In [160]:
hmm_model.add_states() 

In [161]:
start_prob={}

for tag in tags:
    start_prob[tag] = starting_tag_count[tag] / tags_count[tag]

for tag_state in to_pass_states :
    hmm_model.add_transition(hmm_model.start, tag_state, start_prob[tag_state.name])  

In [162]:
end_prob={}

for tag in tags:
    end_prob[tag] = ending_tag_count[tag]/tags_count[tag]
    
for tag_state in to_pass_states :
    hmm_model.add_transition(tag_state, hmm_model.end, end_prob[tag_state.name])

In [None]:
transition_prob_pair={}

for key in tag_bigrams.keys():
    transition_prob_pair[key] = tag_bigrams.get(key)/tags_count[key[0]]
    
for tag_state in to_pass_states:
    for next_tag_state in to_pass_states:
        hmm_model.add_transition(tag_state, next_tag_state, transition_prob_pair[(tag_state.name, next_tag_state.name)])

# Bake the HMM

In [164]:
hmm_model.bake()

# HMM Accuracy

In [181]:
hmm_model

{
    "class" : "HiddenMarkovModel",
    "name" : "base-hmm-tagger",
    "start" : {
        "class" : "State",
        "distribution" : null,
        "name" : "base-hmm-tagger-start",
        "weight" : 1.0
    },
    "end" : {
        "class" : "State",
        "distribution" : null,
        "name" : "base-hmm-tagger-end",
        "weight" : 1.0
    },
    "states" : [
        {
            "class" : "State",
            "distribution" : {
                "class" : "Distribution",
                "dtype" : "str",
                "name" : "DiscreteDistribution",
                "parameters" : [
                    {
                        "." : 1.0
                    }
                ],
                "frozen" : false
            },
            "name" : ".",
            "weight" : 1.0
        },
        {
            "class" : "State",
            "distribution" : {
                "class" : "Distribution",
                "dtype" : "str",
                "name" : "DiscreteDistrib

In [None]:
hmm_training_acc = accuracy(data.training_set.X, data.training_set.Y, hmm_model)
print("training accuracy basic hmm model: {:.2f}%".format(100 * hmm_training_acc))

hmm_testing_acc = accuracy(data.testing_set.X, data.testing_set.Y, hmm_model)
print("testing accuracy basic hmm model: {:.2f}%".format(100 * hmm_testing_acc))