### Library used in this program 

In [None]:
import nltk
nltk.download('brown')
nltk.download('universal_tagset')
import numpy as np
import pandas as pd
from nltk.corpus import brown
from tqdm import tqdm
from sklearn.model_selection import train_test_split, KFold
import seaborn as sns
import matplotlib.pyplot as plt
from numpy.linalg import norm
from gensim.test.utils import datapath, get_tmpfile
from gensim.models import KeyedVectors
from gensim.scripts.glove2word2vec import glove2word2vec

[nltk_data] Downloading package brown to /root/nltk_data...
[nltk_data]   Unzipping corpora/brown.zip.
[nltk_data] Downloading package universal_tagset to /root/nltk_data...
[nltk_data]   Unzipping taggers/universal_tagset.zip.


### DataSet

In [None]:
# splitting the corpus into two for now
train_corpus, test_corpus = train_test_split(brown.tagged_sents(tagset='universal'), test_size=0.2)

In [None]:
class storage:
  #creating dataset for storing the computed data.
  def __init__(self):
    self.id = 0
    self.index2tag = dict()
    self.value2id = dict()
    self.values = set()

  def get_length():
    return self.id
  
  def insert(self, value):
    self.index2tag[self.id] = value
    self.value2id[value] = self.id
    self.values.add(value)
    self.id += 1
    
  def retrive(self, key, method ='id'):
    if method == 'id':
      return self.index2tag[key]
    elif key in self.values:
      return self.value2id[key]
    else: 
      return None

# val variable for hadling some cases
alpha = 0.000001 

### HMM Model

In [None]:
def get_word(train_corpus):
  words = storage()

  for sent in train_corpus:
    for word,tag in sent:
      if words.retrive(word.lower(), 'temp') == None:
        words.insert(word.lower())
  return words

def get_tag():
  tag_list = set([tag for words,tag in brown.tagged_words(tagset='universal')])
  tags = storage()

  for tag in tag_list:
    if tags.retrive(tag, 'temp') == None:
      tags.insert(tag)
  return tags

In [None]:
words = get_word(train_corpus)
tags = get_tag()

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:

glove_file = datapath('/content/drive/MyDrive/Glove/glove.6B.100d.txt')
word2vec_glove_file = get_tmpfile("glove.6B.100d.word2vec.txt")
glove2word2vec(glove_file, word2vec_glove_file)

(400000, 100)

In [None]:
model = KeyedVectors.load_word2vec_format(word2vec_glove_file)

In [None]:
temp = list(words.values)

In [None]:
word_vec_list = []
word_vec_list = [model[word] for word in temp if word in model]

In [None]:
print(len(word_vec_list))
print(len(temp))

37194
45025


In [None]:
def get_hmm_matrix(train_corpus, words, tags, alpha):
  transmission_matrix = np.zeros([tags.id,tags.id])
  emission_matrix = np.zeros([tags.id,words.id])
  tags_prob = np.zeros([tags.id])

  for sent in tqdm(train_corpus):
    for index in range(len(sent)):
      word = sent[index][0]
      tag = sent[index][1]

      word_index = words.retrive(word.lower(), 'temp')
      tag_index = tags.retrive(tag,'temp')

      tags_prob[tag_index] +=1
      emission_matrix[tag_index,word_index] +=1
      if index != len(sent) - 1:
        next_tag = tags.retrive(sent[index + 1][1], 'temp')
        transmission_matrix[tag_index,next_tag] +=1

  transmission_matrix = np.divide((transmission_matrix+ alpha), (np.reshape(tags_prob,[-1,1])+(alpha*12)))
  emission_matrix = np.divide((emission_matrix+alpha), (np.reshape(tags_prob,[-1,1])+ alpha*12))
  tags_prob = np.divide(tags_prob, np.sum(tags_prob))


  transmission_matrix[transmission_matrix == 0] = alpha
  emission_matrix[emission_matrix == 0] =alpha
  tags_prob[tags_prob == 0] = alpha

  return transmission_matrix,emission_matrix,tags_prob



In [None]:
transmission_matrix,emission_matrix,tags_prob = get_hmm_matrix(train_corpus,words,tags,alpha)

100%|██████████| 45872/45872 [00:03<00:00, 13723.98it/s]


In [None]:
# Viterbi algorithm

def get_pos(sent_list, transmission_matrix, emission_matrix, tags_prob, words,word_vec_list, tags,model, alpha = 0.000001):
  if len(sent_list) == 0:
    return []
  A = np.array(word_vec_list)
  seq_score_matrix = np.zeros([tags.id, len(sent_list)])
  back_pointer = np.zeros([tags.id, len(sent_list)])
  # First step in viterbi Intialization
  word_id = words.retrive(sent_list[0].lower(), 'temp')
  for i in range(tags.id):
    if word_id == None:
      # if sent_list[0].lower() in model:
      #   B = np.array(model[sent_list[0].lower()])
      #   cosine = np.dot(A,B)/(norm(A, axis=1)*norm(B))
      #   seq_score_matrix[i,0] = tags_prob[i] * emission_matrix[i,np.argmax(cosine)]
      # else:
        seq_score_matrix[i,0] = tags_prob[i] * alpha
    else:
      seq_score_matrix[i,0] = tags_prob[i] * emission_matrix[i,word_id]
    back_pointer[i,0] = 0

  # Second step is Iteration
  for p in range(len(sent_list)):
    if p!= 0:
      for i in range(tags.id):
        word_id = words.retrive(sent_list[p].lower(), 'temp')
        transmission_vector = np.multiply(seq_score_matrix[:, p-1], transmission_matrix[:, i])
        tag_max_arg = np.argmax(transmission_vector)
        # print(len(transmission_vector
        back_pointer[i,p] = tag_max_arg
        # print(back_pointer)

        if word_id == None:
          if sent_list[0].lower() in model:
            B = np.array(model[sent_list[0].lower()])
            cosine = np.dot(A,B)/(norm(A, axis=1)*norm(B))
            seq_score_matrix[i,p] = transmission_vector[tag_max_arg] * emission_matrix[i,np.argmax(cosine)]
          else:
            seq_score_matrix[i,p] = transmission_vector[tag_max_arg] * alpha
        else:
          seq_score_matrix[i,p] = transmission_vector[tag_max_arg] * emission_matrix[i,word_id]
          
  # Third Step is Sequence Identification
  tag_index = np.zeros([len(sent_list)])
  tag_index[-1] = np.argmax(seq_score_matrix[:, len(sent_list)-1])
  # print(back_pointer)
  # print(tag_index)
  for i in reversed(range(len(sent_list)-1)):
    tag_index[i] = back_pointer[int(tag_index[i+1]), int(i+1)]
  # print(tag_index)
  return [tags.retrive(index, 'id') for index in tag_index]





In [None]:
import re
sente = 'jack came back from work and he reeks of fish.'
sente = re.findall( r'\w+|[^\s\w]+', sente)

In [None]:
for sent in train_corpus:
    for word,tag in sent:
      if word == 'reeked':
        print(tag)

VERB


In [None]:
model.most_similar('reeks')

[('reeked', 0.7333507537841797),
 ('reeking', 0.656512975692749),
 ('dullness', 0.6038519144058228),
 ('smacks', 0.5738084316253662),
 ('phoniness', 0.5679745674133301),
 ('tastelessness', 0.5672473311424255),
 ('superficiality', 0.5661382675170898),
 ('weirdness', 0.5652622580528259),
 ('meanness', 0.5648483633995056),
 ('bespeaks', 0.563983678817749)]

In [None]:
words.value2id['reeked']

39385

In [None]:
sente[0]

'jack'

In [None]:
print(get_pos(sente, transmission_matrix,emission_matrix,tags_prob, words,word_vec_list, tags,model))

['NOUN', 'VERB', 'ADV', 'ADP', 'NOUN', 'CONJ', 'PRON', 'NOUN', 'ADP', 'NOUN', '.']


In [None]:
def prediction(test_corpus, transmission_matrix, emission_matrix, tags_prob, words, tags, alpha):
  confusion_matrix = np.zeros([tags.id,tags.id], dtype=np.int32)

  for test_sent in tqdm(test_corpus):
    test_tag = [item[1] for item in test_sent]
    test_token = [item[0] for item in test_sent]

    predicted_tag = get_pos(test_token,transmission_matrix, emission_matrix, tags_prob, words,word_vec_list, tags,model, alpha)
    for (predicted, test) in zip(predicted_tag, test_tag):
      confusion_matrix[tags.retrive(predicted,'temp'), tags.retrive(test,'temp')] += 1
  
  return confusion_matrix



### Cross Validation

In [None]:
def cross_validation():
  tags = get_tag()
  confusion_matrix = np.zeros([tags.id,tags.id], dtype=np.int32)
  dataset = np.array(brown.tagged_sents(tagset='universal'))
  kfold = KFold(n_splits=5,shuffle=True)
  kfold.get_n_splits(dataset)

  for train, test in kfold.split(dataset):
    train_courpus = dataset[train]
    test_courpus = dataset[test]
    print("Train Data Size: ",len(train))
    print("Test Data Size: ",len(test))
    words = get_word(train_corpus)
    transmission_matrix,emission_matrix,tags_prob = get_hmm_matrix(train_corpus,words,tags,alpha)
    confusion_matrix += prediction(test_corpus, transmission_matrix, emission_matrix, tags_prob, words, tags, alpha)

  return confusion_matrix

In [None]:
confusion_matrix = cross_validation()

  after removing the cwd from sys.path.


Train Data Size:  45872
Test Data Size:  11468


100%|██████████| 45872/45872 [00:03<00:00, 14365.76it/s]
 27%|██▋       | 3150/11468 [03:58<10:46, 12.86it/s]

### overall accuracy

In [None]:
#overall accuracy
total_examples = np.sum(confusion_matrix)
correct_predictions = np.trace(confusion_matrix)
print('The overall accuracy of the hmm model is:', correct_predictions * 100 / total_examples)

In [None]:
# plotting the heat map
plt.figure(figsize = (20, 20))
tag_list = [tags.retrive(i, 'id') for i in range(tags.id)]
confusion_figure = sns.heatmap(confusion_matrix, annot=True, xticklabels=tag_list, yticklabels=tag_list)

In [None]:
per_pos_dict = {'tag': [], 'precision': [], 'recall': [], 'f1-score': []}
for tag_id in range(tags.id):
  per_pos_dict['precision'].append(confusion_matrix[tag_id, tag_id] / np.sum(confusion_matrix[tag_id, :]))
  per_pos_dict['recall'].append(confusion_matrix[tag_id, tag_id] / np.sum(confusion_matrix[:, tag_id]))
  per_pos_dict['tag'].append(tags.retrive(tag_id, 'id'))
  per_pos_dict['f1-score'].append(2 * per_pos_dict['precision'][tag_id] * per_pos_dict['recall'][tag_id] / (per_pos_dict['recall'][tag_id] + per_pos_dict['precision'][tag_id]))
per_pos_df = pd.DataFrame(per_pos_dict)
per_pos_df.to_csv('hmm_per_pos_accuracy.csv')

In [None]:
print(per_pos_df)

In [None]:
Overall_precision = np.sum(per_pos_df['precision'])/12
Overall_recall = np.sum(per_pos_df['recall'])/12

In [None]:
f1_score = 2 * Overall_precision * Overall_recall / ( Overall_precision + Overall_recall)
f_half_score = 1.25 * Overall_precision * Overall_recall / ( (0.25*Overall_precision) + Overall_recall)
f2_score = 5 * Overall_precision * Overall_recall / ( (4*Overall_precision) + Overall_recall)

### Fbeta = ((1 + beta^2) * Precision * Recall) / (beta^2 * Precision + Recall)

*   F0.5-Measure (beta=0.5): More weight on precision, less weight on recall.

*   F2-Measure (beta=2.0): Less weight on precision, more weight on recall

*   F1-Measure (beta=1.0): Balance the weight on precision and recall.



In [None]:
print("Precision: ",Overall_precision)
print("Recall: ",Overall_recall)
print("F1_score: ",f1_score)
print("F0.5_score: ",f_half_score)
print("F2_score: ",f2_score)
