The goal is to design an hidden markov first order model to correct typos in texts without a dictionary.

Participants: TARGHI Amal et MOHAMED Mohamed


targhiamal@gmail.com , mohmed.abdelkhaleq@gmail.com

Question 1:
Dry run: Train a first-order HMM using the training data. This is basically what we did in
    lab sessions for POS-tagging. Compute the error rate (at the character level) and
    compare this results with the dummiest classifier that just do nothing.
    You can also compute  the number of errors your model can correct and the number of errors your model creates. 

In [12]:
import nltk
from numpy import array, ones, zeros, multiply
import numpy as np
import sys
import pprint, pickle

UNK = "<unk>"  # token to map all out-of-vocabulary words (OOVs)
UNKid = 0      # index for UNK



In [13]:
#Data

train10 =   pickle.load(open('/home/amal/TC4-Second-Order-HMM-for-typos-correction-/typos-data /train10.pkl', 'rb'))

train20 =   pickle.load(open('/home/amal/TC4-Second-Order-HMM-for-typos-correction-/typos-data /train20.pkl', 'rb'))

test10 =  pickle.load(open('/home/amal/TC4-Second-Order-HMM-for-typos-correction-/typos-data /test10.pkl', 'rb'))

test20 =  pickle.load(open('/home/amal/TC4-Second-Order-HMM-for-typos-correction-/typos-data /test20.pkl', 'rb'))

In [14]:
class HMM:
        def __init__(self, state_list, observation_list,
                 transition_proba = None,
                 observation_proba = None,
                 initial_state_proba = None, smoothing_obs = 0.01):
            """
            Builds a Hidden Markov Model
            * state_list is the list of state symbols [q_0...q_(N-1)]
            * observation_list is the list of observation symbols [v_0...v_(M-1)]
            * transition_proba is the transition probability matrix
                [a_ij] a_ij = Pr(Y_(t+1)=q_i|Y_t=q_j)
            * observation_proba is the observation probablility matrix
                [b_ki] b_ki = Pr(X_t=v_k|Y_t=q_i)
            * initial_state_proba is the initial state distribution
                [pi_i] pi_i = Pr(Y_0=q_i)"""
            print "HMM creating with: "
            
            self.N = len(state_list)       # number of states
            self.M = len(observation_list) # number of possible emissions
            print str(self.N)+" states"
            print str(self.M)+" observations"
            self.omega_Y = state_list
            self.omega_X = observation_list
            if transition_proba is None:
                self.transition_proba = zeros( (self.N, self.N), float) 
            else:
                self.transition_proba=transition_proba
            if observation_proba is None:
                self.observation_proba = zeros( (self.M, self.N), float) 
            else:
                self.observation_proba=observation_proba
            if initial_state_proba is None:
                self.initial_state_proba = zeros( (self.N,), float ) 
            else:
                self.initial_state_proba=initial_state_proba
            self.make_indexes() # build indexes, i.e the mapping between token and int
            self.smoothing_obs = smoothing_obs 
 
        def make_indexes(self):
            """Creates the reverse table that maps states/observations names
            to their index in the probabilities array"""
            self.Y_index = {}
            for i in range(self.N):
                self.Y_index[self.omega_Y[i]] = i
            self.X_index = {}
            for i in range(self.M):
                self.X_index[self.omega_X[i]] = i
                
            print "States:\n", self.Y_index
            print "Observations:\n", self.X_index
        #cobs,cstat,cpairs,ctrans,cinits
        #def get_observationIndices( self, cobs ):

        def get_observationIndices( self, observations ):
            """return observation indices, i.e 
            return [self.O_index[o] for o in observations]
            and deals with OOVs
            """
            indices = zeros( len(observations), int )
            k = 0
            for o in observations:
                if o in self.X_index:
                    indices[k] = self.X_index[o]
                else:
                    indices[k] = UNKid
                k += 1
            return indices
        
        """From one each word
        - extract the letters and correction of rache one
        - (letterObservation,LetterState)
        """
        def data2indices(self, word): 
    
            letterObservation = list()
            letterState  = list()
            for letter in word:
                observation = letter[0]
                state = letter[1]
                if observation in self.X_index:
                    letterObservation.append(self.X_index[observation])
                else:
                    letterObservation.append(UNKid)
                letterState.append(self.Y_index[state])
            return letterObservation,letterState
        
        def observation_estimation(self, cpairs):
            """ Build the observation distribution: 
                observation_proba is the observation probablility matrix
                [b_ki],  b_ki = Pr(X_t=v_k|Y_t=q_i)"""
            # fill with counts
            for pair in cpairs: 
                observation=pair[0]
                state=pair[1]
                cpt=cpairs[pair]
                k = 0 # for <unk>
                if observation in self.X_index: 
                    k=self.X_index[observation]
                i=self.Y_index[state]
                self.observation_proba[k,i]=cpt
            # normalize
            self.observation_proba=self.observation_proba+self.smoothing_obs
            self.observation_proba=self.observation_proba/self.observation_proba.sum(axis=0).reshape(1,self.N)
        
        def transition_estimation(self, ctrans):
            """ Build the transition distribution: 
                transition_proba is the transition matrix with : 
                [a_ij] a[i,j] = Pr(Y_(t+1)=q_i|Y_t=q_j)
            """
            # fill with counts
            for pair in ctrans:
                i=self.Y_index[pair[0]]
                j=self.Y_index[pair[1]]
                self.transition_proba[i,j]=ctrans[pair]
            # normalize
            self.transition_proba=self.transition_proba/self.transition_proba.sum(axis=0).reshape(1,self.N)
     
            
        def init_estimation(self, cinits):
            """Build the init. distribution"""
            # fill with counts
            for state in cinits:
                i=self.Y_index[state]
                self.initial_state_proba[i]=cinits[state]
            # normalize
            self.initial_state_proba=self.initial_state_proba/sum(self.initial_state_proba)
        
        def supervised_training(self, cpairs, ctrans,cinits):
            """ Train the HMM's parameters. This function wraps everything"""
            self.observation_estimation(cpairs)
            self.transition_estimation(ctrans)
            self.init_estimation(cinits)
            
            # Viterbi Algortihm
            # Source: http://stackoverflow.com/questions/34219766/need-help-implementing-viterbi-for-a-second-order-hidden-markov-model-with-pytho
        def viterbi( self, obs ):
            """Find the most probable state sequence
            """

            # shortcuts
            B = self.observation_proba 
            A = self.transition_proba
            T = len(obsids)
            N = self.N
            # initialisation
            delta = zeros( N, float )
            tmp = zeros( N, float )
            psi = zeros( (T, N), int )      
            delta_t = zeros( N, float )
            # apply initial_state probs to the first frame
            delta = B[obsids[0]] * self.initial_state_proba   
            # recursion
            for t in xrange(1, T):
                O_t = obsids[t]
                for j in range(N):
                    multiply( delta, A[:, j], tmp )
                    idx = psi[t, j] = tmp.argmax()       
                    delta_t[j] = tmp[idx] * B[O_t, j] 
                delta, delta_t = delta_t, delta
            # reconstruction
            i_star = [delta.argmax()]                        
            for psi_t in psi[-1:0:-1]:
                i_star.append( psi_t[i_star[-1]] )                 
            i_star.reverse()
            return i_star

# Compter les lettres et les tags

In [15]:
def make_counts(train):
    
    """
    Build different count tables to train a HMM1. Each count table is a dictionnary.
    Returns:
    * c_observation: No correct Letter Count (Observation Letter)
    * c_state: Correct Letters  ( State Letters)
    * c_pairs: count of pairs (nncorrectedletr,correctedletr)
    * c_transitions: count bigram
    * c_inits: count of tag found in the first position
    """
        
    c_letterObservation = dict()
    c_letterState = dict()
    c_pairs= dict()
    c_transitions = dict()
    c_inits = dict()
    
    
    for word in train:
        # we use i because of the transition counts
        for i in range(len(word)):
            couple=word[i]
            observation = couple[0]
            state = couple[1]
            # letter counts
            if observation in c_letterObservation:
                c_letterObservation[observation]=c_letterObservation[observation]+1
            else:
                c_letterObservation[observation]=1
            # letter  counts
            if state in c_letterState:
                c_letterState[state] = c_letterState[state]+1
            else:
                c_letterState[state]=1
            # observation counts
            if couple in c_pairs:
                c_pairs[couple]=c_pairs[couple]+1
            else:
                c_pairs[couple]=1
            # i >  0 -> transition counts
            if i > 0:
                trans = (word[i-1][1],state)
                if trans in c_transitions:
                    c_transitions[trans]=c_transitions[trans]+1
                else:
                    c_transitions[trans]=1
            # i == 0 -> counts for initial states
            else:
                if state in c_inits:
                    c_inits[state]=c_inits[state]+1
                else:
                    c_inits[state]=1
                    
    return c_letterObservation ,c_letterState,c_pairs, c_transitions, c_inits


# Creation of vocabulary according to the number of occurence for each letter.

In [16]:
def make_vocab(c_letterObservation, threshold):
    """ 
    return a vocabulary by thresholding word counts. 
    inputs: 
    * c_words : a dictionnary that maps word to its counts
    * threshold: count must be >= to the threshold to be included
    
    returns: 
    * a word list
    """
    voc = list()
    voc.append(UNK)
    for letter in c_letterObservation:
        if c_letterObservation[letter] >= threshold:
            voc.append(letter)
    return voc

In [17]:
clettero,cletterS,cpairs,ctrans,cinits = make_counts(train10)
print "Nombre de mots  : "+str(len(clettero))
print "Nombre de tags  : "+str(len(cletterS))
print "Nombre de paires: "+str(len(cpairs))
print "Nombre de trans : "+str(len(ctrans))+ " / "+ str(len(cletterS)*len(cletterS))
print "Nombre de init. : "+str(len(cinits))
vocab = make_vocab(clettero,10)
print "Vocabulaire :"+str(len(vocab))

Nombre de mots  : 26
Nombre de tags  : 26
Nombre de paires: 127
Nombre de trans : 403 / 676
Nombre de init. : 25
Vocabulaire :27


# Creation du HMM1

In [18]:
hmm = HMM(state_list=cletterS.keys(), observation_list=vocab,
                 transition_proba = None,
                 observation_proba = None,
                 initial_state_proba = None)

hmm.supervised_training(cpairs, ctrans, cinits)

HMM creating with: 
26 states
27 observations
States:
{'a': 0, 'c': 1, 'b': 2, 'e': 3, 'd': 4, 'g': 5, 'f': 6, 'i': 7, 'h': 8, 'k': 9, 'j': 10, 'm': 11, 'l': 12, 'o': 13, 'n': 14, 'q': 15, 'p': 16, 's': 17, 'r': 18, 'u': 19, 't': 20, 'w': 21, 'v': 22, 'y': 23, 'x': 24, 'z': 25}
Observations:
{'a': 1, 'c': 2, 'b': 3, 'e': 4, 'd': 5, 'g': 6, 'f': 7, 'i': 8, 'h': 9, 'k': 10, 'j': 11, 'm': 12, 'l': 13, 'o': 14, 'n': 15, 'q': 16, 'p': 17, 's': 18, 'r': 19, 'u': 20, 't': 21, 'w': 22, 'v': 23, 'y': 24, 'x': 25, 'z': 26, '<unk>': 0}


# Propabibilities

In [24]:
hmm.observation_estimation(cpairs)
hmm.transition_estimation(ctrans)
hmm.init_estimation(cinits)
#print hmm.observation_proba.sum(axis=1)
 

# TEST

In [25]:
#test 10 
correct=0
total=0

for word in test10:
    obsids,statids = hmm.data2indices(word)
    best_sequence = hmm.viterbi(obsids)
    correct+=sum(np.array(best_sequence)==np.array(statids))
    total+=len(statids)
    
    
    #print statids
    #print obsids
    #print best_sequence
    
print('correct ones are ' +str(correct))
print('total is ' +str(total))

print "Accuarcy is "+str(correct*100.0/total ) + " %"




correct ones are 6777
total is 7320
Accuarcy is 92.5819672131 %


In [26]:
#test 20 
correct=0
total=0

for word in test20:
    obsids,statids = hmm.data2indices(word)
    best_sequence = hmm.viterbi(obsids)
    correct+=sum(np.array(best_sequence)==np.array(statids))
    total+=len(statids)
    
    
    #print statids
    #print obsids
    #print best_sequence
    
print('correct ones are ' +str(correct))
print('total is ' +str(total))

print "Accuarcy is "+str(correct*100.0/total ) + " %"


correct ones are 14433
total is 16691
Accuarcy is 86.4717512432 %
