# Assignment 11  
Implement Hidden Markov Model with supervised training and Viterbi algorithm for finding the most probable sequence of hidden states. Use [Laplace smoothing](https://en.wikipedia.org/wiki/Additive_smoothing) for estimation of probabilities. Apply the developed model to the problems:
* part of speech tagging
* spelling correction

In [134]:
import numpy as np
from scipy import sparse
from scipy.sparse import csr_matrix
from scipy.sparse import lil_matrix
from tqdm import tqdm
from IPython.display import Image
from collections import defaultdict

HMM model for 3-grams:

$P(x_1, .., x_T, y_1, .., y_T, y_{T+1}) = \prod_{t=1}^{T+1} q(y_t | y_{t-2}, y_{t-1}) \prod_{t=1}^T e(x_t | y_t)$

$x_1, .., x_T$ - sequence of observed states of length T  
$y_1, .., y_T$ - sequence of hidden states of length T  
$q(i | u, v) = \frac {count(u, v, i)} {count(u, v)} $ - transition probability   
$e(x_k | i) = \frac {count(i, x_k)} {count(i)}$ - emission probability  
$A_{i,j} = A_{(u,v), j} = q(i | u, v)$ - transition matrix  
$B_{j,k} = e(x_k | j)$ - emission matrix  

We assume, that $y_{T+1} = TERM$, and $y_0 = y_{-1} = START$

In [210]:
y = [[1,2,3], [4,5,6]]
a = list(set(word for sent in y for word in sent)) + ['start', 'term']
print(len(a))
b = {j:i for i,j in enumerate(a)}
print(len(b))

8
8


In [211]:
class HMM:
    START = '*'
    TERM = '$$'
    REST = '$REST$' # to deal with observed states who have never appeared in train dataset.
        
    def cond_idx(self, u, v):
        return u + v*self.h_dim
        
    def fit(self, X, y):
        """
        X - list of lists, observed states
        y - list of lists, hidden states
        estimate elements of A, B matrices from train sequence. 
        """
        
        self.hidden_idx2state = list(set(word for sent in y for word in sent)) + [self.START, self.TERM]
        print(self.hidden_idx2state)
        self.hidden_states = {j:i for i,j in enumerate(self.hidden_idx2state)}
        print(len(self.hidden_states))
        self.h_dim = len(self.hidden_idx2state)
        
        self.observed_idx2state = list(set(word for sent in X for word in sent)) + [self.REST]
        self.observed_states = {j:i for i,j in enumerate(self.observed_idx2state)}
        self.o_dim = len(self.observed_idx2state)

        # lil matrix используется, когда матрица заполняется поэлементно из изначально нулевой матрицы, 
        # таким образом изменяется разреженность

        """
        B -  emission matrix
        B_ij = (# of transitions from state j to observation i)
        """
        self.B = lil_matrix((self.h_dim, self.o_dim))
        for j, y_sent in enumerate(y):
            for i, y_pos in enumerate(y_sent):
                index_1 = self.hidden_states[y_pos]
                index_2 = self.observed_states[X[j][i]]
                self.B[index_1, index_2] += 1

        self.B_rowsum = np.ravel(self.B.sum(axis=1))
        
        """
        A - transmition matrix
        A_ij = (# transitions from state q to state s)
        """
        self.A = lil_matrix((self.h_dim ** 2, self.h_dim)).todense()
        y_padded = [[self.START, self.START]+sent+[self.TERM] for sent in y]
        for sent in y_padded:
            for i in range(2, len(sent)):
                index_1 = HMM.cond_idx(self, u=self.hidden_states[sent[i-1]], v=self.hidden_states[sent[i-2]])
                index_2 = self.hidden_states[sent[i]]
                self.A[index_1, index_2] += 1
        
        self.A_rowsum = np.ravel(self.A.sum(axis=1))
        
        return self
    
    # Сглаживание Лапласа нужно, чтобы исключить случаи, 
    # когда какое-то количество перехода равно нулю и дробь не определена или равна нулю
    
    def tr_prob(self, i, j):
        """
        A_ij = q(j | i) = q(j| u, v) with Laplace smoothing
        i - previous, j - next
        """
        result = (self.A[i, j] + 1) / (self.A_rowsum[i] + self.h_dim)

        return result
    
    def em_prob(self, i, j):
        """
        B_ij = e(x_j| i) with Laplace smoothing
        i - hidden, j - observed
        """
        result = (self.B[i, j] + 1) / (self.B_rowsum[i] + self.o_dim)
        
        return result
        
    def o_state(self, x):
        """
        return index of observed state
        """
        return self.observed_states.get(x, self.observed_states[self.REST])
    
    
    def predict(self, X):
        """
        Predict the most probable sequence of hidden states for every sequence of observed states
        X - list of lists
        """
        y_pred = [self._viterbi(seq) for seq in tqdm(X)]
        return y_pred 
            
    def _viterbi(self, X):
        """
        X - list of observables
        product of probabilities usually is not numerically stable
        remember, that log(ab) = log(a) + log(b) and argmax[log(f(x))] = argmax[f(x)]
        
        """
        X = X + [self.TERM]
        T = len(X)
        
        # pi[t, u, v] - max probability for any state sequence ending with x_t = v and x_{t-1} = u.
        pi = np.zeros((T + 1, self.h_dim, self.h_dim))
        # backpointers, bp[t, u, v] = argmax probability for any state sequence ending with x_t = v and x_{t-1} = u.
        bp = np.zeros((T + 1, self.h_dim, self.h_dim), dtype=np.int)
        
        ###################
        # fill tables pi and bp
        # pi[t, u, v] = max_{w} [ pi[t-1, w, u] * q(v| w, u) * e(x_k| v) ]
        # bp[t, u, v] = argmax_{w} [ pi[t-1, w, u] * q(v| w, u) * e(x_k| v) ]
        
        x0 = self.o_state(X[0])
        for v in range(self.h_dim):
            log_b_smoothed = self.em_prob(v, x0)
            log_a_smoothed = self.tr_prob(v, self.h_states[self.START], self.h_states[self.START])
            pi[0, 0, 0] = log_a_smoothed * log_b_smoothed
        
        for k in range(1, T + 1):
            xk = self.o_state(X[k-1])
            print('xk = ' + str(xk))
            for v in range(self.h_dim):
                print('v = ' + str(v))
                log_b_smoothed = self.em_prob(v, xk)
                print(log_b_smoothed)
                for u in range(self.h_dim): 
                    print('u = ' + str(u))
                    r = np.zeros(self.h_dim)
                    for w in range(self.h_dim):
                        print('w = ' + str(w))
                        print('index = ' + str(self.cond_idx(w, u)))
                        log_a_smoothed = self.tr_prob(self.cond_idx(w, u), v)
                        print(log_a_smoothed)
                        r[w] = pi[k-1, w, u] * log_a_smoothed * log_b_smoothed
                        print('r[w] = ' + str(r[w]))
                    bp[k, u, v] = np.argmax(r)
                    pi[k, u, v] = np.max(r)
        ###################
                    
        term_idx = self.hidden_states[self.TERM]
        
        ###################
        # r(u,v) = pi[T, u, v] * q(TERM | u, v)
        # find argmax_{u, v} r(u, v)
        r = np.zeros((self.h_dim, self.h_dim))
        for v in range(self.h_dim):
            for u in range(self.h_dim):
                r[u, v] = pi[T, u, v] * self.A[self.cond_idx(u, v), self.hidden_states[self.TERM]]
        print(r)
        u, v = np.argmax(r)
        ###################

        h_states = [v, u]
        ###################
        # rollback backpointers
        # y_{t-2} = bp[t, y_{t-1}, y_t]
        # h_states is a reversed sequence of hidden states
        h_states = h_states + [bp[t-2, h_states[t-1], h_states[t-2]] for t in range(2, len(X))]
            
        ###################        
            
        return [self.hidden_idx2state[i] for i in reversed(h_states[:T])]

In [212]:
hh = HMM()
hh.fit(X_train, y_train)

['VBP', '#', 'VBZ', 'RBS', ',', 'WDT', 'RB', '.', 'TO', 'NNS', 'NN', 'EX', 'IN', 'SYM', 'PDT', 'POS', 'DT', 'RBR', '``', 'WP', "''", 'VBG', '-LRB-', 'PRP$', 'WP$', 'VBN', 'UH', 'WRB', '-NONE-', 'NNPS', ':', 'NNP', '-RRB-', 'LS', 'FW', 'JJR', 'VB', 'MD', 'CC', 'PRP', 'RP', 'JJ', '$', 'VBD', 'CD', 'JJS', '*', '$']
47


<__main__.HMM at 0x10c15c4a8>

In [202]:
hh._viterbi(['61', 'years', 'old'])

xk = 8256
v = 0
8.386447500838645e-05
u = 0
w = 0
index = 0
8.386447500838645e-05
r[w] = 0.0
w = 1
index = 1
9.26526452330214e-05
r[w] = 0.0
w = 2
index = 2
7.970032677133976e-05
r[w] = 0.0
w = 3
index = 3
9.25154963456379e-05
r[w] = 0.0
w = 4
index = 4
6.868131868131868e-05
r[w] = 0.0
w = 5
index = 5
9.004952723998198e-05
r[w] = 0.0
w = 6
index = 6
7.65872711955273e-05
r[w] = 0.0
w = 7
index = 7
7.265857734505559e-05
r[w] = 0.0
w = 8
index = 8
8.038585209003216e-05
r[w] = 0.0
w = 9
index = 9
6.453694740238787e-05
r[w] = 0.0
w = 10
index = 10
0.0002908949869097256
r[w] = 0.0
w = 11
index = 11
9.210647508519849e-05
r[w] = 0.0
w = 12
index = 12
5.452860025083156e-05
r[w] = 0.0
w = 13
index = 13
9.275577404693442e-05
r[w] = 0.0
w = 14
index = 14
9.258401999814833e-05
r[w] = 0.0
w = 15
index = 15
8.78966335589347e-05
r[w] = 0.0
w = 16
index = 16
5.84316933504733e-05
r[w] = 0.0
w = 17
index = 17
9.175995595522114e-05
r[w] = 0.0
w = 18
index = 18
8.774238834781083e-05
r[w] = 0.0
w = 19
index

IndexError: row index (48) out of bounds

## Problem 1. Part of speech tagging

Build Part-of-Speech tagging model using HMM. Estimate accuracy on test dataset.

In [1]:
import nltk
nltk.download('treebank')
from nltk.corpus import treebank
from sklearn import metrics

[nltk_data] Error loading treebank: <urlopen error [SSL:
[nltk_data]     CERTIFICATE_VERIFY_FAILED] certificate verify failed
[nltk_data]     (_ssl.c:777)>


In [12]:
data = treebank.tagged_sents()[:3000]
test_data = treebank.tagged_sents()[3000:3010]

X_train = [[x[0] for x in y] for y in data]
y_train = [[x[1] for x in y] for y in data]

X_test = [[x[0] for x in y] for y in test_data]
y_test = [[x[1] for x in y] for y in test_data]

print('sentence: ', " ".join(X_train[0]))
print('tags: ', " ".join(y_train[0]))

sentence:  Pierre Vinken , 61 years old , will join the board as a nonexecutive director Nov. 29 .
tags:  NNP NNP , CD NNS JJ , MD VB DT NN IN DT JJ NN NNP CD .


In [5]:
def accuracy(y_true, y_pred):       
    y_true = np.concatenate(y_true)
    y_pred = np.concatenate(y_pred)
    
    return np.mean(y_true == y_pred)

In [6]:
%%time

hh = HMM().fit(X_train, y_train)
y_pred = hh.predict(X_test)
print(accuracy(y_test, y_pred))

100%|██████████| 10/10 [01:06<00:00,  6.70s/it]

0.7488789237668162
CPU times: user 1min 7s, sys: 128 ms, total: 1min 7s
Wall time: 1min 7s





Your accuracy must be > 0.74

## Problem 2. Spelling correction

Given data of true_char corrupted\_char build spelling correction model using HMM.    
There are 2 datatsets (spelling10.txt, spelling20.txt) with 10% and 20% corruption probability respectively.  
Each dataset contains 30556 words. Use first 28000 for training and the rest for testing. Estimate accuracy on the test subset.  


In [18]:
# YOUR CODE HERE

You should get > 93% accuracy (+3%) on spelling10 dataset

In [19]:
# YOUR CODE HERE

You should get > 89% accuracy (+9%) on spelling20 dataset