In [1]:
from itertools import chain

import numpy as np
import sklearn
from collections import defaultdict, Counter
import scipy.stats
from sklearn.metrics import make_scorer,classification_report
from sklearn.model_selection import cross_val_score, RandomizedSearchCV, train_test_split, cross_val_predict
from pomegranate import State, HiddenMarkovModel, DiscreteDistribution

In [2]:
%%time

import pandas as pd

data = pd.read_csv('final_dataset.csv', encoding='latin1', low_memory=False, dtype={'Sentence #': str, 'Word': str, 'POS': str, 'Tag': str})
filled_data = data.fillna(method='ffill')

class SentenceGetter(object):
    def __init__(self, data):
        self.n_sent = 1
        self.data = data
        self.empty = False
        agg_func = lambda s: [(w, t) for w, t in zip(s["Word"].values.tolist(),
                                                    s["Tag"].values.tolist())]
        self.grouped = self.data.groupby("Sentence #").apply(agg_func)
        self.sentences = [s for s in self.grouped]
        self.length = int(len(self.sentences))
        
        # All Words group by sentence
        agg_func_word = lambda s: [w for w in s["Word"].values.tolist()]             
        self.grouped_word = self.data.groupby("Sentence #").apply(agg_func_word)
        self.sentences_words = [s for s in self.grouped_word]
        
        # All Tags group by sentence
        agg_func_tag = lambda s: [t for t in s["Tag"].values.tolist()]
        self.grouped_tag = self.data.groupby("Sentence #").apply(agg_func_tag)
        self.sentences_tags = [s for s in self.grouped_tag]
        
        # 80% Train 20% Test
        self.X_train, self.X_test, self.y_train, self.y_test = train_test_split(self.sentences_words, self.sentences_tags, test_size=0.2)
        

    def get_next(self):     
        try:
             s = self.grouped["Sentence: {}".format(self.n_sent)]
             self.n_sent += 1
             return s
        except:
             return None       

getter = SentenceGetter(filled_data)
sentences = getter.sentences
words = getter.sentences_words
tags = getter.sentences_tags

# X = Words, y = Tags
X_train = getter.X_train
X_test = getter.X_test
y_train = getter.y_train
y_test = getter.y_test

y_train_stream = []
y_test_stream = []
for train_st, test_st in zip(y_train, y_test):
    for train_t, test_t in zip(train_st, test_st):
        y_train_stream.append(train_t)
        y_test_stream.append(test_t)

X_train_stream = []
X_test_stream = []
for train_st, test_st in zip(X_train, X_test):
    for train_t, test_t in zip(train_st, test_st):
        X_train_stream.append(train_t)
        X_test_stream.append(test_t)

train_vocab = frozenset(X_train_stream)
print(len(X_train), len(X_test), len(y_train), len(y_test))

3248 812 3248 812
Wall time: 4.44 s


In [3]:
def pair_counts(tags, words):
    d = defaultdict(lambda: defaultdict(int))
    for tag, word in zip(tags, words):
        d[tag][word] += 1
    
    return d


all_single_words = [] # X
all_single_tags = [] # Y
for sentence in sentences:
    for i, (word, tag) in enumerate(sentence):
        all_single_words.append(word)
        all_single_tags.append(tag)

data_wt = [*zip(all_single_words, all_single_tags)] # data_wt = data.stream()

# print(words[0])
# X_train, X_test, y_train, y_test = train_test_split(words, tags, test_size=0.2)
# print(len(X_train), len(X_test), len(y_train), len(y_test))

In [4]:
def unigram_counts(seqs):
    return Counter(seqs)

def bigram_counts(seqs):
    d = Counter(seqs)
    return d

# tag_uni = [tag for i, (w, t) in enumerate(train_set)]
tag_unigrams = unigram_counts(y_train_stream)

tag_bi = [t for i, (w, t) in enumerate(data_wt)]
bi = [(tag_bi[i], tag_bi[i+1]) for i in range(0, len(tag_bi)-2, 2)]
tag_bigrams = bigram_counts(bi)
# print(tag_bigrams)

In [5]:
def starting_counts(seqs):
    d = Counter(seqs)
    return d

def ending_counts(seqs):
    d = Counter(seqs)
    return d

starting_tag_list = [i[0] for i in tags]
tag_starts = starting_counts(starting_tag_list)
ending_tag_list = [i[-1] for i in tags]
tag_ends = ending_counts(ending_tag_list)
print(tag_starts)
print(tag_ends)

Counter({'O': 3850, 'B-MAL': 154, 'B-CAP': 20, 'B-BEH': 18, 'B-DVEC': 16, 'B-OS': 2})
Counter({'O': 4054, 'B-MAL': 6})


In [6]:
hmm = HiddenMarkovModel(name="base-hmm-tagger")

all_single_words = [w for i, (w, t) in enumerate(data_wt)]
all_single_tags = [t for i, (w, t) in enumerate(data_wt)]

tags_count = unigram_counts(all_single_tags)
tag_words_count = pair_counts(all_single_tags, all_single_words)

starting_tag_count = starting_counts(starting_tag_list)
ending_tag_count = ending_counts(ending_tag_list)

to_pass_states = []

for tag, words_dict in tag_words_count.items():
    total = float(sum(words_dict.values()))
    distribution = {word: count/total for word, count in words_dict.items()}
    tag_emissions = DiscreteDistribution(distribution)
    tag_state = State(tag_emissions, name=tag)
    to_pass_states.append(tag_state)

hmm.add_states()

start_prob={}

for tag in all_single_tags:
    start_prob[tag] = starting_tag_count[tag]/tags_count[tag]
print(start_prob)
for tag_state in to_pass_states:
    hmm.add_transition(hmm.start, tag_state, start_prob[tag_state.name])

end_prob = {}

for tag in all_single_tags:
    end_prob[tag] = ending_tag_count[tag]/tags_count[tag]
print(end_prob)
for tag_state in to_pass_states:
    hmm.add_transition(tag_state, hmm.end, end_prob[tag_state.name])
    
transition_prob_pair = {}

for key in tag_bigrams.keys():
    transition_prob_pair[key] = tag_bigrams.get(key)/tags_count[key[0]]

for tag_state in to_pass_states:
    for next_tag_state in to_pass_states:
        if (tag_state.name, next_tag_state.name) in tag_bigrams.keys():
#             print(*zip([tag_state.name], [next_tag_state.name]))
            hmm.add_transition(tag_state,next_tag_state,transition_prob_pair[(tag_state.name,next_tag_state.name)])

hmm.bake()

{'O': 0.038570970585878016, 'B-OS': 0.010752688172043012, 'B-DVEC': 0.043010752688172046, 'I-DVEC': 0.0, 'B-CAP': 0.062111801242236024, 'B-MAL': 0.11079136690647481, 'B-BEH': 0.021634615384615384, 'I-BEH': 0.0, 'I-CAP': 0.0, 'I-MAL': 0.0, 'I-OS': 0.0}
{'O': 0.04061473110523363, 'B-OS': 0.0, 'B-DVEC': 0.0, 'I-DVEC': 0.0, 'B-CAP': 0.0, 'B-MAL': 0.004316546762589928, 'B-BEH': 0.0, 'I-BEH': 0.0, 'I-CAP': 0.0, 'I-MAL': 0.0, 'I-OS': 0.0}


In [7]:
def replace_unknown(sequence):
    
    return [w if w in train_vocab else 'nan' for w in sequence]

def simplify_decoding(X, model):
    
    _, state_path = model.viterbi(replace_unknown(X))
    return [state[1].name for state in state_path[1:-1]]

def accuracy(X, Y, model):
    
    correct = total_predictions = 0
    for observations, actual_tags in zip(X, Y):
        
        # The model.viterbi call in simplify_decoding will return None if the HMM
        # raises an error (for example, if a test sentence contains a word that
        # is out of vocabulary for the training set). Any exception counts the
        # full sentence as an error (which makes this a conservative estimate).
        try:
            most_likely_tags = simplify_decoding(observations, model)
            correct += sum(p == t for p, t in zip(most_likely_tags, actual_tags))
        except:
            pass
        total_predictions += len(observations)
    return correct / total_predictions

In [8]:
hmm_training_acc = accuracy(X_train, y_train, hmm)
print("training accuracy basic hmm model: {:.2f}%".format(100 * hmm_training_acc))

hmm_testing_acc = accuracy(X_test, y_test, hmm)
print("testing accuracy basic hmm model: {:.2f}%".format(100 * hmm_testing_acc))

training accuracy basic hmm model: 97.61%
testing accuracy basic hmm model: 97.47%


In [9]:
print("Predicted labels:\n-----------------")
print(simplify_decoding(words[2], hmm))
print()
print("Actual labels:\n--------------")
print(tags[2])
print()
print("\n")

Predicted labels:
-----------------
['O', 'O', 'O', 'B-OS', 'O', 'O', 'B-DVEC', 'I-DVEC', 'O', 'O', 'O', 'O']

Actual labels:
--------------
['O', 'O', 'O', 'B-OS', 'O', 'O', 'B-DVEC', 'I-DVEC', 'O', 'O', 'B-CAP', 'O']



