In [1]:
import nltk
from numpy import array, ones, zeros, multiply
import numpy as np
import sys
from pprint import pprint
import operator

epsilon=1e-1
class HMM:
        def __init__(self, state_list, observation_list,
                 transition_proba = None,
                 observation_proba = None,
                 initial_state_proba = None, smoothing_obs = 0.01):
            print "HMM creating with: "
            self.N = len(state_list)       # number of states
            self.M = len(observation_list) # number of possible emissions
            print str(self.N)+" states"
            print str(self.M)+" observations"
            #contient les differents labels observes
            self.omega_Y = state_list
            #contient les differentes lettrs observers
            self.omega_X = observation_list
            
            #initialisation des transitions
            if transition_proba is None:
                self.transition_proba = zeros( (self.N, self.N), float) 
            else:
                self.transition_proba=transition_proba
            
            #intialisation des observations
            if observation_proba is None:
                self.observation_proba = zeros( (self.M, self.N), float) 
            else:
                self.observation_proba=observation_proba
            
            #initialisation des 
            if initial_state_proba is None:
                self.initial_state_proba = zeros( (self.N,), float ) 
            else:
                self.initial_state_proba=initial_state_proba
            self.make_indexes() # build indexes, i.e the mapping between token and int
            self.smoothing_obs = smoothing_obs 
            
        def make_indexes(self):
            """Creates the reverse table that maps states/observations names
            to their index in the probabilities array"""
            self.Y_index = {}
            for i in range(self.N):
                self.Y_index[self.omega_Y[i]] = i
            self.X_index = {}
            for i in range(self.M):
                self.X_index[self.omega_X[i]] = i
      
        def get_observationIndices( self, observations ):
            """return observation indices, i.e 
            return [self.O_index[o] for o in observations]
            and deals with OOVs
            """
            indices = zeros( len(observations), int )
            k = 0
            for o in observations:
                if o in self.X_index:
                    indices[k] = self.X_index[o]
                else:
                    raise("Erreur")
                k += 1
            return indices

    
        def data2indices(self, sent): 
            """From one tagged sentence of the brown corpus: 
            - extract the words and tags 
            - returns two list of indices, one for each
            -> (wordids, tagids)
            """
            wordids = list()
            tagids  = list()
            for couple in sent:
                wrd = couple[0]
                tag = couple[1]
                if wrd in self.X_index:
                    wordids.append(self.X_index[wrd])
                else:
                    raise("Erreur data2indices")
                tagids.append(self.Y_index[tag])
            return wordids,tagids
            
        def observation_estimation(self, pair_counts):
            """ Build the observation distribution: 
                observation_proba is the observation probablility matrix
                    [b_ki],  b_ki = Pr(X_t=v_k|Y_t=q_i)"""
            # fill with counts
            for pair in pair_counts:
                wrd=pair[0]
                tag=pair[1]
                cpt=pair_counts[pair]
                k = 0 # for <unk>
                if wrd in self.X_index: 
                    k=self.X_index[wrd]
                i=self.Y_index[tag]
                self.observation_proba[k,i]=cpt
            # normalize
            self.observation_proba=self.observation_proba+self.smoothing_obs
            self.observation_proba=self.observation_proba/self.observation_proba.sum(axis=0).reshape(1,self.N)
            
        
        def transition_estimation(self, trans_counts):
            """ Build the transition distribution: 
                transition_proba is the transition matrix with : 
                [a_ij] a[i,j] = Pr(Y_(t+1)=q_i|Y_t=q_j)
            """
            # fill with counts
            for pair in trans_counts:
                i=self.Y_index[pair[1]]
                j=self.Y_index[pair[0]]
                self.transition_proba[j,i]=trans_counts[pair]
            # normalize
            self.transition_proba=self.transition_proba/self.transition_proba.sum(axis=0).reshape(1,self.N)
        
        def init_estimation(self, init_counts):
            """Build the init. distribution"""
            # fill with counts
            for tag in init_counts:
                i=self.Y_index[tag]
                self.initial_state_proba[i]=init_counts[tag]
            # normalize
            self.initial_state_proba=self.initial_state_proba/sum(self.initial_state_proba)
             
        
        def supervised_training(self, pair_counts, trans_counts,init_counts):
            """ Train the HMM's parameters. This function wraps everything"""
            self.observation_estimation(pair_counts)
            self.transition_estimation(trans_counts)
            self.init_estimation(init_counts)
        
        def viverbit(self,mots):
            #alpha : contient la classe la plus probable pour chaque mot en fonction des lettres precedentes
            alpha = np.zeros((self.N,len(mots)))
            #contient le chemin parcouru pour arriver dans chaque etat pour chaque lettre du mot d entree
            xi = np.zeros((self.N,len(mots)))
            
            #initialisation de la premiere colonne
            i = 0
            if mots[0] in self.X_index:
                i = self.X_index[mots[0]]
            alpha[:,0] = self.initial_state_proba*self.observation_proba[i]
            
            #calcul de la deuxieme lettre a la dernier de la proba de chaque classe
            for i in range(1,len(mots)):
                for j in range(self.N):
                    indi = 0
                    #indice de la ieme lettre
                    if mots[i] in self.X_index:
                        indi = self.X_index[mots[i]]
                    #calcul de la proba en utilisant les calculs precedent
                    liste = [alpha[k,i-1] * self.transition_proba[k,j]* self.observation_proba[indi,j] for k in range(self.N)]
                    #la lettre la plus probable qui a permis d arriver dans l etat j pour la lettre i
                    alpha[j,i] = np.max(liste)
                    xi[j,i] = np.argmax(liste)
                    
            #le backtrack
            tags = []
            debut = len(xi)
            starting = np.argmax(alpha[:,len(xi[0])-1])
            tags.append(starting)
            count = len(xi[0])
            #on remonte le tableau contenant les chemins
            while len(tags) != len(mots):
                count -= 1
                new_index = xi[starting,count]
                tags.append(new_index)
                starting = new_index
            tags = tags[::-1]
            to_return  = []
            count = 0
            #on renvoie une liste de pairs (bonne lettre, lettre calculee)
            for i in tags:
                to_return.append((mots[count],self.Y_index.keys()[self.Y_index.values().index(i)]))
                count +=1
            return to_return
                 
        def evaluate(self,test_data):
            #fonction d'evaluation
            #calcul differentes valeurs comme le taux de correction
            #le taux d'erreurs...
            
            #taux d'erreur brut
            errors = 0
            #nombre total de caracteres
            total = 0
            #taux d'erreur sur l'ensemble de test
            erreur_2 = 0
            
            #nombre de correction
            correction = 0
            correction_totale = 0
            for i in range(len(test_data)):
                res = self.viverbit(map(operator.itemgetter(0), test_data[i]))
                for a,b in zip(res,test_data[i]):
                    if(b[0] != b[1] and b[1]==a[1]):
                        correction +=1
                    correction_totale +=1
                erreur_2 += sum([a!=b for a,b in test_data[i]])
                errors += sum([a[1]!=b[1] for a,b in zip(res,test_data[i])])
                total += len(res)
            print("################# Resultats du HMM d'ordre 1 #################")
            print("Percentage of errors : {0:.2f}%".format(((errors/float(total))*100.0)))
            print("Taux d erreur brut : {0:.2f}%".format(((erreur_2/float(total))*100.0)))
            print("Taux de correction : {0:.2f}%".format(((correction/float(correction_totale))*100.0)))
            print("Nombre de corrections {} vs nombre de bonnes corrections {}".format(correction_totale,correction))
            



# Compter les mots et les tags

In [2]:
def make_counts(corpus):
    """ 
    Build different count tables to train a HMM. Each count table is a dictionnary. 
    Returns: 
    * c_words: word counts
    * c_tags: tag counts
    * c_pairs: count of pairs (word,tag)
    * c_transitions: count of tag bigram 
    * c_inits: count of tag found in the first position
    """
    c_words = dict()
    c_tags = dict()
    c_pairs= dict()
    c_transitions = dict()
    c_inits = dict()
    for sent in corpus:
        # we use i because of the transition counts
        for i in range(len(sent)):
            couple=sent[i]
            wrd = couple[0]
            tag = couple[1]
            # word counts
            if wrd in c_words:
                c_words[wrd]=c_words[wrd]+1
            else:
                c_words[wrd]=1
            # tag counts
            if tag in c_tags:
                c_tags[tag]=c_tags[tag]+1
            else:
                c_tags[tag]=1
            # observation counts
            if couple in c_pairs:
                c_pairs[couple]=c_pairs[couple]+1
            else:
                c_pairs[couple]=1
            # i >  0 -> transition counts
            if i > 0:
                trans = (sent[i-1][1],tag)
                if trans in c_transitions:
                    c_transitions[trans]=c_transitions[trans]+1
                else:
                    c_transitions[trans]=1
            # i == 0 -> counts for initial states
            else:
                if tag in c_inits:
                    c_inits[tag]=c_inits[tag]+1
                else:
                    c_inits[tag]=1
                    
    return c_words,c_tags,c_pairs, c_transitions, c_inits


# Création du vocabulaire (filtrage selon le nombre d'occurence)

In [3]:
def make_vocab(c_words, threshold):
    """ 
    return a vocabulary by thresholding word counts. 
    inputs: 
    * c_words : a dictionnary that maps word to its counts
    * threshold: count must be >= to the threshold to be included
    * dans le cas de la correction de typo, ce treshold est quasi toujours depasse pour toutes les classes
    returns: 
    * a word list
    """
    voc = list()
    for w in c_words:
        if c_words[w] >= threshold:
            voc.append(w)
        else:
            print("Ereur dans le lectue des donnees")
    return voc


# les données
* Charge les donnees de test et d'entrainement

In [4]:
import cPickle

with open("typos-data/test10.pkl", "rb") as input_file:
    test = cPickle.load(input_file)
    
with open("typos-data/train10.pkl", "rb") as input_file:
    train = cPickle.load(input_file)
print "Nombre de phrases de train = "+str(len(train))
print "Nombre de phrases de test  = "+str(len(test))

Nombre de phrases de train = 29057
Nombre de phrases de test  = 1501


In [5]:
#cree les differents comptes utilent pour notre hmm

cwords,ctags,cpairs,ctrans,cinits = make_counts(train)
vocab = make_vocab(cwords,10)
print("Taille du vocabulaire : ", len(cwords))
print("Nombre de tags : ",len(ctags))
print("Nombre de pairs differentes lettre/label : ", len(cpairs))
print("Nombre de transitions : ", len(ctrans))
print("Nombre de labels differents pour l'initialisation : ", len(cinits))

('Taille du vocabulaire : ', 26)
('Nombre de tags : ', 26)
('Nombre de pairs differentes lettre/label : ', 127)
('Nombre de transitions : ', 403)
("Nombre de labels differents pour l'initialisation : ", 25)


# Création du HMM

In [6]:
hmm = HMM(state_list=ctags.keys(), observation_list=vocab,
                 transition_proba = None,
                 observation_proba = None,
                 initial_state_proba = None,
                 smoothing_obs = 1e-2)

HMM creating with: 
26 states
26 observations


# Apprentissage pas à pas 

In [7]:
print("Entrainement du hmm d'ordre 1 :")
hmm.supervised_training(cpairs,ctrans,cinits)

print("Test sur toutes les donnees de test : ")
hmm.evaluate(test)

Entrainement du hmm d'ordre 1 :
Test sur toutes les donnees de test : 




################# Resultats du HMM d'ordre 1 #################
Percentage of errors : 7.42%
Taux d erreur brut : 10.18%
Taux de correction : 3.98%
Nombre de corrections 7320 vs nombre de bonnes corrections 291


In [8]:
print("Exemple d'appel de viterbi pour les 10 premieres donnees de test : ")
print("###############################################################")
for i in range(10):
    entree = "".join([a[0] for a in test[i]])
    solution = "".join([a[1] for a in hmm.viverbit(map(operator.itemgetter(0), test[i]))])
    print("{} ------------------> {}".format(entree,solution))

Exemple d'appel de viterbit pour les 10 premieres donnees de test : 
###############################################################
the ------------------> the
leftist ------------------> leftist
is ------------------> is
too ------------------> too
far ------------------> far
gone ------------------> tone
for ------------------> for
that ------------------> that
his ------------------> his
reeljhgs ------------------> reelings


