In [11]:
import pickle
import numpy as np
from numpy import array, ones,zeros
import sys

class HMM:
    def __init__(self, 
               state_list,observation_list,
               transition_proba=None, 
               observation_proba=None,
               initial_state_proba=None,
               smoothing_obs=0.01):
        print "HMM creating with: "
        self.N = len(state_list)#number of states
        self.M = len(observation_list)#number of observation
        print str(self.N)+" states"
        print str(self.M)+" observations."
        self.omega_Y = state_list
        self.omega_X = observation_list
        if transition_proba is None:
            self.transition_proba = zeros((self.N,self.N), float)
        else:
            self.transition_proba = transition_proba
        if observation_proba is None:
            self.observation_proba = zeros((self.M, self.N),float)
        else:
            self.observation_proba = observation_proba
        if initial_state_proba is None:
            self.initial_state_proba = zeros((self.N,),float)
        self.make_indexes()
        self.smoothing_obs = smoothing_obs
    
    def make_indexes(self):
        self.Y_index = {}
        for i in range(self.N):
            self.Y_index[self.omega_Y[i]] = i
        self.X_index = {}
        for i in range(self.M):
            self.X_index[self.omega_X[i]] = i
            
    def observation_estimation(self, pair_counts):
        for pair in pair_counts:
            obs = pair[0]
            tag = pair[1]
            cpt = pair_counts[pair]
            k = 0
            if obs in self.X_index:
                k = self.X_index[obs]
            i = self.Y_index[tag]
            self.observation_proba[k,i] = cpt
        
        self.observation_proba = self.observation_proba + self.smoothing_obs
        self.observation_proba = self.observation_proba/self.observation_proba.sum(axis=0).reshape(1,self.N)
        
    def transition_estimation(self, trans_counts):
        for pair in trans_counts:
            i = self.Y_index[pair[1]]
            j = self.Y_index[pair[0]]
            self.transition_proba[i,j] = trans_counts[pair]
        self.transition_proba = self.transition_proba/self.transition_proba.sum(axis=0).reshape(1,self.N)
        
    def init_estimation(self, init_counts):
        for tag in init_counts:
            i = self.Y_index[tag]
            self.initial_state_proba[i] = init_counts[tag]
        self.initial_state_proba = self.initial_state_proba/sum(self.initial_state_proba)
            
    def supervised_training(self, pair_counts, trans_counts, init_counts):
        self.observation_estimation(pair_counts)
        self.transition_estimation(trans_counts)
        self.init_estimation(init_counts)
        
    def viterbi(self, obs):#obs is chars sequence
        V = [{}]
        path = {}

        # Initialize base cases (t == 0)
        for y in range(self.N):
            V[0][y] = self.initial_state_proba[y] * self.observation_proba[y,self.X_index[obs[0]]]
            path[y] = [y]

        # Run Viterbi for t > 0
        for t in range(1,len(obs)):
            V.append({})
            newpath = {}

            for y in range(self.N):
                (prob, state) = max([(V[t-1][y0] * self.transition_proba[y,y0] * self.observation_proba[self.X_index[obs[t]],y], y0) for y0 in range(self.N)])
                V[t][y] = prob
                newpath[y] = path[state] + [y]

            # Don't need to remember the old paths
            path = newpath

        (prob, state) = max([(V[len(obs) - 1][y], y) for y in range(self.N)])
        return path[state]
    
    def viterbi_2(self,c_trans_2,obs,c_obs_2,couple):
        l = len(obs)
        obs_list = self.omega_X
        if l < 3:
            ls_states = self.viterbi(obs)
            return ls_states
        else:
            V1 = []
            for i in couple:
                for j in range(l-1):
                    V1.append((j,i))
            V = dict.fromkeys(V1)
            path = dict.fromkeys(V1)
            for i in V:
                V[i] = 0
            for y in couple:
                mot0 = self.X_index[y[0]]
                mot1 = self.X_index[y[1]]
                obs0 = self.X_index[obs[0]]
                obs1 = self.X_index[obs[1]]
                V[0,y] = self.initial_state_proba[mot0]*self.observation_proba[obs0,mot0]*self.observation_proba[obs1,mot1]*self.transition_proba[mot1,mot0]
                path[0,y] = 0
            for t in range(1,l-1):
                for y in couple:
                    (j,k) = y
                    (proba,state) = max([(V[t-1,(i,j)]*c_trans_2[((i,j),k)]*c_obs_2[((j,k),obs[t+1])],i) for i in obs_list])
                    V[t,y] = proba
                    path[t,y] = state
            result = []
            for j in V:
                if j[0] == l-2:
                    result.append((V[j],j))
            last2,last1 = max(result)[1][1]
            list_state = []
            list_state.append(last1)
            list_state.append(last2)
            search_couple = (last2,last1)
            for i in reversed(range(l-2)):
                last = path[i+1,(search_couple)]
                list_state.append(last)
                search_couple = (list_state[-1],list_state[-2])
            list_state.reverse()
            ls_state = []
            for i in list_state:
                ls_state.append(hmm.X_index[i])
            return ls_state
    
with open('train10.pkl','rb') as f:
    data = pickle.load(f)

In [12]:
def make_counts(corpus):
    c_tags = dict()
    c_obs = dict()
    c_trans = dict()
    c_pairs = dict()
    c_inits = dict()
    for word in corpus:
        for i in range(len(word)):
            pair = word[i]
            tag = pair[1]
            obs = pair[0]
            if tag in c_tags:
                c_tags[tag]+=1
            else:
                c_tags[tag]=1
            if obs in c_obs:
                c_obs[obs]+=1
            else:
                c_obs[obs]=1
            if pair in c_pairs:
                c_pairs[pair]+=1
            else:
                c_pairs[pair]=1
            if i>0:
                trans = (word[i-1][1],tag)
                if trans in c_trans:
                    c_trans[trans]+=1
                else:
                    c_trans[trans]=1
            else:
                if tag in c_inits:
                    c_inits[tag]+=1
                else:
                    c_inits[tag]=1
    return c_obs, c_tags, c_pairs, c_trans, c_inits

In [13]:
c_obs, c_tags, c_pairs, c_trans, c_inits = make_counts(data)

In [14]:
hmm = HMM(c_tags.keys(),c_obs.keys(),transition_proba=None,
         observation_proba=None,
         initial_state_proba=None,smoothing_obs=0.001)
hmm.supervised_training(c_pairs, c_trans, c_inits)

HMM creating with: 
26 states
26 observations.


In [15]:
x = []
y = []
for i in c_tags:
    for j in c_tags:
        for k in c_tags:
            x.append(((i,j),k))
        y.append((i,j))
c_trans_2 = dict.fromkeys(x)
c_counter_2 = dict.fromkeys(y)
for i in c_trans_2:
    c_trans_2[i] = 0
for j in c_counter_2:
    c_counter_2[j] = 0
    

In [16]:
for i in data:
    if len(i)>2:
        for j in range(len(i)-2):
            c_counter_2[(i[j][1],i[j+1][1])] += 1
            c_trans_2[((i[j][1],i[j+1][1]),i[j+2][1])] += 1

In [17]:
from __future__ import division
for i in c_trans_2:
    if c_trans_2[i] !=0:
        c_trans_2[i] = c_trans_2[i] / c_counter_2[i[0]]

In [18]:
c_obs_2 = dict.fromkeys(x)
c_obs_counter = dict.fromkeys(y)
for i in c_obs_2:
    c_obs_2[i] = 0
for i in c_obs_counter:
    c_obs_counter[i] = 0
for i in data:
    if len(i)>2:
        for j in range(len(i)-1):
            c_obs_2[((i[j][1],i[j+1][1]),i[j+1][0])] +=1
            c_obs_counter[(i[j][1],i[j+1][1])] +=1
for i in c_obs_2:
    if c_obs_2[i] != 0:
        c_obs_2[i] = c_obs_2[i] / c_obs_counter[i[0]] 

In [19]:
with open('test20.pkl','rb') as f:
    data_test = pickle.load(f)
errors = 0
total = 0
errors_letters = 0

for word in data_test:
    obs = []
    true = []
    for pair in word:
        obs.append(pair[0])
        true.append(hmm.Y_index[pair[1]])
    ls_states = hmm.viterbi_2(c_trans_2, obs, c_obs_2, y)

    for i in  range(len(ls_states)):
        total+=1
        if ls_states[i]!=true[i]:
            errors_letters+=1
print "Error rate =", errors_letters*1.0/total

Error rate = 0.0821999880175
