<a href="https://colab.research.google.com/github/akshala/POS-tagging-using-HMM-and-viterbi-algorithm/blob/main/hmm_2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from google.colab import files
uploaded = files.upload()

Saving Brown_train.txt to Brown_train.txt


In [1]:
from statistics import mean
from sklearn.model_selection import KFold
import numpy as np

In [2]:
with open('Brown_train.txt') as f:
  text = f.read()
# text

In [3]:
sentences = text.split('\n')
sentences = ['<s>_<s> <s>_<s> ' + elt for elt in sentences if len(elt) > 0]
# sentences

In [4]:
final = []
all_words = []
for sentence in sentences:
  if len(sentence) == 0:
    continue
  words = sentence.split()
  all_words.extend(words)
  word_tag_pair = []
  for word in words:
    word = word.split('_')
    try:
      word_tag_pair.append((word[0], word[1]))
    except IndexError:
      pass
  final.append(word_tag_pair)
# final
len(all_words)

1271491

In [5]:
sentence_lengths = []
for sentence in sentences:
  sentence_lengths.append(len(sentence))
print('max sentence length', max(sentence_lengths))
print('min sentence length', min(sentence_lengths))
print('avg sentence length', mean(sentence_lengths))
print('total number of sentences', len(sentence_lengths))

max sentence length 1976
min sentence length 19
avg sentence length 194.82925015867258
total number of sentences 55145


In [6]:
vocab_word_tag = {}
tag_word_dict = {}
for sentence in final:
  for pair in sentence:
    word = pair[0].lower()
    tag = pair[1]
    if word in vocab_word_tag:
      vocab_word_tag[word].add(tag)
    else:
      vocab_word_tag[word] = set()
      vocab_word_tag[word].add(tag)
    if tag in tag_word_dict:
      tag_word_dict[tag].add(word)
    else:
      tag_word_dict[tag] = set()
      tag_word_dict[tag].add(word)
print('word key tag value', len(vocab_word_tag))
print('tag key word value', len(tag_word_dict))

word key tag value 49810
tag key word value 473


In [7]:
vocab = list(vocab_word_tag.keys())
vocab.sort()
len(vocab)
# vocab

49810

In [8]:
tags = list(tag_word_dict.keys())
tags.sort()
len(tags)
# tags

473

In [9]:
def get_tag_vocab(input_array):
  vocab_word_tag = {}
  tag_word_dict = {}
  for sentence in input_array:
    for pair in sentence:
      word = pair[0].lower()
      tag = pair[1]
      if word in vocab_word_tag:
        vocab_word_tag[word].add(tag)
      else:
        vocab_word_tag[word] = set()
        vocab_word_tag[word].add(tag)
      if tag in tag_word_dict:
        tag_word_dict[tag].add(word)
      else:
        tag_word_dict[tag] = set()
        tag_word_dict[tag].add(word)
  vocab = list(vocab_word_tag.keys())
  vocab.append('<unknown>')
  vocab.sort()
  tags = list(tag_word_dict.keys())
  tags.sort()
  vocab_word_tag['<unknown>'] = set()
  for elt in tags:
    vocab_word_tag['<unknown>'].add(elt)
  return vocab, tags, vocab_word_tag

In [10]:
num_words_for_tag = {}
for key, val in tag_word_dict.items():
  num_words_for_tag[key] = len(val)
# num_words_for_tag

In [11]:
total_words_vocab = vocab_word_tag.keys()
len(total_words_vocab)

49810

In [12]:
def get_counts(input_array):
  transition_counts = {}
  emission_counts = {}
  tag_counts = {}
  for sentence in input_array:
    prev1_tag = '<s>'
    prev2_tag = '<s>'
    for pair in sentence:
      word = pair[0].lower()
      tag = pair[1]
      if (tag, prev1_tag, prev2_tag) not in transition_counts:
        transition_counts[(tag, prev1_tag, prev2_tag)] = 1
      else:
        transition_counts[(tag, prev1_tag, prev2_tag)] += 1
      if (word, tag) not in emission_counts:
        emission_counts[(word, tag)] = 1
      else:
        emission_counts[(word, tag)] += 1
      if tag not in tag_counts:
        tag_counts[tag] = 1
      else:
        tag_counts[tag] += 1
      prev2_tag = prev1_tag
      prev1_tag = tag
  tag_counts['<unknown>'] = 0
  return transition_counts, emission_counts, tag_counts

In [13]:
def get_2_tag_counts(input_array):
  two_tag_counts = {}
  for sentence in input_array:
    prev = '<s>'
    i = 0
    for pair in sentence:
      tag = pair[1]
      if i != 0:
        if (tag, prev) not in two_tag_counts:
          two_tag_counts[(tag, prev)] = 1
        else:
          two_tag_counts[(tag, prev)] += 1
      i += 1
      prev = tag
  return two_tag_counts

In [14]:
def get_transition_matrix(transition_counts, two_tag_counts, tags):
  transition_prob = {}
  for key, val in transition_counts.items():
    prev1_tag = key[1]
    prev2_tag = key[2]
    transition_prob[key] = val/two_tag_counts[(prev1_tag, prev2_tag)]
  return transition_prob

In [15]:
def get_emission_matrix(emission_counts, tag_counts, tags, vocab):
  emission_prob = {}
  total_tag_count = sum(list(tag_counts.values()))
  for key, val in emission_counts.items():
    tag = key[1]
    emission_prob[key] = val/tag_counts[tag]
  for key, val in tag_counts.items():
    word = '<unknown>'
    emission_prob[(word, key)] = val/total_tag_count
  return emission_prob

In [16]:
def viterbi(transition_probs, emission_probs, test, vocab_word_tag):
  prev1_word = '<s>'
  prev2_word = '<s>'
  predicted = []
  count = 1
  for word in test:
    prob = []
    prob_tag = []
    for prev1_tag in vocab_word_tag[prev1_word]:
      for prev2_tag in vocab_word_tag[prev2_word]:
        try:
          tag_list = vocab_word_tag[word]
        except KeyError:
          tag_list = vocab_word_tag['<unknown>']
          word = '<unknown>'
        for cur_tag in tag_list:
          try:
            if word == '<unknown>':
              prob.append(np.log(transition_probs[(cur_tag, prev1_tag, prev2_tag)]))
            else:
              prob.append(np.log(transition_probs[(cur_tag, prev1_tag, prev2_tag)]) + np.log(emission_probs[(word, cur_tag)]))
          except KeyError:
            prob.append(np.log(emission_probs[(word, cur_tag)]))
          prob_tag.append(cur_tag)
      prev2_word = prev1_word
      prev1_word = word
    n = len(prob)
    max_prob = float('-inf')
    max_prob_tag = ''
    for i in range(n):
      if prob[i] > max_prob:
        max_prob = prob[i]
        max_prob_tag = prob_tag[i]
    predicted.append(max_prob_tag.split('-')[0])
    # print(count, max_prob_tag)
    count += 1
  return predicted

In [17]:
def word_tag_seperate(word_tag_pair_sentences):
  words = []
  tags = []
  for sentence in word_tag_pair_sentences:
    sentence_words = []
    sentences_tags = []
    for pair in sentence:
      words.append(pair[0].lower())
      tags.append(pair[1])
  return words, tags

In [None]:
kFold = KFold(n_splits=3)
predicted_tags = []
all_test_tags = []
for train, test in kFold.split(final):
  training_set = []
  testing_set = []
  for elt in train:
    training_set.append(final[elt])
  for elt in test:
    testing_set.append(final[elt])
  test_words, test_tags = word_tag_seperate(testing_set)
  all_test_tags.append(test_tags)
  vocab, tags, vocab_word_tag = get_tag_vocab(training_set)
  # print(len(test_words))

  transition_counts, emission_counts, tag_counts = get_counts(training_set)
  two_tag_counts = get_2_tag_counts(training_set)
  # print(two_tag_counts)
  # print(transition_counts)
  # print(emission_counts)
  transition_probs = get_transition_matrix(transition_counts, two_tag_counts, tags)
  emission_probs = get_emission_matrix(emission_counts, tag_counts, tags, vocab)
  # print(transition_probs)
  # print(emission_probs)
  predicted_tags.append(viterbi(transition_probs, emission_probs, test_words, vocab_word_tag))
  # break

In [None]:
def accuracy(actual, pred):
  n = len(actual)
  correct = 0
  for i in range(n):
    if actual[i] == pred[i]:
      correct += 1
    # else:
    #   print(actual[i], pred[i])
  return correct/n

In [None]:
for i in range(3):
  print(accuracy(all_test_tags[i], predicted_tags[i]))
  # break

In [23]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
save_all_test_tags = np.array(all_test_tags)
save_predicted_tags = np.array(predicted_tags)
# save_transition_counts = np.array(transition_counts)
# save_emission_counts = np.array(emission_counts)
# save_tag_counts = np.array(tag_counts)
# save_two_tag_counts = np.array(two_tag_counts)

In [None]:
np.save('save_all_test_tags', save_all_test_tags)
np.save('save_predicted_tags', save_predicted_tags)
# np.save('save_transition_counts', save_transition_counts)
# np.save('save_emission_counts', save_emission_counts)
# np.save('save_tag_counts', save_tag_counts)
# np.save('save_two_tag_counts', save_two_tag_counts)

In [None]:
!cp save_all_test_tags.npy "drive/My Drive/NLP"
!cp save_predicted_tags.npy "drive/My Drive/NLP"
# !cp save_transition_counts.npy "drive/My Drive/NLP"
# !cp save_emission_counts.npy "drive/My Drive/NLP"
# !cp save_tag_counts.npy "drive/My Drive/NLP"
# !cp save_two_tag_counts.npy "drive/My Drive/NLP"

In [27]:
TEST_TAG = np.load('drive/My Drive/NLP/save_all_test_tags.npy')
PRED_TAG = np.load('drive/My Drive/NLP/save_all_test_tags.npy')