# Modèle de Markov Caché du second ordre

### Application à la correction de typos dans des textes

#### Les typos sont maintenant des insertions de caractères au lieu de substitutions

In [3]:
import nltk
from numpy import array, ones, zeros, multiply
import numpy as np
import sys
from __future__ import division

#### Implementation de la classe HMM

In [4]:
UNK = "<unk>"  # token to map all out-of-vocabulary words (OOVs)
INS = "<ins>"
UNKid = 0  # index for UNK
#INSid = 1
epsilon = 1e-100


class HMM:
    def __init__(self,
                 state_list,
                 observation_list,
                 head_transition_proba=None,
                 transition_proba=None,
                 observation_proba=None,
                 initial_state_proba=None,
                 transition_head_proba=None,
                 smoothing_obs=0.01):
        """
            Builds a Hidden Markov Model
            * state_list is the list of state symbols [q_0...q_(N-1)]
            * observation_list is the list of observation symbols [v_0...v_(M-1)]
            * transition_proba is the transition probability matrix
                [a_ij] a_ij = Pr(Y_(t+1)=q_i|Y_t=q_j)
            * observation_proba is the observation probablility matrix
                [b_ki] b_ki = Pr(X_t=v_k|Y_t=q_i)
            * initial_state_proba is the initial state distribution
                [pi_i] pi_i = Pr(Y_0=q_i)"""

        print ("HMM creating with: ")

        self.N = len(state_list)  # number of states
        self.M = len(observation_list)  # number of possible emissions

        print (str(self.N) + " states")
        print (str(self.M) + " observations")

        self.omega_Y = state_list
        self.omega_X = observation_list

        if head_transition_proba is None:
            self.head_transition_proba = zeros((self.N, self.N))
        else:
            self.head_transition_proba = head_transition_proba

        if transition_proba is None:
            self.transition_proba = zeros((self.N, self.N**2))
        else:
            self.transition_proba = transition_proba

        if observation_proba is None:
            self.observation_proba = zeros((self.M, self.N))
        else:
            self.observation_proba = observation_proba

        if initial_state_proba is None:
            self.initial_state_proba = zeros((self.N, ))
        else:
            self.initial_state_proba = initial_state_proba

        self.make_indexes(
        )  # build indexes, i.e the mapping between token and int
        self.smoothing_obs = smoothing_obs

    def make_indexes(self):
        """Creates the reverse table that maps states/observations names
            to their index in the probabilities array"""
        self.Y_index = {}
        for i in range(self.N):
            self.Y_index[self.omega_Y[i]] = i
        self.X_index = {}
        for i in range(self.M):
            self.X_index[self.omega_X[i]] = i

    def get_observationIndices(self, observations):
        """return observation indices, i.e 
            return [self.O_index[o] for o in observations]
            and deals with OOVs
            """
        indices = zeros(len(observations), int)
        k = 0
        for o in observations:
            if o in self.X_index:
                indices[k] = self.X_index[o]
            else:
                indices[k] = UNKid
            k += 1
        return indices

    def data2indices(self, sent):
        """From one tagged sentence of the brown corpus: 
            - extract the words and tags 
            - returns two list of indices, one for each
            -> (wordids, tagids)
            """
        wordids = list()
        tagids = list()
        for couple in sent:
            wrd = couple[0]
            tag = couple[1]
            if wrd in self.X_index:
                wordids.append(self.X_index[wrd])
            else:
                wordids.append(UNKid)
            tagids.append(self.Y_index[tag])
        return wordids, tagids

    def observation_estimation(self, pair_counts):
        """ Build the observation distribution: 
                observation_proba is the observation probablility matrix
                    [b_ki],  b_ki = Pr(X_t=v_k|Y_t=q_i)
                
                pair_counts is dictionary with 
                - key : a tuple (word,tag)
                - value: the associated count
                
                We just need to fill the matrix and normalize the count in the right way: 
                - one column (i constant) is the distrib. of word given a tag
                - just normalize the column, i.e sum over the row (axis=0)
            """
        # fill with counts
        for pair in pair_counts:
            wrd = pair[0]  # get word
            tag = pair[1]  # get tag
            cpt = pair_counts[pair]  # get the count
            # get word index (row), deal with OOV
            k = 0  # for <unk>
            if wrd in self.X_index:
                k = self.X_index[wrd]
            # get tag  index (column)
            i = self.Y_index[tag]
            # fill the matrix
            self.observation_proba[k, i] = cpt
        # normalize
        self.observation_proba = self.observation_proba + self.smoothing_obs
        self.observation_proba = self.observation_proba / self.observation_proba.sum(
            axis=0).reshape(1, self.N)

    def head_transition_estimation(self, head_trans_counts):
        """ Buid the transition distribution """
        # fill with counts
        for pair in head_trans_counts:
            i = self.Y_index[pair[0]]
            j = self.Y_index[pair[1]]
            self.head_transition_proba[j, i] = head_trans_counts[pair]
        # normalize
        self.head_transition_proba = self.head_transition_proba + self.smoothing_obs
        self.head_transition_proba = self.head_transition_proba / self.head_transition_proba.sum(
            axis=0).reshape(1, self.N)

    def transition_estimation(self, trans_counts):
        """ Build the transition distribution """
        # fill with counts
        for pair in trans_counts:
            i = self.Y_index[pair[0][0]]
            j = self.Y_index[pair[0][1]]
            k = self.Y_index[pair[1]]
            self.transition_proba[k, (i * self.N + j)] = trans_counts[pair]
        # normalize
        self.transition_proba = self.transition_proba + self.smoothing_obs
        self.transition_proba = self.transition_proba / self.transition_proba.sum(
            axis=0).reshape(1, self.N**2)

    def init_estimation(self, init_counts):
        """Build the init. distribution"""
        # fill with counts
        for tag in init_counts:
            i = self.Y_index[tag]
            self.initial_state_proba[i] = init_counts[tag]
        # normalize
        self.initial_state_proba = self.initial_state_proba / sum(
            self.initial_state_proba)

    def supervised_training(self, pair_counts, head_trans_counts, trans_counts,
                            init_counts):
        """ Train the HMM's parameters. This function wraps everything """
        self.observation_estimation(pair_counts)
        self.head_transition_estimation(head_trans_counts)
        self.transition_estimation(trans_counts)
        self.init_estimation(init_counts)

    def viterbi(self, obsids):
        """ Viterbi Algorithm : 
            Finding the most likely sequence of hidden states. 
            """

        T = len(obsids)

        # Initialisation
        delta = zeros(self.N, float)
        tmp = zeros(self.N, float)
        psi = zeros((T, self.N), int)

        # Delta update
        delta_t = zeros(self.N, float)

        # Apply initial_state probabilities to the first frame
        delta = self.observation_proba[obsids[0]] * self.initial_state_proba

        # Recursion
        for t in range(1, T):
            if t == 1:
                for i in range(self.N):
                    for j in range(self.N):
                        # Head
                        tmp[j] = delta[j] * self.head_transition_proba[i, j]
                    psi[t, i] = tmp.argmax()
                    delta_t[i] = tmp.max() * self.observation_proba[obsids[t],
                                                                    i]
            else:
                for i in range(self.N):
                    for j in range(self.N):
                        # Second
                        tmp[j] = delta[j] * self.transition_proba[
                            i, psi[t - 1, j] * self.N + j]
                    psi[t, i] = tmp.argmax()
                    delta_t[i] = tmp.max() * self.observation_proba[obsids[t],
                                                                    i]

            delta, delta_t = delta_t, delta

        # Reconstruction
        i_star = [delta.argmax()]
        for psi_t in psi[-1:0:-1]:
            i_star.append(psi_t[i_star[-1]])
        i_star.reverse()

        return i_star

        
    def print_error_rate(self, test):
        """ Compute and print error rate on test
            """
        nb_correct_before_hmm = 0
        nb_correct_after_hmm = 0
        total = 0
        
        for word in test:
            obsids,statids = hmm.data2indices(word)
            best_sequence = hmm.viterbi(obsids)

            for (i,j) in zip(best_sequence,statids):
                if i==j:
                    nb_correct_after_hmm += 1
                    
            for(i,j) in zip(statids,obsids):
                if i==j:
                    nb_correct_before_hmm += 1

            total += len(statids)


        error_before_hmm = 100 - nb_correct_before_hmm * 100.0 / total
        error_after_hmm = 100 - nb_correct_after_hmm * 100.0 / total

        print("Error rate before HMM : {}%".format(error_before_hmm))
        print("Error rate after HMM : {}%".format(error_after_hmm))

# Compter les mots et les tags

In [5]:
def make_counts(corpus):
    """ 
    Build different count tables to train a HMM. Each count table is a dictionnary. 
    Returns: 
    * c_words: word counts
    * c_tags: tag counts
    * c_pairs: count of pairs (word,tag)
    * c_transitions: count of tag bigram 
    * c_inits: count of tag found in the first position
    """
    c_words = dict()
    c_tags = dict()
    c_pairs= dict()
    c_transitions1 = dict()
    c_transitions2 = dict()
    c_inits = dict()
    
    for sent in corpus:
        # we use i because of the transition counts
        for i in range(len(sent)):
            couple = sent[i]
            wrd = couple[0]
            tag = couple[1]
            # word counts
            if wrd in c_words:
                c_words[wrd] = c_words[wrd] + 1
            else:
                c_words[wrd] = 1
                
            # tag counts
            if tag in c_tags:
                c_tags[tag] = c_tags[tag] + 1
            else:
                c_tags[tag] = 1
                
            # observation counts
            if couple in c_pairs:
                c_pairs[couple] = c_pairs[couple] + 1
            else:
                c_pairs[couple] = 1
            
            # i >  1 -> transition counts
            if i > 1:
                trans = ((sent[i-2][1], sent[i-1][1]), tag) #(tag at t-2, tag at t-1, tag at t)
                if trans in c_transitions2:
                    c_transitions2[trans] = c_transitions2[trans] + 1
                else:
                    c_transitions2[trans] = 1
                    
            elif i == 1:
                trans = (sent[i-1][1], tag)
                if trans in c_transitions1:
                    c_transitions1[trans] = c_transitions1[trans] + 1
                else:
                    c_transitions1[trans] = 1
                    
            # i == 0 -> counts for initial states
            else:
                if tag in c_inits:
                    c_inits[tag] = c_inits[tag] + 1
                else:
                    c_inits[tag] = 1
                    
    return c_words, c_tags, c_pairs, c_transitions1, c_transitions2, c_inits

# Création du vocabulaire

In [6]:
def make_vocab(c, threshold):
    """ 
    return a vocabulary by thresholding word counts. 
    inputs: 
    * c_words : a dictionnary that maps word to its counts
    * threshold: count must be >= to the threshold to be included
    
    returns: 
    * a word list
    """
    voc = list()
    voc.append(UNK)
    voc.append(INS)
    for w in c:
        if c[w] >= threshold:
            voc.append(w)
    return voc


# Les données


In [7]:
import pickle

path = ''

# Données avec 10% de typos
train10 = pickle.load(open(path+'train10.pkl', 'rb'))
test10 = pickle.load(open(path+'test10.pkl', 'rb'))

# Données avec 20% de typos
train20 = pickle.load(open(path+'train20.pkl', 'rb'))
test20 = pickle.load(open(path+'test20.pkl', 'rb'))

# Les insertions

In [8]:
from random import *
letters = 'abcdefghijklmnopqrstuvwxyz'

def insert_char(train, percentage, with_substitution=False):
    newtext = []
    for word in train:
        newword=[]
        for i in range(len(word)):
            # Insertion
            if (random()<percentage/100):
                newword.append((choice(letters), INS))
            else:
                if(with_substitution):
                    newword.append(word[i])
                else:
                    newword.append((word[i][1],word[i][1]))
        newtext.append(newword)
    return newtext

In [9]:
# Seulement des insertions
train10 = insert_char(train10, 10)
test10 = insert_char(test10, 10)
train20 = insert_char(train20, 20)
test20 = insert_char(test20, 20)

In [10]:
print(train10[:10])

[[('b', 'b'), ('y', 'y')], [('t', 't'), ('h', 'h'), ('e', 'e'), ('i', 'i'), ('r', 'r')], [('o', 'o'), ('w', 'w'), ('n', 'n')], [('m', '<ins>'), ('c', 'c'), ('c', 'c'), ('o', 'o'), ('u', 'u'), ('n', 'n'), ('t', 't')], [('v', 'v'), ('i', 'i'), ('d', '<ins>'), ('l', 'l'), ('e', 'e'), ('n', 'n'), ('c', 'c'), ('e', 'e')], [('i', 'i'), ('d', '<ins>')], [('f', 'f'), ('o', 'o'), ('r', 'r')], [('a', '<ins>'), ('h', 'h'), ('e', 'e'), ('m', 'm')], [('a', 'a')], [('f', 'f'), ('o', 'o'), ('r', 'r'), ('m', 'm')]]


In [13]:
train = train10
test = test10

tot = len(train + test)
print ("Nombre de phrases totale = " + str(tot))
print ("Nombre de phrases de train = " + str(len(train)))
print ("Nombre de phrases de test  = " + str(len(test)))

cwords,ctags,cpairs,ctrans1,ctrans2,cinits = make_counts(train)
print ("Nombre de lettres  : "+str(len(cwords)))
print ("Nombre de tags  : "+str(len(ctags)))
print ("Nombre de paires: "+str(len(cpairs)))
print ("Nombre de init. : "+str(len(cinits)))
print (ctags)
vocab = make_vocab(cwords,10)
print ("Vocabulaire :"+str(len(vocab)))

Nombre de phrases totale = 30558
Nombre de phrases de train = 29057
Nombre de phrases de test  = 1501
Nombre de lettres  : 26
Nombre de tags  : 27
Nombre de paires: 52
Nombre de init. : 26
{'b': 1890, 'y': 2723, 't': 12492, 'h': 6012, 'e': 16241, 'i': 9874, 'r': 7428, 'o': 10766, 'w': 2015, 'n': 8815, '<ins>': 14368, 'c': 4326, 'u': 3529, 'v': 1717, 'l': 5770, 'f': 3052, 'm': 3392, 'a': 9494, 'd': 4061, 's': 8731, 'g': 2466, 'k': 529, 'z': 109, 'j': 98, 'p': 2886, 'x': 255, 'q': 129}
Vocabulaire :28


# Création du HMM et apprentissage

In [14]:
hmm = HMM(state_list=vocab, 
          observation_list=vocab,
          smoothing_obs = 0.001)

hmm.supervised_training(cpairs,ctrans1,ctrans2,cinits)

HMM creating with: 
28 states
28 observations


# Tests sur test10 avec seulement des insertions

In [13]:
hmm.print_error_rate(test)

Error rate before HMM : 9.767759562841533%
Error rate after HMM : 5.177595628415304%


# Tests sur test20 avec seulement des insertions

In [14]:
cwords,ctags,cpairs,ctrans1,ctrans2,cinits = make_counts(train20)

hmm = HMM(state_list=vocab, 
          observation_list=vocab, 
          smoothing_obs = 0.001)

hmm.supervised_training(cpairs,ctrans1,ctrans2,cinits)
hmm.print_error_rate(test20)

HMM creating with: 
28 states
28 observations
Error rate before HMM : 19.89695045233958%
Error rate after HMM : 11.389371517584323%
