# Assignment 2  
Implement Hidden Markov Model with supervised training and Viterbi algorithm for finding the most probable sequence of hidden states.  
Use [Laplace smoothing](https://en.wikipedia.org/wiki/Additive_smoothing) for estimation of probabilities.  
Apply the developed model to the problems:
* part of speech tagging
* spelling correction

In [187]:
import numpy as np
from scipy import sparse
from tqdm.notebook import tqdm
from IPython.display import Image
from collections import defaultdict

HMM model for 3-grams:

$P(x_1, .., x_T, y_1, .., y_T, y_{T+1}) = \prod_{t=1}^{T+1} q(y_t | y_{t-2}, y_{t-1}) \prod_{t=1}^T e(x_t | y_t)$

$x_1, .., x_T$ - sequence of observed states of length T  
$y_1, .., y_T$ - sequence of hidden states of length T  
$q(i | u, v) = \frac {count(u, v, i)} {count(u, v)} $ - transition probability   
$e(x_k | i) = \frac {count(i, x_k)} {count(i)}$ - emission probability  
$A_{i,j} = A_{(u,v), j} = q(i | u, v)$ - transition matrix  
$B_{j,k} = e(x_k | j)$ - emission matrix  

We assume, that $y_{T+1} = TERM$, and $y_0 = y_{-1} = START$

## Task1: Implement HMM (20%)

In [165]:
def flatten(list_of_lists):
    # converts 2d list to 1d list
    return [item for sublist in list_of_lists for item in sublist]

class HMM:
    START = '*'
    TERM = '$$'     # CHANGED IT, because '$' emerges in original tags
    REST = '$REST$' # to deal with observed states who have never appeared in train dataset.
    alpha = 0.01     # Laplace smoothing parameter for probabilities estimate
        
    def cond_idx(self, u, v):
        '''
        converts two indices of hidden state into 1. First index for 'A' matrix 
        '''
        return np.ravel_multi_index((u, v), (self.h_dim, self.h_dim))
        
    def fit(self, X, y):
        """
        X - list of lists, observed states
        y - list of lists, hidden states
        estimate elements of A, B matrices from train sequence. 
        """
        
        #######################        
        unique_hidden_states = set(flatten(y))
        unique_hidden_states.update([self.START, self.TERM])
        self.hidden_idx2state = list(unique_hidden_states) # list of unique hidden states in train sequence + [START, TERM]
        
        self.hidden_states = {state: idx for idx, state in enumerate(self.hidden_idx2state)} # from state name to state index in hidden_idx2state
        self.h_dim = len(self.hidden_idx2state) # number of hidden states

        unique_observed_states = set(flatten(X))
        self.observed_idx2state = list(unique_observed_states) # list of unique observed states in train sequence + [REST]
        self.observed_idx2state.append(self.REST)
        
        self.observed_states = {state: idx for idx, state in enumerate(self.observed_idx2state)}# from state name to state index in hidden_idx2state
        self.o_dim = len(self.observed_idx2state) # number of observed states
        
        #######################       
        
        
        #######################
        # estimate emission matrix
        count_emission_matrix = np.zeros((self.h_dim, self.o_dim))
        for o_st, h_st in zip(X, y):
            o_idx = [self.observed_states[el] for el in o_st]
            h_idx = [self.hidden_states[el] for el in h_st]
            
            for o, h in zip(o_idx, h_idx):
                count_emission_matrix[h, o] += 1
        
        # self.B = sparse.csr_matrix(count_emission_matrix) # sparse csr matrix of shape (h_dim, o_dim)
        self.B = count_emission_matrix # sparse csr matrix of shape (h_dim, o_dim)
        assert self.B.shape == (self.h_dim, self.o_dim)
        
        #######################
        
        self.B_rowsum = np.ravel(self.B.sum(axis=1)) 
        
        
        ########################
        # transition matrix
        count_trans_matrix = np.zeros((self.h_dim**2, self.h_dim))
        
        for h_st in y:
            h_idx = [self.hidden_states[el] for el in 
                     [self.START]*2 + h_st + [self.TERM]
                     ] 
            for i in range(len(h_idx)-2):
                count_trans_matrix[self.cond_idx(h_idx[i], h_idx[i+1]), h_idx[i+2]] += 1
        
        # self.A = sparse.csr_matrix(count_trans_matrix) # dense matrix of shape (h_dim **2, h_dim)
        self.A = count_trans_matrix # dense matrix of shape (h_dim **2, h_dim)
        # remember about padding for sequence of hidden states, eg {a, b} -> {START, START, a, b, TERM}
        ########################
        
        self.A_rowsum = np.ravel(self.A.sum(axis=1))
        
        return self
    
    def tr_prob(self, j, u, v):
        """
        A_ij = q(j | i) = q(j| u, v) with Laplace smoothing
        """
        return self._tr_prob(self.cond_idx(u, v), j)
    
    def _tr_prob(self, i, j):
        """
        A_ij = q(j | i) = q(j| u, v) with Laplace smoothing
        """
        ########################
        result = (self.A[i, j]+self.alpha)/(self.A_rowsum[i] + self.h_dim*self.alpha) # smoothed probability
        ########################
        return result
    
    def em_prob(self, k, j):
        """
        j - hidden index
        k - observable index
        B_jk = e(x_k| j) with Laplace smoothing
        """
        ########################
        result = (self.B[j, k]+self.alpha)/(self.B_rowsum[j] + self.o_dim*self.alpha) # smoothed probability
        ########################
        return result
        
    def o_state(self, x):
        """
        return index of observed state
        """
        return self.observed_states.get(x, self.observed_states[self.REST])
    
    
    def predict(self, X):
        """
        Predict the most probable sequence of hidden states for every sequence of observed states
        X - list of lists
        """
        y_pred = [self._viterbi(seq) for seq in tqdm(X)]
        return y_pred 
            
    def _viterbi(self, X):
        """
        X - list of observables
        product of probabilities usually is not numerically stable
        remember, that log(ab) = log(a) + log(b) and argmax[log(f(x))] = argmax[f(x)]
        """   
        T = len(X)
        
        # pi[t, u, v] - max probability for any state sequence ending with y_t = v and y_{t-1} = u.
        pi = np.zeros((T, self.h_dim, self.h_dim))
        # backpointers, bp[t, u, v] = argmax probability for any state sequence ending with y_t = v and y_{t-1} = u.
        bp = np.zeros((T, self.h_dim, self.h_dim), dtype=int)
        
        ###################
        # fill tables pi and bp
        # pi[t, u, v] = max_{w} [ pi[t-1, w, u] * q(v| w, u) * e(x_k| v) ]
        # bp[t, u, v] = argmax_{w} [ pi[t-1, w, u] * q(v| w, u) * e(x_k| v) ]
        
        for t in range(1):  # loop to keep variables 
            x_t = self.o_state(X[t])
            # start_idx = self.hidden_states[self.START]
            for u in range(self.h_dim):
                for v in range(self.h_dim):
                    # log_prob[w] - log of probabilities to have `w` as hidden value at t = 0. 
                    log_prob = [np.log(self.tr_prob(v, w, u)) + np.log(self.em_prob(x_t, v))
                                for w in range(self.h_dim)]
            
                    bp[t, u, v] = np.argmax(log_prob)
                    pi[t, u, v] = np.max(log_prob)
        for t in tqdm(range(1, T)):
            x_t = self.o_state(X[t])
            for u in range(self.h_dim):
                for v in range(self.h_dim):
                    # log_prob(w) = log( pi[t-1, w, u] * q(v| w, u) * e(x_k| v))
                    log_prob = [pi[t-1, w, u] + np.log(self.tr_prob(v, w, u)) + np.log(self.em_prob(x_t, v))
                                for w in range(self.h_dim)]
                    bp[t, u, v] = np.argmax(log_prob)
                    pi[t, u, v] = np.max(log_prob)  
                        
        ###################
                    
        term_idx = self.hidden_states[self.TERM]
        
        ###################
        # r(u,v) = pi[T, u, v] * q(TERM | u, v)
        # find argmax_{u, v} r(u, v)
        r = [[ 
              pi[T-1, u, v] 
              + np.log(self.tr_prob(term_idx, u, v))
            for v in range(self.h_dim)] for u in range(self.h_dim)]
        u, v = np.unravel_index(np.argmax(r), (self.h_dim, self.h_dim))
        ###################

        h_states = [v, u]
        ###################
        # rollback backpointers
        # y_{t-2} = bp[t, y_{t-1}, y_t]
        # h_states is a reversed sequence of hidden states
        for t in reversed(range(T)):
            h_states.append(bp[t, h_states[-1], h_states[-2]])
        ###################        
            
        return [self.hidden_idx2state[i] for i in reversed(h_states[:T])]

## Task 2. Part of speech tagging (20%)

Build Part-of-Speech tagging model using HMM. Estimate accuracy on test dataset.

In [84]:
import nltk
nltk.download('treebank')
from nltk.corpus import treebank
from sklearn import metrics

[nltk_data] Downloading package treebank to /home/amir/nltk_data...
[nltk_data]   Package treebank is already up-to-date!


In [154]:
data = treebank.tagged_sents()[:3000]
test_data = treebank.tagged_sents()[3000:3010]

X_train = [[x[0] for x in y] for y in data]
y_train = [[x[1] for x in y] for y in data]

X_test = [[x[0] for x in y] for y in test_data]
y_test = [[x[1] for x in y] for y in test_data]

print('sentence: ', " ".join(X_train[0]))
print('tags: ', " ".join(y_train[0]))

sentence:  Pierre Vinken , 61 years old , will join the board as a nonexecutive director Nov. 29 .
tags:  NNP NNP , CD NNS JJ , MD VB DT NN IN DT JJ NN NNP CD .


In [86]:
for y in y_train:
    if "$" in y:
        print(y)
## there are many "$" tags used in the dataset itself

['NNS', 'IN', 'DT', 'CD', 'JJ', 'NNS', 'VBD', 'IN', '$', 'CD', 'CD', '-NONE-', 'IN', 'DT', 'JJS', 'NN', ',', 'TO', '$', 'CD', 'CD', '-NONE-', '.']
['NNP', 'NNP', 'NNP', 'NNP', 'VBD', '-NONE-', 'NNS', 'VBD', 'PRP$', 'NN', 'IN', 'NNP', 'NNP', 'NNP', 'IN', 'NNP', 'IN', '$', 'CD', '-NONE-', 'DT', 'NN', ',', 'CC', '$', 'CD', 'CD', '-NONE-', '.']
['NNP', 'NNP', 'NNP', 'VBD', '-NONE-', 'PRP$', 'NNP', 'CC', 'NNP', 'NN', 'VBD', 'DT', 'NN', 'IN', 'PRP$', 'NNP', 'NNP', 'NNP', 'TO', 'NNP', 'NNP', '.', 'NNP', 'IN', '$', 'CD', 'CD', '-NONE-', '.']
['PRP', 'VBZ', 'CD', 'NNS', 'CC', 'VBZ', 'JJ', 'NN', 'IN', 'IN', '$', 'CD', 'CD', '-NONE-', '.']
['DT', 'NN', 'POS', 'NN', 'NN', 'VBD', 'IN', 'NN', 'NNP', 'TO', '$', 'CD', 'CD', '-NONE-', 'IN', '$', 'CD', 'CD', '-NONE-', '.']
['DT', 'NNP', 'VBZ', 'VBN', '-NONE-', 'TO', 'VB', 'DT', 'NN', 'TO', '$', 'CD', 'CD', '-NONE-', ',', 'CC', 'DT', 'NNP', 'VBZ', 'RB', 'VBN', '-NONE-', 'TO', 'VB', 'IN', 'JJ', 'NN', 'IN', 'DT', 'JJS', '.']
['NNP', 'NNP', 'VBD', 'DT', 'NN

In [155]:
def accuracy(y_true, y_pred):       
    y_true = np.concatenate(y_true)
    y_pred = np.concatenate(y_pred)
    
    return np.mean(y_true == y_pred)

In [156]:
%%time

hh = HMM().fit(X_train, y_train)
y_pred = hh.predict(X_test)
print(accuracy(y_test, y_pred))

  0%|          | 0/1 [00:00<?, ?it/s]

  0%|          | 0/23 [00:00<?, ?it/s]

0.0
CPU times: user 1min 36s, sys: 369 ms, total: 1min 37s
Wall time: 1min 47s


  return np.mean(y_true == y_pred)


Your accuracy must be > 0.73

In [157]:
for i in range(len(y_pred[:3])):
    print("pred:", *y_pred[i], sep='\t')
    print("true:", *y_test[i], sep='\t')
    print()

pred:	IN	NNP	,	DT	JJ	NN	IN	CD	JJ	NNS	,	WDT	-NONE-	VBD	-NONE-	NNP	NNP	,	VBD	VBG	NNS	TO	NNS	.
true:	IN	NNP	,	DT	NNP	NN	IN	CD	VBN	NNS	,	WDT	-NONE-	VBD	CD	NNS	NNP	,	VBD	CD	NNS	TO	CD	.



## Task 3. Vectorized viterbi (20%)
Our currrent implementation of Viterbi is too slow. Let's make it vectorized. 

In [166]:
class HmmVectorized(HMM):
    
    def _viterbi(self, X):
        """
        Vectorized version of Viterbi. Let's speed up!
        X - list of observables
        """   
        T = len(X)
        
        # One may notice, at every step t we only need pi[t-1, u, v] = pi_prev[u,v] to compute pi[t, u, v] = pi_curr[u,v]
        pi_prev = np.zeros((self.h_dim, self.h_dim))
        
        # backpointers
        bp = np.zeros((T + 1, self.h_dim, self.h_dim), dtype=int)
        
        a_rowsum = self.A_rowsum.reshape(self.h_dim, self.h_dim, 1)
        
        ###################
        # fill pi and bp
        # pi_curr[u, v] = max_{w} [ pi_prev[w, u] * q(v| w, u) * e(x_k| v) ]
        # bp[t, u, v] = argmax_{w} [ pi_prev[w, u] * q(v| w, u) * e(x_k| v) ]
        
        # log_prob[w, u, v] (shape=(h_dim, h_dim, h_dim)) -- log of probabilities of (w, u, v) sequence of hidden states v time is t
        # pi_curr[:, :] =  max_{w} lop_prob[w, :, :]
        # bp[t, :, :] =  argmax_{w} lop_prob[w, :, :]
        # log_prob[w, u, v] = pi_prev[w, u] * q(v| w, u) * e(x_k| v)
        # log_prob[:(w), :(u), :(v)] = log( pi_prev[:(w), :(u), np.newaxis] * A_sm[:(w), :(u), :(v)] * e[np.newaxis, np.newaxis, :(v)] )
        
        
        # don't use tr_prob() and em_prob(), apply laplace smoothing directly here
        # laplace smoothing
        B_sm_log = np.log( (self.B+self.alpha)/(self.B_rowsum.reshape(-1, 1) + self.o_dim*self.alpha) )
        
        A = self.A.reshape((self.h_dim, self.h_dim, self.h_dim))
        A_sm_log = np.log( (A+self.alpha) / (a_rowsum + self.h_dim*self.alpha) )
        
        for t in range(1):
            # first element without pi_prev
            x_t = self.o_state(X[t])
            e_log = B_sm_log[:, x_t]
            log_prob = A_sm_log + e_log[np.newaxis, np.newaxis, :]
            pi_prev = np.max(log_prob, axis=0)
            bp[t] = np.argmax(log_prob, axis=0)
        
        for t in range(1, T):
            x_t = self.o_state(X[t])
            e_log = B_sm_log[:, x_t]
            log_prob = pi_prev[:, :, np.newaxis] + A_sm_log + e_log[np.newaxis, np.newaxis, :]
            pi_prev = np.max(log_prob, axis=0)
            bp[t] = np.argmax(log_prob, axis=0)
        
        ###################
        
        term_idx = self.hidden_states[self.TERM]
        
        ###################
        # r(u,v) = pi[T, u, v] * q(TERM | u, v)
        # find argmax_{u, v} r(u, v)
        # express r(u,v) as matrix additions
        r = sum([
            pi_prev, 
            A_sm_log[:, :, term_idx],
        ])
        u, v = np.unravel_index(np.argmax(r), (self.h_dim, self.h_dim))
        h_states = [v, u]
        ###################
        
        ###################
        # rollback backpointers
        # y_{t-2} = bp[t, y_{t-1}, y_t]
        # h_states is a reversed sequence of hidden states
        for t in reversed(range(T)):
            h_states.append(bp[t, h_states[-1], h_states[-2]])
        ###################
        
        return [self.hidden_idx2state[i] for i in reversed(h_states[:T])]

Let's take a larger test subset

In [169]:
len(treebank.tagged_sents())

3914

In [167]:
data = treebank.tagged_sents()[:3000]
test_data = treebank.tagged_sents()[3000:3300]

X_train = [[x[0] for x in y] for y in data]
y_train = [[x[1] for x in y] for y in data]

X_test = [[x[0] for x in y] for y in test_data]
y_test = [[x[1] for x in y] for y in test_data]

print('sentence: ', " ".join(X_train[0]))
print('tags: ', " ".join(y_train[0]))

sentence:  Pierre Vinken , 61 years old , will join the board as a nonexecutive director Nov. 29 .
tags:  NNP NNP , CD NNS JJ , MD VB DT NN IN DT JJ NN NNP CD .


In [168]:
%%time

hh = HmmVectorized().fit(X_train, y_train)
y_pred = hh.predict(X_test)
print(accuracy(y_test, y_pred))

  0%|          | 0/300 [00:00<?, ?it/s]

0.8993342210386152
CPU times: user 47.3 s, sys: 526 ms, total: 47.9 s
Wall time: 1min 39s


Your accuracy must be > 0.83 

In [170]:
for i in range(len(y_pred[:3])):
    print("pred:", *y_pred[i], sep='\t')
    print("true:", *y_test[i], sep='\t')
    print("text:", *X_test[i], sep='\t')
    print()

pred:	IN	NNP	,	DT	JJ	NN	IN	CD	JJR	NNS	,	WDT	-NONE-	VBD	-NONE-	NNP	NNP	,	VBD	VBG	NNS	TO	NNS	.
true:	IN	NNP	,	DT	NNP	NN	IN	CD	VBN	NNS	,	WDT	-NONE-	VBD	CD	NNS	NNP	,	VBD	CD	NNS	TO	CD	.
text:	At	Tokyo	,	the	Nikkei	index	of	225	selected	issues	,	which	*T*-1	gained	132	points	Tuesday	,	added	14.99	points	to	35564.43	.

pred:	IN	JJ	NN	IN	NNP	NNP	,	DT	JJS	NN	VBD	VBG	NNS	TO	NNS	.
true:	IN	RB	NN	IN	NNP	NNP	,	DT	NNP	NN	VBD	CD	NNS	TO	CD	.
text:	In	early	trading	in	Tokyo	Thursday	,	the	Nikkei	index	fell	63.79	points	to	35500.64	.

pred:	NNP	POS	NN	IN	DT	NNP	NN	VBD	VBN	-NONE-	IN	CD	CD	NNS	,	IN	NN	IN	NNP	POS	NNP	CD	.
true:	NNP	POS	NN	IN	DT	NNP	NNP	VBD	VBN	-NONE-	IN	CD	CD	NNS	,	IN	NN	IN	NNP	POS	CD	CD	.
text:	Wednesday	's	volume	on	the	First	Section	was	estimated	*-1	at	900	million	shares	,	in	line	with	Tuesday	's	909	million	.



Given data of true_char corrupted\_char build spelling correction model using HMM.    
There are 2 datatsets (spelling10.txt, spelling20.txt) with 10% and 20% corruption probability respectively.  
Each dataset contains 30556 words. Use first 28000 for training and the rest for testing. Estimate accuracy on the test subset.  

In [198]:
def load_dataset(file_path):
    '''
    returns X, y
    X - list of observable states (corrupted strings)
    y - list of hidden states (true strings)
    '''
    X = []
    y = []
    delimiter = '_'
    current_x = []
    current_y = []
    file = np.loadtxt(file_path, dtype=str)
    for line in file:
        l, r = line # l -- true, r - corrupted 
        if l == delimiter:
            X.append((current_x))
            y.append((current_y))
            current_x = []
            current_y = []
        else:
            current_x.append(r)
            current_y.append(l)
    X.append((current_x))
    y.append((current_y))
            
    return X, y
            

## Task4. Spelling correction (20%)
You should get > 92% accuracy on spelling10 dataset

In [199]:
X, y = load_dataset("data/spelling10.txt")
train_slice = slice(28_000)
test_slice = slice(28_000, None)
X_train = X[train_slice]
y_train = y[train_slice]
X_test = X[test_slice]
y_test = y[test_slice]

In [200]:
# YOUR CODE HERE
hh = HmmVectorized().fit(X_train, y_train)
y_pred = hh.predict(X_test)
print(accuracy(y_test, y_pred))

  0%|          | 0/2558 [00:00<?, ?it/s]

0.9349734989320465


In [203]:
accuracy(y_test, X_test)

0.900878095087414

In [211]:
def show_examples(y_true, y_predict, x, N=10):
    for data, label in zip([y_true, x, y_predict], 
                           ["true", "corrupted", "predict"]):
        print("{:<12}".format(label+":"), *[''.join(x) for x in data[:N]])

In [202]:
show_examples(y_test, y_pred, X_test)

true:        in the strictest sense but a whole spectrum of related
corrupted:   in the strictest ernse but a whole spectrum of re$$sted
corrected:   in the strictest wrnse but a whole spectrum of relsted


## Task5. Spelling correction (20%)
You should get > 88% accuracy on spelling20 dataset

In [204]:
X, y = load_dataset("data/spelling20.txt")
train_slice = slice(28_000)
test_slice = slice(28_000, None)
X_train = X[train_slice]
y_train = y[train_slice]
X_test = X[test_slice]
y_test = y[test_slice]

In [205]:
# YOUR CODE HERE
hh = HmmVectorized().fit(X_train, y_train)
y_pred = hh.predict(X_test)
print(accuracy(y_test, y_pred))

  0%|          | 0/2558 [00:00<?, ?it/s]

0.8888537299264299


In [206]:
accuracy(y_test, X_test)

0.803259235819951

In [212]:
show_examples(y_test, y_pred, X_test)

true:        in the strictest sense but a whole spectrum of related
corrupted:   in tge strictest xense nyt a whooe specfrum od relqtwc
predict:     in the strictest sense ny$$ s whole spectrum ld related
