In [2]:
import nltk
from numpy import array, ones, zeros
import sys
import pickle

# Some words in test could be unseen during training, or out of the vocabulary (OOV) even during the training. 
# To manage OOVs, all words out the vocabulary are mapped on a special token: UNK defined as follows: 
UNK = "<unk>" 
UNKid = 0 

class HMM:
        def __init__(self, state_list, observation_list,
                 transition_proba = None,
                 observation_proba = None,
                 initial_state_proba = None):
            """Builds a new Hidden Markov Model
            state_list is the list of state symbols [q_0...q_(N-1)]
            observation_list is the list of observation symbols [v_0...v_(M-1)]
            transition_proba is the transition probability matrix
                [a_ij] a_ij = Pr(Y_(t+1)=q_i|Y_t=q_j)
            observation_proba is the observation probablility matrix
                [b_ki] b_ki = Pr(X_t=v_k|Y_t=q_i)
            initial_state_proba is the initial state distribution
                [pi_i] pi_i = Pr(Y_0=q_i)"""
            print("HMM creating with: ")
            self.N = len(state_list) # The number of states
            self.M = len(observation_list) # The number of words in the vocabulary
            print(str(self.N)+" states")
            print(str(self.M)+" observations")
            self.omega_Y = state_list # Keep the vocabulary of tags
            self.omega_X = observation_list # Keep the vocabulary of words
            # Init. of the 3 distributions : observation, transition and initial states
            if transition_proba is None:
                self.transition_proba = zeros( (self.N, self.N, self.N), float) 
            else:
                self.transition_proba=transition_proba
            if observation_proba is None:
                self.observation_proba = zeros( (self.M, self.N), float) 
            else:
                self.observation_proba=observation_proba
            if initial_state_proba is None:
                self.initial_state_proba = zeros( (self.N, self.N), float ) # pi
            else:
                self.initial_state_proba=initial_state_proba
            # Since everything will be stored in numpy arrays, it is more convenient and compact to 
            # handle words and tags as indices (integer) for a direct access. However, we also need 
            # to keep the mapping between strings (word or tag) and indices. 
            self.make_indexes()
            self.compute_matrix()

        def make_indexes(self):
            """Creates the reverse table that maps states/observations names
            to their index in the probabilities arrays"""
            self.Y_index = {}
            for i in range(self.N):
                self.Y_index[self.omega_Y[i]] = i
            self.X_index = {}
            for i in range(self.M):
                self.X_index[self.omega_X[i]] = i
                
        def compute_matrix(self):
            trans = zeros( (self.N, self.N, self.N), float)
            for (y1, y2, y3), v in self.transition_proba.items():
                trans[self.Y_index[y1], self.Y_index[y2], self.Y_index[y3]] = v
            self.transition_proba = trans
            
            obs = zeros( (self.M, self.N), float) 
            
            for (x, y), v in self.observation_proba.items():
                obs[self.X_index[x], self.Y_index[y]] = v
            self.observation_proba = obs
            
            init = zeros( (self.N, self.N), float )
            for (y1, y2), v in self.initial_state_proba.items():
                init[self.Y_index[y1],self.Y_index[y2]] = v
            self.initial_state_proba = init

In [3]:
# nltk.download("brown")
# nltk.download('universal_tagset')
# data = nltk.corpus.brown.tagged_sents(tagset='universal')

data_train = pickle.load( open( "/home/biard/Documents/université/m2/s1/tc4_algo_inférence/typos-data/train10.pkl", "rb" ))
data_test = pickle.load(open("/home/biard/Documents/université/m2/s1/tc4_algo_inférence/typos-data/test10.pkl", "rb"))

In [4]:
def make_words_tags_count(data):
    count_words = {}
    tags = {}
    pairs = {}
    for sent in data:
        for w,t in sent:
            if w in count_words:
                count_words[w]+=1
            else:
                count_words[w]=1
            if t in tags:
                tags[t]+=1
            else:
                tags[t]=1
            if (w,t) in pairs:
                pairs[(w,t)]+=1
            else:
                pairs[(w,t)]=1
    return count_words, tags, pairs

In [5]:
def filter_dict(data, limit_occ):
    words,tags,_=make_words_tags_count(data)
    res = []
    for s in data:
        sent = []
        for w,t in s:
            if words[w] > limit_occ:
                sent.append((w,t))
            else:
                sent.append((UNK,t))
        res.append(sent)
    return res
data_train=filter_dict(data_train,10)

In [6]:
def proba_observation_Y_X(mot, tag, words, pairs):
    if (mot,tag) in pairs:
        return pairs[(mot,tag)] / words[mot]
    else:
        return 0

def make_all_proba_observation_Y_X(data):
    words,tags,pairs=make_words_tags_count(data)
    res = {}
    for (w,t),c in pairs.items():
        res[(w,t)]=proba_observation_Y_X(w,t,words,pairs)
    s = sum(v for k, v in res.items())
    res = {k: v / s for k, v in res.items()}
    return res

In [7]:
def proba_transition_Y2_Y1_Y0(tag2, tag1, tag0, triplets, pairs):
    if (tag2,tag1,tag0) in triplets:
        return triplets[(tag2, tag1, tag0)] / pairs[tag1, tag0]
    else:
        return 0

def make_tags_triplets(data):
    triplets = {}
    for s in data:
        for i in range(len(s) - 2):
            if (s[i+2][1], s[i+1][1], s[i][1]) in triplets:
                triplets[s[i+2][1], s[i+1][1], s[i][1]]+=1
            else:
                triplets[s[i+2][1], s[i+1][1], s[i][1]]=1
    return triplets

def make_tags_pairs(data):
    pairs = {}
    for s in data:
        for i in range(len(s) - 1):
            if (s[i+1][1], s[i][1]) in pairs:
                pairs[s[i+1][1], s[i][1]]+=1
            else:
                pairs[s[i+1][1], s[i][1]]=1
    return pairs

def make_all_proba_transition_Y1_Y0(data):
    triplets=make_tags_triplets(data)
    pairs=make_tags_pairs(data)
    res = {}
    for (y2,y1,y0),c in triplets.items():
        res[(y2,y1,y0)]=proba_transition_Y2_Y1_Y0(y2,y1,y0, triplets, pairs)
    s = sum(v for k, v in res.items())
    res = {k: v / s for k, v in res.items()}
    return res

In [8]:
def makePi(data):
    res = {}
    total = 0
    for s in data:
        if len(s) > 1 and (s[0][1],s[1][1]) in res:
            res[s[0][1],s[1][1]] += 1
        elif len(s) > 1:
            res[s[0][1],s[1][1]] = 1
        total += 1
    for k,v in res.items():
        res[k] /= total
    return res

In [9]:
def initHMM2(data):
    count_words, count_tags, pairs = make_words_tags_count(data)
    obs = make_all_proba_observation_Y_X(data)
    trans = make_all_proba_transition_Y1_Y0(data)
    return HMM([t for t,_ in count_tags.items()], [w for w,_ in count_words.items()], trans, obs, makePi(data))

In [10]:
HMM2 = initHMM2(data_train)

HMM creating with: 
26 states
26 observations


In [11]:
def viterbi(seq, hmm):
    if len(seq) <= 1:
        return [""] # il faut une phrases de longueur minimale 2
    pi_0 = hmm.initial_state_proba[hmm.Y_index[seq[0][1]], hmm.Y_index[seq[1][1]]]
    b_0 = hmm.observation_proba[hmm.X_index[seq[1][0]], hmm.Y_index[seq[1][1]]]
    prob = pi_0 * b_0
    res = ["",""] # on ne compte pas le 1er et le 2e
    y_t_moins_un = seq[0][1]
    y_t_moins_deux = seq[1][1]
    for x_t, y_to_pred in seq[2:]:
        tmp = 0
        pred = ""
        for y_t in hmm.omega_Y:
            curr = prob * \
                hmm.transition_proba[hmm.Y_index[y_t], hmm.Y_index[y_t_moins_un], hmm.Y_index[y_t_moins_deux]] * \
                hmm.observation_proba[hmm.X_index[x_t], hmm.Y_index[y_t]]
            if curr > tmp:
                tmp = curr
                pred = y_t
        y_t_moins_deux = y_t_moins_un
        y_t_moins_un = y_to_pred
        res.append(pred)
    return res

In [12]:
def test(hmm, data):
    error = 0
    total = 0
    for seq in data:
        res = viterbi(seq, hmm)
        for y_pred, (_, y) in list(zip(res, seq))[2:]: # on ne compte pas le 1er et le 2e
            error += 1 if y_pred != y else 0
            total += 1
    return error / total

In [13]:
e = test(HMM2, data_test)

In [14]:
print(e)

0.2992432928227471
