# Hidden Markov Model for Part of Speech tagging

- Libraries Used:  __Pandas, Numpy, itertools, json__
- Time to Run the code: __Approximately 7 minutes__


In [1]:
import pandas as pd
import numpy as np
import itertools
import json

### Task 1 - Creating vocabulary

In the following cell, I am reading the tab seperated training data file. Since the file does not have a header, I convert the file into a dataframe with three columns namely: Index, Word and Tag. Next, I calucate and store the total number of sentences into a variable and am convert all the words into a numpy array which will further be used for vocabulary creation.

In [2]:
trainData = pd.read_csv('train', sep='\t', header = None, names = ['Index','Word','Tag'])
no_of_sentences = len(trainData[trainData['Index'] == 1])
vocabularyCreation = np.array(trainData['Word']) 

The next step is to create the vocabulary. Here I have two dictionaries, one which calculates the occurances of every unique word in the training file and another for calculating the unknown word count. Later I append both these dictionary and write it to the output. The final dictionary is sorted lexicographically in descending order of its number of occurrences with the count of unknown words at the top as per the question.

In [3]:
#Create Vocabulary
wordFrequency = dict()
unknownDict = dict()
for ele in vocabularyCreation:
    if ele not in wordFrequency.keys():
        wordFrequency[ele] = 1
    else:
        wordFrequency[ele]+= 1

#Sorting the dictionary lexicographically in descending order of its number of occurrences.
wordFrequency = dict(sorted(wordFrequency.items(), key=lambda item: (-item[1],item[0])))
unknownDict['<UNK>'] = 0
for keys in wordFrequency.keys():
    if wordFrequency[keys] < 2:
        unknownDict['<UNK>'] += wordFrequency[keys]
        wordFrequency[keys] = 0
wordFrequency = {key:val for key, val in wordFrequency.items() if val != 0} 
#Appending the dictionaries
wordFrequency = {**unknownDict, **wordFrequency}

In [4]:
#Write the result to a file - vocab.txt
i = 0
with open('vocab.txt', 'w') as f:
    for keys in wordFrequency.keys():
        line = str(keys)+'\t'+str(i)+'\t'+str(wordFrequency[keys])+'\n'
        f.write(line)
        i+=1

RESULTS
- Threshold for unknown words replacement: __2__
- Total size of  vocabulary: __23182 words__
- Total occurrences of the special token ‘< unk >’ after replacement: __20011__

### Task 2  - Transmission and Emission Probabilities

In the following cell, I create a list for all the unique tags in our training data. I am also storing all consecutively occuring tags in the data as a list. Both these values are used to calculate the transmission probabilities.

In [5]:
trainingDataTag =  list(trainData['Tag'])
tags = list(trainData['Tag'].unique())
trainingDataConsecutiveTags = list(zip(trainingDataTag, trainingDataTag[1:]))

#### Transmission Probability calculation
In order to efficienty calculate the transmission probabilities, I am creating three dictionaries:
- A dictionary to store the count for each tag: tagOccurence
- A dictionary to store the count for every consecutive tag: consecutiveTags
- A dictionary to store the count for every starting tag: startCounts


I am iterating through the entire training data, which was stored as a list in our previous cell and storing the counts in the respective dictionaries.

In [6]:
tagOccurence = dict()
consecutiveTags = dict()
startCounts = dict()

#list of all starting tags
startTags = trainData['Tag'][trainData['Index'] == 1]

for ele in trainingDataTag:
    if ele not in tagOccurence.keys():
        tagOccurence[ele] = 1
    else:
        tagOccurence[ele] += 1
        
for ele in trainingDataConsecutiveTags:
    if ele not in consecutiveTags.keys():
        consecutiveTags[ele] = 1
    else:
        consecutiveTags[ele] += 1

for ele in startTags:
    if ('',ele) not in startCounts.keys():
        startCounts[('',ele)] = 1
    else:
        startCounts[('',ele)] +=1
    

Now to calculate the transmission probability, I am using the values previously stored in my dictionary. In total, there will be __((Total number of unique tags) * (Total number of unique tags) + 45 starting tags)__ elements in the transmission dictionary which accounts to __2070__. For this I use the inbuilt python function to get all the possible combination of tags. Since it does not consider (a,b) and (b,a) as two different tags, I invert the list and run the code again to account for every possible tag combinations. At the end, I count the probabilities for the start tags.

In [7]:
tagsPossibleCombinations = list(itertools.combinations_with_replacement(tags,2))
transmissionProbability = dict()

#Transmission Prob for all unique tag combinations
for ele in tagsPossibleCombinations:
    if ele in consecutiveTags.keys():
        transmissionProbability[ele] = (consecutiveTags[ele])/ (tagOccurence[ele[0]])
    else:
        transmissionProbability[ele] = 0
invertTagsPossibleCombinations = []

#Transmission Prob for the inverted tag list
for ele in tagsPossibleCombinations:
    invertTagsPossibleCombinations.append((ele[1],ele[0]))    
for ele in invertTagsPossibleCombinations:
    if ele in consecutiveTags.keys():
        transmissionProbability[ele] = (consecutiveTags[ele])/ (tagOccurence[ele[0]])
    else:
        transmissionProbability[ele] = 0   
        
#Transmission Prob for start
for ele in tags:
    if ('',ele) in startCounts:
        transmissionProbability[('',ele)] = (startCounts[('',ele)])/(no_of_sentences)
    else:
        transmissionProbability[('',ele)] = 0
        
    



#### Emission Probability calculation
I am using the same approach for calculation the emission probabilities. Here I have just one dictionary which stores all consecutive word counts in our training data. Here I also take into account the unknown word tags previously created in our vocab file. 

In [8]:
#A dict to store the counts for all (word, tag) pairs
consecutiveEmissionTags = dict()
emissionDF = trainData.drop(columns=["Index"])
emissionDataConsecutiveTags = emissionDF.values.tolist()
index = -1
for ele in emissionDataConsecutiveTags:
    index+=1
    if ele[0] not in wordFrequency.keys():
        emissionDataConsecutiveTags[index][0] = '<UNK>'
        
for ele in emissionDataConsecutiveTags:
    if tuple(ele) not in consecutiveEmissionTags.keys():
        consecutiveEmissionTags[tuple(ele)] = 1
    else:
        consecutiveEmissionTags[tuple(ele)]+=1


Once all the required values are stored in the dictionary, I use them to calculate the emission probabilities. The size of the dictionary will be __(number of unique vocabulary words)* number of unique tags__. I am using the python inbuilt itertool library to map all possible emission probability elements. Once mapped all the probabilities are stored in emissionProbability dictionary.

In [9]:
#Get all possible emission pairs: vocabulary size*tags
emissionProbability = dict()
uniqueVocabularyWords = list(trainData['Word'].unique())
index = -1
for word in uniqueVocabularyWords:
    index+=1
    if word not in wordFrequency.keys():
        uniqueVocabularyWords[index] = '<UNK>'
        
allPossibleEmissions = list(itertools.product(uniqueVocabularyWords, tags))
for ele in allPossibleEmissions:
    if ele in consecutiveEmissionTags.keys():
        emissionProbability[ele] = (consecutiveEmissionTags[ele]/tagOccurence[ele[1]])
    else:
        emissionProbability[ele] = 0     
        

In [10]:
transmissionProbabilityOutput = {",".join(key): value for key, value in transmissionProbability.items() if value !=0}
emissionProbabilityOutput = {",".join(key): value for key, value in emissionProbability.items() if value != 0}

hmmJSON = dict()
hmmJSON['Transition'] = transmissionProbabilityOutput
hmmJSON['Emission'] = emissionProbabilityOutput


with open('hmm.json', 'w') as fp:
    json.dump(hmmJSON, fp)

#### RESULTS
- Transition Probability Parameters: 2070
- Emission Probability Parameters: 1043235
- Transition Probability non-zero values: 1419
- Emission Probability non-zero values: 30303

NOTE: Whenever a start tag is encountered, I consider it as ""

### Task 3 : Greedy Algorithm

For the greedy algorithm, I first train and evaluate my algorithm on the development dataset. For this, I extract the index,word pairs from the dataset.

In [11]:
DevData = pd.read_csv('dev', sep='\t', header = None, names = ['Index','Word','Tag'])
trainingDevData = DevData.drop('Tag', axis = 1)
trainingDevData = trainingDevData.values.tolist()

Here, I have defined my main function as: getGreedyTags. It takes in the (index,word) pair and returns the most appropriate tag as per definition. First of all, the greedy algorithm checks if the word in not present in our vocabulary and marks it as unknown. It then passes into a function defined as getMaxProb which calculates the most probable tag.

In [12]:
def getMaxProb(index,word,tags,prevTag):
    max = 0
    tagname = ''
    for tag in tags:
        greedyProb = transmissionProbability[(prevTag,tag)] * emissionProbability[(word,tag)]
        if greedyProb > max:
            max = greedyProb
            tagname = tag
                
    return tagname    

def getGreedyTags(trainingDevData):
    mySolution = []  
    index = 0
    for ele in trainingDevData:
        if ele[1] not in uniqueVocabularyWords:
            word = '<UNK>'
        else:
            word = ele[1]
        if ele[0] == 1:
            prevTag = ''
        else:
            prevTag = mySolution[-1]
        prob = getMaxProb(ele[0],word,tags,prevTag)
        mySolution.append(prob)
        index+=1
    return mySolution

I then pass the development data into the function and check its accuracy

In [13]:
#Check Accuracy
mySolution = getGreedyTags(trainingDevData)
devSolution = DevData['Tag'].values.tolist()
count = 0
right = 0
index = 0

for a,b in zip(mySolution,devSolution):
    count+=1
    if a==b:
        right+=1
    index+=1
print('Accuracy: ', (right/count)*100   )    

Accuracy:  93.50069819683078


#### RESULTS:
Accuracy on development Data : __93.5%__

Predicting the test file Tags and writing them into greedy.out

In [14]:
TestData = pd.read_csv('test', sep='\t', header = None, names = ['Index','Word'])
testDevData = TestData.values.tolist()
testSolution = getGreedyTags(testDevData)
greedyResults = zip(testDevData,testSolution)
with open('greedy.out', 'w') as fileGreedy:
    for keys in greedyResults:
        line = str(keys[0][0])+'\t'+str(keys[0][1])+'\t'+str(keys[1])+'\n'
        if keys[0][0] == 1:
            fileGreedy.write('\n')
        fileGreedy.write(line)
        

### Task 4: Viterbi Algorithm

For the viterbi algorithm, I start again with extracting and evaluating my algorithm on the development dataset. For this, I extract the index,word pairs from the dataset. I also keep a track of all the unique tags stored in trainViterbi

In [15]:
DevDataViterbi = pd.read_csv('dev', sep='\t', header = None, names = ['Index','Word','Tag'])
trainingViterbi = DevDataViterbi.drop('Tag', axis = 1)
trainingViterbi = trainingViterbi.values.tolist()
tagViterbi = tags

First of all, I convert all the unknown vocabulary words to <UNK>. Now for every sentence I have two main functionalities: buildMatrix and getParent.
- Build Matrix : the function builds and returns a dynamic programming table with all the unique tags as its rows and all words in the sentence as it column. Every value in the table is stored as (Prob of(word,tag)pair, Parent)
- getParent: The function backtracks the matrix and return the most efficient tag sequence for a word. One unique case which I encountered here was when all the previous columns were 0. In that case it assigns a tag '' to the word and goes to the prev column to find the maximum probable tag in that column.

In [16]:
def buildMatrix(word,tagViterbi):
    OPTMatrix = [[(0,'') for i in range(len(word))] for j in range(len(tagViterbi))]
    for j in range(len(OPTMatrix[0])):
        for i in range(len(OPTMatrix)): 
            if j==0:  
                OPTMatrix[i][j] = (transmissionProbability[('',tagViterbi[i])] * emissionProbability[(word[j],tagViterbi[i])],'')
                
            else:         
                maximum = 0
                parentTag = ''
                tempMax = 0
                for allCol in range(len(OPTMatrix)):
                    tempMax = OPTMatrix[allCol][j-1][0]*transmissionProbability[(tagViterbi[allCol],tagViterbi[i])] * emissionProbability[(word[j],tagViterbi[i])]                                                          
                    if tempMax > maximum:
                        maximum = tempMax
                        parentTag = tagViterbi[allCol]
                OPTMatrix[i][j] = (maximum,parentTag)
                
    return OPTMatrix    
                    
def getParent(OPTMatrix,tagViterbi):
    viterbiTag = []
    row = len(OPTMatrix)
    col = len(OPTMatrix[0])
    maximum = (0,'')
    allZeros = True
    for j in reversed(range(col)):    
        if allZeros:
            index = 0
            for i in range(row):                  
                if OPTMatrix[i][j][0] > maximum[0]:                    
                    maximum = OPTMatrix[i][j]
                    index = i
            if maximum[0] > 0:
                viterbiTag.append(tagViterbi[index])
                allZeros = False
                
            else:
                viterbiTag.append('')
                
                
        else:
            if maximum[1] != '' :   
                indexOfTag = tagViterbi.index(maximum[1])
                viterbiTag.append(maximum[1])
                maximum = OPTMatrix[indexOfTag][j]
                
            else:
                break

    return viterbiTag       



the function below takes in the training data, checks for unknown words, and returns a tag using the functions defined above

In [17]:
def getViterbiTags(trainingViterbi,uniqueVocabularyWords,tagViterbi):
    viterbiTags = []
    index = 0
    if trainingViterbi[0][1] in uniqueVocabularyWords:
        wordList = [trainingViterbi[0][1]]
    else:
        wordList = ['<UNK>']
    for ele in trainingViterbi[1:]:
        index+=1
        if ele[1] not in uniqueVocabularyWords:
            word = '<UNK>'
        else:
            word = ele[1]
        if index == (len(trainingViterbi) - 1):
            wordList.append(word)
            
        if ele[0] == 1 or index == (len(trainingViterbi) - 1):
            OPTMatrix = buildMatrix(wordList,tagViterbi)
            TagList = getParent(OPTMatrix,tagViterbi)
            TagList = list(reversed(TagList))
            for ele in TagList:
                viterbiTags.append(ele)
            wordList = [word]       
        else:
            wordList.append(word)
    return viterbiTags


Checking the development data accuracy

In [18]:
viterbiTags = getViterbiTags(trainingViterbi,uniqueVocabularyWords,tagViterbi)
devSolutionViterbi = DevDataViterbi['Tag'].values.tolist()
count = 0
right = 0

for a,b in zip(viterbiTags,devSolutionViterbi):
    count+=1
    if a==b:
        right+=1
print('Accuracy: ', right/count) 

Accuracy:  0.9477794305142372


#### RESULTS:
Accuracy on development Data : __94.77%__

Predicting the test file Tags and writing them into viterbi.out

In [19]:
TestDataViterbi = pd.read_csv('test', sep='\t', header = None, names = ['Index','Word'])
testDevDataViterbi = TestData.values.tolist()
tagViterbi = tags
viterbiTest = getViterbiTags(testDevDataViterbi,uniqueVocabularyWords,tagViterbi)
viterbiResults = zip(testDevDataViterbi,viterbiTest)
with open('viterbi.out', 'w') as fileViterbi:
    for keys in viterbiResults:
        line = str(keys[0][0])+'\t'+str(keys[0][1])+'\t'+str(keys[1])+'\n'
        if keys[0][0] == 1:
            fileViterbi.write('\n')
        fileViterbi.write(line)
