# Read data from pickle file

In [2]:
def read_file(file):
    data = []
    with open(file, 'rb') as f:
        while True:
            try:
                data.append(pickle.load(f))
            except EOFError:
                break
    return data 

# Examiner les données

In [9]:
import operator
from functools import reduce
import pandas as pd
import numpy as np

def analyse_data(data_set):
    all_characters = reduce(operator.concat, data_set[0])
    df_train = pd.DataFrame(all_characters, columns=["obs", "state"])
    df_train['bigram'] = df_train['state']+df_train['state'].shift(-1)
    df_train['obs_state_pair'] = df_train['obs']+df_train['state']
    bigram_count = df_train.bigram.value_counts()
    observation_count = df_train.obs_state_pair.value_counts()
    alphabet = []
    for letter in range(97,123):
        alphabet.append(chr(letter))
    
    # counting transition probabilities
    M_transitions = np.zeros((26,26), dtype=float) # matrix of counting transition
    for bi in bigram_count.iteritems():
        M_transitions[alphabet.index(bi[0][0]), alphabet.index(bi[0][1])] = bi[1]
    M_probas_transitions = np.zeros((26,26), dtype=float)
    sum_by_row = np.expand_dims(np.sum(M_transitions, axis=1), axis=1)
    M_probas_transitions = M_transitions / np.tile(sum_by_row, (1, M_transitions.shape[1]))
    
    # counting observation probabilities
    M_observation = np.zeros((26,26), dtype=float)
    for ob in observation_count.iteritems():
        M_observation[alphabet.index(ob[0][0]), alphabet.index(ob[0][1])] = ob[1]
    M_probas_observation = np.zeros((26,26), dtype=float)
    sum_by_column = np.expand_dims(np.sum(M_observation, axis=0), axis=0)
    M_probas_observation = M_observation / np.tile(sum_by_column, (M_observation.shape[1], 1))
    
    # initial states probabilities
    initial_state_proba = zeros( (len(alphabet)), float )
    for i, phrase in enumerate(data_set[0]):
        initial_state_proba[alphabet.index(phrase[0][1])] +=1
    initial_state_proba = initial_state_proba / len(data_set[0])
    
    # for second-order modele: calculating two-state transition probabilities
    dict_trigram = {}
    for lettre in alphabet:
        dict_trigram[lettre] = np.zeros((26,26))
    df_train['combined'] = df_train['state']+df_train['state'].shift(-1)+df_train['state'].shift(-2)
    combined_counts = df_train.combined.value_counts()
    for tri in combined_counts.iteritems():
        M_temps = dict_trigram[tri[0][0]]
        M_temps[alphabet.index(tri[0][1]), alphabet.index(tri[0][2])] = tri[1]
    dict_trigram_probas = {}
    for lettre, matrix in dict_trigram.items():
        sum_by_row = np.expand_dims(np.sum(matrix, axis=1), axis=1) + 1e-5
        dict_trigram_probas[lettre] = matrix / np.tile(sum_by_row, (1, matrix.shape[1]))
    
    
    return (M_probas_transitions, M_probas_observation, initial_state_proba, dict_trigram_probas)

# Model HMM with Viterbi algorithm

In [10]:
import nltk
from numpy import array, ones, zeros
import sys

# Some words in test could be unseen during training, or out of the vocabulary (OOV) even during the training. 
# To manage OOVs, all words out the vocabulary are mapped on a special token: UNK defined as follows: 
UNK = "<unk>" 
UNKid = 0 

class HMM:
        def __init__(self, state_list, observation_list, dict_trigram_probas,
                 transition_proba = None,
                 observation_proba = None,
                 initial_state_proba = None):
            """Builds a new Hidden Markov Model
            state_list is the list of state symbols [q_0...q_(N-1)]
            observation_list is the list of observation symbols [v_0...v_(M-1)]
            transition_proba is the transition probability matrix
                [a_ij] a_ij = Pr(Y_(t+1)=q_i|Y_t=q_j)
            observation_proba is the observation probablility matrix
                [b_ki] b_ki = Pr(X_t=v_k|Y_t=q_i)
            initial_state_proba is the initial state distribution
                [pi_i] pi_i = Pr(Y_0=q_i)"""
            print ("HMM creating with: ")
            self.N = len(state_list) # The number of states
            self.M = len(observation_list) # The number of words in the vocabulary
            print (str(self.N)+" states")
            print (str(self.M)+" observations")
            self.omega_Y = state_list # Keep the vocabulary of tags
            self.omega_X = observation_list # Keep the vocabulary of tags
            # Init. of the 3 distributions : observation, transition and initial states
            if transition_proba is None:
                self.transition_proba = zeros( (self.N, self.N), float) 
            else:
                self.transition_proba=transition_proba
            if observation_proba is None:
                self.observation_proba = zeros( (self.M, self.N), float) 
            else:
                self.observation_proba=observation_proba
            if initial_state_proba is None:
                self.initial_state_proba = zeros( (self.N,), float ) 
            else:
                self.initial_state_proba=initial_state_proba
            self.dict_trigram_probas = dict_trigram_probas
            # Since everything will be stored in numpy arrays, it is more convenient and compact to 
            # handle words and tags as indices (integer) for a direct access. However, we also need 
            # to keep the mapping between strings (word or tag) and indices.
            self.alphabet = state_list
            self.make_indexes()

        def make_indexes(self):
            """Creates the reverse table that maps states/observations names
            to their index in the probabilities arrays"""
            self.Y_index = {}
            for i in range(self.N):
                self.Y_index[self.omega_Y[i]] = i
            self.X_index = {}
            for i in range(self.M):
                self.X_index[self.omega_X[i]] = i
        
        def first_order(self, X_test):
            # unpack test set to list of tuples
            test_tuples = reduce(operator.concat, X_test[0])
            accuracy = 0
            for word in X_test[0]:
                predited_list=[]
                backward = np.zeros((len(self.alphabet), len(word)))
                q_table = np.zeros((len(self.alphabet), len(word)))
                for t, pair in enumerate(word):
                    if t==0:
                        index = self.alphabet.index(pair[0])
                        q_table[:,0] = np.multiply(self.initial_state_proba, self.observation_proba[index,:])
                        backward[:,0] = 0
                        
                    else:
                        for s in range(26):
                            beta = np.multiply( self.transition_proba[:,s], q_table[:,t-1] )
                            index = self.alphabet.index(pair[0])
                            before_max = np.multiply(self.observation_proba[index, s], beta)
                            q_table[s,t] = np.max(before_max)
                            backward[s,t] = np.argmax(beta)
                    
                final_state_index = np.argmax(q_table[:,len(word)-1])
                predited_list.insert(0, self.alphabet[final_state_index])
                
                for i in range(1,len(word)):
                    if i==1:
                        index_previous_state = backward[final_state_index, len(word)-i]
                    else:
                        index_previous_state = backward[int(index_previous_state), len(word)-i]
                    predited_list.insert(0, self.alphabet[int(index_previous_state)])
                
                # calculate accuracy:
                for t, pair in enumerate(word):
                    if pair[1] == predited_list[t]:
                        accuracy +=1
                    #else:
                        #print("word=", word, "----true state:", pair[1], "--- predicted state:", predited_list[t])
            print("There are", len(test_tuples), "characters in", len(X_test[0]), "words from test set")
            print("Modele has predicted exaclty", accuracy, "states of typos")
            print("Accuracy is:", accuracy/len(test_tuples))
            
        def second_order(self, X_test):
            test_tuples = reduce(operator.concat, X_test[0])
            accuracy = 0
            #Step 1:
            for word in X_test[0]:
                predited_list=[]
                q_table = []
                backward = []
                for w in range(len(word)):
                    backward.append(np.zeros((26,26)))
                for w in range(len(word)):
                    if w == 0:
                        q_table.append(np.zeros((26)))
                    else:
                        q_table.append(np.zeros((26,26)))

                for i, pair in enumerate(word):
                    if i==0:
                        index0 = self.alphabet.index(pair[0])
                        for l in range(26):
                            q_table[i][l] = self.initial_state_proba[l] * self.observation_proba[index0, l]
                    
                    elif i==1:
                        index1 = self.alphabet.index(pair[0])
                        for l in range(26):
                            for m in range(26):
                                # d2(l,m) = q_table(l*26+m, 1)
                                q_table[i][l, m] = q_table[i-1][l] * self.transition_proba[l,m] *\
                                                                self.observation_proba[index1, m]
                    # step 2
                    else:
                        index = self.alphabet.index(pair[0])
                        for m in range(26):
                            for n in range(26):
                                array_temps_d = np.zeros((26))
                                array_temps_c = np.zeros((26))
                                for l in range(26):
                                    state_l = self.alphabet[l]
                                    d = q_table[i-1][l,m]*self.dict_trigram_probas[state_l][m,n]*\
                                                                    self.observation_proba[index,n]
                                    c = q_table[i-1][l,m]*self.dict_trigram_probas[state_l][m,n]
                                    array_temps_d[l] = d
                                    array_temps_c[l] = c
                                    
                                q_table[i][m,n] = np.max(array_temps_d)
                                backward[i][m,n] = np.argmax(array_temps_c)
                K = len(word)-1
                if len(word) == 1:
                    final_state_compound = np.argwhere(q_table[K].max() == q_table[K])
                    only_state = final_state_compound[0][0]
                    predited_list.insert(0, self.alphabet[only_state])
                else:    
                    
                    final_state_compound = np.argwhere(q_table[K].max() == q_table[K])
                    final_state_index = final_state_compound[0][1]
                    before_final_index = final_state_compound[0][0]
                    predited_list.insert(0, self.alphabet[final_state_index])
                    predited_list.insert(0, self.alphabet[before_final_index])
                
                for i in range(len(word)-1, 1, -1):
                    index_compound = backward[i][before_final_index, final_state_index]
                    index_previous_state = int(index_compound)
                    predited_list.insert(0, self.alphabet[index_previous_state])
                    final_state_index = before_final_index
                    before_final_index = index_previous_state
                
                # calculate accuracy:
                for t, pair in enumerate(word):
                    if pair[1] == predited_list[t]:
                        accuracy +=1
                    #else:
                        #print("word=", word, "----true state:", pair[1], "--- predicted state:", predited_list[t])
            print("There are", len(test_tuples), "characters in", len(X_test[0]), "words from test set")
            print("Modele has predicted exaclty", accuracy, "states of typos")
            print("Accuracy is:", accuracy/len(test_tuples))

# Load data and test model

In [11]:
import pickle
TRAIN_PATH_10 = "typos-data/train10.pkl"
TEST_PATH_10 = "typos-data/test10.pkl"
TRAIN_PATH_20 = "typos-data/train20.pkl"
TEST_PATH_20 = "typos-data/test20.pkl"

In [26]:
train_set_10 = read_file(TRAIN_PATH_10)
test_set_10 = read_file(TEST_PATH_10)
train_set_20 = read_file(TRAIN_PATH_20)
test_set_20 = read_file(TEST_PATH_20)

### Test with dataset of 10% typos

In [14]:
all_probas = analyse_data(train_set_10)
M_probas_transitions = all_probas[0]
M_probas_observation = all_probas[1]
initial_state_proba = all_probas[2]
dict_trigram_probas = all_probas[3]

In [15]:
state_list = []
for letter in range(97,123):
    state_list.append(chr(letter))
observation_list = state_list

HMM_model = HMM(state_list, observation_list, dict_trigram_probas,
                 transition_proba = M_probas_transitions,
                 observation_proba = M_probas_observation,
                 initial_state_proba = initial_state_proba)

HMM creating with: 
26 states
26 observations


#### first order viterbi algorithm

In [16]:
HMM_model.first_order(test_set_10)

There are 7320 characters in 1501 words from test set
Modele has predicted exaclty 6800 states of typos
Accuracy is: 0.9289617486338798


#### second order viterbi algorithm

In [17]:
HMM_model.second_order(test_set_10)

There are 7320 characters in 1501 words from test set
Modele has predicted exaclty 6909 states of typos
Accuracy is: 0.9438524590163935


### Test with dataset of 20% typos

In [23]:
all_probas_20 = analyse_data(train_set_20)
M_probas_transitions_20 = all_probas_20[0]
M_probas_observation_20 = all_probas_20[1]
initial_state_proba_20 = all_probas_20[2]
dict_trigram_probas_20 = all_probas_20[3]

In [24]:
state_list = []
for letter in range(97,123):
    state_list.append(chr(letter))
observation_list = state_list

HMM_model_20 = HMM(state_list, observation_list, dict_trigram_probas_20,
                 transition_proba = M_probas_transitions_20,
                 observation_proba = M_probas_observation_20,
                 initial_state_proba = initial_state_proba_20)

HMM creating with: 
26 states
26 observations


#### first order viterbi algorithm

In [27]:
HMM_model_20.first_order(test_set_20)

There are 16691 characters in 3374 words from test set
Modele has predicted exaclty 14403 states of typos
Accuracy is: 0.8629201366005632


#### second order viterbi algorithm

In [28]:
HMM_model_20.second_order(test_set_20)

There are 16691 characters in 3374 words from test set
Modele has predicted exaclty 15103 states of typos
Accuracy is: 0.9048589059972441
