In [51]:
import nltk
from nltk.tokenize import sent_tokenize
from sklearn.model_selection import train_test_split
import time
import numpy as np
import pandas as pd

In [80]:
class data_preprocessing:

  def __init__(self):
    self.train_labelled_data = []
    self.test_labelled_data = []
    self.tags = {}
    self.vocab = {}
    self.test_set = []
    self.train_set = []
    self.create_tags_vocab()

  def file_processing(self,file_name = "train.txt"):

    # Reading the dataset file
    with open(file_name, 'r') as file:
      data = file.read()
    print("Train Corpus Sample:")
    print(f"{data[0:50]} \n")

    # Extracting the sentences from data and creating a sentences list []
    sentences = data.strip().split('\n\n')
    print("Sentences Sample:")
    print(sentences[:2])

    processed_sentences = []

    for sentence in sentences:
      sent = []
      # Split the sentence into individual lines (tokens and tags)
      lines = sentence.strip().split('\n')
      # Extract the tokens and tags from each line
      tokens_tags = [line.split() for line in lines]
      # Extract the tokens and tags into separate lists
      for token_tag in tokens_tags:
        token, tag = token_tag[0], token_tag[1]
        sent.append((token, tag))
      processed_sentences.append(sent)
    
    return processed_sentences

  def split_train_test(self):
    processed_sentences = self.file_processing()

    # Showing sample data
    for token_tag in processed_sentences[:2]:
      for tuple in token_tag:
        print(tuple)
    print("\n")

    # split data into training and validation set in the ratio 80:20
    self.train_set,self.test_set = train_test_split(processed_sentences,train_size=0.80,test_size=0.20,random_state = 100)

    # create list of train and test tagged words
    self.train_labelled_data = [tuple for token_tag in self.train_set for tuple in token_tag]
    self.test_labelled_data = [tuple for token_tag in self.test_set for tuple in token_tag]
    print(f"Length of the training labels: \t{len(self.train_labelled_data)}")
    # check some of the train labelled data.
    print(f'Training labelled words \t {self.train_labelled_data[:5]}')

    print(f"Length of the testing labels: \t{len(self.test_labelled_data)}")
    # check some of the test labelled data.
    print(f'Testing labelled words \t {self.test_labelled_data[:5]}')

    print("\n")

  def create_tags_vocab(self):
    self.split_train_test()
    #use set datatype to check how many unique tags are present in training data
    self.tags = {tag for word,tag in self.train_labelled_data}
    print(f"Length of tags in training labelled data: \t{len(self.tags)}")
    print(self.tags)
    print("\n")

    # check total words in vocabulary
    self.vocab = {word for word,tag in self.train_labelled_data}
    print(f"Length of Vocab in training labelled data: \t{len(self.vocab)}")
    print(self.vocab)
    print("\n")


In [86]:
processed_data = data_preprocessing()
train_labelled_data = processed_data.train_labelled_data
test_set, train_set, vocab, tags = processed_data.test_set, processed_data.train_set, processed_data.vocab, processed_data.tags

Train Corpus Sample:
Confidence NN B-NP
in IN B-PP
the DT B-NP
pound NN 

Sentences Sample:
["Confidence NN B-NP\nin IN B-PP\nthe DT B-NP\npound NN I-NP\nis VBZ B-VP\nwidely RB I-VP\nexpected VBN I-VP\nto TO I-VP\ntake VB I-VP\nanother DT B-NP\nsharp JJ I-NP\ndive NN I-NP\nif IN B-SBAR\ntrade NN B-NP\nfigures NNS I-NP\nfor IN B-PP\nSeptember NNP B-NP\n, , O\ndue JJ B-ADJP\nfor IN B-PP\nrelease NN B-NP\ntomorrow NN B-NP\n, , O\nfail VB B-VP\nto TO I-VP\nshow VB I-VP\na DT B-NP\nsubstantial JJ I-NP\nimprovement NN I-NP\nfrom IN B-PP\nJuly NNP B-NP\nand CC I-NP\nAugust NNP I-NP\n's POS B-NP\nnear-record JJ I-NP\ndeficits NNS I-NP\n. . O", "Chancellor NNP O\nof IN B-PP\nthe DT B-NP\nExchequer NNP I-NP\nNigel NNP B-NP\nLawson NNP I-NP\n's POS B-NP\nrestated VBN I-NP\ncommitment NN I-NP\nto TO B-PP\na DT B-NP\nfirm NN I-NP\nmonetary JJ I-NP\npolicy NN I-NP\nhas VBZ B-VP\nhelped VBN I-VP\nto TO I-VP\nprevent VB I-VP\na DT B-NP\nfreefall NN I-NP\nin IN B-PP\nsterling NN B-NP\nover IN B-PP\nthe

In [97]:
class HMM:

  def __init__(self, train_bag):
    self.train_bag = train_bag
   
  # Emission Probability
  def emission_prob(self, word, tag):
      tag_list = [pair for pair in self.train_bag if pair[1]==tag]
      count_tag = len(tag_list)#total number of times the passed tag occurred in train_bag
      w_given_tag_list = [pair[0] for pair in tag_list if pair[0]==word]
      #now calculate the total number of times the passed word occurred as the passed tag.
      count_w_given_tag = len(w_given_tag_list)
  
      
      return (count_w_given_tag, count_tag)

  # Transition Probability
  def transition_prob(self, t2, t1):
      tags = [pair[1] for pair in self.train_bag]
      count_t1 = len([t for t in tags if t==t1])
      count_t2_t1 = 0
      for index in range(len(tags)-1):
          if tags[index]==t1 and tags[index+1] == t2:
              count_t2_t1 += 1
      return (count_t2_t1, count_t1)

  # Trainsition Matrix
  def create_transition_matrix(self, tags):
    # creating t x t transition matrix of tags, t= no of tags
    # Matrix(i, j) represents P(jth tag after the ith tag)
    
    tags_matrix = np.zeros((len(tags), len(tags)), dtype='float32')
    for i, t1 in enumerate(list(tags)):
        for j, t2 in enumerate(list(tags)): 
            tags_matrix[i, j] = self.transition_prob(t2, t1)[0]/self.transition_prob(t2, t1)[1]
    
    print(tags_matrix)
    # convert the matrix to a df for better readability
    #the table is same as the transition table shown in section 3 of article
    tags_df = pd.DataFrame(tags_matrix, columns = list(tags), index=list(tags))
    display(tags_df)
    return tags_df

  def Viterbi(self, words, tags_df):
    state = []
    T = list(set([pair[1] for pair in self.train_bag]))
     
    for key, word in enumerate(words):
        #initialise list of probability column for a given observation
        p = [] 
        for tag in T:
            if key == 0:
                transition_p = tags_df.loc['.', tag]
            else:
                transition_p = tags_df.loc[state[-1], tag]
                 
            # compute emission and state probabilities
            emission_p = self.emission_prob(words[key], tag)[0]/self.emission_prob(words[key], tag)[1]
            state_probability = emission_p * transition_p    
            p.append(state_probability)
             
        pmax = max(p)
        # getting state for which probability is maximum
        state_max = T[p.index(pmax)] 
        state.append(state_max)
    return list(zip(words, state))

  def model_accuracy(self, tagged_seq, input_tagged_words):
    check = [i for i, j in zip(tagged_seq, input_tagged_words) if i == j] 
    accuracy = len(check)/len(tagged_seq)
    print('Viterbi Algorithm Accuracy: ',accuracy*100)

In [94]:
import random
# Let's test our Viterbi algorithm on a few sample sentences of test dataset
random.seed(1234)      #define a random seed to get same sentences when run multiple times
 
# choose random 10 numbers
rndom = [random.randint(1,len(test_set)) for x in range(10)]
 
# list of 10 sents on which we test the model
test_run = [test_set[i] for i in rndom]
 
# list of tagged words
test_run_base = [tup for sent in test_run for tup in sent]
 
# list of untagged words
test_tagged_words = [tup[0] for sent in test_run for tup in sent]

In [95]:
#Here We will only test 10 sentences to check the accuracy
#as testing the whole training set takes huge amount of time
model = HMM(train_labelled_data)
tags_df = model.create_transition_matrix(tags)

start = time.time()
tagged_seq = model.Viterbi(test_tagged_words, tags_df=tags_df)
end = time.time()

difference = end-start
print("Time taken in seconds: ", difference)

[[0.00204271 0.05292479 0.0003714  ... 0.         0.0007428  0.02544104]
 [0.25849673 0.00196078 0.         ... 0.         0.00098039 0.        ]
 [0.24447334 0.03381014 0.         ... 0.         0.         0.00390117]
 ...
 [0.         0.         0.         ... 0.         0.         0.        ]
 [0.25829384 0.08530806 0.         ... 0.         0.         0.00236967]
 [0.00066667 0.00066667 0.         ... 0.         0.         0.        ]]


Unnamed: 0,VBD,PRP,WDT,UH,#,NNPS,JJS,NNP,:,NN,...,SYM,RBR,RB,WRB,EX,JJR,POS,WP$,WP,PRP$
VBD,0.002043,0.052925,0.000371,0.0,0.0,0.0,0.000186,0.044383,0.002786,0.045682,...,0.0,0.001114,0.076137,0.001114,0.0013,0.009842,0.0,0.0,0.000743,0.025441
PRP,0.258497,0.001961,0.0,0.0,0.0,0.0,0.0,0.000654,0.001961,0.002614,...,0.0,0.00098,0.056536,0.001634,0.000327,0.001307,0.0,0.0,0.00098,0.0
WDT,0.244473,0.03381,0.0,0.0,0.0,0.0,0.0,0.014304,0.0013,0.015605,...,0.0,0.0,0.029909,0.0,0.0013,0.0,0.0013,0.0,0.0,0.003901
UH,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.076923,0.0,0.0,0.0
#,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
NNPS,0.104348,0.0,0.0,0.0,0.0,0.005797,0.0,0.046377,0.017391,0.075362,...,0.0,0.0,0.008696,0.0,0.0,0.002899,0.037681,0.0,0.014493,0.0
JJS,0.003413,0.013652,0.0,0.0,0.0,0.003413,0.0,0.006826,0.006826,0.419795,...,0.0,0.0,0.023891,0.003413,0.0,0.0,0.0,0.0,0.0,0.0
NNP,0.062571,0.000881,0.001385,0.0,0.0,0.011457,0.0,0.390092,0.006673,0.063641,...,0.0,0.000378,0.007932,0.000692,0.0,0.000126,0.047904,0.0,0.000504,0.000189
:,0.018913,0.043735,0.022459,0.0,0.0,0.0,0.0,0.105201,0.001182,0.037825,...,0.0,0.001182,0.056738,0.008274,0.003546,0.004728,0.0,0.0,0.00591,0.010638
NN,0.048801,0.005154,0.007773,0.0,4.2e-05,0.000333,4.2e-05,0.009976,0.011556,0.116764,...,8.3e-05,0.004115,0.017043,0.001995,0.000249,0.001372,0.022197,0.000166,0.002369,0.000208


Time taken in seconds:  313.7441916465759
Viterbi Algorithm Accuracy:  89.95633187772926


In [99]:
model.model_accuracy(tagged_seq, test_run_base)

Viterbi Algorithm Accuracy:  89.95633187772926


In [None]:
#Code to test all the test sentences
test_tagged_words = [tup for sent in test_set for tup in sent]
test_untagged_words = [tup[0] for sent in test_set for tup in sent]
test_untagged_words
 
start = time.time()
tagged_seq = model.Viterbi(test_untagged_words, tags_df=tags_df)
end = time.time()
difference = end-start
 
print("Time taken in seconds: ", difference)

In [None]:
model.model_accuracy(tagged_seq, test_tagged_words)

Github reference code: 

**(Other resources: https://www.mygreatlearning.com/blog/pos-tagging/) 

In [43]:
# from collections import defaultdict
# import string

# # punctuation characters
# punct = set(string.punctuation)

# # morphology rules used to assign unknown word tokens
# noun_suffix = ["action", "age", "ance", "cy", "dom", "ee", "ence", "er", "hood", "ion", "ism", "ist", "ity", "ling", "ment", "ness", "or", "ry", "scape", "ship", "ty"]
# verb_suffix = ["ate", "ify", "ise", "ize"]
# adj_suffix = ["able", "ese", "ful", "i", "ian", "ible", "ic", "ish", "ive", "less", "ly", "ous"]
# adv_suffix = ["ward", "wards", "wise"]


# def assign_unk(tok):
#     # Digits
#     if any(char.isdigit() for char in tok):
#         return "--unk_digit--"

#     # Punctuation
#     elif any(char in punct for char in tok):
#         return "--unk_punct--"

#     # Upper-case
#     elif any(char.isupper() for char in tok):
#         return "--unk_upper--"

#     # Nouns
#     elif any(tok.endswith(suffix) for suffix in noun_suffix):
#         return "--unk_noun--"

#     # Verbs
#     elif any(tok.endswith(suffix) for suffix in verb_suffix):
#         return "--unk_verb--"

#     # Adjectives
#     elif any(tok.endswith(suffix) for suffix in adj_suffix):
#         return "--unk_adj--"

#     # Adverbs
#     elif any(tok.endswith(suffix) for suffix in adv_suffix):
#         return "--unk_adv--"

#     return "--unk--"

# def get_word_tag(line, vocab):
#     # check if a line is empty (just contains \n or \t), if yes
#     if not line.split():
#         word = "--n--"
#         tag = "--s--"
#         return word, tag
#     else:
#         word, tag = line.split()
#         if word not in vocab:
#             word = assign_unk(word)
#         return word, tag
#     return None

# def create_dictionaries(training_corpus, vocab):
#     emission_counts = defaultdict(int)
#     transition_counts = defaultdict(int)
#     tag_counts = defaultdict(int)

#     prev_tag = '--s--' 
#     for word_tag in training_corpus:
#         word, tag = get_word_tag(word_tag,vocab) 
#         transition_counts[(prev_tag, tag)] += 1
#         emission_counts[(tag, word)] += 1
#         tag_counts[tag] += 1
#         prev_tag = tag
#     return emission_counts, transition_counts, tag_counts

In [None]:
#emission_counts, transition_counts, tag_counts = create_dictionaries(train_tagged_words, vocab)

In [None]:
# states = sorted(tag_counts.keys())
# print(f"Number of POS tags (number of 'states'): {len(states)}")
# print("View these POS tags (states)")
# print(states)