In [1]:
import matplotlib.pyplot as plt
import numpy as np
from IPython.core.display import HTML
from itertools import chain
from collections import Counter, defaultdict, namedtuple, OrderedDict
from pomegranate import State, HiddenMarkovModel, DiscreteDistribution
import os
from io import BytesIO
from itertools import chain
import random

In [3]:
Test_Sentence = namedtuple("Sentence", "words")

def getMeTestSentences(data):
    sentences = []
    for key in data:
        sentence = []
        for val in zip(data[key].words):
            sentence.append(val)
        sentences.append(sentence)
    return sentences

def read_test_data(filename):
    """Read tagged sentence data"""
    with open(filename, 'r') as f:
        sentence_lines = [l.split("\n") for l in f.read().split("\n\n")]
        index = 1
        a = OrderedDict()
        for s in sentence_lines:
            temp = []
            for l in s:
                temp.append(l.strip().split("\t")[1:])   
            temp2 = []
            for val in temp:
                if len(val) == 1:
                    temp2.append(val[0])
                          
            a[index] = Test_Sentence(tuple(temp2))
            index += 1
        return a
        
class TestDataset(namedtuple("_TDataset", "sentences keys vocab X N")):
    def __new__(cls, tagfile, datafile, train_test_split=0.8, seed=112890):
        sentences = read_test_data(datafile)
        keys = tuple(sentences.keys())
        wordset = frozenset(chain(*[s.words for s in sentences.values()]))
        word_sequences = tuple([sentences[k].words for k in keys])
        N = sum(1 for _ in chain(*(s.words for s in sentences.values())))
        
        return super().__new__(cls, dict(sentences), keys, wordset, word_sequences,N)

    def __len__(self):
        return len(self.sentences)

    def __iter__(self):
        return iter(self.sentences.items())

In [5]:
Sentence = namedtuple("Sentence", "words tags")

def read_data(filename):
    """Read tagged sentence data"""
    with open("S21-gene-train.txt", 'r') as f:
        sentence_lines = [l.split("\n") for l in f.read().split("\n\n")]
        index = 1
        a = OrderedDict()
        for s in sentence_lines:
            # print(s)
            temp = []
            for l in s:
                temp.append(l.strip().split("\t")[1:])
            
            temp2 = []
            temp3 = []
            for val in temp:
                if len(val) == 2:
                    temp2.append(val[0])
                    temp3.append(val[1])
                          
            a[index] = Sentence(tuple(temp2),tuple(temp3))
            index += 1
        return a

def read_tags(filename):
    """Read a list of word tag classes"""
    with open(filename, 'r') as f:
        tags = f.read().split("\n")
    return frozenset(tags)

class Subset(namedtuple("BaseSet", "sentences keys vocab X tagset Y N stream")):
    def __new__(cls, sentences, keys):
        word_sequences = tuple([sentences[k].words for k in keys])
        tag_sequences = tuple([sentences[k].tags for k in keys])
        wordset = frozenset(chain(*word_sequences))
        tagset = frozenset(chain(*tag_sequences))
        N = sum(1 for _ in chain(*(sentences[k].words for k in keys)))
        stream = tuple(zip(chain(*word_sequences), chain(*tag_sequences)))
        return super().__new__(cls, {k: sentences[k] for k in keys}, keys, wordset, word_sequences,
                               tagset, tag_sequences, N, stream.__iter__)

    def __len__(self):
        return len(self.sentences)

    def __iter__(self):
        return iter(self.sentences.items())

class Dataset(namedtuple("_Dataset", "sentences keys vocab X tagset Y training_set testing_set N stream")):
    def __new__(cls, tagfile, datafile, train_test_split=0.8, seed=112890):
        tagset = read_tags(tagfile)
        sentences = read_data(datafile)
        keys = tuple(sentences.keys())
        wordset = frozenset(chain(*[s.words for s in sentences.values()]))
        word_sequences = tuple([sentences[k].words for k in keys])
        tag_sequences = tuple([sentences[k].tags for k in keys])
        N = sum(1 for _ in chain(*(s.words for s in sentences.values())))
        
        # split data into train/test sets
        _keys = list(keys)
        if seed is not None: random.seed(seed)
        random.shuffle(_keys)
        split = int(train_test_split * len(_keys))
        training_data = Subset(sentences, _keys[:split])
        testing_data = Subset(sentences, _keys[split:])
        stream = tuple(zip(chain(*word_sequences), chain(*tag_sequences)))
        return super().__new__(cls, dict(sentences), keys, wordset, word_sequences, tagset,
                               tag_sequences, training_data, testing_data, N, stream.__iter__)

    def __len__(self):
        return len(self.sentences)

    def __iter__(self):
        return iter(self.sentences.items())

In [6]:
data = Dataset("tags-universal.txt", "brown-universal.txt", train_test_split=0.8)

print("There are {} sentences in the corpus.".format(len(data)))
print("There are {} sentences in the training set.".format(len(data.training_set)))
print("There are {} sentences in the testing set.".format(len(data.testing_set)))

assert len(data) == len(data.training_set) + len(data.testing_set), \
       "The number of sentences in the training set + testing set should sum to the number of sentences in the corpus"

There are 13796 sentences in the corpus.
There are 11036 sentences in the training set.
There are 2760 sentences in the testing set.


In [7]:
key = 10
print("Sentence: {}".format(key))
print("words:\n\t{!s}".format(data.sentences[key].words))
print("tags:\n\t{!s}".format(data.sentences[key].tags))

Sentence: 10
words:
	('The', 'variable', 'HMG', 'dosage', 'regimen', 'was', 'found', 'to', 'offer', 'no', 'advantages', 'when', 'compared', 'with', 'our', 'standard', 'daily', 'dosage', 'regimen', '.')
tags:
	('O', 'O', 'B', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O')


In [8]:
print("There are a total of {} samples of {} unique words in the corpus."
      .format(data.N, len(data.vocab)))
print("There are {} samples of {} unique words in the training set."
      .format(data.training_set.N, len(data.training_set.vocab)))
print("There are {} samples of {} unique words in the testing set."
      .format(data.testing_set.N, len(data.testing_set.vocab)))
print("There are {} words in the test set that are missing in the training set."
      .format(len(data.testing_set.vocab - data.training_set.vocab)))

assert data.N == data.training_set.N + data.testing_set.N, \
       "The number of training + test samples should sum to the total number of samples"

There are a total of 386201 samples of 31328 unique words in the corpus.
There are 309830 samples of 27563 unique words in the training set.
There are 76371 samples of 11825 unique words in the testing set.
There are 3765 words in the test set that are missing in the training set.


In [9]:
# accessing words with Dataset.X and tags with Dataset.Y 
for i in range(2):    
    print("Sentence {}:".format(i + 1), data.X[i])
    print()
    print("Labels {}:".format(i + 1), data.Y[i])
    print()

Sentence 1: ('Comparison', 'with', 'alkaline', 'phosphatases', 'and', '5', '-', 'nucleotidase', '.')

Labels 1: ('O', 'O', 'B', 'I', 'O', 'B', 'I', 'I', 'O')

Sentence 2: ('Pharmacologic', 'aspects', 'of', 'neonatal', 'hyperbilirubinemia', '.')

Labels 2: ('O', 'O', 'O', 'O', 'O', 'O')



In [10]:
# use Dataset.stream() (word, tag) samples for the entire corpus
print("\nStream (word, tag) pairs:\n")
for i, pair in enumerate(data.stream()):
    print("\t", pair)
    if i > 3: break


Stream (word, tag) pairs:

	 ('Comparison', 'O')
	 ('with', 'O')
	 ('alkaline', 'B')
	 ('phosphatases', 'I')
	 ('and', 'O')


### Making Predictions with a Model

In [11]:
def replace_unknown(sequence):
    
    return [w if w in data.training_set.vocab else 'nan' for w in sequence]

def simplify_decoding(X, model):
    
    _, state_path = model.viterbi(replace_unknown(X))
    return [state[1].name for state in state_path[1:-1]]

### Build an HMM tagger

In [12]:
def unigram_counts(sequences):

    return Counter(sequences)

tags = [tag for i, (word, tag) in enumerate(data.training_set.stream())]
tag_unigrams = unigram_counts(tags)

### Bigram Counts

In [13]:
def bigram_counts(sequences):

    d = Counter(sequences)
    return d

tags = [tag for i, (word, tag) in enumerate(data.stream())]
o = [(tags[i],tags[i+1]) for i in range(0,len(tags)-2,2)]
tag_bigrams = bigram_counts(o)

### Sequence Starting Counts

In [14]:
def starting_counts(sequences):
    
    d = Counter(sequences)
    return d

tags = [tag for i, (word, tag) in enumerate(data.stream())]
starts_tag = [i[0] for i in data.Y]
tag_starts = starting_counts(starts_tag)

### Sequence Ending Counts

In [15]:
def ending_counts(sequences):
    
    d = Counter(sequences)
    return d

end_tag = [i[len(i)-1] for i in data.Y]
tag_ends = ending_counts(end_tag)

In [17]:
def pair_counts(tags, words):
    d = defaultdict(lambda: defaultdict(int))
    for tag, word in zip(tags, words):
        d[tag][word] += 1
        
    return d

### Basic HMM Tagger

In [18]:
basic_model = HiddenMarkovModel(name="base-hmm-tagger")

tags = [tag for i, (word, tag) in enumerate(data.stream())]
words = [word for i, (word, tag) in enumerate(data.stream())]

tags_count=unigram_counts(tags)
tag_words_count=pair_counts(tags,words)

starting_tag_list=[i[0] for i in data.Y]
ending_tag_list=[i[-1] for i in data.Y]

starting_tag_count=starting_counts(starting_tag_list)
ending_tag_count=ending_counts(ending_tag_list)      



to_pass_states = []
for tag, words_dict in tag_words_count.items():
    total = float(sum(words_dict.values()))
    distribution = {word: count/total for word, count in words_dict.items()}
    tag_emissions = DiscreteDistribution(distribution)
    tag_state = State(tag_emissions, name=tag)
    to_pass_states.append(tag_state)


basic_model.add_states()    
    

start_prob={}

for tag in tags:
    start_prob[tag]=starting_tag_count[tag]/tags_count[tag]

for tag_state in to_pass_states :
    basic_model.add_transition(basic_model.start,tag_state,start_prob[tag_state.name])    

end_prob={}

for tag in tags:
    end_prob[tag]=ending_tag_count[tag]/tags_count[tag]
for tag_state in to_pass_states :
    basic_model.add_transition(tag_state,basic_model.end,end_prob[tag_state.name])
    


transition_prob_pair={}

for key in tag_bigrams.keys():
    transition_prob_pair[key]=tag_bigrams.get(key)/tags_count[key[0]]
for tag_state in to_pass_states :
    for next_tag_state in to_pass_states :
        if (tag_state.name,next_tag_state.name) in transition_prob_pair:
            basic_model.add_transition(tag_state,next_tag_state,transition_prob_pair[(tag_state.name,next_tag_state.name)])
        else:
            basic_model.add_transition(tag_state,next_tag_state,0)


basic_model.bake()

In [20]:
def accuracy(X, Y, model):
    
    correct = total_predictions = 0
    for observations, actual_tags in zip(X, Y):
        
        # The model.viterbi call in simplify_decoding will return None if the HMM
        # raises an error (for example, if a test sentence contains a word that
        # is out of vocabulary for the training set).
        try:
            most_likely_tags = simplify_decoding(observations, model)
            correct += sum(p == t for p, t in zip(most_likely_tags, actual_tags))
        except:
            pass
        total_predictions += len(observations)
    return correct / total_predictions

In [21]:
hmm_training_acc = accuracy(data.training_set.X, data.training_set.Y, basic_model)
print("training accuracy basic hmm model: {:.2f}%".format(100 * hmm_training_acc))

hmm_testing_acc = accuracy(data.testing_set.X, data.testing_set.Y, basic_model)
print("testing accuracy basic hmm model: {:.2f}%".format(100 * hmm_testing_acc))



training accuracy basic hmm model: 96.41%
testing accuracy basic hmm model: 93.99%


### Example Decoding Sequences with the HMM Tagger

In [None]:
for key in data.testing_set.keys[:2]:
    print("Sentence Key: {}\n".format(key))
    print("Predicted labels:\n-----------------")
    print(simplify_decoding(data.sentences[key].words, basic_model))
    print()
    print("Actual labels:\n--------------")
    print(data.sentences[key].tags)
    print("\n")

In [24]:
with open('yoursystemoutput.txt', 'w') as f:
    for key in data.testing_set.sentences:
        for i,val in enumerate(zip(data.testing_set.sentences[key].words,simplify_decoding(data.sentences[key].words, basic_model))):
            f.write("\t".join([str(i+1),val[0],val[1]]) + "\n")
        f.write("\n")
        
with open('goldstandardfile.txt', 'w') as f:
    for key in data.testing_set.sentences:
        for i,val in enumerate(zip(data.testing_set.sentences[key].words,data.testing_set.sentences[key].tags)):
            f.write("\t".join([str(i+1),val[0],val[1]]) + "\n")
        f.write("\n")

In [25]:
test_data = TestDataset("tags-universal.txt", "S21-gene-test.txt")

In [28]:

with open('testFinal.txt', 'w') as f:
    k = 0
    for key in test_data.sentences:
        for i,val in enumerate(zip(test_data.sentences[key].words,simplify_decoding(test_data.sentences[key].words, basic_model))):
            f.write("\t".join([str(i+1),val[0],val[1]]) + "\n")
        f.write("\n")