# **HMM TRAINING**

# Importing Required Libraries

In [1]:
# importing required libraries
import numpy as np

# Creating a function for getting the sequence and observables

In [2]:
# defining the function that takes the experiment data and return the sequence and that observables
def data(experiment_data):
  # creating a ssequence containing the observations
  sequence = np.zeros(shape=(len(experiment_data)))
  # defining a dictionary of obervables
  observable = dict()
  for t in range(0, len(experiment_data)):
    data = experiment_data[t]
    if not data in observable:
      observable[data] = len(observable)
    sequence[t] = observable[data]
  sequence = sequence.astype(dtype='int')
  # returning the sequence and observables
  return sequence,observable

# Defining a Class for HMM Training using forward and backward
### Definitions:
### (1) HMM = (Q, pi, A, B), where
### - Q: A set of N hidden states
### - N: The number of hidden states
### - pi: An initial probability distribution (1xN)
### - A: A transaction probability matrix (NxN)
### - B: Observation likelihoods (NxV)
### (2) An observations sequence = o_0, o_1, ..., o_{T-1}; where o_i (0<=t<=T-1) is drawn from a observations.
### - T: the number of observations in a sequence.


In [3]:
class HMM_Forward_Backward:
    # the hmm class wil take aargument the number of hidden states
    def __init__(self, hidden_states):
      self.hidden_states = hidden_states 

    # defining the function for training the model
    def train_model(self, A, B, pi, sequence, observables, iterations=100,print_data=0):
    # for wach iterations we perform two steps E step and M step
      for iteration in range(iterations):
        print('For iteration: ' + str(iteration) + '\n')
        # E-step Expecting the values of alpha, beta, gamma and , zeta/zi
        alpha_value = self.alpha_value(sequence, A, B, pi)
        beta_value = self.beta_value(sequence, A, B)
        zeta_value = self.zeta_value(sequence, alpha_value, beta_value, A, B)
        gama_value = self.gama_value(alpha_value, beta_value)

        # M-step Maximization step
        self.compute_A(A, zeta_value)
        self.compute_B(gama_value, B, sequence, observables)
    
        # Prob_Obervations should be decreased after iterations
        Prob_Obervations = self.Probability_Obervations(alpha_value)
        print('Prob_Obervations = ' + str(Prob_Obervations) + '\n')

        if print_data:
          print('Alpha:\n'+str(alpha_value)+'\n')
          print('Beta:\n'+str(beta_value)+'\n')
          print('A: \n' + str(A) + '\n')
          print('B: \n' + str(B) + '\n')
          print('pi: \n' + str(pi) + '\n')
          print('Observables: ' + str(observables))
      return A,B,pi

    # Initialise the hmm parameters for two different kind of hmms defined in pdf
    # For general purpose the parameters can be randomly generated
    # A[i, j]: probability of transition from state i to state j
    # B[i, j]: probability of seeing symbol o_j from state i
    # pi = initial probability of states
    def initialize_parameters(self, states,observables_size):
      if states == 2:
        #A = np.random.rand(self.hidden_states, self.hidden_states)
        #A = A / A.sum(axis=1, keepdims=True)
        A = np.array([[0.96,1.0],[0.04,0.0]])
        #B = np.random.rand(self.hidden_states, observables_size)
        #B = B / B.sum(axis=1, keepdims=True)
        B = np.array([[0.52,0.48],[1.0,0.0]])
        #pi = np.random.rand(self.hidden_states)
        #pi = pi / pi.sum()
        pi = np.array([1.0,0.0])
      elif states == 3:
        #A = np.random.rand(self.hidden_states, self.hidden_states)
        #A = A / A.sum(axis=1, keepdims=True)
        A = np.array([[0,1.0,1.0],[0.5,0.0,0.0],[0.5,0.0,0.0]])
        #B = np.random.rand(self.hidden_states, observables_size)
        #B = B / B.sum(axis=1, keepdims=True)
        B = np.array([[0.5,0.5],[0.5,0.5],[0.5,0.5]])
        #pi = np.random.rand(self.hidden_states)
        #pi = pi / pi.sum()
        pi = np.array([1.0,0.0,0.0])
      return A, B, pi

    # computing the probablity of observations p(o|lamba)
    def Probability_Obervations(self, alpha_value):
      Prob = 0
      hidden_states, num_of_observations = alpha_value.shape
      for i in range(hidden_states):
        Prob += alpha_value[i, num_of_observations - 1]
      return Prob
    
    # calculating the alpha values
    # alpha[1] = pi * b_j(o_t)
    # alpha[i] = sum_j(alpha[i-1]_j * A_i,j * B_i(o_t)) 
    def alpha_value(self, sequence, A, B, pi):
      num_of_observations = len(sequence)
      hidden_states, _ = A.shape
      alpha_value = np.zeros(shape=(hidden_states, num_of_observations))
      # initializing 
      for j in range(hidden_states):
        alpha_value[j, 0] = pi[j] * B[j, sequence[0]]
      # recursive
      for t in range(1, num_of_observations):
        for j in range(hidden_states):
          for i in range(hidden_states):
            alpha_value[j, t] += alpha_value[i, t - 1] * A[i, j] * B[j, sequence[t]]
      return alpha_value

    # calculating the beta values
    def beta_value(self, sequence, A, B):
      num_of_observations = len(sequence)
      hidden_states, _ = A.shape
      beta_value = np.zeros(shape=(hidden_states, num_of_observations))
      # initializing
      for i in range(hidden_states):
        beta_value[i, num_of_observations - 1] = 1
      # recursive
      for t in range(num_of_observations - 2, -1, -1):
        for i in range(hidden_states):
          for j in range(hidden_states):
            beta_value[i, t] += A[i, j] * B[j, sequence[t + 1]] * beta_value[j, t + 1]
      return beta_value

    # Calculating the zeta or zi values
    def zeta_value(self, sequence, alpha_value, beta_value, A, B):
      num_of_observations = len(sequence)
      hidden_states, _ = A.shape
      zeta_value = np.zeros(shape=(num_of_observations, hidden_states, hidden_states))
      for t in range(num_of_observations - 1):
        # probability of abservations required for normalization
        Prob_Obervations = 0
        for j in range(hidden_states):
          Prob_Obervations += alpha_value[j, t] * beta_value[j, t]
        for i in range(hidden_states):
          for j in range(hidden_states):
            zeta_value[t, i, j] = alpha_value[i, t] * A[i, j] * B[j, sequence[t + 1]] * beta_value[j, t + 1]
            zeta_value[t, i, j] = zeta_value[t, i, j] / Prob_Obervations
      return zeta_value
    
    # Calculating the gamma values
    def gama_value(self, alpha_value, beta_value):
      hidden_states, num_of_observations = alpha_value.shape
      gama_value = np.zeros(shape=(hidden_states, num_of_observations))
      for t in range(num_of_observations):
        # probability of abservations required for mormalization
        Prob_Obervations = 0
        for j in range(hidden_states):
          Prob_Obervations += alpha_value[j, t] * beta_value[j, t]
        for j in range(hidden_states):
          gama_value[j, t] = alpha_value[j, t] * beta_value[j, t] / Prob_Obervations
      return gama_value

    # calculating the updated A values
    def compute_A(self, A, zeta_value):
      hidden_states, _ = A.shape
      num_of_observations = zeta_value.shape[0]
      for i in range(hidden_states):
        for j in range(hidden_states):
          numerator = 0
          for t in range(num_of_observations - 1):
            numerator += zeta_value[t, i, j]
            denominator = 0
            for t in range(num_of_observations - 1):
              for k in range(hidden_states):
                denominator += zeta_value[t, i, k]
            A[i, j] = numerator / denominator
      return A

    # calculating the updated B values
    def compute_B(self, gama_value, B, sequence, vocaburary):
      num_of_observations = len(sequence)
      hidden_states = gama_value.shape[0]
      for j in range(hidden_states):
        for k in range(len(observables)):
          numerator = 0
          for t in range(num_of_observations):
            if sequence[t] == list(vocaburary.values())[k]:
              numerator += gama_value[j, t]
              denominator = 0
              for t in range(num_of_observations):
                denominator += gama_value[j, t]
          B[j, list(vocaburary.values())[k]] = numerator / denominator
      return B

In [4]:
# states and input sequence 
s = int(input("Enter Number of states:\n"))
d = input("Enter the Observation sequence:\n")

# the size of vocobulary is 2
sequence, observables = data(d)

# train HMM hidden states defines the number of observables.
hmm = HMM_Forward_Backward(hidden_states=2)  
A, B, pi = hmm.initialize_parameters(s,observables_size=len(observables))

Enter Number of states:
2
Enter the Observation sequence:
01011


In [5]:
a,b,p = hmm.train_model(A, B, pi, sequence, observables,iterations=100,print_data=0)

For iteration: 0

Prob_Obervations = 0.0275188937738158

For iteration: 1

Prob_Obervations = 0.03356313095637386

For iteration: 2

Prob_Obervations = 0.033916391110135224

For iteration: 3

Prob_Obervations = 0.034149151528151185

For iteration: 4

Prob_Obervations = 0.03429971404311325

For iteration: 5

Prob_Obervations = 0.03439591338005851

For iteration: 6

Prob_Obervations = 0.03445688616067224

For iteration: 7

Prob_Obervations = 0.03449533278381869

For iteration: 8

Prob_Obervations = 0.034519496033181926

For iteration: 9

Prob_Obervations = 0.03453465090483858

For iteration: 10

Prob_Obervations = 0.03454414345255639

For iteration: 11

Prob_Obervations = 0.03455008442900956

For iteration: 12

Prob_Obervations = 0.03455380072343134

For iteration: 13

Prob_Obervations = 0.034556124652899645

For iteration: 14

Prob_Obervations = 0.03455757759572672

For iteration: 15

Prob_Obervations = 0.03455848587529123

For iteration: 16

Prob_Obervations = 0.03455905362437815

For 

In [8]:
print("After Training:")
print("Initial state probabilities pi:\n",p)
print("Final A matrix:\n",a)
print("Final B matrix:\n",b)

After Training:
Initial state probabilities pi:
 [1. 0.]
Final A matrix:
 [[1.00000000e+00 1.30423689e-22]
 [1.00000000e+00 0.00000000e+00]]
Final B matrix:
 [[0.4 0.6]
 [1.  0. ]]


# **POS Tagging**

In [34]:
def viterbi(observations, states, start_p, trans_p, emit_p):
  V = [{}]
  for st in states:
    V[0][st] = {"prob": start_p[st] * emit_p[st][observations[0]], "prev": None}
  for t in range(1, len(observations)):
    V.append({})
    for st in states:
      max_tr_prob = V[t - 1][states[0]]["prob"] * trans_p[states[0]][st]
      prev_st_selected = states[0]
      for prev_st in states[1:]:
        tr_prob = V[t - 1][prev_st]["prob"] * trans_p[prev_st][st]
        if tr_prob > max_tr_prob:
          max_tr_prob = tr_prob
          prev_st_selected = prev_st
      max_prob = max_tr_prob * emit_p[st][observations[t]]
      V[t][st] = {"prob": max_prob, "prev": prev_st_selected}
  for line in dptable(V):
      print(line)
  opt = []
  max_prob = 0.0
  best_st = None
  for st, data in V[-1].items():
    if data["prob"] > max_prob:
      max_prob = data["prob"]
      best_st = st
  opt.append(best_st)
  previous = best_st
  for t in range(len(V) - 2, -1, -1):
    opt.insert(0, V[t + 1][previous]["prev"])
    previous = V[t + 1][previous]["prev"]
  print ("The steps of states are " + " ".join(opt) + " with highest probability of %s" % max_prob)

def dptable(V):
    yield " ".join(("%12d" % i) for i in range(len(V)))
    for state in V[0]:
        yield "%.7s: " % state + " ".join("%.7s" % ("%f" % v[state]["prob"]) for v in V)

In [35]:
observations = ("jayesh", "will", "spot","will")
states = ("N", "M", "V")
pi = {"N": 0.75, "M": 0.25,"V": 0.0}
A = {
    "N": {"N": 0.11, "M": 0.33,"V": 0.11},
    "M": {"N": 0.25, "M": 0.0,"V": 0.75},
     "V": {"N": 1.0, "M": 0.0,"V": 0.0},
}
B = {
    "N": {"gaurav":0.44,"jayesh":0.22,"will":0.11,"spot":0.23,"can":0,"see":0,"pat":0},
    "M": {"gaurav":0,"jayesh":0,"will":0.75,"spot":0,"can":0.25,"see":0,"pat":0},
    "V": {"gaurav":0,"jayesh":0,"will":0,"spot":0.25,"can":0,"see":0.5,"pat":0.25},
}

In [36]:
print("Trainig Corpus:\ngaurav jayesh can see will\nspot will see gaurav\nwill jayesh spot gaurav?,gaurav will pat spot.\n")
print("Observations:\n",observations,"\n")
print("Expected Output:\nN M V N\n")
viterbi(observations, states, pi, A, B)

Trainig Corpus:
gaurav jayesh can see will
spot will see gaurav
will jayesh spot gaurav?,gaurav will pat spot.

Observations:
 ('jayesh', 'will', 'spot', 'will') 

Expected Output:
N M V N

           0            1            2            3
N: 0.16500 0.00199 0.00234 0.00084
M: 0.00000 0.04083 0.00000 0.00058
V: 0.00000 0.00000 0.00765 0.00000
The steps of states are N M V N with highest probability of 0.0008422734375000001
