## Loading Modules

In [None]:
# Importing libraries
import nltk
from IPython.display import display
import numpy as np
import pandas as pd
import pickle
import re
import string
import seaborn as sns
from sklearn.model_selection import train_test_split
import sklearn.metrics as metrics
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
import matplotlib.pyplot as plt
import warnings
warnings.filterwarnings('ignore')


In [None]:
nltk.download('brown')
nltk.download('universal_tagset')

# Train and Test Data 
#### Note : If custom train and test set are used please make sure the data type and format is same, and you can skip these cells (But run other cells with functions) or can assign your train,test set to the ones mentioned below

### train_set, test_set : List[ List [ tuple(word,tag) ] ]

#### Run HMM_Viterbi_Train() and HMM_Viterbi_Test() with your custom input

In [None]:
nltk_data = list(nltk.corpus.brown.tagged_sents(tagset='universal'))

In [None]:
train_set,test_set =train_test_split(nltk_data,train_size=0.80,test_size=0.20,random_state = 101)

In [None]:
print(len(test_set))
print(len(train_set))

## Unigram Probability and Tag Counts

In [None]:
allWords = {}
allTags = {}
def tag_and_words(train_set):
  global allWords
  global allTags
  allWords = {}
  allTags = {}
  allWords = {word for sent in train_set for word,_ in sent}
  allTags = sorted({tag for sent in train_set for _,tag in sent})



In [None]:
def generate_Unigram_Prob(train_set):
  tagCounts ={ tag:0 for tag in allTags}
  for sent in train_set:
    for _,tag in sent:
      tagCounts[tag] += 1
  totalTagCount = sum(tagCounts.values())
  tagUnigramProb = {}
  if totalTagCount != 0:
    tagUnigramProb = { tag: (val/totalTagCount) for tag,val in zip(tagCounts.keys(), tagCounts.values()) }
  return tagUnigramProb, tagCounts

## Generating Emission Probability


In [None]:
def generate_Emission_Prob_Table(train_set):
  emissionProbTable = pd.DataFrame(0.0, columns=allTags, index=allWords)
  for sent in train_set:
    for word,tag in sent:
      emissionProbTable.loc[ word, tag] += 1
  for tag in allTags:
    total = sum(emissionProbTable[tag])
    if total != 0:
      emissionProbTable[tag] = emissionProbTable[tag].div(total)

  return emissionProbTable

## Generating BiGram Transition Probability



In [None]:
def myBigrams(sent):
  bigrams = []
  for i in range(len(sent)-1):
    bigrams.append((sent[i], sent[i+1]))
  return bigrams

def generate_Bigram_Transition_Table(train_set):
  tagBigramProb = pd.DataFrame(0.0,columns=allTags, index=allTags)
  for sent in train_set:
    bi = myBigrams(sent)
    for b1,b2 in bi:
      tagBigramProb.loc[ b1[1], b2[1] ] += 1
  biGramCount = tagBigramProb.copy()

  for tag in allTags:
    total = sum(tagBigramProb.loc[tag, :])
    if total != 0:
      tagBigramProb.loc[tag, :] = tagBigramProb.loc[tag, :].div(total) 
  
  return tagBigramProb, biGramCount




## Loading Tag Data

In [None]:
with open('./pronouns.pkl','rb') as file:
  pronouns = pickle.load( file)

## Generating Trigram Transition Probability

In [None]:
def myTrigrams(sent):
  trigrams = []
  for i in range(len(sent)-2):
    trigrams.append((sent[i], sent[i+1], sent[i+2]))
  return trigrams

def generate_Trigram_Transition_Table(train_set):
  trigramGiven = sorted({ "({},{})".format(t1,t2) for t1 in allTags for t2 in allTags})
  tagTrigramProb = pd.DataFrame(0.0,columns=allTags, index=trigramGiven)
  for sent in train_set:
    tri = myTrigrams(sent)
    for t1,t2,t3 in tri:
      tagTrigramProb.loc[ "({},{})".format(t1[1], t2[1]) , t3[1] ] += 1

  trigramCount = tagTrigramProb.copy()
  
  for tag in trigramGiven:
    total = sum(tagTrigramProb.loc[tag, :])
    if total != 0:
      tagTrigramProb.loc[tag, :] = tagTrigramProb.loc[tag, :].div(total)

  return tagTrigramProb, trigramCount

In [None]:
'''for tag in sorted({ "({},{})".format(t1,t2) for t1 in allTags for t2 in allTags}):
  total = sum(x[0].loc[tag, :])
  print(total)'''

## Trigram Transition Probabilities with deleted interpolation

In [None]:
def deleted_interpolation(tagCounts, biGramCount, trigramCount):
  l1 = l2 = l3 = 0
  triplets = [(a,b,c) for a in allTags for b in allTags for c in allTags]
  for a,b,c in triplets:
    triCount = trigramCount.loc["({},{})".format(a,b), c]
    if triCount > 0:
      try: 
        k1 = (triCount-1)/(biGramCount.loc[a, b] - 1)
      except :
        k1 = 0.0
      try:
        k2 = (biGramCount.loc[a, b] - 1)/( tagCounts[a] - 1)
      except :
        k2 = 0.0
      try:
        k3 = ( tagCounts[a] - 1)/(sum(tagCounts.values()) -1)
      except :
        k3 = 0.0
      #print(k1,k2,k3)
      k = np.argmax([k1, k2, k3])
      if k==0:
        l1 += triCount
      if k==1:
        l2 += triCount
      if k==2:
        l3 += triCount
  wts = [l1, l2, l3]
  return [l/(sum(wts)) for l in wts]



In [None]:
def generate_Trigram_Prob_Table_D(tagUnigramProb, tagBigramProb, tagTrigramProb, lambdas):
  trigramGiven = sorted({ "({},{})".format(t1,t2) for t1 in allTags for t2 in allTags})
  tagTrigramProb_D = pd.DataFrame(0.0,columns=allTags, index=trigramGiven)

  triplets = [(a,b,c) for a in allTags for b in allTags for c in allTags]
  for a,b,c in triplets:
    r = "({},{})".format(a,b)
    tagTrigramProb_D.loc[ r, c] += (
        lambdas[0]*tagTrigramProb.loc[ r, c] +
        lambdas[1]*tagBigramProb.loc[ b, c] +
        lambdas[2]*tagUnigramProb[c]                               
    )
                                
  return tagTrigramProb_D



## Viterbti Algorithm

In [None]:
def get_word(sent,k):
        if k < 0:
            return '.'
        else:
            return sent[k]

def get_tags(k):
        if k == -1:
            return set(['.'])
        if k == 0:
            return set(['.'])
        else:
            return allTags

def get_transition(transitionProbTable, w, u, v):
  if w=='':
    w = '.'
  if u=='':
    u = '.'
  if v=='':
    v = '.'
  r = "({},{})".format(w, u)
  return transitionProbTable.loc[ r, v]

def get_emission(emissionProbTable, word, tag, unseenWord):
  if unseenWord:
    if findTag_UnseenWords(word) == tag:
      return 1
    else :
      return 0
  else:
    return emissionProbTable.loc[word, tag]

def get_V(V,n,u,v):
  try:
    return V[n,u,v]
  except:
    return 0

def findTag_UnseenWords(word):
    if not re.search(r'\w', word):
        return '.'
    elif word.lower() in pronouns:
        return 'PRON'
    elif re.search(r'\d', word):
        return 'NUM'
    elif re.search(r'(ion\b|ian\b|ty\b|ics\b|ment\b|ence\b|ance\b|ness\b|ist\b|ism\b)',word):
        return 'NOUN'
    elif word.istitle():
        return 'NOUN'
    elif re.search(r'(ate\b|fy\b|ize\b|\ben|\bem|ing\b|ed\b|es\b)', word):
        return 'VERB'
    elif re.search(r'(\bun|\bin|ble\b|ry\b|ish\b|ious\b|ical\b|\bnon|ful\b|less\b)',word):
        return 'ADJ'
    elif re.search(r'(\*T?\*?-[0-9]+$)', word):
        return 'X'
    else:
        return 'NOUN'


In [None]:
def viterbi(sent, transitionProbTable, emissionProbTable):
        V = {}
        path = {}
        
        V[0,'.','.'] = 1
        path['.','.'] = []
        
        
        for k in range(1,len(sent)+1):
            temp_path = {}
            word = get_word(sent,k-1)

            unseenWord = False
            if word not in allWords:
              if word.lower() not in allWords:
                unseenWord = True
              else :
                word = word.lower()
           
            for u in get_tags(k-1):
                  
                  for v in get_tags(k):
                      V[k,u,v],prev_w = max([( get_V(V,k-1,w,u) * get_transition(transitionProbTable,w,u,v) * get_emission(emissionProbTable,word,v, unseenWord),w) for w in get_tags(k-2)])
                      temp_path[u,v] = path[prev_w,u] + [v]
                      
            path = temp_path


        prob,umax,vmax = max([(get_V(V,len(sent),u,v) * get_transition(transitionProbTable,u,v,'.'),u,v) for u in allTags for v in allTags])
        
        return path[umax,vmax]

## Training HMM POS Tagger

In [None]:
def HMM_Viterbi_Train(train_set):
  print("# Generating Transition & Emission Probability Table...")
  tag_and_words(train_set)
  tagUnigramProb, tagCounts = generate_Unigram_Prob(train_set)
  #print("Unigram Transition Probability",tagUnigramProb)
  print("# Calculating Bigram Transition Prob...")
  tagBigramProb, biGramCount = generate_Bigram_Transition_Table(train_set)
  #print(tagBigramProb)
  print("# Calculating Trigram Transition Prob...")
  tagTrigramProb, trigramCount = generate_Trigram_Transition_Table(train_set)
  #print(tagTrigramProb)
  lambdas = deleted_interpolation(tagCounts, biGramCount, trigramCount)
  print("# Calculating Trigram Transition Prob Deleted Interpolation...")
  transition_d = generate_Trigram_Prob_Table_D(tagUnigramProb, tagBigramProb, tagTrigramProb, lambdas)
  #print(transition_d)
  print("# Calculating Emission Prob...")
  emission = generate_Emission_Prob_Table(train_set)
  #print(emission.head(15))
  return emission, tagBigramProb, tagTrigramProb, transition_d

## Testing HMM POS Tagger

In [None]:
def HMM_Viterbi_Test(test_set, transitionProbTable, emissionProbTable):

  output_tags = []
  input_word_tags = []
  counter = 0
  for sent in test_set:
    counter += 1
    print(counter, end=" ")
    if not (counter%40):
      print("\n")
    sent_words = [ word for word,_ in sent]
    input_word_tags += [tup for tup in sent]
    output_tags += viterbi(sent_words, transitionProbTable, emissionProbTable)
    
    
  return input_word_tags, output_tags

## Calling Train & Test Function

In [None]:
emission, bi_Transition, tri_transition, tri_d_transition = HMM_Viterbi_Train(train_set)

In [None]:
emission

In [None]:
bi_Transition

In [None]:
tri_transition

In [None]:
tri_d_transition

## Output for test data set :
# Caution \#: for large number of sentences longer waiting time

In [None]:
input_word_tag, output_tags = HMM_Viterbi_Test(test_set, tri_d_transition, emission)

In [None]:
print("Accuracy: {}".format(len([1 for i in range(len(output_tags)) if input_word_tag[i][1]==output_tags[i]])/len(output_tags) ))

# Custom Sentence Testing
### For your own sentence prediction run below 2 cells

In [None]:
inp_text = input("Enter your sentence : ")

In [None]:
viterbi( inp_text.split(" ") , tri_d_transition, emission)

# Analysis metrics and Confusion matrix

In [None]:
actual_tags = [t for _,t in input_word_tag]

In [None]:
predicted_tags = output_tags

In [None]:
print(metrics.classification_report(actual_tags, predicted_tags,labels=allTags))

In [None]:
cm = metrics.confusion_matrix(actual_tags, predicted_tags,labels=allTags)

df_cm = pd.DataFrame(cm, index = allTags,
                  columns = allTags)
fig=plt.figure(figsize = (10,7))


cmap = sns.cm.rocket_r
ax=sns.heatmap(df_cm, annot=True, yticklabels=allTags , cmap=cmap, fmt='d')
ax.set_xlabel('Predicted '+r'$\longrightarrow$')
ax.set_ylabel('Actual '+r'$\longrightarrow$')
plt.yticks(rotation=0) 
ax.set_title('Confusion Matrix of HMM-Viterbi Trigram Model')
plt.show()