In [None]:
import numpy as np
import math
import time


class HMM:
    '''
    Implement an HMM
    
    Use notation and formulae from Jurafsky--Martin 
    (https://web.stanford.edu/~jurafsky/slp3/9.pdf)
    
    Assume the hidden states are from 1, 2, ..., to N.  State 0 and 
    State N+1 are reserved from the START and STOP state, which occur 
    at exactly time 0 and time T+1 of a T long observation sequence.
    
    Assume observation values are from 1, 2, ..., to some max values 
    M. Observation 0 and Observation M+1 are reserved and are seen
    at exactly time 0 and time T+1 of a T long observation sequence.
    '''
    
    def __init__(self, a, b):
        '''
        Initialize the HMM with given parameters
        
        Initialize any missing parameter with random probability
        values.
        
        "a", the transmission probabilities, is a (N+2) x (N+2) matrix 
        such that a(i, j) is the 
        probability of moving from hidden state i to hidden state j. 
        Since one cannot move to State 0 from any state, Column 0 
        has 0 values.  Similarly Row N+1 has 0 values.
        
        "b", the emission probabilities, is a (N+2) x (M+2) matrix such
        that b(i, j) is the probability of emitting output j in hidden
        state i.  Since
        Output 0 is only emitted in State 0, Row 0 has a 1.0 in 
        Column 0, and 0 values otherwise.  Similarly, Row N+1 has
        a 1.0 in Column M+1 and 0 values otherwise.
        '''
        
        
        # Check input
        if (a.shape[0] != a.shape[1] or
            a.shape[0] != b.shape[0]):
            raise Exception("Improper a or b matrix shapes.")
            
        # N is number of regular states     
        self.N = a.shape[0] - 2
        # M is number of regular outputs
        self.M = b.shape[1] - 2
        
        self.a = a
        self.b = b
        
        # Useful macros
        self.START = 0 # start state
        self.FINISH = self.N+1 # finish state
        self.STATES = range(1, self.N+1) # set of regular states
        
        # placeholders
        self.O = [] # observation sequence
        self.T = 0 # number of observations
        self.alpha_table = None
        self.beta_table = None
        self.viterbi_table = None
        self.viterbi_bpointers = None
        
    def print(self):
        print(f"N {self.N} M {self.M} T {self.T}")
        print("Transmission probabilities 'a'")
        print(self.a)
        print("Emission probabilities 'b'")
        print(self.b)
        
    def set_a_b(self, a, b):
        
        if (a.shape[0] != a.shape[1] or a.shape[0] != b.shape[0] or
            a.shape[0] != self.N+2 or b.shape[1] != self.M+2):
            raise Exception('Incompatible a, b shapes')
        
        self.a = a
        self.b = b
        self.reset_alpha_beta_tables()
        
    def reset_alpha_beta_tables(self):
        
        # intialize alpha values to save recursive calls
        self.alpha_table = np.array(
            [[-1.0] * (self.N+2)] * (self.T+2))
        self.alpha_table[self.T+1, :self.N+1] = 0.0
        self.alpha_table[0, 0] = 1.0
        self.alpha_table[0, 1:] = 0.0
        
        # initialize beta values
        self.beta_table = np.array(
            [[-1.0] * (self.N+2)] * (self.T+1)) 
        # Recall beta(T+1, i) is not defined.
        
    def set_observations(self, O):
        '''
        Set the observation sequence
        
        Assume the first observation is 0 and the last M+1, and
        the rest are in [1,...,M]. 
        '''
        
        # Check that observations are valid
        if (not set(O[1:-1]).issubset(set(range(1, self.M+1))) or
            O[0] != 0 or O[-1] != self.M+1):
            print(O)
            raise Exception("Invalid observation sequence.")
        self.O = O
        self.T = len(O)-2
        self.reset_alpha_beta_tables()
        
    
    def forward(self):
        '''Fill alpha_table, i.e., alpha_t(j)'s'''
        
        # This should be called after set_observation, 
        # else self.T == 0.
        
        if (self.T == 0):
            raise Exception("Premature call to alpha().")
        
        ##################################################
        # WRITE YOUR CODE HERE ###########################
        ##################################################
        # Please follow the algorithm as described in 
        # https://web.stanford.edu/~jurafsky/slp3/9.pdf
        # from 9.15 to 9.17
        # First fill for t = 0
        # Fill remaining values "recursively"
        
        self.alpha_table[0, self.START] = 1.0
#         for j in range(1, self.N+1):
#             self.alpha_table[0, j] = 0.0
        self.alpha_table[0, 1:self.N+1] = 0.0
            
        # Fill remaining values "recursively"
        for t in range(1, self.T+2):
#             for j in range(self.N+2):
#                 self.alpha_table[t, j] = sum(
#                     [self.alpha_table[t-1, i] * self.a[i, j]  
#                      for i in range(self.N+2)]
#                     ) * self.b[j, self.O[t]]
            self.alpha_table[t, :self.N+2] = np.matmul(
                self.alpha_table[t-1, :], self.a[:, :self.N+2]) * self.b[:self.N+2, self.O[t]]
                
        
    def viterbi(self, t, j):
                
        if (self.T==0 or t < 1 or t > self.T+1 or j < 0 or
            j > self.N + 1 or 
            (t == self.T+1 and j != self.FINISH)):
            raise Exception("Invalid call to viterbi().")
            
        # Check if value already available
        rv = self.viterbi_table[t, j]
        if rv <= -1:
            if t == 1:
                rv = self.a[self.START, j] * self.b[j, self.O[1]]
                self.viterbi_bpointers[t, j] = self.START
            else:
                if t == self.T+1:
                    possibles = [(self.viterbi(t-1, i) * 
                                  self.a[i,j] , i) 
                                 for i in self.STATES]
                else:             
                    possibles = [(self.viterbi(t-1, i) * 
                                  self.a[i,j] * self.b[j, self.O[t]],
                                  i)
                                 for i in self.STATES]
                    
                rv, max_i = max(possibles, key=lambda tup: tup[0])
                self.viterbi_bpointers[t, j] = max_i
            self.viterbi_table[t, j] = rv
        return rv
    
    def decode(self, O=None):
        '''Return the most likely hidden state sequenence
           for observation sequence obs based 
           on self.a and self.b, and its probability.
           O is the observations'''
        
        ##################################################
        # WRITE YOUR CODE HERE ###########################
        ##################################################
        # Your code should implement the VITERBI function
        # in Figure 9.11 on Jurafsky-Martin 
        # https://web.stanford.edu/~jurafsky/slp3/9.pdf.
        # In particular you should store probabilities in
        # a table and store backpointers in another table.
        # You need to construct the best hidden state sequence
        # using the backpointers and return it along with its
        # probability.  For this you will need to use self.a
        # and self.b, and values self.T, but you don't need to
        # use the alpha or beta tables.
        
        if O is not None:
            self.set_observations(O)
        self.viterbi_table = np.ones((self.T+2, self.N+2)) * -1.0
        self.viterbi_bpointers = np.ones((self.T+2, self.N+2),
                                         dtype = int) * -1
        
        p = self.viterbi(self.T+1, self.FINISH)
        bp_sequence = [self.FINISH]
        j = self.FINISH
        for t in range(self.T+1,0,-1):
                j = self.viterbi_bpointers[t, j]
                bp_sequence.append(j)
        return (list(reversed(bp_sequence)), p)
    
    def backward(self):
        '''Fill beta_table, i.e., beta_t(i)'s'''
               
        # Unlike alpha we don't define beta(T+1, i).
        # This should be called after set_observation, 
        # else self.T == 0.
        
        if (self.T == 0):
            raise Exception("Premature call to beta()")
            
        
        ##################################################
        # WRITE YOUR CODE HERE ###########################
        ##################################################
        # Please follow the algorithm as described in 
        # https://web.stanford.edu/~jurafsky/slp3/9.pdf
        # from 9.27 to 9.30
        # First fill for t = T
        # Fill for t=T-1 to 1 "recursively"
        # Fill in for t = 0 and i = START
        
        # First fill for t = T
        for i in range(1, self.N+1):
            self.beta_table[self.T, i] = self.a[i, self.FINISH]
            
        # Fill for t=1 to T-1 "recursively"
        for t in range(self.T-1, 0, -1):
#             for i in range(1, self.N+1):
#                 self.beta_table[t, i] = sum(
#                     [self.a[i, j] * self.b[j,  self.O[t+1]] * 
#                      self.beta_table[t+1, j] for j in self.STATES])
            for i in range(1, self.N+1):
                self.beta_table[t, i] = np.sum(
                    self.a[i, 1:self.N+1] * self.b[1:self.N+1,  self.O[t+1]] * 
                     self.beta_table[t+1, 1:self.N+1])
        
        # Fill in for t = 0 and i = START
#         self.beta_table[0, self.START] = sum(
#                 [self.a[self.START, j] * self.b[j,  self.O[1]] * 
#                  self.beta_table[1, j] for j in self.STATES])
        self.beta_table[0, self.START] = np.sum(
                self.a[self.START, 1:self.N+1] * self.b[1:self.N+1,  self.O[1]] * 
                 self.beta_table[1, 1:self.N+1])
        
        
        
    def xi(self, t, i, j):
        '''
        Return xi_t(i, j)
        '''        

        # The basic formula works for t=0 as well. For t=T we need
        # a special case because beta(t+1, j) is not defined for all j.
                
        if self.alpha_table[self.T+1, self.FINISH] <= 0.0:
            raise Exception("Impossible observation sequence")
    
        ##################################################
        # WRITE YOUR CODE HERE ###########################
        ##################################################
        # Please follow formula 9.37 in 
        # https://web.stanford.edu/~jurafsky/slp3/9.pdf
        # to compute the value of xi. 
        # You may need to take care of some corner cases carefully. 
        
        if t >= 0 and t < self.T:
            rv = (self.alpha_table[t, i] * self.a[i, j] * 
                   self.b[j, self.O[t+1]] * self.beta_table[t+1, j]
                 )/ self.alpha_table[self.T+1, self.FINISH]            
        elif t == self.T: # non-zero only for j == FINISH
            if j != self.FINISH:
                rv = 0.0
            else:
                rv = (self.alpha_table[t, i] * self.a[i, j] / 
                  self.alpha_table[self.T+1, self.FINISH])
        else:
            raise Exception("Invalid call to xi().")
        
        return rv
    
    def gamma(self, t, j):
        '''
        Return gamma_t(j)
        '''  
        
        rv = 0
        
        ##################################################
        # WRITE YOUR CODE HERE ###########################
        ##################################################
        # Your code should compute gamma_t(j) as specified
        # in Figure 9.16 of Jurafsky-Martin 
        # https://web.stanford.edu/~jurafsky/slp3/9.pdf. 
        # For this you wil need to use self.alpha_table, 
        # self.beta_table, self.T, and values such as self.START
        # (the number corresponding to start state, set at 0),
        # and self.FINISH (the number corresponding to the
        # the end state, set at self.N+1).
        
        if self.alpha_table[self.T+1, self.FINISH] <= 0.0:
            raise Exception("Impossible observation sequence")
        
        if t >0 and t < self.T+1:
            rv = (self.alpha_table[t, j] * self.beta_table[t, j] / 
                  self.alpha_table[self.T+1, self.FINISH])
        elif t == 0:
            if j == self.START:
                rv = 1.0
            else:
                rv = 0.0
        elif t == self.T+1:
            if j == self.FINISH:
                rv = 1.0
            else:
                rv = 0.0
        else:
            raise Exception("Invalid call to gamma()")
            
        return rv

    def a_from_xi(self, i, j, rtype='ratio'):
        '''
        Return new a_{ij} computed from xi
        rtype: we define two return types, "ratio" or "separate", 
        the separate return type will be used in later functions. 
        '''
    
        ##################################################
        # WRITE YOUR CODE HERE ###########################
        ##################################################
        # Your code should compute a(hat) as described
        # in Figure 9.16 of Jurafsky-Martin 
        # https://web.stanford.edu/~jurafsky/slp3/9.pdf. 
        
        if j == self.START or i == self.FINISH:
            numerator = 0.0
            denominator = 1.0
        else:
            numerator = sum([self.xi(t, i, j) for t in 
                                range(0, self.T+1)])
                
            # Denominator in Jurafsky-Martin can be improved.    
            #denominator = sum([self.xi(t, i, k) for t in 
            #                   range(0, self.T+1) for k in 
            #               range(self.N+2)])
            denominator = sum([self.gamma(t, i) for t in
                               range(self.T+1)])

        if rtype == 'separate':
            rv = (numerator, denominator)
        elif math.isclose(denominator, 0.0):
            print("Divide by zero in i={} j={}".format(i, j))
            rv = np.nan
        else:
            rv = numerator/denominator
        return rv
        
        return rv
        
    def b_from_gamma(self, j, v, rtype='ratio'):
        '''
        Return new b_{ij} computed from xi
        rtype: we define two return types, "ratio" or "separate", 
        the separate return type will be used in later functions. 
        '''
    
        ##################################################
        # WRITE YOUR CODE HERE ###########################
        ##################################################
        # Your code should compute b(hat) as described
        # in Figure 9.16 of Jurafsky-Martin 
        # https://web.stanford.edu/~jurafsky/slp3/9.pdf. 
        
        numerator = sum([self.gamma(t, j) for t in 
                         range(0, self.T+2) if self.O[t] == v])
        denominator = sum([self.gamma(t, j) for t in 
                           range(0, self.T+2)])

        if rtype == 'separate':
            rv = (numerator, denominator)    
        elif math.isclose(denominator, 0.0):
            print("Divide by zero in j={}, v={}".format(j, v))
            rv = np.nan
        else:
            rv = numerator/denominator
        return rv
    
    def new_a(self):
        rv = np.array([[-1.0] * (self.N+2)] * (self.N+2))
        for i in range(self.N+2):
            for j in range(self.N+2):
                rv[i,j] = self.a_from_xi(i, j)
        return rv
    
    def new_b(self):
        rv = np.array([[-1.0] * (self.M+2)] * (self.N+2))
        for j in range(self.N+2):
            for v in range(self.M+2):
                rv[j,v] = self.b_from_gamma(j,v)
        return rv
    
    def fit(self, O, max_iter=10, verbose=False):
        '''
        Run the forward-backward algorithm for a single observation
        '''
        
        self.set_observations(O)
        converged = False
        i = 0
        while not converged and i < max_iter:
            i += 1
            
            self.forward()
            self.backward()
            
            new_a = self.new_a()
            new_b = self.new_b()
            
            if verbose:
                print(f'Iteration {i}')
                print(new_b.T)
                print(new_a.T)
        
            if (np.allclose(new_a, self.a) and
                    np.allclose(new_b, self.b)):
                converged = True
            else:
                self.set_a_b(new_a, new_b)
        return(i)

In [None]:
# A toy example for testing your code, based on Jason Eisner's
# Workbook

transmission_prob = np.array([
    [0. , 0.5, 0.5, 0. ],
    [0. , 0.8, 0.1, 0.1],
    [0. , 0.2, 0.7, 0.1],
    [0. , 0. , 0. , 0. ]])

emission_prob = np.array([
    [1. , 0. , 0. , 0. , 0. ],
    [0. , 0.7, 0.2, 0.1, 0. ],
    [0. , 0. , 0.3, 0.7, 0. ],
    [0. , 0. , 0. , 0. , 1. ]])

observation_short = [
    0,
    2, 3, 3, 2,
    4
]

T_observation_short = len(observation_short) -2

observation_long = [
    0,
    2, 3, 3, 2,
    3, 2, 3, 2,
    2, 3, 1, 3,
    3, 1, 1, 1,
    2, 1, 1, 1,
    3, 1, 2, 1,
    1, 1, 2, 3,
    3, 2, 3, 2, 
    2,
    4
]

T_observation_long = len(observation_long)-2

In [None]:
hmms = HMM(transmission_prob, emission_prob)
hmms.set_observations(observation_short)
hmms.forward()
expected_alpha = np.array(
      [[1.        , 0.        , 0.        , 0.        ],
       [0.        , 0.1       , 0.15      , 0.        ],
       [0.        , 0.011     , 0.0805    , 0.        ],
       [0.        , 0.00249   , 0.040215  , 0.        ],
       [0.        , 0.002007  , 0.00851985, 0.        ],
       [0.        , 0.        , 0.        , 0.00105268]])

assert np.allclose(hmms.alpha_table, expected_alpha)

hmms.backward()
expected_beta = np.array([
    [ 0.00105268, -1.        , -1.        , -1.        ],
    [-1.        ,  0.0011457 ,  0.0062541 , -1.        ],
    [-1.        ,  0.00327   ,  0.01263   , -1.        ],
    [-1.        ,  0.019     ,  0.025     , -1.        ],
    [-1.        ,  0.1       ,  0.1       , -1.        ]])
assert np.allclose(hmms.beta_table, expected_beta)

expected_decode = ([0, 2, 2, 2, 2, 3], 0.0007563149999999998)
assert hmms.decode(hmms.O) == expected_decode

In [None]:
hmml = HMM(transmission_prob, emission_prob)
hmml.fit(observation_long, max_iter = 20)
expected_a = np.array([
    [0.00000000e+00, 5.68740363e-15, 1.00000000e+00, 0.00000000e+00],
    [0.00000000e+00, 9.33778559e-01, 6.62214406e-02, 1.21039999e-15],
    [0.00000000e+00, 7.18667227e-02, 8.64943927e-01, 6.31893502e-02],
    [0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00]])
expected_b = np.array([
    [1.        , 0.        , 0.        , 0.        , 0.        ],
    [0.        , 0.64048263, 0.14806965, 0.21144771, 0.        ],
    [0.        , 0.        , 0.53439047, 0.46560953, 0.        ],
    [0.        , 0.        , 0.        , 0.        , 1.        ]])
assert np.allclose(hmml.a, expected_a)
assert np.allclose(hmml.b, expected_b)

expected_decode_result = ([0, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 1, 1, 1, 
                           1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 
                           2, 2, 2, 2, 2, 2, 3],
                          1.4779964597903278e-16)
assert np.allclose(hmml.decode(hmml.O)[0], expected_decode_result[0])
assert np.isclose(hmml.decode(hmml.O)[1], expected_decode_result[1])

In [None]:
class HMMMulti(HMM):
    '''
    Run the forward-backward algorithm for multiple independent observations
    
    Follow the description in Sec 4.1 of "Training Hidden Markov Models with 
    Multiple Observations – A Combinatorial Method" by Li, Parizeau, 
    and Plamondon, available at 
    http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.335.1457&rep=rep1&type=pdf
    '''    
    
    def update_new_a_num_deno(self):
        for i in range(self.N+2):
            for j in range(self.N+2):
                (num, deno) = self.a_from_xi(i, j, 'separate')
                self.new_a_num[i,j] += num
                self.new_a_deno[i,j] += deno
    
    def update_new_b_num_deno(self):
        for j in range(self.N+2):
            for v in range(self.M+2):
                (num, deno) = self.b_from_gamma(j, v, 'separate')
                self.new_b_num[j, v] += num
                self.new_b_deno[j, v] += deno
                
    def get_likelihoods(self):
        '''Get the likelihood at each iteration of
           the fit/learning step.
           
           Assume this is called only after a call to fit().'''
        
        ##################################################
        # WRITE YOUR CODE HERE ###########################
        ##################################################
        # You have to write code here, and make changes in 
        # other parts of this 
        # class as well to store the likelihoods of the 
        # entire set of observations Oset used in fit,
        # at each step of the fit process, and return them
        # when this function is called.
        
        prob = 0 # in log space
        self.forward()
        prob += np.log(self.alpha_table[
            self.T+1, self.FINISH])
        return np.exp(prob)



    def fit(self, Oset, max_iter=10, verbose=False):
        
        self.new_a_num = np.array([[0.0] * (self.N+2)] * (self.N+2))
        self.new_a_deno = np.array([[0.0] * (self.N+2)] * (self.N+2))
        self.new_b_num = np.array([[0.0] * (self.M+2)] * (self.N+2))
        self.new_b_deno = np.array([[0.0] * (self.M+2)] * (self.N+2))

        
        converged = False
        i = 0
        
        while not converged and i < max_iter:
            i += 1
                   
            for O in Oset:
                self.set_observations(O)
                
                self.forward()
                self.backward()
                
                
            
                self.update_new_a_num_deno()
                self.update_new_b_num_deno()
            
            new_a = self.new_a_num / self.new_a_deno
            new_b = self.new_b_num / self.new_b_deno
            
            if verbose:
                print(f'Iteration {i}')
                print(new_b.T)
                print(new_a.T)
            else:
                print(f'Iteration {i}')
        
            if (np.allclose(new_a, self.a) and
                    np.allclose(new_b, self.b)):
                converged = True
            else:
                self.set_a_b(new_a, new_b)
                self.new_a_num.fill(0)
                self.new_a_deno.fill(0)
                self.new_b_num.fill(0)
                self.new_b_deno.fill(0)
                
                
        return(i)
    

In [None]:
hmmml = HMMMulti(transmission_prob, emission_prob)
hmmml.fit([observation_long]*5, max_iter = 20)
expected_a = np.array([
    [0.00000000e+00, 5.68740363e-15, 1.00000000e+00, 0.00000000e+00],
    [0.00000000e+00, 9.33778559e-01, 6.62214406e-02, 1.21039999e-15],
    [0.00000000e+00, 7.18667227e-02, 8.64943927e-01, 6.31893502e-02],
    [0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00]])
expected_b = np.array([
    [1.        , 0.        , 0.        , 0.        , 0.        ],
    [0.        , 0.64048263, 0.14806965, 0.21144771, 0.        ],
    [0.        , 0.        , 0.53439047, 0.46560953, 0.        ],
    [0.        , 0.        , 0.        , 0.        , 1.        ]])
assert np.allclose(hmml.a, expected_a)
assert np.allclose(hmml.b, expected_b)
expected_likelihoods = [8.463524947168683e-21, 6.45760751286262e-19,
                        2.604423720662633e-18, 3.772405535111533e-18,
                        3.934445572171057e-18, 3.945375545182965e-18,
                        3.9459945227193204e-18, 3.946030509476693e-18,
                        3.946032758826868e-18, 3.946032909436204e-18,
                        3.94603292005471e-18]
assert np.allclose(hmmml.get_likelihoods(), expected_likelihoods)

In [None]:
import nltk
from nltk.corpus import brown

In [None]:
class POS:
    '''
        Partition given sentences, or those obtained from the brown
        corpus, equally into fit sentences and test sentences.
        For each obtain tags using nltk tagger.  Initialize "a"
        with the tags from all sentences, but with uniform
        transition probabilities.  Initialize "b" with all words
        in all sentences.  But the emission probability 
        initialization should only use the words in fit sentences.
        For each word in the fit sentence, give its most common
        tag "high" probability of emitting it.  All other words
        get a "low" probability of being emitted by a tag.  (The
        high and low values are different for each tag because of
        normalization.)
        '''
        
    def __init__(self, sents=None, category=None, 
                 num_sents=40,
                 b_bound = 1*10**(-4), 
                 tagset="universal"):
        
        if sents is None:
            self.sents = brown.sents(categories=category)[:num_sents]
            self.tagged_sents = brown.tagged_sents(
                categories=category, tagset=tagset)[:num_sents]
        else:
            self.sents = [nltk.word_tokenize(s) for s in sents]
            self.tagged_sents = [nltk.pos_tag(s, tagset = tagset)
                                 for s in self.sents]
            
        self.fsize = int(len(self.sents) * 0.5)
        self.fit_tsents = self.tagged_sents[:self.fsize]
        self.test_tsents = self.tagged_sents[self.fsize:]
        tagged_words = [tup for s in self.tagged_sents[:self.fsize] 
                        for tup in s]
        
        self.vocabulary = (['_SWORD'] + # Special observation in STAG
                      sorted(list(set([w.lower() for 
                                    s in self.sents for w in s]))) +
                      ['_FWORD']) # Special observation in FTAG
        self.M = len(self.vocabulary) # Number of distinct observations
        
        print(f'Vocabulary size {self.M}')
    
        self.tags = (['STAG'] + # Special start state
                     sorted(list(set([p[1] for s in self.tagged_sents 
                                        for p in s]))) + 
                     ['FTAG']) # Special end state
        self.N = len(self.tags) # Number of states
    
        self.w2i = dict(zip(self.vocabulary, 
                            range(len(self.vocabulary))))
        self.t2i = dict(zip(self.tags, range(len(self.tags))))
        
        # Initialize a (transition probabilities)
        #self.a = np.random.rand(self.N, self.N)
        self.a = np.ones((self.N, self.N))
        self.a[:,0] = 0.0
        self.a = self.a/(self.a.sum(axis=1).reshape(
            self.a.shape[0],1))
        self.a[-1,:] = 0.0
        
        cfd = nltk.ConditionalFreqDist((word.lower(), tag) for 
                                       (word, tag)
                                       in tagged_words)
        
        # Initialize b (emission probabilities)
        #self.b = np.random.rand(self.N, self.M)*b_random_bound
        self.b = np.ones((self.N, self.M))*b_bound
        # STAG only produces _SWORD and FTAG only produces _FWORD
        self.b[0,:] = 0.0
        self.b[:,0] = 0.0
        self.b[0,0] = 1.0
        self.b[-1,:] = 0.0
        self.b[:,-1] = 0.0
        self.b[-1, -1] = 1.0
        # Set non-trivial probability of a word being produced
        # by its most common tag
        for word in cfd.conditions():
            tag = (cfd[word].most_common(1)[0][0])
            self.setb(tag, word, 0.5)
        self.b = self.b/(self.b.sum(axis=1).reshape(self.N,1))
        
        # Initialize an HMM on oset with a, b
        self.hmm = HMMMulti(self.a.copy(), self.b.copy())
        
    def create_oset(self):
        
        self.oset = []
        for ts in self.tagged_sents:
            s, _ = split_tagged_sent(ts)
            self.oset.append(self.s2o(s))
        
    def fit(self, max_iter=10):
        
        # Create oset
        self.create_oset()
            
        # Fit the hmm 
        self.hmm.fit(self.oset, max_iter=max_iter)
        
    def test(self):
        
        for descrip, ts_set in [("fit set", self.fit_tsents), 
                                ("test set", self.test_tsents)]:        
            correct = 0
            sum_accuracy = 0
            for ts in ts_set:
                s, tag_seq = split_tagged_sent(ts)
                predicted, _ = self.decode(s)
                a = accuracy(tag_seq, predicted)
                if a == 1.0:
                    correct += 1
                sum_accuracy += a
            avg_accuracy = sum_accuracy/len(ts_set)
            correct_ratio = correct/len(ts_set)
            print(f"For {descrip}: Average accuracy = "
                  f"{avg_accuracy:.3f}," 
                  f" Totally correct ratio = " 
                  f"{correct_ratio:.3f}")
              
    def decode(self, sentence):
        if sentence[0] != 0: # Weak check if converted
            O = self.s2o(sentence)
        else:
            O = sentence.copy()
        state_seq, prob = self.hmm.decode(O)
        tag_seq = [self.tags[s] for s in state_seq]
        return (tag_seq, prob)
        
    def setb(self, j, w, p):
        self.b[self.t2i[j], self.w2i[w]] = p
    def getb(self, j, w, b=None):
        if b is None:
            b = self.b
        return b[self.t2i[j], self.w2i[w]]
    def seta(self, i, j, p):
        self.a[self.t2i[i], self.t2i[j]] = p
    def geta(self, i, j, a=None):
        if a is None:
            a = self.a
        return a[self.t2i[i], self.t2i[j]]
    def s2o(self, sentence):
        return([0] + 
               [self.w2i[w.lower()] for w in sentence] + 
               [self.w2i['_FWORD']])
    def i2t(self, state_sequence):
        return [self.tags[i] for i in state_sequence][1:-1]
    
    def print_b_unsorted(self, b=None):
        if b is None:
            b = self.b
        for t in self.tags:
            print(f'\n\n{t}', end='')
            for w in self.vocabulary:
                p = self.getb(t, w, b)
                if p > 10**(-5):
                    print(f' {w}[{p:.5f}]', end='')
                    
    def print_a_unsorted(self, a=None):
        if a is None:
            a = self.a
        for t in self.tags:
            print(f'\n\n{t}', end='')
            for u in self.tags:
                p = self.geta(t, u, a)
                print(f' {u}[{p:.5f}]', end='')
                
    def print_b(self, b=None):
        if b is None:
            b = self.b
        for t in self.tags:
            ptups = zip(b[self.t2i[t],:],self.vocabulary)
            ptups = sorted(ptups, key = lambda tup: tup[0],
                          reverse = True) 
            print(f'\n\n{t}', end='')
            for tup in ptups:
                print(f' {tup[1]}[{tup[0]}]', end='')
                
    def print_a(self, a=None):
        if a is None:
            a = self.a
        for t in self.tags:
            ptups = zip(a[self.t2i[t],:],self.tags)
            ptups = sorted(ptups, key = lambda tup: tup[0],
                          reverse = True) 
            print(f'\n\n{t}', end='')
            for tup in ptups:
                print(f' {tup[1]}[{tup[0]}]', end='')
                
    def create_tagged_sent(self, sentence, tags):
        if sentence[0] != '_SWORD':
            s = (['_SWORD'] + 
                 [w.lower() for w in sentence] + 
                 ['_FWORD'])
        else:
            s = sentence
        return list(zip(s, tags))
    
    def check_sent(self, sentence_number):
        n = sentence_number
        s = self.sents[n]
        tag_seq = [tag for (word, tag) in 
                             self.tagged_sents[n]]
        predicted_tag_seq = self.decode(s)[0][1:-1]
        return list(zip(s, tag_seq, predicted_tag_seq))

def split_tagged_sent(tagged_sent):
    return list(zip(*tagged_sent))

def accuracy(tag_seq, predicted_tag_seq):
    if predicted_tag_seq[0] == 'STAG':
        p = predicted_tag_seq[1:-1]
    else:
        p = predicted_tag_seq
    return (sum(t == pt for (t, pt) in zip(tag_seq, p))/
            len(tag_seq))


In [None]:
sents_example_1 = [
    "Janet will back the bill.",
    "Richard will eat an apple.",
    "Mary will run a mile.",
    "Sam will write an essay.",
]

start = time.time()
pos = POS(sents_example_1)
pos.fit(100)
pos.test()
pos.print_a(pos.hmm.a)
pos.print_b(pos.hmm.b)
finish = time.time()
elapsed = finish - start
print("time elapsed: ", elapsed)

############################################################
# PLEASE ANSWER THE FOLLOWING:  ############################
############################################################
# Did the HMM based POS tagger learn a correct POS
# tagging model? If not, what might be the cause?

In [None]:
sents_example_2 = [
    "Janet will back the bill.",
    "Janet likes music.",
    "Richard will eat an apple.",
    "Peter needs rest.",
    "Mary will run a mile.",
    "Joe hates TV.",
    "Sam will write an essay.",
    "Ramona loves cooking.",
]

start = time.time()
pos = POS(sents_example_2)
pos.fit(100)
pos.test()
pos.print_a(pos.hmm.a)
pos.print_b(pos.hmm.b)
finish = time.time()
elapsed = finish - start
print("time elapsed: ", elapsed)

############################################################
# PLEASE ANSWER THE FOLLOWING:  ############################
############################################################
# Did the HMM based POS tagger learn a correct POS
# tagging model? If not, what might be the cause?

In [None]:
from nltk import sent_tokenize

with open("dev.txt", "r") as fin:
    text = fin.read()
    
sents_example_3 = sent_tokenize(text)

start = time.time()
pos = POS(sents_example_3)
pos.fit(100)
pos.test()
pos.print_a(pos.hmm.a)
pos.print_b(pos.hmm.b)
finish = time.time()
elapsed = finish - start
print("time elapsed: ", elapsed)

############################################################
# PLEASE ANSWER THE FOLLOWING:  ############################
############################################################
# Did the HMM based POS tagger learn a correct POS
# tagging model? If not, what might be the cause?