<a href="https://colab.research.google.com/github/SantanaC4/hmm_tagger_treebank/blob/main/hmm_tagger.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Implementation part of speech tagging from treebank corpus using Hidden Markov Model approach

In [3]:
import pandas as pd
from collections import defaultdict
import pprint, time

### Upload Penn Treebank Corpus

In [5]:
# That function filter the pair (word, tag) and remove nonsense (like: ''_'') pair of the dataset

def pair_filter(section):
  if (section == "training"):
    url = 'https://raw.githubusercontent.com/SantanaC4/hmm_tagger_treebank/main/treebank_dataset/Sec0-18_training'
  if (section == "development"):
    url = 'https://raw.githubusercontent.com/SantanaC4/hmm_tagger_treebank/main/treebank_dataset/Sec-19-21_development'
  if (section == "testing"):
    url = 'https://raw.githubusercontent.com/SantanaC4/hmm_tagger_treebank/main/treebank_dataset/Sec-22-24_testing'

  df = pd.read_csv(url, header=None, sep='\n')
  spliting =  [i.split(" ") for i in df[0]]
  pair_wordTag = []
  tags = []
  words = []

  count = 0
  for i in spliting:
    if (len(i) == 1):
      if (i[0] != "''_''"):
          word, tag = i[0].split("_")
          pair_wordTag.append(tuple([word, tag]))
          tags.append(tag)
          words.append(word)
    else:
      for j in i:
        if (j != "''_''" and j != "'_''"):
          word, tag = j.split("_")
          pair_wordTag.append(tuple([word, tag]))
          tags.append(tag)
          words.append(word)
  return (pair_wordTag, tags, words)

In [6]:
pair_wordTag,tags,words= pair_filter("training")
print(words[0:10])
print(tags[0:10])
print(pair_wordTag[0:10])


['Pierre', 'Vinken', ',', '61', 'years', 'old', ',', 'will', 'join', 'the']
['NNP', 'NNP', ',', 'CD', 'NNS', 'JJ', ',', 'MD', 'VB', 'DT']
[('Pierre', 'NNP'), ('Vinken', 'NNP'), (',', ','), ('61', 'CD'), ('years', 'NNS'), ('old', 'JJ'), (',', ','), ('will', 'MD'), ('join', 'VB'), ('the', 'DT')]


### HMM class

In [7]:
class HMM:
  def __init__(self, text, n):
        self.bigram_cnt = defaultdict(int)
        self.unigram_cnt = defaultdict(int)
        self.tag_word_count = defaultdict(int)
        self.transition_probabilities = defaultdict(int)
        self.emmission_probabilities = defaultdict(int)
        self.ngrams(text, n)

  def ngrams(self, text, n):
        n_grams = []
        text = ["START"] + text
        for i in range(len(text)): 
            n_grams.append(tuple(text[i: i + n]))

        n_grams[-1] = tuple([n_grams[-1][0], "END"]) 
        if (n == 2):
          for bigram in n_grams:
              self.bigram_cnt[bigram] += 1
              self.unigram_cnt[bigram[0]] += 1
              self.unigram_cnt[bigram[1]] += 1
        return n_grams

  def transition_probability(self, tags):
        bigrams = self.ngrams(tags, 2)
        for bigram in bigrams:
            self.transition_probabilities[bigram] = self.bigram_cnt[bigram] / self.unigram_cnt[bigram[0]]
        return self.transition_probabilities

  def emmission_probability(self, tagged_words):
        for word, tag in tagged_words:
            self.tag_word_count[tuple([word, tag])] += 1
        for word, tag in tagged_words:
            self.emmission_probabilities[tuple([word, tag])] = self.tag_word_count[tuple([word, tag])] / self.unigram_cnt[tag]
        return self.emmission_probabilities
  
  def Viterbi(self, words, tags):
    state = []
    
    for k, word in enumerate(words):
      #initialise list of probability column for a given observation
      p = []
      for tag in tags:
        if k == 0:
          transition_p = self.transition_probabilities[('.', tag)]
        else:
          transition_p = self.transition_probabilities[state[-1], tag]
        # compute emissino and state probabilities
        emission_p = self.emmission_probabilities[(words[k],tag)]
        state_probability = emission_p * transition_p
        p.append(state_probability)
             
      pmax = max(p)
      # getting state for which probability is maximum
      state_max = tags[p.index(pmax)]
      state.append(state_max)
    return list(zip(words, state))
  

In [8]:
tags_space = pair_filter("training")[1]
tagged_words = pair_filter("training")[0]

a = HMM(tags_space, 2)
a.transition_probability(tags_space)
a.emmission_probability(tagged_words)

test = "Will can see Marry".split()
a.Viterbi(test, tags_space)

[('Will', 'NNP'), ('can', 'MD'), ('see', 'VB'), ('Marry', 'NNP')]

In [None]:
# Code to test all the test sentences
# tagging the test sentences()
tagged_words = pair_filter("testing")[0]
test_untagged_words = pair_filter("testing")[2]
training_untagged_words = pair_filter("training")[2]

start = time.time()
aux = 0
tagged_seq = []
test_batch_size = 50
testdata_len = len(test_untagged_words)
for i in range(test_batch_size, testdata_len, test_batch_size):
  batch = a.Viterbi(test_untagged_words[aux:i], tags_space)
  check = [i for i, j in zip(batch, tagged_words[aux:i]) if i == j]
  print("Accuracy per batch:", len(check)/len(batch)*100)
  print(i,"/",testdata_len)
  tagged_seq += batch
  aux = i
end = time.time()
difference = end-start
 
print("Time taken in seconds: ", difference)
 
# accuracy
check = [i for i, j in zip(tagged_seq, tagged_words) if i == j] 
print(check)
accuracy = len(check)/len(tagged_seq)
print('Viterbi Algorithm Accuracy for the whole test dataset: ', accuracy*100)

Accuracy per batch: 94.0
50 / 128609
Accuracy per batch: 94.0
100 / 128609
Accuracy per batch: 96.0
150 / 128609
Accuracy per batch: 98.0
200 / 128609




---

