# Project: Second-Order HMM for typos correction



The goal is to design a model to correct typos in texts without a dictionaries.

In this problem, a state refers to the correct letter that should have been typed, and an observation refers to the actual letter that is typed. Given a sequence of outputs/observations (i.e., actually typed letters), the problem is to reconstruct the hidden state sequence (i.e., the intended sequence of letters). Thus, data for this problem looks like:

* [('t', 't'), ('h', 'h'), ('w', 'e'), ('k', 'm')]
* [('f', 'f'), ('o', 'o'), ('r', 'r'), ('m', 'm')] 

The first example is misspelled: the observation is thwk while the correct word is them. The second example is correctly typed.

Data for this problem was generated as follows: starting with a text document, in this case, the Unabomber's Manifesto, which was chosen not for political reasons, but for its convenience being available on-line and of about the right length, all numbers and punctuation were converted to white space and all letters converted to lower case. The remaining text is a sequence only over the lower case letters and the space character, represented in the data files by an underscore character. Next, typos were artificially added to the data as follows: with 90% probability, the correct letter is transcribed, but with 10% probability, a randomly chosen neighbor (on an ordinary physical keyboard) of the letter is transcribed instead. Space characters are always transcribed correctly. In a harder variant of the problem, the rate of errors is increased to 20%.

The dataset in an archive, see the shared drive to download it. This archive contains 4 pickles: train10 and test10 constitute the dataset with 10% or spelling errors, while train20 and test20 the one with 20% or errors.


## Part 1 : First Order HMM

In [24]:
import nltk
from numpy import array, ones, zeros
import sys

class HMM:
        def __init__(self, state_list, observation_list,
                 transition_proba = None,
                 observation_proba = None,
                 initial_state_proba = None):
            """Builds a new Hidden Markov Model
            state_list is the list of state symbols [q_0...q_(N-1)]
            observation_list is the list of observation symbols [v_0...v_(M-1)]
            transition_proba is the transition probability matrix
                [a_ij] a_ij = Pr(Y_(t+1)=q_i|Y_t=q_j)
            observation_proba is the observation probablility matrix
                [b_ki] b_ki = Pr(X_t=v_k|Y_t=q_i)
            initial_state_proba is the initial state distribution
                [pi_i] pi_i = Pr(Y_0=q_i)"""
            print("HMM created with: ")
            self.N = len(state_list) # The number of states
            self.M = len(observation_list) # The number of words in the vocabulary
            print(str(self.N)+" states")
            print(str(self.M)+" observations")
            self.omega_Y = state_list # Keep the vocabulary of tags
            self.omega_X = observation_list # Keep the vocabulary of tags
            # Init. of the 3 distributions : observation, transition and initial states
            if transition_proba is None:
                self.transition_proba = zeros( (self.N, self.N), float) 
            else:
                self.transition_proba=transition_proba
            if observation_proba is None:
                self.observation_proba = zeros( (self.M, self.N), float) 
            else:
                self.observation_proba=observation_proba
            if initial_state_proba is None:
                self.initial_state_proba = zeros( (self.N,), float ) 
            else:
                self.initial_state_proba=initial_state_proba
            # Since everything will be stored in numpy arrays, it is more convenient and compact to 
            # handle words and tags as indices (integer) for a direct access. However, we also need 
            # to keep the mapping between strings (word or tag) and indices. 
            self.make_indexes()

        def make_indexes(self):
            """Creates the reverse table that maps states/observations names
            to their index in the probabilities arrays"""
            self.Y_index = {}
            omega_Y_keys = [key for key in self.omega_Y.keys()]
            omega_X_keys = [key for key in self.omega_X.keys()]
            for i in range(self.N):
                self.Y_index[omega_Y_keys[i]] = i
            self.X_index = {}
            for i in range(self.M):
                self.X_index[omega_X_keys[i]] = i
                
        def get_X_index(self, word):
            if word in self.X_index:
                return word
            else:
                return UNK
        
        def compute_init_state_proba(self, data):
            for sent in data:
                self.initial_state_proba[self.Y_index[sent[0][1]]]+=1
            self.initial_state_proba/=len(data)
            
        def compute_observation_probas(self, data):            
            for phr in data:
                for word in phr:
                    x = self.X_index[self.get_X_index(word[0])]
                    y = self.Y_index[word[1]]
                    self.observation_proba[x][y] += 1
            self.observation_proba /= np.sum(self.observation_proba, axis=1)[:, np.newaxis]
             
        def compute_transition_probas(self, data):            
            for phr in data:
                for i in range(len(phr) - 1):
                    yplus1 = self.Y_index[phr[i + 1][1]]
                    y = self.Y_index[phr[i][1]]
                    self.transition_proba[y][yplus1] += 1
            self.transition_proba /= np.sum(self.transition_proba, axis=1)[:, np.newaxis]
            
        def init_parameters(self, train_set):
            self.compute_init_state_proba(train_set)
            self.compute_observation_probas(train_set)
            self.compute_transition_probas(train_set)
            
        def forward(self, obs):
            alpha = np.zeros((len(obs), len(self.Y_index)))
            alpha[0] = self.initial_state_proba\
                        * self.observation_proba[self.X_index[self.get_X_index(obs[0][0])]]
            for i in range(1, len(alpha)):
                alpha[i] = self.observation_proba[self.X_index[self.get_X_index(obs[i][0])]] *\
                np.sum(self.transition_proba.T * alpha[i - 1], axis=1)
            return alpha
        
        def backward(self, obs):
            beta = np.zeros((len(obs), len(self.Y_index)))
            beta[-1] = ones(len(self.Y_index))
            for i in range(len(obs) - 2, -1, -1):
                beta[i] = np.sum(beta[i + 1]\
                                 * self.observation_proba[self.X_index[self.get_X_index(obs[i + 1][0])]]\
                                 * self.transition_proba, axis=1)
            return beta
        
        def decode(self, alpha, beta):
            prob = alpha * beta
            preds = prob.argmax(axis=1)
            keys = [key for key in self.omega_Y.keys()]
            return [keys[pred_ind] for pred_ind in preds]
        
        def viterbi(self, obs):
            mu_max = np.zeros(len(obs))
            tmp = self.initial_state_proba * self.observation_proba[self.X_index[self.get_X_index(obs[0][0])]]
            index = [np.argmax(tmp)]
            mu_max[0] = max(tmp)
            for i in range(1, len(obs)):
                tmp = self.observation_proba[self.X_index[self.get_X_index(obs[i][0])]]\
                        * self.transition_proba[self.Y_index[obs[i - 1][1]]]\
                        * mu_max[i - 1]
                index.append(np.argmax(tmp))
                mu_max[i] = max(tmp)
            keys = [key for key in self.omega_Y.keys()]
            return [keys[ind] for ind in index]
            
        def score_eval(self, test):
            error = 0
            elements = 0
            errors_corrected = 0
            errors_added = 0
            for word in test:
                base = [letter for (letter, _) in word]
                truth = [tag for (_, tag) in word]
                alpha = self.forward(word)
                beta = self.backward(word)
                preds = self.decode(alpha, beta)
                elements += len(preds)
                for x, y, pred in zip(base, truth, preds):
                    if pred != x and pred == y:
                        errors_corrected += 1
                    if pred != y:
                        error += 1
                        if x == y:
                            errors_added += 1
            return error / elements, errors_corrected, errors_added
        
        def score_viterbi(self, test):
            error = 0
            elements = 0
            errors_corrected = 0
            errors_added = 0
            for word in test:
                base = [letter for (letter, _) in word]
                truth = [tag for (_, tag) in word]
                preds = self.viterbi(word)
                elements += len(preds)
                for x, y, pred in zip(base, truth, preds):
                    if pred != x and pred == y:
                        errors_corrected += 1
                    if pred != y:
                        error += 1
                        if x == y:
                            errors_added += 1
            return error / elements, errors_corrected, errors_added

        def score_dummy(self, test):
            error = 0
            elements = 0
            for word in test:
                base = [letter for (letter, _) in word]
                truth = [tag for (_, tag) in word]
                elements += len(truth)
                for x, y in zip(base, truth):
                    if x != y:
                        error += 1
            return error / elements
        
        def results_hmm(self, test):
            error_test, fb_corrected, fb_added = self.score_eval(test)
            viterbi_error_test, vit_corrected, vit_added = self.score_viterbi(test)
            error_dummy = self.score_dummy(test)
            print("Error forward-backward = {:.2%}, {} errors corrected, {} errors added"
                  .format(error_test, fb_corrected, fb_added))
            print("Error viterbi = {:.2%}, {} errors corrected, {} errors added"
                  .format(viterbi_error_test, vit_corrected, vit_added))
            print("Error with nothing changed = {:.2%}".format(error_dummy))

## Lecture & séparation des données

In [10]:
import pickle
import numpy as np

In [11]:
train10 = pickle.load(open("./typos-data/train10.pkl", "rb"))
train20 = pickle.load(open("./typos-data/train20.pkl", "rb"))
test10 = pickle.load(open("./typos-data/test10.pkl", "rb"))
test20 = pickle.load(open("./typos-data/test20.pkl", "rb"))

## Création du vocabulaire & du HMM

In [12]:
def distrib_x_y_data(data):
    set_tag = []
    set_mot = []
    dict_tag = dict()
    dict_mot = dict()
    for phrase in data:
        for mot in phrase:
            if not(mot[1] in set_tag):
                set_tag.append(mot[1])
                dict_tag[mot[1]]=0
            dict_tag[mot[1]]+=1
            if not(mot[0] in set_mot):
                set_mot.append(mot[0])
                dict_mot[mot[0]]=0
            dict_mot[mot[0]]+=1
    return dict_mot, dict_tag

### Corpus with 10% errors

In [21]:
obs_list, state_list = distrib_x_y_data(train10)

hmm = HMM(state_list, obs_list)

HMM created with: 
26 states
26 observations


In [17]:
hmm.init_parameters(train10)

hmm.results_hmm(test10)

Error forward-backward = 8.29%, 332 errors corrected, 194 errors added
Error viterbi = 8.92%, 296 errors corrected, 204 errors added
Error baseline = 10.18%


### Corpus with 20% errors

In [22]:
obs_list, state_list = distrib_x_y_data(train20)

hmm_20 = HMM(state_list, obs_list)

HMM created with: 
26 states
26 observations


In [23]:
hmm_20.init_parameters(train20)

hmm_20.results_hmm(test20)

Error forward-backward = 15.04%, 1453 errors corrected, 724 errors added
Error viterbi = 15.85%, 1401 errors corrected, 808 errors added
Error baseline = 19.41%


## Part 2 : Second order HMM

In [315]:
import nltk
from numpy import array, ones, zeros
import sys

class HMM_2:
        def __init__(self, state_list, observation_list,
                 transition_proba = None,
                 observation_proba = None,
                 initial_state_proba = None):
            """Builds a new Hidden Markov Model
            state_list is the list of state symbols [q_0...q_(N-1)]
            observation_list is the list of observation symbols [v_0...v_(M-1)]
            transition_proba is the transition probability matrix
                [a_ijk] a_ijk = Pr(Y_(t+1)=q_i|Y_t=q_j, Y_(t-1)=q_k)
            observation_proba is the observation probablility matrix
                [b_kij] b_ki = Pr(X_t=v_k|Y_t=q_i, Y_(t-1)=q_j)
            initial_state_proba is the initial state distribution
                [pi_i] pi_i = Pr(Y_0=q_i)"""
            print("HMM created with: ")
            self.N = len(state_list) # The number of states
            self.M = len(observation_list) # The number of words in the vocabulary
            print(str(self.N)+" states")
            print(str(self.M)+" observations")
            self.omega_Y = state_list # Keep the vocabulary of tags
            self.omega_X = observation_list # Keep the vocabulary of tags
            # Init. of the 3 distributions : observation, transition and initial states
            if transition_proba is None:
                self.transition_proba = zeros((self.N, self.N, self.N), float) 
            else:
                self.transition_proba=transition_proba
            if observation_proba is None:
                self.observation_proba = zeros((self.M, self.N, self.N), float) 
            else:
                self.observation_proba=observation_proba
            if initial_state_proba is None:
                self.initial_state_proba = zeros((self.N,), float) 
            else:
                self.initial_state_proba=initial_state_proba
            # Since everything will be stored in numpy arrays, it is more convenient and compact to 
            # handle words and tags as indices (integer) for a direct access. However, we also need 
            # to keep the mapping between strings (word or tag) and indices. 
            self.transition_proba_order1 = zeros( (self.N, self.N), float) 
            self.observation_proba_order1 = zeros( (self.M, self.N), float) 
            self.make_indexes()

        def make_indexes(self):
            """Creates the reverse table that maps states/observations names
            to their index in the probabilities arrays"""
            self.Y_index = {}
            omega_Y_keys = [key for key in self.omega_Y.keys()]
            omega_X_keys = [key for key in self.omega_X.keys()]
            for i in range(self.N):
                self.Y_index[omega_Y_keys[i]] = i
            self.X_index = {}
            for i in range(self.M):
                self.X_index[omega_X_keys[i]] = i
                
        def get_X_index(self, word):
            if word in self.X_index:
                return word
            else:
                return UNK
        
        def compute_init_state_proba(self, data):
            for sent in data:
                self.initial_state_proba[self.Y_index[sent[0][1]]]+=1
            self.initial_state_proba/=len(data)
        
        def compute_observation_probas_order1(self, data):            
            for phr in data:
                for word in phr:
                    x = self.X_index[self.get_X_index(word[0])]
                    y = self.Y_index[word[1]]
                    self.observation_proba_order1[x][y] += 1
            self.observation_proba_order1 /= np.sum(self.observation_proba_order1, axis=1)[:, np.newaxis]
             
        def compute_transition_probas_order1(self, data):            
            for phr in data:
                for i in range(len(phr) - 1):
                    yplus1 = self.Y_index[phr[i + 1][1]]
                    y = self.Y_index[phr[i][1]]
                    self.transition_proba_order1[y][yplus1] += 1
            self.transition_proba_order1 /= np.sum(self.transition_proba_order1, axis=1)[:, np.newaxis]
            
        def compute_observation_probas(self, data):            
            for phr in data:
                for i in range(1, len(phr)):
                    x = self.X_index[self.get_X_index(phr[i][0])]
                    y = self.Y_index[phr[i][1]]
                    yminus1 = self.Y_index[phr[i - 1][1]]
                    self.observation_proba[x][yminus1][y] += 1
            sumPlus1 = np.sum(self.observation_proba, axis=2)[:, :, np.newaxis]
            self.observation_proba /= np.where(sumPlus1 == 0, 1, sumPlus1)
             
        def compute_transition_probas(self, data):  
            nb_trigrams = 0.0
            nb_bigrams = 0.0
            unigram = zeros(self.N)
            bigrams = zeros((self.N, self.N))
            trigrams = zeros((self.N, self.N, self.N))
            for phr in data:
                for i in range(2, len(phr)):
                    y = self.Y_index[phr[i][1]]
                    unigram[y] += 1
                    yminus1 = self.Y_index[phr[i - 1][1]]
                    bigrams[yminus1][y] += 1
                    nb_bigrams += 1.0
                    yminus2 = self.Y_index[phr[i - 2][1]]
                    trigrams[yminus2][yminus1][y] += 1
                    nb_trigrams += 1.0
#             unigram /= np.sum(unigram)
#             bigrams /= np.sum(bigrams, axis=1)
#             trigrams /= np.sum(trigrams, axis=2)
#             print("Unigrams\n", unigram)
#             print("Bigrams\n", bigrams)
#             print("Trigrams\n", trigrams)
            freq_bigrams = bigrams / nb_bigrams
#             print("Freq bigrams\n", freq_bigrams)
            freq_trigrams = trigrams / nb_trigrams
#             print("Freq trigrams\n", freq_trigrams)
            k3 = (np.log(freq_trigrams + 1) + 1) / (np.log(freq_trigrams + 1) + 2)
            k2 = (np.log(freq_bigrams + 1) + 1) / (np.log(freq_bigrams + 1) + 2)
#             print("K3\n", k3)
#             print("K2\n", k2)
            lambda1 = k3
            lambda2 = (1 - k3) * k2
            lambda3 = (1 - k3) * (1 - k2)
            for phr in data:
                for i in range(2, len(phr)):
                    y = self.Y_index[phr[i][1]]
                    yminus1 = self.Y_index[phr[i - 1][1]]
                    yminus2 = self.Y_index[phr[i - 2][1]]
                    self.transition_proba[yminus2][yminus1][y] = lambda1[yminus2][yminus1][y] \
                                                * trigrams[yminus2][yminus1][y]\
                                                + lambda2[yminus2][yminus1][y] * bigrams[yminus1][y]\
                                                + lambda3[yminus2][yminus1][y] * unigram[y]
                    
            sumPlus1 = np.sum(self.transition_proba, axis=2)[:, :, np.newaxis]
            self.transition_proba /= np.where(sumPlus1 == 0, 1, sumPlus1)
#             print((self.transition_proba > 0).sum())
            
        def init_parameters(self, train_set):
            self.compute_init_state_proba(train_set)
            self.compute_observation_probas(train_set)
            self.compute_transition_probas(train_set)
            self.compute_observation_probas_order1(train_set)
            self.compute_transition_probas_order1(train_set)
        
        def viterbi(self, obs):
            keys = [key for key in self.omega_Y.keys()]
            
            delta = np.zeros(len(obs))
            tmp = self.initial_state_proba * self.observation_proba_order1\
                                                    [self.X_index[self.get_X_index(obs[0][0])]]
            phi = [np.argmax(tmp)]
            delta[0] = max(tmp)
            
            if len(obs) < 2:
                return [keys[ind] for ind in phi]
            else:

                tmp2 = self.observation_proba[self.X_index[self.get_X_index(obs[1][0])]]\
                                                    [self.Y_index[obs[0][1]]]\
                            * self.transition_proba_order1[self.Y_index[obs[0][1]]]\
                            * delta[0]
                phi.append(np.argmax(tmp2))
                print("tag 0", obs[0][1])
                print("Letter 1", obs[1][0])
                print("Obs : curr_obs", obs[1][0],
                      "last tag", obs[0][1], 
                      ":", dict(zip(keys, self.observation_proba[self.X_index[self.get_X_index(obs[1][0])]]\
                                                    [self.Y_index[obs[0][1]]])))
                print("Transition :", "y - 1 :", obs[0][1], ":", 
                      dict(zip(keys, self.transition_proba_order1[self.Y_index[obs[0][1]]])))
                print("Tmp2", dict(zip(keys, tmp2)))
                print("Chosen letter :", keys[phi[-1]])
                delta[1] = max(tmp2)

                for i in range(2, len(obs)):
                    tmp = self.observation_proba[self.X_index[self.get_X_index(obs[i][0])]]\
                                                        [self.Y_index[obs[i - 1][1]]]\
                            * self.transition_proba[self.Y_index[obs[i - 2][1]]][self.Y_index[obs[i - 1][1]]]\
                            * delta[i - 1]
                    phi.append(np.argmax(tmp))
                    print("tag", (i - 1), obs[i - 1][1])
                    print("Letter", i, obs[i][0])
                    print("Obs : curr_obs", obs[i][0],
                      "last tag", obs[i - 1][1], 
                      ":", dict(zip(keys, self.observation_proba[self.X_index[self.get_X_index(obs[i][0])]]\
                                                    [self.Y_index[obs[i - 1][1]]])))
                    print("Transition :", "y - 2 :", obs[i - 2][1], ":",
                          "y - 1 :", obs[i - 1][1], ":",
                      dict(zip(keys, self.transition_proba[self.Y_index[obs[i - 2][1]]][self.Y_index[obs[i - 1][1]]])))
                    print("Tmp", tmp)
                    print("Chosen letter :", keys[phi[-1]])
                    delta[i] = max(tmp)
                return [keys[ind] for ind in phi]
        
        def score_viterbi(self, test):
            error = 0
            elements = 0
            errors_corrected = 0
            errors_added = 0
            for word in test:
                base = [letter for (letter, _) in word]
                truth = [tag for (_, tag) in word]
                preds = self.viterbi(word)
                elements += len(preds)
                for x, y, pred in zip(base, truth, preds):
                    if pred != x and pred == y:
                        errors_corrected += 1
                    if pred != y:
                        error += 1
                        if x == y:
                            errors_added += 1
            return error / elements, errors_corrected, errors_added

        def score_dummy(self, test):
            error = 0
            elements = 0
            for word in test:
                base = [letter for (letter, _) in word]
                truth = [tag for (_, tag) in word]
                elements += len(truth)
                for x, y in zip(base, truth):
                    if x != y:
                        error += 1
            return error / elements
        
        def results_hmm(self, test):
            viterbi_error_test, vit_corrected, vit_added = self.score_viterbi(test)
            error_dummy = self.score_dummy(test)
            print("Error viterbi = {:.2%}, {} errors corrected, {} errors added"
                  .format(viterbi_error_test, vit_corrected, vit_added))
            print("Error with nothing changed = {:.2%}".format(error_dummy))

### Corpus with 10% errors

In [316]:
obs_list, state_list = distrib_x_y_data(train10)

hmm2 = HMM_2(state_list, obs_list)

HMM created with: 
26 states
26 observations


In [317]:
hmm2.init_parameters(train10)

In [318]:
word = train10[11]
print("Base :", word)
print("Predicted", hmm2.viterbi(word))

Base : [('p', 'l'), ('j', 'i'), ('b', 'b'), ('e', 'e'), ('r', 'r'), ('a', 'a'), ('t', 't'), ('i', 'i'), ('o', 'o'), ('n', 'n')]
tag 0 l
Letter 1 j
Obs : curr_obs j last tag l : {'b': 0.0, 'y': 0.0, 't': 0.0, 'h': 0.0, 'e': 0.0, 'i': 0.8095238095238095, 'r': 0.0, 'o': 0.0, 'w': 0.0, 'n': 0.0, 'a': 0.0, 'c': 0.0, 'u': 0.14285714285714285, 'v': 0.0, 'l': 0.0, 's': 0.0, 'f': 0.0, 'm': 0.0, 'd': 0.0, 'g': 0.0, 'k': 0.047619047619047616, 'p': 0.0, 'z': 0.0, 'j': 0.0, 'x': 0.0, 'q': 0.0}
Transition : y - 1 : l : {'b': 0.0, 'y': 0.12646416517768513, 't': 0.022632519356759976, 'h': 0.0, 'e': 0.22394282310899344, 'i': 0.11911852293031566, 'r': 0.003772086559459996, 'o': 0.11117728806829462, 'w': 0.002183839587055787, 'n': 0.0003970617431010522, 'a': 0.06829461981338097, 'c': 0.0011911852293031567, 'u': 0.03295612467738734, 'v': 0.01568393885249156, 'l': 0.1415525114155251, 's': 0.03216200119118523, 'f': 0.017470716696446297, 'm': 0.006750049632717888, 'd': 0.06948580504268413, 'g': 0.0, 'k': 0.0

In [300]:
hmm2.results_hmm(test10)

Error viterbi = 8.70%, 371 errors corrected, 263 errors added
Error with nothing changed = 10.18%


### Corpus with 20% errors

In [295]:
obs_list, state_list = distrib_x_y_data(train20)

hmm2_20 = HMM_2(state_list, obs_list)

HMM created with: 
26 states
26 observations


In [296]:
hmm2_20.init_parameters(train20)

hmm2_20.results_hmm(test20)

Error viterbi = 15.42%, 1636 errors corrected, 970 errors added
Error with nothing changed = 19.41%
