In [92]:
import pickle
import numpy as np
from collections import defaultdict

In [93]:
with open("./data/train10.pkl", "rb") as f:
    train_data = pickle.load(f)

with open("./data/test10.pkl", "rb") as f:
    test_data = pickle.load(f)   

idx=3
print("Observations (token 7): %s" % "".join([t for t, _ in train_data[idx]]))
print("Hidden states (token 7): %s" % "".join([t for _, t in train_data[idx]]))

Observations (token 7): acvount
Hidden states (token 7): account


In [207]:
len(train_data)

29057

In [95]:
class Dict:
    def __init__(self, chars, unk=None):
        self._unk = unk
        self._char_to_id = dict()
        self._id_to_char = list()

        if unk in chars:
            raise RuntimeError("UNK word exists in vocabulary")

        if unk is not None:
            self.unk_index = self._add_char(unk)

        for char in chars:
            self._add_char(char)

    # for internal use only!
    def _add_char(self, char):
        if char not in self._char_to_id:
            id = len(self._id_to_char)
            self._char_to_id[char] = id
            self._id_to_char.append(char)
            return id
        else:
            return self._char_to_id[char]

    def char_to_id(self, char):
        if self._unk is not None:
            return self._char_to_id.get(char, self.unk_index)
        else:
            return self._char_to_id[char]

    def id_to_char(self, id):
        return self._id_to_char[id]

    def __len__(self):
        return len(self._char_to_id)

    def has_unk(self):
        return self._unk is not None
    
    def unk(self):
        return self.unk_index

In [244]:
distribution_per_char = {}
distribution_per_char_correct = {}
for w in train_data:
    for typed_c, correct_c in w:
        if typed_c in distribution_per_char : 
            if correct_c in distribution_per_char[typed_c]:
                distribution_per_char[typed_c][correct_c]+=1
            else:
                distribution_per_char[typed_c][correct_c] = 1
        else:
            distribution_per_char[typed_c]={}
            distribution_per_char[typed_c][correct_c] = 1

In [246]:
tmp = [list(x.keys()) for x in distribution_per_char.values()]

correct_c = [item for sublist in tmp for item in sublist]

typed_c_dict = Dict(distribution_per_char.keys())
correct_c_dict = Dict(correct_c)
c_dict = Dict(correct_c)

In [256]:
print(correct_c_dict.__len__())

26


In [148]:
class HMM:
    def __init__(self, y_dict, x_dict):
        if not isinstance(y_dict, Dict) or not isinstance(x_dict, Dict):
            raise RuntimeError("Arguments must be of type Dict")

        self.y_dict = y_dict
        self.x_dict = x_dict

        n_y = len(y_dict)
        n_x = len(x_dict)
        self.init_prob = np.zeros((n_y,), float) 
        self.transition_prob = np.zeros((n_y, n_y), float) 
        self.observation_prob = np.zeros((n_y, n_x), float) 

In [265]:
hmm = HMM(correct_c_dict, correct_c_dict)

###init prob###
#computing the frequency of each tag to be the first tag of the sequence
for sent in train_data:
    tag = sent[0][1]
    id_tag = correct_c_dict.char_to_id(tag)
    hmm.init_prob[id_tag]+=1
#smooting
hmm.init_prob+=1
hmm.init_prob/=(len(correct_c_dict)+len(train_data))


###transition prob###
#computing p(yi|y(i-1)) 
d_tag = defaultdict(int)
for sent in train_data:
    for i in range(1,len(sent)):
        cur_tag = sent[i][1]
        pred_tag = sent[i-1][1]
        id_cur_tag = correct_c_dict.char_to_id(cur_tag)
        id_pred_tag = correct_c_dict.char_to_id(pred_tag)
        hmm.transition_prob[id_pred_tag][id_cur_tag]+=1
        d_tag[id_pred_tag]+=1
#smoothing
hmm.transition_prob+=1
for id_tag in d_tag:
    hmm.transition_prob[id_tag,:]/=(d_tag[id_tag]+len(correct_c_dict))   
    
###observation prob###
d_tag = defaultdict(int)
for sent in train_data:
    for i in range(len(sent)):
        cur_tag = sent[i][1]
        cur_w = sent[i][0]
        id_cur_tag = correct_c_dict.char_to_id(cur_tag)
        id_cur_w = correct_c_dict.char_to_id(cur_w)
        hmm.observation_prob[id_cur_tag][id_cur_w]+=1
        d_tag[id_cur_tag]+=1
#smoothing
hmm.observation_prob+=1
for id_tag in d_tag:
    hmm.observation_prob[id_tag,:]/=(d_tag[id_tag]+len(typed_c_dict))

In [266]:
print(sum(hmm.init_prob))
print(hmm.transition_prob.sum(1))
print(hmm.observation_prob.sum(1))

1.0000000000000002
[1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1.
 1. 1.]
[1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1.
 1. 1.]


In [278]:
print((hmm.observation_prob[21]))

[2.63227165e-04 3.47459858e-02 2.63227165e-04 2.63227165e-04
 2.63227165e-04 2.63227165e-04 2.63227165e-04 2.63227165e-04
 2.63227165e-04 2.63227165e-04 2.84285338e-02 2.63227165e-04
 2.63227165e-04 2.63227165e-04 2.63227165e-04 2.63227165e-04
 3.84311661e-02 2.63227165e-04 2.63227165e-04 2.63227165e-04
 2.63227165e-04 8.92603317e-01 2.63227165e-04 2.63227165e-04
 2.63227165e-04 2.63227165e-04]


In [279]:
def viterbi(hmm, chars):
    """
    Input:
    - hmm: an HMM object
    - words: a list of words (ie a sentence)
    Return:
    - a list of POS tags
    """

    #DEFINING THE CHART AND BACKPOINTER TABLES
    chart = np.zeros((len(hmm.y_dict), len(chars)), float)
    backpointer = np.zeros((len(hmm.y_dict), len(chars)), float)
    
    #FILLING THE FIRST LINE OF THE CHART TABLE 
    for i in range(len(hmm.y_dict)):
        id_c0 = correct_c_dict.char_to_id(chars[0])
        chart[i,0] = hmm.init_prob[i]*hmm.observation_prob[i,id_c0]
        
    #FILLING BACKPOINTER TABLE AND THE REST OF THE CHART TABLE
    #for each char
    for i in range(1,len(chars)):
        id_c = correct_c_dict.char_to_id(chars[i])
        #for each possible char 
        for j in range(len(hmm.y_dict)):
            b_score = -1.0
            #for each possible (char, char') we want the maximum of the equation below
            for k in range(len(hmm.y_dict)):
                score = hmm.transition_prob[k,j]*hmm.observation_prob[j,id_c]*chart[k,i-1]
                #if the score is superior, we update because it wasn't the maximun
                if(score>b_score):
                    chart[j,i] = score
                    b_score = score
                    backpointer[j,i] = k
    
    #FILLING THE TABLE CONTAINING THE ID OF THE GOOD TAGS
    y = np.zeros(len(chars))
    y[len(chars)-1] = np.argmax(chart[:,len(chars)-1], axis=0)

    for j in range(1,len(chars))[::-1]:
        y[j-1] = backpointer[int(y[j-2]),j]
    #MAPPING EACH ID TAG TO THE TAG
    pred = [correct_c_dict.id_to_char(int(i)) for i in (y)]

    return pred

In [280]:
print(train_data[idx])
viterbi(hmm,['y','o','o'])

[('a', 'a'), ('c', 'c'), ('v', 'c'), ('o', 'o'), ('u', 'u'), ('n', 'n'), ('t', 't')]


['t', 'o', 'o']

In [270]:
# Evaluate the HMM using the viterbi
n_chars = 0
n_correct_chars = 0
n_correct_chars2 = 0

n_words = 0
n_correct_words = 0
n_correct_words2 = 0
for w in test_data:
    
    typed_chars = [c for c,_ in w]
    correct_chars = [c for _,c in w]
    pred = viterbi(hmm, typed_chars)
    
    n_chars += len(typed_chars)
    
    correct_char_hmm = sum(1 for c in range(len(typed_chars))  if correct_chars[c] == pred[c])
    correct_char_nothing =  sum(1 for c in range(len(typed_chars))  if correct_chars[c] == typed_chars[c])
    
    n_correct_chars += correct_char_hmm
    n_correct_chars2 += correct_char_nothing

    n_words +=1
    if  len(typed_chars) == correct_char_hmm:
        n_correct_words += 1 
    if  len(typed_chars) == correct_char_nothing:
        n_correct_words2 += 1
    
print("Char Tagging accuracy for HMM Order 1: %.2f" % (100 * n_correct_chars / n_chars))
print("Word Tagging accuracy for HMM Order 1: %.2f" % (100 * n_correct_words / n_words))

print("Char Tagging accuracy before HMM Order 1: %.2f" % (100 * n_correct_chars2 / n_chars))
print("Word Tagging accuracy before HMM Order 1: %.2f" % (100 * n_correct_words2 / n_words))

Char Tagging accuracy for HMM Order 1: 83.73
Word Tagging accuracy for HMM Order 1: 52.70
Char Tagging accuracy before HMM Order 1: 89.82
Word Tagging accuracy before HMM Order 1: 62.89
