In [1]:
import sys
import time
from math import log10

# hmm tagger 2

following https://github.com/RahulGuptaIIITA/HMM-POS-Tagger

In [8]:
class HMMLearn:
    
    def __init__(self, model_path = "hmmmodel.txt"):
        
        self.model_path = model_path
        # variables to keep the word/tags and tag/tag
        self.tag_tag_dict = dict()
        self.word_tag_dict= dict()

        # variables to keep the counts
        self.unique_tags_count_dict = dict()
        
        # variables to keep the probabilities
        self.emission_probabilities = dict()
        self.transition_probabilities = dict()
        
        return

    def calculate_probabilities(self):
        
        # emission probability
        for key, value in self.word_tag_dict.items():
            var_array = key.split("/")
            tag = var_array[-1]
            self.emission_probabilities[key] = value / float(self.unique_tags_count_dict[tag])
        
        # transition probability
        for key, value in self.tag_tag_dict.items():
            prev_tag = key.split("/")[0]
            exclude_tag = prev_tag + "/<~s>"
            exclude_prob = self.tag_tag_dict[exclude_tag] if exclude_tag in self.tag_tag_dict else 0
            self.transition_probabilities[key] = (1+value) / float(len(self.unique_tags_count_dict) + self.unique_tags_count_dict[prev_tag] - exclude_prob)
            
        return

    def save_model(self):
        file_object = open(self.model_path, "w")
    
        for string, prob in self.emission_probabilities.items():
            file_object.write("em " + string + " " + str(prob))
            file_object.write("\n")
    
        for string, prob in self.transition_probabilities.items():
            file_object.write("tr " + string + " " + str(prob))
            file_object.write("\n")
    
        for tag, count in self.unique_tags_count_dict.items():
            exclude_count = 0
            if tag + "/<~s>" in self.tag_tag_dict:
                exclude_count = self.tag_tag_dict[tag + "/<~s>"]
            file_object.write("tg " + tag + " " + str(count - exclude_count))
            file_object.write("\n")
            
        file_object.close()
    
        return
    
    def parse_sentence(self, sentence):
        
        previous = "<s>"
        previous = previous.strip()
        if previous not in self.unique_tags_count_dict:
            self.unique_tags_count_dict[previous] = 0
        self.unique_tags_count_dict[previous] += 1
        
        word_tags = sentence.split(" ")
        
        for i, word_tag in enumerate(word_tags):
            word_tag = word_tag.strip()
            var_array = word_tag.split("/")
            tag = var_array[-1]
            tag = tag.strip()
            
            if tag not in self.unique_tags_count_dict:
                self.unique_tags_count_dict[tag] = 0
            self.unique_tags_count_dict[tag] += 1
            
            if word_tag not in self.word_tag_dict:
                self.word_tag_dict[word_tag] = 0
            self.word_tag_dict[word_tag] += 1
            
            if previous + "/" + tag not in self.tag_tag_dict:
                self.tag_tag_dict[previous + "/" + tag] = 0
            self.tag_tag_dict[previous + "/" + tag] += 1
            
            previous = tag
        
        if previous + "/<~s>" not in self.tag_tag_dict:
            self.tag_tag_dict[previous + "/<~s>"] = 0
        self.tag_tag_dict[previous + "/<~s>"] += 1
        
        return
    
    def run(self, infile):
        try:
            with open(infile) as file:
                sentences = file.readlines()
                for sentence in sentences:
                    self.parse_sentence(sentence)
            
            self.calculate_probabilities()
            self.save_model()
            
        except Exception as e:
            print (e)
            
        return

model_path = './github/HMM-POS-Tagger/hmmmodel.txt'
training_file = './github/HMM-POS-Tagger/data/en_train_tagged.txt'
hmm_learn_object = HMMLearn(model_path)
hmm_learn_object.run(training_file)

In [27]:
class HMMDecode:
    
    def __init__(self, model_path = "hmmmodel.txt"):
        
        self.model_path = model_path
        # variables to keep possible tags and words
        self.possible_tags = set()
        self.possible_words = set()
        
        # variable to keep possible tags count
        self.possible_tags_count = dict()
        
        # variables to keep the probabilities
        self.emission_probabilities = dict()
        self.transition_probabilities = dict()

        with open(self.model_path) as file:
            lines = file.readlines()
            for line in lines:
                var_array = line.split(" ")
                if var_array[0] == "em":
                    self.emission_probabilities[var_array[1]] = float(var_array[2].strip())
                    word_tag = var_array[1]
                    word = word_tag.rsplit("/")[0]
                    self.possible_words.add(word)
        
                elif var_array[0] == "tr":
                    self.transition_probabilities[var_array[1]] = float(var_array[2].strip())
        
                elif var_array[0] == "tg":
                    self.possible_tags.add(var_array[1].strip())
                    self.possible_tags_count[var_array[1]] = int(var_array[2].strip())
        return
        
    def smooth_probabilities(self, word, prev_tag, cur_tag):
        
        if (prev_tag + "/" + cur_tag) not in self.transition_probabilities:
            tr_prob = 1 / float(len(self.possible_tags) + self.possible_tags_count[prev_tag])
        else:
            tr_prob = self.transition_probabilities[prev_tag + "/" + cur_tag]
        
        if word not in self.possible_words:
            em_prob = 1
        elif (word + "/" + cur_tag) not in self.emission_probabilities:
            em_prob = 0
        else:
            em_prob = self.emission_probabilities[word + "/" + cur_tag]
            
        return em_prob, tr_prob
    
    def viterbi_algorithm(self, sentence):
        best_edge = dict()
        best_score = dict()
        words = sentence.split(" ")
        words = [word.strip() for word in words]
       
        for tag in self.possible_tags:
            em_prob, tr_prob = self.smooth_probabilities(words[0], "<s>", tag)
            best_score[(words[0], tag, 0)] = em_prob * tr_prob
            best_edge[(words[0], tag, 0)] = "<s>"

        for i in range(1, len(words)):
            for cur_tag in self.possible_tags:
                temp_score = 0
                if (words[i] in self.possible_words) and ((words[i] + "/" + cur_tag) not in self.emission_probabilities):
                    best_score[(words[i], cur_tag, i)] = temp_score
                else:
                    for prev_tag in self.possible_tags:
                        em_prob, tr_prob = self.smooth_probabilities(words[i], prev_tag, cur_tag)
                        score = best_score[(words[i-1], prev_tag, i-1)] * em_prob * tr_prob
                        best_score[(words[i], cur_tag, i)] = temp_score
                        
                        if score > temp_score:
                            temp_score = score
                            best_score[(words[i], cur_tag, i)] = score
                            best_edge[(words[i], cur_tag, i)] = prev_tag
        score = 0
        best_tag = None
        tagged_sentence = []
        nth_word = words[-1]
        words_length = len(words) - 1
        for tag in self.possible_tags:
            if best_score[(nth_word, tag, words_length)] > score:
                score = best_score[(nth_word, tag, words_length)]
                best_tag = tag
        tagged_sentence.append((nth_word, best_tag))
        
        for i in range(len(words) - 2, -1, -1):
            tagged_sentence.append((words[i], best_edge[(words[i+1], best_tag, i+1)]))
            best_tag = best_edge[(words[i+1], best_tag, i+1)]
            
        return tagged_sentence
    
    def tag_sentence(self, sentence, file_object):
        tagged_sentence = self.viterbi_algorithm(sentence)
        tagged_sentence = tagged_sentence[::-1]
        
        lnth = len(tagged_sentence)
        for i, word_tag in enumerate(tagged_sentence):
            word = word_tag[0]
            tag = word_tag[1]
            file_object.write(word + "/" + tag)
            if i != lnth - 1:
                file_object.write(" ")
        
        file_object.write("\n")
        
        return
    
    def run(self, infile, outfile):
        try:
            file_object = open(outfile, "w")
            with open(infile) as file:
                sentences = file.readlines()
                for i, sentence in enumerate(sentences):
                    self.tag_sentence(sentence, file_object)
            
            file_object.close()
            
        except Exception as e:
            print (e)
    
        return
    def predict(self, sentence):
        tagged_sentence = self.viterbi_algorithm(sentence)
        tagged_sentence = tagged_sentence[::-1]
        return tagged_sentence
    
output_file = './github/HMM-POS-Tagger/en_dev_predict.txt'
testing_file = './github/HMM-POS-Tagger/data/en_dev_raw.txt'

hmm_decode_object = HMMDecode(model_path)

start_time = time.time()
hmm_decode_object.run(testing_file, output_file)
print ("total time it took {}s".format(time.time() - start_time))



total time it took 7.179669380187988s


In [28]:
hmm_decode_object.predict('He is great.')

[('He', 'PRP'), ('is', 'VBZ'), ('great.', 'RB')]

In [15]:
testing_ground_truth = './github/HMM-POS-Tagger/data/en_dev_tagged.txt'
testing_ground_truth_seq = ''
with open(testing_ground_truth) as f:
    for line in f.readlines():
        testing_ground_truth_seq = testing_ground_truth_seq + ' ' + line.rstrip('\n')
testing_ground_truth_seq = testing_ground_truth_seq.lstrip(' ')

In [16]:
testing_predict_file = './github/HMM-POS-Tagger/en_dev_predict.txt'
testing_predict_seq = ''
with open(testing_predict_file) as f:
    for line in f.readlines():
        testing_predict_seq = testing_predict_seq + ' ' + line.rstrip('\n')
testing_predict_seq = testing_predict_seq.lstrip(' ')

In [19]:
testing_ground_truth_list = testing_ground_truth_seq.split(' ')
testing_predict_list = testing_predict_seq.split(' ')

In [22]:
correct_idx = [i for i, j in zip(testing_ground_truth_list, testing_predict_list) if i == j] 
 
accuracy = len(correct_idx)/len(testing_ground_truth_list)
print('Accuracy:',accuracy*100)

Accuracy: 87.81215205980595


In [None]:
len(train)

In [25]:
training_file = './github/HMM-POS-Tagger/data/en_train_tagged.txt'
training_seq = ''
with open(training_file) as f:
    for line in f.readlines():
        training_seq = training_seq + ' ' + line.rstrip('\n')
training_seq = training_seq.lstrip(' ')
len(training_seq)

1704539

In [24]:
len(testing_ground_truth_list)

25148