In [33]:
import nltk
from numpy import array, ones, zeros, multiply
import numpy as np
import sys

# construc the first order HMM Model

UNK = "<unk>"
UNKid = 0
epsilon = 1e-100

class HMM:
        def __init__(self, state_list, observation_list,
                 transition_proba = None,
                 observation_proba = None,
                 initial_state_proba = None, smoothing_obs = 0.01):
            """
            Builds a Hidden Markov Model
            * state_list is the list of state symbols [q_0...q_(N-1)]
            * observation_list is the list of observation symbols [v_0...v_(M-1)]
            * transition_proba is the transition probability matrix
                [a_ij] a_ij = Pr(Y_(t+1)=q_i|Y_t=q_j)
            * observation_proba is the observation probablility matrix
                [b_ki] b_ki = Pr(X_t=v_k|Y_t=q_i)
            * initial_state_proba is the initial state distribution
                [pi_i] pi_i = Pr(Y_0=q_i)"""
            print("HMM creating with: ")
            self.N = len(state_list)       # number of states
            self.M = len(observation_list) # number of possible emissions
            print(str(self.N)+" states")
            print(str(self.M)+" observations")
            self.omega_Y = state_list
            self.omega_X = observation_list
            if transition_proba is None:
                self.transition_proba = zeros( (self.N, self.N), float) 
            else:
                self.transition_proba=transition_proba
            if observation_proba is None:
                self.observation_proba = zeros( (self.M, self.N), float) 
            else:
                self.observation_proba=observation_proba
            if initial_state_proba is None:
                self.initial_state_proba = zeros( (self.N,), float ) 
            else:
                self.initial_state_proba=initial_state_proba
            self.make_indexes() # build indexes, i.e the mapping between token and int
            self.smoothing_obs = smoothing_obs 
            
        def make_indexes(self):
            """Creates the reverse table that maps states/observations names
            to their index in the probabilities array"""
            self.Y_index = {}
            for i in range(self.N):
                self.Y_index[self.omega_Y[i]] = i
            self.X_index = {}
            for i in range(self.M):
                self.X_index[self.omega_X[i]] = i
      
        def get_observationIndices( self, observations ):
            """return observation indices, i.e 
            return [self.O_index[o] for o in observations]
            and deals with OOVs
            """
            indices = zeros( len(observations), int )
            k = 0
            for o in observations:
                if o in self.X_index:
                    indices[k] = self.X_index[o]
                else:
                    indices[k] = UNKid
                k += 1
            return indices

    
        def data2indices(self, sent): 
            """From one tagged sentence of the brown corpus: 
            - extract the words and tags 
            - returns two list of indices, one for each
            -> (wordids, tagids)
            """
            wordids = list()
            tagids  = list()
            for couple in sent:
                wrd = couple[0]
                tag = couple[1]
                if wrd in self.X_index:
                    wordids.append(self.X_index[wrd])
                else:
                    wordids.append(UNKid)
                tagids.append(self.Y_index[tag])
            return wordids,tagids
            
        def observation_estimation(self, pair_counts):
            """ Build the observation distribution: 
                observation_proba is the observation probablility matrix
                    [b_ki],  b_ki = Pr(X_t=v_k|Y_t=q_i)"""
            # fill with counts
            for pair in pair_counts:
                wrd=pair[0]
                tag=pair[1]
                cpt=pair_counts[pair]
                k = 0 # for <unk>
                if wrd in self.X_index: 
                    k=self.X_index[wrd]
                i=self.Y_index[tag]
                self.observation_proba[k,i]=cpt
            # normalize
            self.observation_proba=self.observation_proba+self.smoothing_obs
            self.observation_proba=self.observation_proba/self.observation_proba.sum(axis=0).reshape(1,self.N)
            
        
        def transition_estimation(self, trans_counts):
            """ Build the transition distribution: 
                transition_proba is the transition matrix with : 
                [a_ij] a[i,j] = Pr(Y_(t+1)=q_i|Y_t=q_j)
            """
            # fill with counts
            for pair in trans_counts:
                i=self.Y_index[pair[1]]
                j=self.Y_index[pair[0]]
                self.transition_proba[i,j]=trans_counts[pair]
            # normalize
            self.transition_proba=self.transition_proba/self.transition_proba.sum(axis=0).reshape(1,self.N)
        
        def init_estimation(self, init_counts):
            """Build the init. distribution"""
            # fill with counts
            for tag in init_counts:
                i=self.Y_index[tag]
                self.initial_state_proba[i]=init_counts[tag]
            # normalize
            self.initial_state_proba=self.initial_state_proba/sum(self.initial_state_proba)
             
        
        def supervised_training(self, pair_counts, trans_counts,init_counts):
            """ Train the HMM's parameters. This function wraps everything"""
            self.observation_estimation(pair_counts)
            self.transition_estimation(trans_counts)
            self.init_estimation(init_counts)
            
        # Algorithme Viterbi
        def viterbi (self, obs):
            """
            @obs: phrase
            """
            B = self.observation_proba
            A = self.transition_proba
            T = len(obs)
            N = self.N
            # init
            delta = zeros(N, float)
            tmp = zeros(N, float)
            psi = zeros((T,N), int)
            delta_t = zeros(N, float)
            # apply initial_state probs to the first frame
            delta = B[obs[0]] * self.initial_state_proba
            # recursion
            for t in range(1, T):
                O_t = obs[t]
                for j in range(N):
                    multiply(delta, A[j,:], tmp)
                    idx = psi[t,j] = tmp.argmax()
                    delta_t[j] = tmp[idx] * B[O_t,j]
                delta, delta_t = delta_t, delta
            # reconstruction
            i_star = [delta.argmax()]
            for psi_t in psi[-1:0:-1]:
                i_star.append(psi_t[i_star[-1]])
            i_star.reverse()
            return i_star

In [19]:
def make_counts(corpus):
    """ 
    Build different count tables to train a HMM. Each count table is a dictionnary. 
    Returns: 
    * c_words: word counts
    * c_tags: tag counts
    * c_pairs: count of pairs (word,tag)
    * c_transitions: count of tag bigram 
    * c_inits: count of tag found in the first position
    """
    c_words = dict()
    c_tags = dict()
    c_pairs= dict()
    c_transitions = dict()
    c_inits = dict()
    for sent in corpus:
        # we use i because of the transition counts
        for i in range(len(sent)):
            couple=sent[i]
            wrd = couple[0]
            tag = couple[1]
            # word counts
            if wrd in c_words:
                c_words[wrd]=c_words[wrd]+1
            else:
                c_words[wrd]=1
            # tag counts
            if tag in c_tags:
                c_tags[tag]=c_tags[tag]+1
            else:
                c_tags[tag]=1
            # observation counts
            if couple in c_pairs:
                c_pairs[couple]=c_pairs[couple]+1
            else:
                c_pairs[couple]=1
            # i >  0 -> transition counts
            if i > 0:
                trans = (sent[i-1][1],tag)
                if trans in c_transitions:
                    c_transitions[trans]=c_transitions[trans]+1
                else:
                    c_transitions[trans]=1
            # i == 0 -> counts for initial states
            else:
                if tag in c_inits:
                    c_inits[tag]=c_inits[tag]+1
                else:
                    c_inits[tag]=1
                    
    return c_words,c_tags,c_pairs, c_transitions, c_inits

In [21]:
def make_vocab(c_words, threshold):
    """ 
    return a vocabulary by thresholding word counts. 
    inputs: 
    * c_words : a dictionnary that maps word to its counts
    * threshold: count must be >= to the threshold to be included
    
    returns: 
    * a word list
    """
    voc = list()
    voc.append(UNK)
    for w in c_words:
        if c_words[w] >= threshold:
            voc.append(w)
    return voc

In [22]:
# read train data
import pickle
fr10 = open('./data/train10.pkl','rb')
fr20 = open('./data/train20.pkl','rb')
train10 = pickle.load(fr10)
train20 = pickle.load(fr20)
fr10.close()
fr20.close()

In [23]:
cwords,ctags,cpairs,ctrans,cinits = make_counts(train20)
print("Nombre de mots  : "+str(len(cwords)))
print("Nombre de tags  : "+str(len(ctags)))
print("Nombre de paires: "+str(len(cpairs)))
print("Nombre de trans : "+str(len(ctrans))+ " / "+ str(12*12))
print("Nombre de init. : "+str(len(cinits)))
print(ctags)
vocab = make_vocab(cwords,10)
print("Vocabulaire: " + str(len(vocab)))

Nombre de mots  : 26
Nombre de tags  : 26
Nombre de paires: 128
Nombre de trans : 401 / 144
Nombre de init. : 25
{'b': 1947, 'y': 2771, 't': 12933, 'h': 6266, 'e': 16899, 'i': 10197, 'r': 7730, 'o': 11205, 'w': 2080, 'n': 9155, 'a': 9880, 'c': 4511, 'u': 3713, 'v': 1792, 'l': 5964, 's': 9072, 'f': 3104, 'm': 3543, 'd': 4265, 'g': 2549, 'k': 559, 'p': 3043, 'z': 117, 'j': 99, 'x': 260, 'q': 143}
Vocabulaire: 27


In [30]:
hmm = HMM(state_list=list(ctags.keys()), observation_list=vocab,
         transition_proba = None,
         observation_proba = None,
         initial_state_proba = None)
hmm.supervised_training(cpairs,ctrans,cinits)

HMM creating with: 
26 states
27 observations


In [31]:
# read test data
fr10 = open('./data/test10.pkl','rb')   
fr20 = open('./data/test20.pkl','rb')
test10 = pickle.load(fr10) 
test20 = pickle.load(fr20)
fr10.close()
fr20.close()

In [34]:
tot=0.0
correct=0.0
for t in test10:
    letter_index, tag_index = hmm.data2indices(t)
    predict_tag = hmm.viterbi(letter_index)
    correct += np.count_nonzero(np.array(tag_index) == np.array(predict_tag))
    tot+=len(t)
print("The accuracy(%) : "+str(correct)+" / "+str(tot)+ " -> "+ str(correct/tot))

NameError: name 'xrange' is not defined

In [None]:
tot=0.0
correct=0.0
for t in test20:
    letter_index, tag_index = hmm.data2indices(t)
    predict_tag = hmm.viterbi(letter_index)
    correct += np.count_nonzero(np.array(tag_index) == np.array(predict_tag))
    tot+=len(t)
print "The accuracy(%) : "+str(correct)+" / "+str(tot)+ " -> "+ str(correct/tot)