# Hidden Markov Models 
###  Greedy and Viterbi Decoding 

Submitted by: Anushka Deshpande

## Importing required libraries

In [1]:
import numpy as np
import pandas as pd

## Task 1: Voccabulary Creation

In [3]:
dataframe = pd.DataFrame
with open("train", 'r') as train:
  dataframe = pd.read_csv(train, sep = "\t", names = ["index", "word", "tag"])
  train.close()
dataframe

Unnamed: 0,index,word,tag
0,1,Pierre,NNP
1,2,Vinken,NNP
2,3,",",","
3,4,61,CD
4,5,years,NNS
...,...,...,...
912090,22,to,TO
912091,23,San,NNP
912092,24,Francisco,NNP
912093,25,instead,RB


In [4]:
dataframe['occurence'] = dataframe.groupby('word')['word'].transform('size')
dataframe.head()

Unnamed: 0,index,word,tag,occurence
0,1,Pierre,NNP,6
1,2,Vinken,NNP,2
2,3,",",",",46476
3,4,61,CD,25
4,5,years,NNS,1130


In [5]:
threshold = 3
def UNKreplace(entry):
  if entry.occurence <= threshold:
    return "< unk >"
  else:
    return entry.word

dataframe['word'] = dataframe.apply(lambda entry: UNKreplace(entry), axis = 1)
dataframe.head()

Unnamed: 0,index,word,tag,occurence
0,1,Pierre,NNP,6
1,2,< unk >,NNP,2
2,3,",",",",46476
3,4,61,CD,25
4,5,years,NNS,1130


In [6]:
dfVocab = dataframe.word.value_counts().rename_axis('word').reset_index(name = 'occurence')
dfVocab.head()

Unnamed: 0,word,occurence
0,",",46476
1,< unk >,42044
2,the,39533
3,.,37452
4,of,22104


In [7]:
dfUnk = dfVocab[dfVocab['word'] == '< unk >']
dfUnk.head()

Unnamed: 0,word,occurence
1,< unk >,42044


In [8]:
dfVocab = dfVocab.drop(dfVocab[dfVocab.word == '< unk >'].index)
dfVocab.head()

Unnamed: 0,word,occurence
0,",",46476
2,the,39533
3,.,37452
4,of,22104
5,to,21305


In [9]:
finalVocab = pd.concat([dfUnk, dfVocab]).reset_index(drop=True)
finalVocab['id'] = finalVocab.index + 1
columnList = finalVocab.columns.tolist()
columnList = [columnList[0], columnList[-1], columnList[1]]
finalVocab = finalVocab[columnList]
finalVocab.head()

Unnamed: 0,word,id,occurence
0,< unk >,1,42044
1,",",2,46476
2,the,3,39533
3,.,4,37452
4,of,5,22104


generate the vocab.txt file

In [10]:
finalVocab.to_csv('vocab.txt', sep='\t', header=None, index = False)

In [11]:
print("Selected Threshold: " , threshold)
print("Size of Vocabulary: ", finalVocab.shape[0])
print("Total occurences of < unk> : ",  dfUnk['occurence'].tolist()[0])

Selected Threshold:  3
Size of Vocabulary:  13751
Total occurences of < unk> :  42044


## Task 2: Model Learning

**Counting the unique tags in the dataframe**

In [12]:
dfPos = dataframe.tag.value_counts().rename_axis('pos').reset_index(name='count')
POSCountDict = dict(dfPos.values)
tagList = dfPos.pos.tolist()
print('Total Unique tags: ', len(tagList))

Total Unique tags:  45


**Creating a mapping for the text sentence wise, maps word -> tag**

In [13]:
text = []
sentence = []
first = 1
for entry in dataframe.itertuples():
    if(entry.index == 1 and first == 0):
        text.append(sentence)
        sentence = []
    first = 0
    sentence.append((entry.word, entry.tag))
text.append(sentence)


In [14]:
vocabulary = finalVocab.word.tolist()

Now, we have the following information:

1. tagList = list of tags (45)
2. vocab = list of unique words in df
3. sentences = each word in each sentence tagged with its POS
4. POSCountDict = count of each POS in the df

So we generate the transition matrix and emission matrix. 

In [15]:
# calculate the transition matrix
t_matrix = np.zeros((len(tagList), len(tagList)))
tagCount = {}

for tag in range(len(tagList)):
  tagCount[tag] = 0

for sent in text:
  for i in range(len(sent)):
    tagCount[tagList.index(sent[i][1])] += 1
    if i == 0:
      continue
    t_matrix[tagList.index(sent[i-1][1])][tagList.index(sent[i][1])] += 1

for i in range(len(t_matrix)):
  for j in range(len(t_matrix[0])):
    if t_matrix[i][j] == 0:
      t_matrix[i][j] = 1e-10
    else:
      t_matrix[i][j] /= tagCount[i] 

#print(t_matrix.shape)

(45, 45)


In [16]:
# calculate emission matrix
e_matrix = np.zeros((len(tagList), len(vocabulary)))

for sent in text:
  for word, tag in sent:
    e_matrix[tagList.index(tag)][vocabulary.index(word)] += 1

for i in range(len(e_matrix)):
  for j in range(len(e_matrix[0])):
    if e_matrix[i][j] == 0:
      e_matrix[i][j] = 1e-10
    else:
      e_matrix[i][j] /= tagCount[i]

#print(e_matrix.shape)
               

(45, 13751)


Now we generate a dictionary for the transition and emission probabilities

In [17]:
# make dictionary of transition probabilities

tagsDict = {}
for i in range(len(tagList)):
  tagsDict[i] = tagList[i]

transitionProbability = {}
for i in range(len(t_matrix)):
  for j in range(len(t_matrix[0])):
    transitionProbability["(" + tagsDict[i] + "," + tagsDict[j] + ")"] = t_matrix[i][j]


In [18]:
# make dictionary of emission probabilities

emissionProbability = {}
for i in range(len(e_matrix)):
  for j in range(len(e_matrix[0])):
    emissionProbability["(" + tagsDict[i] + "," + vocabulary[j] + ")"] = e_matrix[i][j]


We generate the probabilities for \<start\> and \<key\> tags 

In [19]:
# calculate the initial probabilities of tags

tagsStartCount = {}
startSum = 0
for tag in tagList:
  tagsStartCount[tag] = 0

for entry in dataframe.itertuples():
  if entry[1] == 1:
    tagsStartCount[entry[3]] += 1
    startSum += 1

initProbability = {}
for tag in tagsStartCount.keys():
  initProbability[tag] = tagsStartCount[tag] / startSum

Append the initial probabilities to the transition probabilities to generate the final transition probabilities

In [20]:
# append initial probabilities to transition probabilities

finalTransitionProb = {}
for tag in initProbability.keys():
  finalTransitionProb["(" + "< start >" + "," + tag + ")"] = initProbability[tag]

finalTransitionProb.update(transitionProbability)

In [21]:
print("Total transition parameters: ", str(len(finalTransitionProb)))
print("Total emission paramenters:  ", str(len(emissionProbability)))

Total transition parameters:  2070
Total emission paramenters:   618795


generate the hmm.json file

In [22]:
# store in hmm.json file
import json
with open('hmm.json', 'w') as hmmFile:
  json.dump({"transition" : finalTransitionProb, "emission" : emissionProbability}, hmmFile, ensure_ascii=False, indent = 4)
  hmmFile.close()

## Task 3: Greedy Decoding with HMM

preprocess the validation dataset to generate the sentences and find the word, tag pairs. 

In [23]:
# preprocess dev data aka validation set

validationData = pd.read_csv("dev", sep="\t", names = ['index', 'word', 'tag'])
validationData['occurence'] = validationData.groupby('word')['word'].transform('size')

validationSentences = []
sentence = []
first = 1
for entry in validationData.itertuples():
  if entry.index == 1 and first == 0:
    validationSentences.append(sentence)
    sentence = []
  first = 0
  sentence.append((entry.word, entry.tag))
validationSentences.append(sentence)


use the greedy decoding algorithm to generate the predictions

In [24]:
# greedy decoding for validation data

tagPredictions = []
tagScores = []

for sent in validationSentences:
  previousTag = "< start >"
  sentPrediction = []
  sentScore = []

  for i in range(len(sent)):
    highestScore = -1
    
    for j in range(len(tagList)):
      currScore = 1
      if i == 0:
        currScore *= initProbability[tagList[j]] 
      else:
        if str("(" + previousTag + "," + tagList[j] + ")") in finalTransitionProb:
          currScore *= finalTransitionProb["(" + previousTag + "," + tagList[j] + ")"]
      
      if str("(" + tagList[j] + "," + sent[i][0] + ")") in emissionProbability:
        currScore *= emissionProbability["(" + tagList[j] + "," + sent[i][0] + ")"]
      else:
        currScore *= emissionProbability["(" + tagList[j] + "," + "< unk >" + ")"]

      if(currScore > highestScore):
        highestScore = currScore
        highestProbabilityTag = tagList[j]
    
    previousTag = highestProbabilityTag
    sentPrediction.append(previousTag)
    sentScore.append(highestScore)
  
  tagPredictions.append(sentPrediction)
  tagScores.append(sentScore)


now calculate the accuracy on the validation dataset

In [25]:
# to calculate accuracy of predictions on validation set

totalPredictions = 0
correctTagPrediction = 0
for i in range(len(validationSentences)):
  for j in range(len(validationSentences[i])):
    if tagPredictions[i][j] == validationSentences[i][j][1]:
      correctTagPrediction += 1
    totalPredictions += 1

validationAccuracy = correctTagPrediction / totalPredictions

print("Accuracy on Validation set: {:.2f} %".format(validationAccuracy * 100))


Accuracy on Validation set: 92.64 %


Now apply the same thing for test dataset

In [26]:
# preprocess test data set

testData = pd.read_csv("test", sep="\t", names = ['index', 'word', 'tag'])
testData['occurence'] = testData.groupby('word')['word'].transform('size')

testSentences = []
sentence = []
first = 1
for entry in testData.itertuples():
  if entry.index == 1 and first == 0:
    testSentences.append(sentence)
    sentence = []
  first = 0
  sentence.append(entry.word)
testSentences.append(sentence)


In [27]:
# greedy decoding for test data

testPredictions = []
testScores = []

for sent in testSentences:
  previousTag = "< start >"
  sentPrediction = []
  sentScore = []

  for i in range(len(sent)):
    highestScore = -1
    
    for j in range(len(tagList)):
      currScore = 1
      if i == 0:
        currScore *= initProbability[tagList[j]] 
      else:
        if str("(" + previousTag + "," + tagList[j] + ")") in finalTransitionProb:
          currScore *= finalTransitionProb["(" + previousTag + "," + tagList[j] + ")"]
      
      if str("(" + tagList[j] + "," + sent[i][0] + ")") in emissionProbability:
        currScore *= emissionProbability["(" + tagList[j] + "," + sent[i][0] + ")"]
      else:
        currScore *= emissionProbability["(" + tagList[j] + "," + "< unk >" + ")"]

      if(currScore > highestScore):
        highestScore = currScore
        highestProbabilityTag = tagList[j]
    
    previousTag = highestProbabilityTag
    sentPrediction.append(previousTag)
    sentScore.append(highestScore)
  
  testPredictions.append(sentPrediction)
  testScores.append(sentScore)


In [28]:
result = []
for i in range(len(testSentences)):
  entry = []
  for j in range(len(testSentences[i])):
    entry.append((str(j+1), testSentences[i][j], testPredictions[i][j]))
  result.append(entry)


Store the output on test data into a file (greedy.out)

In [29]:
with open('greedy.out', 'w') as greedyOutput:
  for elem in result:
    greedyOutput.write("\n".join([str(item[0]) + "\t" + item[1] + "\t" + item[2] for item in elem]))
    greedyOutput.write("\n\n")
  greedyOutput.close()

## Task 4: Viterbi decoding with HMM

use viterbi decoding algorithm to generate the cache and viterbi_matrix

In [30]:
# use viterbi decoding algorithm to generate the cache and viterbi_matrix

c = []
v = []

for sent in validationSentences:
  n = len(tagList)
  viterbiList = []
  cache = {}

  for tag in tagList:
    if str('(' + tag + ',' + sent[0][0] + ')') in emissionProbability:
      viterbiList.append(initProbability[tag] * emissionProbability['(' + tag + ',' + sent[0][0] + ')'])
    else:
      viterbiList.append(initProbability[tag] * emissionProbability['(' + tag + ',' + '< unk >' + ')'])

  for i, l in enumerate(sent):
    word = l[0] 
    if i == 0:
      continue
    tempList = [None] * n
    for j, tag in enumerate(tagList):
      score = -1
      value = 1
      for k, prob in enumerate(viterbiList):
        if str("(" + tagList[k] + "," + tag + ")") in transitionProbability and str("(" + tag + "," + word + ")") in emissionProbability:
          value = prob * transitionProbability["(" + tagList[k] + "," + tag + ")"] * emissionProbability["(" + tag + "," + word + ")"]
        else:
          value = prob * transitionProbability["(" + tagList[k] + "," + tag + ")"] * emissionProbability["(" + tag + "," + "< unk >" + ")"]
        
        if(score < value):
          score = value
          cache[str(i) + ", " + tag] = [tagList[k], value]
      tempList[j] = score
    viterbiList = [x for x in tempList]
  
  c.append(cache)
  v.append(viterbiList)

Backtrack through the matrix to fing the POS tag predictions 

In [31]:
# backtrack through matrix for pos tags

VitValidationPred = []
VitValidationScore = []

for cache, viterbiList in zip(c, v):
  numStates = len(tagList)
  n = len(cache) // numStates

  bestSequence = []
  bestSequenceBreakdown = []

  x = tagList[np.argmax(np.asarray(viterbiList))]
  bestSequence.append(x)

  for i in range(n, 0, -1):
    val = cache[str(i) + ', ' + x][1]
    x = cache[str(i) + ', ' + x][0]
    bestSequence = [x] + bestSequence
    bestSequenceBreakdown = [val] + bestSequenceBreakdown
  
  VitValidationPred.append(bestSequence)
  VitValidationScore.append(bestSequenceBreakdown)

Calculate and print the accuracy on the validation dataset

In [33]:
# calculate accuracy for validation set

totPredictions = 0
correctPredictions = 0
for i in range(len(validationSentences)):
  for j in range(len(validationSentences[i])):
    if VitValidationPred[i][j] == validationSentences[i][j][1]:
      correctPredictions += 1
    totPredictions += 1

valAccuracy = correctPredictions / totPredictions

print("Accuracy on Validation set: {:.2f}".format(valAccuracy * 100))


Accuracy on Validation set: 94.07


In [34]:
# follow the same process for test set

c = []
v = []

for sent in testSentences:
  n = len(tagList)
  viterbiList = []
  cache = {}

  for tag in tagList:
    if str('(' + tag + ',' + sent[0][0] + ')') in emissionProbability:
      viterbiList.append(initProbability[tag] * emissionProbability['(' + tag + ',' + sent[0][0] + ')'])
    else:
      viterbiList.append(initProbability[tag] * emissionProbability['(' + tag + ',' + '< unk >' + ')'])

  for i, l in enumerate(sent):
    word = l[0] 
    if i == 0:
      continue
    tempList = [None] * n
    for j, tag in enumerate(tagList):
      score = -1
      value = 1
      for k, prob in enumerate(viterbiList):
        if str("(" + tagList[k] + "," + tag + ")") in transitionProbability and str("(" + tag + "," + word + ")") in emissionProbability:
          value = prob * transitionProbability["(" + tagList[k] + "," + tag + ")"] * emissionProbability["(" + tag + "," + word + ")"]
        else:
          value = prob * transitionProbability["(" + tagList[k] + "," + tag + ")"] * emissionProbability["(" + tag + "," + "< unk >" + ")"]
        
        if(score < value):
          score = value
          cache[str(i) + ", " + tag] = [tagList[k], value]
      tempList[j] = score
    viterbiList = [x for x in tempList]
  
  c.append(cache)
  v.append(viterbiList)

In [35]:
# backtrack through matrix for pos tags

VitTestPred = []
VitTestScore = []

for cache, viterbiList in zip(c, v):
  numStates = len(tagList)
  n = len(cache) // numStates

  bestSequence = []
  bestSequenceBreakdown = []

  x = tagList[np.argmax(np.asarray(viterbiList))]
  bestSequence.append(x)

  for i in range(n, 0, -1):
    val = cache[str(i) + ', ' + x][1]
    x = cache[str(i) + ', ' + x][0]
    bestSequence = [x] + bestSequence
    bestSequenceBreakdown = [val] + bestSequenceBreakdown
  
  VitTestPred.append(bestSequence)
  VitTestScore.append(bestSequenceBreakdown)

In [36]:
result = []
for i in range(len(testSentences)):
  entry = []
  for j in range(len(testSentences[i])):
    entry.append((str(j+1), testSentences[i][j], VitTestPred[i][j]))
  result.append(entry)

Write the output of test data to a file (viterbi.out)

In [37]:
with open('viterbi.out', 'w') as greedyOutput:
  for elem in result:
    greedyOutput.write("\n".join([str(item[0]) + "\t" + item[1] + "\t" + item[2] for item in elem]))
    greedyOutput.write("\n\n")
  greedyOutput.close()