# Data preprocessing
here we obtain the data from the json file and then process it by adding start and end tokens to it and then define the vocabulary, sentence list and also the tags (or the state) corresponding to each observation (or words). These will be helpful for us in determining the parameters of **Hidden Markov Model later**

In [1]:
import pandas as pd
import json
from collections import defaultdict,Counter

with open('/kaggle/input/annotated-dataset-for-pos-tagging/dev.json', 'r') as f:
    json_data = f.read()
with open('/kaggle/input/annotated-dataset-for-pos-tagging/train.json','r') as f:
    json_data = f.read()
    
dataset_1 = json.loads(json_data)
for item in dataset_1:
    item['sentence'].insert(0,'<s>')
    item['sentence'].append('</s>')
    item['labels'].insert(0,'<s>')
    item['labels'].append('</s>')
dataset_2 = json.loads(json_data)
for item in dataset_2:
    item['sentence'].insert(0,'<s>')
    item['sentence'].append('</s>')
    item['labels'].insert(0,'<s>')
    item['labels'].append('</s>')
dataset = dataset_1+dataset_2
print(f'length of dataset is {len(dataset)}')

state_tags = defaultdict(set)
sentences = list()
state_seq = list()
vocab = set()
tags = set()
tag_count = defaultdict(int)
word_count = defaultdict(int)
tag_zipped_words = list()
from copy import deepcopy
for item in dataset:
    sentences.append(item['sentence'])
    state_seq.append(item['labels'])
    tag_zipped_words.append(deepcopy(list(zip(item['sentence'],item['labels']))))
    for word,count in list(Counter(item['sentence']).items()):
        word_count[word]+=count
    for tag,count in list(Counter(item['labels']).items()):
        tag_count[tag]+=count   
    vocab = vocab.union(set(item['sentence']))
    tags = tags.union(set(item['labels']))
    for i,word in enumerate(item['sentence']):
        state_tags[word].add(item['labels'][i])

length of dataset is 76436


# Determining Parameters of model
here we define the parameters of the model which are:
* $ \text{emission matrix B, where } B_i(o_t) = P(o_t|q_i)$
* $ \text{transition matrix A, where } A_{i-1,i} = P(q_{i}|q_{i-1})$
* $ \text{initial probabilities } \pi \text{ where } \pi_i = \text{initial probabilities of }i^{th}\text{label}$



In [2]:
A = defaultdict(lambda:defaultdict(float)) # transition matrix
B = defaultdict(lambda:defaultdict(float)) # emmision matrix
PI = defaultdict(float)

for item in state_seq:
    for i_1,i in zip(item,item[1:]):
        A[i_1][i]+=1

for item in A.keys():
    for key in A[item].keys():
        A[item][key] = A[item][key]/tag_count[item]
    
PI = A['<s>']
for item in tag_zipped_words:
    for obs,state in list(item):
        B[state][obs]+=1
for item in B.keys():
    for key in B[item].keys():
        B[item][key] /= tag_count[item]
    



# Viterbi algorithm

In [3]:
def viterbi(sentence):
    sentence = sentence.strip().split()
    memo = defaultdict(lambda: defaultdict(tuple))
    w = sentence[0]
    for tags in state_tags[w]:
        memo[0][tags] = (PI[tags],'<s>')
    for i in range(1,len(sentence)):
        w = sentence[i]
        tags = state_tags[w]
        for tag in tags:
            emission = B[tag][w]
            memo[i][tag] = (-1e9,'')
            for t,(prior,path) in memo[i-1].items():
                transition = A[t][tag]
                curr_prob = transition * emission * prior
                if curr_prob>memo[i][tag][0]:
                    memo[i][tag] = (curr_prob,f'{path},{tag}')
    n = len(sentence)
    res = ''
    check = -1e9
    for t,(prior,path) in memo[n-1].items():
        if prior>check:
            check = prior
            res = path
    return res
    

# demo test

In [4]:
viterbi("increase the pressure further more to get success")

'<s>,DT,NN,RB,JJR,TO,VB,NN'

# HMM model
Here we create an HMM model using pytorch and combine all the previously defined steps in the init method and running the virerbi in the forward method

In [5]:
import torch 
import torch.nn as nn
from collections import defaultdict,Counter
class HMM(nn.Module):
    def __init__(self,train_set):
        super(HMM,self).__init__()
        # data processing
        self.state_tags = defaultdict(set)
        self.sentences = list()
        self.state_seq = list()
        self.vocab = set()
        self.tags = set()
        self.tag_count = defaultdict(int)
        self.word_count = defaultdict(int)
        self.tag_zipped_words = list()
        from copy import deepcopy
        for item in dataset:
            self.sentences.append(item['sentence'])
            self.state_seq.append(item['labels'])
            self.tag_zipped_words.append(deepcopy(list(zip(item['sentence'],item['labels']))))
            for word,count in list(Counter(item['sentence']).items()):
                self.word_count[word]+=count
            for tag,count in list(Counter(item['labels']).items()):
                self.tag_count[tag]+=count   
            self.vocab = self.vocab.union(set(item['sentence']))
            self.tags = self.tags.union(set(item['labels']))
            for i,word in enumerate(item['sentence']):
                self.state_tags[word].add(item['labels'][i])
        
        # parameters
        self.A = defaultdict(lambda:defaultdict(float)) # transition matrix
        self.B = defaultdict(lambda:defaultdict(float)) # emmision matrix
        self.PI = defaultdict(float)

        for item in self.state_seq:
            for i_1,i in zip(item,item[1:]):
                self.A[i_1][i]+=1

        for item in self.A.keys():
            for key in self.A[item].keys():
                self.A[item][key] = self.A[item][key]/self.tag_count[item]

        self.PI = self.A['<s>']
        for item in self.tag_zipped_words:
            for obs,state in list(item):
                self.B[state][obs]+=1
        for item in self.B.keys():
            for key in self.B[item].keys():
                self.B[item][key] /= self.tag_count[item]
    def forward(self,sentence):
        sentence = sentence.strip().split()
        memo = defaultdict(lambda: defaultdict(tuple))
        w = sentence[0]
        for tags in self.state_tags[w]:
            memo[0][tags] = (self.PI[tags],'<s>')
        for i in range(1,len(sentence)):
            w = sentence[i]
            tags = self.state_tags[w]
            for tag in tags:
                emission = self.B[tag][w]
                memo[i][tag] = (-1e9,'')
                for t,(prior,path) in memo[i-1].items():
                    transition = self.A[t][tag]
                    curr_prob = transition * emission * prior
                    if curr_prob>memo[i][tag][0]:
                        memo[i][tag] = (curr_prob,f'{path},{tag}')
        n = len(sentence)
        res = ''
        check = -1e9
        for t,(prior,path) in memo[n-1].items():
            if prior>check:
                check = prior
                res = path
        return res
    

# Training and testing the HMM 
instantiating the HMM object by passing a training dataset will build the parameter matrices automatically 

In [6]:
model = HMM(dataset)

In [7]:
model('I will work hard to do well ')

'<s>,MD,VB,JJ,TO,VB,RB'