In [1]:
# HMM1
# Load the data  train10 & test10
# Set the correct path to the data

import pprint, pickle

train_set = pickle.load(open('train10.pkl', 'rb'))
test_set = pickle.load(open('test10.pkl', 'rb'))

In [2]:
import nltk
from numpy import array, ones, zeros, multiply
import numpy as np
import sys

UNK = "<unk>"  # token to map all out-of-vocabulary words (OOVs)
UNKid = 0      # index for UNK
epsilon=1e-100

In [3]:


class HMM:
        def __init__(self, state_list, observation_list,
                 transition_proba = None,
                 observation_proba = None,
                 initial_state_proba = None, smoothing_obs = 0.01):
            """
            Builds a Hidden Markov Model
            * state_list is the list of state symbols [q_0...q_(N-1)]
            * observation_list is the list of observation symbols [v_0...v_(M-1)]
            * transition_proba is the transition probability matrix
                [a_ij] a_ij = Pr(Y_(t+1)=q_i|Y_t=q_j)
            * observation_proba is the observation probablility matrix
                [b_ki] b_ki = Pr(X_t=v_k|Y_t=q_i)
            * initial_state_proba is the initial state distribution
                [pi_i] pi_i = Pr(Y_0=q_i)"""
            print "HMM creating with: "
            self.N = len(state_list)       # number of states
            self.M = len(observation_list) # number of possible emissions
            print str(self.N)+" states"
            print str(self.M)+" observations"
            self.omega_Y = state_list
            self.omega_X = observation_list
            if transition_proba is None:
                self.transition_proba = zeros( (self.N, self.N), float) 
            else:
                self.transition_proba=transition_proba
            if observation_proba is None:
                self.observation_proba = zeros( (self.M, self.N), float) 
            else:
                self.observation_proba=observation_proba
            if initial_state_proba is None:
                self.initial_state_proba = zeros( (self.N,), float ) 
            else:
                self.initial_state_proba=initial_state_proba
            self.make_indexes() # build indexes, i.e the mapping between token and int
            self.smoothing_obs = smoothing_obs 
            
        def make_indexes(self):
            """Creates the reverse table that maps states/observations names
            to their index in the probabilities array"""
            self.Y_index = {}
            for i in range(self.N):
                self.Y_index[self.omega_Y[i]] = i
            self.X_index = {}
            for i in range(self.M):
                self.X_index[self.omega_X[i]] = i
        #cobs,cstat,cpairs,ctrans,cinits
        #def get_observationIndices( self, cobs ):
           
        def data2indices(self, sent): 
            """From one tagged sentence of the brown corpus: 
            - extract the words and tags 
            - returns two list of indices, one for each
            -> (wordids, tagids)
            """
            wordids = list()
            tagids  = list()
            for couple in sent:
                wrd = couple[0]
                tag = couple[1]
                if wrd in self.X_index:
                     wordids.append(self.X_index[wrd])
                tagids.append(self.Y_index[tag])
            return wordids,tagids
            
        def observation_estimation(self,pair_counts):
            """ Build the observation distribution: 
                observation_proba is the observation probablility matrix
                    [b_ki],  b_ki = Pr(X_t=v_k|Y_t=q_i)"""
            # fill with counts
            for pair in pair_counts: 
                wrd=pair[0]
                tag=pair[1]
                cpt=pair_counts[pair]
                k = 0 # for <unk>
                if wrd in self.X_index: 
                    k=self.X_index[wrd]
                i=self.Y_index[tag]
                self.observation_proba[k,i]=cpt
            # normalize
            self.observation_proba=self.observation_proba+self.smoothing_obs
            self.observation_proba=self.observation_proba/self.observation_proba.sum(axis=0).reshape(1,self.N)
            
        
        def transition_estimation(self, trans_counts):
            """ Build the transition distribution: 
                transition_proba is the transition matrix with : 
                [a_ij] a[i,j] = Pr(Y_(t+1)=q_i|Y_t=q_j)
            """
            # fill with counts
            for pair in trans_counts:
                i=self.Y_index[pair[0]]
                j=self.Y_index[pair[1]]
                self.transition_proba[i,j]=trans_counts[pair]
            # normalize
            self.transition_proba=self.transition_proba/self.transition_proba.sum(axis=0).reshape(1,self.N)
        
        def init_estimation(self,  init_counts):
            """Build the init. distribution"""
            # fill with counts
            for tag in  init_counts:
                i=self.Y_index[tag]
                self.initial_state_proba[i]= init_counts[tag]
            # normalize
            self.initial_state_proba=self.initial_state_proba/sum(self.initial_state_proba)
             
        
        def supervised_training(self, pair_counts, trans_counts,init_counts):
            """ Train the HMM's parameters. This function wraps everything"""
            self.observation_estimation(pair_counts)
            self.transition_estimation(trans_counts)
            self.init_estimation(init_counts)
        
        def viterbi(self,wordids):
            """Viterbi algorithm: 
            Find the states corresponding to the observations. 
            The observations must be converted in a list of indices. 
            """
            # shortcuts
            
            B = self.observation_proba 
            A = self.transition_proba
            T = len(wordids)
            N = self.N
            
            # initialization
            
            delta = zeros( N, float )
            tmp = zeros( N, float )
            psi = zeros( (T, N), int )      
            delta_t = zeros( N, float )
            
            # apply initial_state probs to the first frame
            
            delta = B[wordids[0]] * self.initial_state_proba  
            
            
            for t in xrange(1, T):
                O_t = wordids[t]
                for j in range(N):
                    multiply( delta, A[:, j], tmp )
                    idx = psi[t, j] = tmp.argmax()       
                    delta_t[j] = tmp[idx] * B[O_t, j] 
                delta, delta_t = delta_t, delta
                
            # reconstruction
            
            i_star = [delta.argmax()]                        
            for psi_t in psi[-1:0:-1]:
                i_star.append( psi_t[i_star[-1]] )                 
            i_star.reverse()
            return i_star

In [4]:
# Count the observed letters and states
def make_counts(train_set):
    """ 
    Build different count tables to train the HMM. Each count table is a dictionnary. 
    Returns: 
    * c_words: word counts
    * c_tags: tag counts
    * c_pairs: count of pairs (word,tag)
    * c_transitions: count of tag bigram 
    * c_inits: count of tag found in the first position
    """
    c_words = dict()
    c_tags = dict()
    c_pairs= dict()
    c_transitions = dict()
    c_inits = dict()
    for sent in train_set:
        # we use i because of the transition counts
        for i in range(len(sent)):
            couple=sent[i]
            wrd = couple[0]
            tag = couple[1]
            # word counts
            if wrd in c_words:
                c_words[wrd]=c_words[wrd]+1
            else:
                c_words[wrd]=1
            # tag counts
            if tag in c_tags:
                c_tags[tag]=c_tags[tag]+1
            else:
                c_tags[tag]=1
            # observation counts
            if couple in c_pairs:
                c_pairs[couple]=c_pairs[couple]+1
            else:
                c_pairs[couple]=1
            # i >  0 -> transition counts
            if i > 0:
                trans = (sent[i-1][1],tag)
                if trans in c_transitions:
                    c_transitions[trans]=c_transitions[trans]+1
                else:
                    c_transitions[trans]=1
            # i == 0 -> counts for initial states
            else:
                if tag in c_inits:
                    c_inits[tag]=c_inits[tag]+1
                else:
                    c_inits[tag]=1
                    
    return c_words,c_tags,c_pairs, c_transitions, c_inits

In [5]:
c_words,c_tags,c_pairs, c_transitions, c_inits = make_counts(train_set)

print "Number of non corrected letter is  : "+str(len(c_words))
print "Number of corrected letter is  : "+str(len(c_tags))
print "Numer of pairs of tuples: "+str(len(c_pairs))
print "Number of transition : "+str(len(c_transitions))+ " / "+ str(26*26)
print "Number of tag in the first position : "+str(len(c_inits))

Number of non corrected letter is  : 26
Number of corrected letter is  : 26
Numer of pairs of tuples: 127
Number of transition : 403 / 676
Number of tag in the first position : 25


In [6]:
c_words1,c_tags1,c_pairs1, c_transitions1, c_inits1 = make_counts(test_set)

In [7]:
def make_vocab(c_words, threshold):
    """ 
    return a vocabulary by thresholding word counts. 
    inputs: 
    * c_words : a dictionnary that maps word to its counts
    * threshold: count must be >= to the threshold to be included
    
    returns: 
    * a word list
    """
    voc = list()
    voc.append(UNK)
    for w in c_words:
        if c_words[w] >= threshold:
            voc.append(w)
    return voc

In [8]:
vocab = make_vocab(c_words,0)
print "Vocabulary:"+str(len(vocab))

Vocabulary:27


In [9]:
vocab = make_vocab(c_words1,0)
print "Vocabulary:"+str(len(vocab))

Vocabulary:27


In [10]:
hmm = HMM(state_list=c_tags.keys(), observation_list=c_words.keys(),
                 transition_proba = None,
                 observation_proba = None,
                 initial_state_proba = None)

HMM creating with: 
26 states
26 observations


In [11]:
hmm.observation_estimation(c_pairs)
print hmm.observation_proba.sum(axis=0)

[ 1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1.
  1.  1.  1.  1.  1.  1.  1.  1.]


In [12]:
# Learning step by step
hmm.transition_estimation(c_transitions)

#Print hmm.transition_proba
print hmm.transition_proba.sum(axis=0)

[ 1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1.
  1.  1.  1.  1.  1.  1.  1.  1.]


In [13]:
hmm.init_estimation(c_inits)
print sum(hmm.initial_state_proba)

1.0


In [14]:
# One time learning
hmm = HMM(state_list=c_tags.keys(), observation_list=vocab,
                 transition_proba = None,
                 observation_proba = None,
                 initial_state_proba = None,
                 smoothing_obs = 0.001)
hmm.supervised_training(c_pairs,c_transitions,c_inits)

HMM creating with: 
26 states
27 observations


In [15]:
# Viterbi
import time

n_correct=0
tot=0
ccount=0
start = time.time()
for sent in test_set:
    wordids,tagids = hmm.data2indices(sent)

    viterbi= hmm.viterbi(wordids)
                            
    n_correct+=sum(np.array(viterbi)==np.array(tagids))
                                            
    tot+=len(tagids)
end = time.time()
    
    
precision=n_correct*100.0/tot
print "precision  = "+str(precision)+"%"
error_rate=100.0-precision
print"error rate  = "+str(error_rate)+"%"

precision  = 92.5819672131%
error rate  = 7.41803278689%
