<a href="https://colab.research.google.com/github/Nukaraju2003/Natural-Language-Processing/blob/main/HMM_POSassignment.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# HMM POS Classifier

In this notebook we shall implement the POS tagger using HMM method. 

Check the [video description](https://youtu.be/tEebcmTxvWc)

The datset can be downloaded from:
* [Training Data](https://drive.google.com/file/d/1Bsw4BxpW98gK3vOJNdwcAKQ-8WO38CtR/view?usp=sharing)
* [Testing Data](https://drive.google.com/file/d/1ZzkigI6TFGVCbAXuPmyx4lekf3WoZkq_/view?usp=sharing)

Carefully observe and learn the implementation. We shall implement both vectorised and the regular one. See the performance boost in the vectorised version.



In [None]:
def read_data(fname):
    exemplars = []
    file = open(fname, 'r');
    for line in file:
        data = tuple([w.lower() for w in line.split()])
        exemplars += [ (data[0::2], data[1::2]), ]

    return exemplars

In [None]:
import math
_NINF_LOG = -300
_NINF = float('-1e300')
class Probdist:
  def __init__(self, freqdist, gamma=0.5, bins=None):
    self._gamma = gamma
    self._freqdist = freqdist
    n = sum(freqdist.values())
    self._bins = len(freqdist) if bins is None else bins
    self._divisor = n + self._bins*self._gamma

  def prob(self, sample):
    c = self._freqdist.get(sample, 0) + self._gamma
    return c/self._divisor

  def logprob(self, sample):
    p = self.prob(sample)
    return math.log(p, 2) if p!=0 else _NINF_LOG

  def samples(self):
    return list(self._freqdist.keys())

In [None]:
train_set = read_data('bc.train')

In [None]:
test_set = read_data('bc.test')

In [None]:
example = test_set[0]

In [None]:
list(zip(example[0], example[1]))

[('at', 'adp'),
 ('the', 'det'),
 ('same', 'adj'),
 ('instant', 'noun'),
 (',', '.'),
 ('nick', 'noun'),
 ('hit', 'verb'),
 ('the', 'det'),
 ('barrel', 'noun'),
 ('and', 'conj'),
 ('threw', 'verb'),
 ('himself', 'pron'),
 ('upon', 'adp'),
 ('the', 'det'),
 ('smaller', 'adj'),
 ('man', 'noun'),
 ('.', '.')]

## POS Tagging

Given data where each word in corpus has a corresponding POS tag, you can have features as the word itself and labels as the POS tag. Now you can consider unigram, bigram, trigram etc. You need to think of whether to simplify the word, tokenize each word etc.

In [None]:
START = "START" # a fixed start state for a sentence
END = "END" # a fioxed end state for a sentence
import numpy as np
class HMMClassifier:
  def __init__(self):
    self.S = {START, END} # set of states
    self.O = set() # set of observations
    self.E = {} # emission counts
    self.T = {} # tranistions counts

  def _add_training_example(self, example):
    # get counts for emissions and transitions
    prev_state = START

    for o, s in list(zip(example[0], example[1])):
      self.S.add(s) # append the observed state to set of states
      self.O.add(o) # append the observed emission to set of observations

      self.E[s,o] = self.E.get((s,o), 0) + 1 # set emission counts
      self.T[prev_state, s] = self.T.get((prev_state, s), 0)+1
      prev_state = s

    self.T[prev_state, END] = self.T.get((prev_state, END), 0)+1

  def train(self, dataset):
    for example in dataset:
      self._add_training_example(example)

    # Now turn the counts into probability distributions of logprobs
    self.pE = Probdist(self.E, gamma=0)
    self.pT = Probdist(self.T, gamma=0)
    self.states = list(self.S)
    m = len(self.states)
    
    self.transitions = np.zeros(m*m).reshape(m,m)
    # record transitions to state i from any state
    for i in range(m):
      for j in range(m):
        self.transitions[i][j] = self.pT.logprob((self.states[i], self.states[j]))

    # record emissions from state i to any observation
    # alongside we must have a dict to map observation to index
    # and state to its index
    n = len(self.O)
    self.emissions = (np.ones(m*n)*_NINF_LOG).reshape(m,n)
    self.state_idx = {s:i for i,s in enumerate(self.states)}
    self.obs_idx = {o:i for i,o in enumerate(self.O)}
    for k,v in self.pE.samples():
      i = self.state_idx[k]
      j = self.obs_idx[v]
      self.emissions[i][j] = self.pE.logprob((k,v))

  def fit(self, observations):
    """
    given array of observations determine the best possible state transitions
    that led to those observations. This will implement Viterbi algorithm
    find S1,S2....Sn that emits O1,O2...On such that pS1*pS2...pSn is maximized

    V[i][j] = maximum probability for considering i observations given it
    reached state j at i_th observation
    
    V[i][j] = max(V[i-1]) + transition_prob[Sj-1,Sj]*Emission_prob[Sj to Oj]

    this translates to
    MAX(V[i-1][0 to n as k] + transition_prob[Sk,Sj]*Emission_prob[Sj to Oj])
    
    We need to do sum of logprobs instead
    """

    m = len(self.states) # do we need to remove start and end as they are initial and final states!!
    n = len(observations) # 2 for start and end

    viterbi_prob = np.zeros(m*n).reshape(m,n) # store max probs
    viterbi_path = np.zeros(m*n).reshape(m,n) # store paths for max probs

    # initialize the first state
    for i in range(m):
      viterbi_prob[i][0] = self.pE.logprob((self.states[i], observations[0])) + self.pT.logprob((START, self.states[i]))

    # now calculate rest of the viterbi matrix
    for i in range(1,n):
      for j in range(m):
        c = [viterbi_prob[k][i-1] + \
             self.pT.logprob((self.states[k], self.states[j])) + \
             self.pE.logprob((self.states[j], observations[i])) for k in range(m)]
        viterbi_prob[j][i] = np.max(c)
        viterbi_path[j][i] = np.argmax(c)

    #done now we can emit the path in reverse direction
    prediction = np.zeros(n+1, dtype=np.int32) # important the data type
    # the final state is the one that gave max prob when moving
    # to the end state
    prediction[n] = np.argmax( \
        [viterbi_prob[m-1][n-1] + self.pT.logprob((self.states[j], END)) for j in range(m)])
    
    for i in range(n-1,0, -1):
      prediction[i] = viterbi_path[prediction[i+1]][i]

    return [self.states[prediction[i+1]] for i in range(n)]

  """
  Let us not try to vectorise viterbi implementation to make it faster
  """
  def npfit(self, observations):
    m = len(self.states)
    n = len(observations)

    viterbi_prob = np.zeros(m*n).reshape(m,n)
    viterbi_path = np.zeros(m*n).reshape(m,n)

    # initialize the first state
    infar = np.ones(m)*_NINF_LOG


    # initialize first column to be emission of the first observaton
    # from that state, if observation is in training data
    # else Negative Infinity
    viterbi_prob[:,0] = (self.emissions[:,self.obs_idx[observations[0]]] \
                         if observations[0] in self.obs_idx \
                         else np.ones(m)*_NINF_LOG) \
                         + self.transitions[self.state_idx[START],:]

    # now calculate rest of the viterbi matrix
    for i in range(1,n):
      for j in range(m):
        c = viterbi_prob[:,i-1] + self.transitions[:,j]
        if observations[i] in self.obs_idx:
          c += self.emissions[j][self.obs_idx[observations[i]]]
        else:
          c += infar
        
        viterbi_prob[j][i] = np.max(c)
        viterbi_path[j][i] = np.argmax(c)

    #done now we can emit the path in reverse direction
    prediction = np.zeros(n+1, dtype=np.int32) # important the data type
    
    # the final state is the one that gave max prob when moving
    # to the end state
    prediction[n] = np.argmax(viterbi_prob[:,n-1] + self.transitions[:,self.state_idx[END]])
    
    for i in range(n-1,0, -1):
      prediction[i] = viterbi_path[prediction[i+1]][i]

    return [self.states[prediction[i+1]] for i in range(n)]

In [None]:
hmm = HMMClassifier()

In [None]:
hmm.train(train_set)

In [None]:
prediction = hmm.fit(test_set[0][0])

In [None]:
prediction

['adp',
 'det',
 'adj',
 'noun',
 '.',
 'noun',
 'verb',
 'det',
 'noun',
 'conj',
 'verb',
 'pron',
 'adp',
 'det',
 'adj',
 'noun',
 '.']

In [None]:
# test using the fit method
from tqdm.notebook import tqdm
from timeit import default_timer as timer
st = timer()
accuracy = 0
for i in tqdm(range(len(test_set))):
  prediction = hmm.fit(test_set[i][0])
  correct = [x[0]==x[1] for x in zip(prediction, test_set[i][1])]
  accuracy += sum(correct)/len(test_set[i][1])
print(accuracy/len(test_set))
en = timer()
print(en - st)

  0%|          | 0/2000 [00:00<?, ?it/s]

0.930611672501294
30.476854848000016


In [None]:
# test using the faster npfit method
from tqdm.notebook import tqdm
from timeit import default_timer as timer
st = timer()
accuracy = 0
for i in tqdm(range(len(test_set))):
  prediction = hmm.npfit(test_set[i][0])
  correct = [x[0]==x[1] for x in zip(prediction, test_set[i][1])]
  accuracy += sum(correct)/len(test_set[i][1])
print(accuracy/len(test_set))
en = timer()
print(en - st)

  0%|          | 0/2000 [00:00<?, ?it/s]

0.9316165983352638
6.78700901000002
