### Hidden Markov Model for Part-of-Speech tagging

Use the first 10k tagged sentences from the Brown corpus to generate the components of a part-of-speech hidden markov model: 
1. Transition matrix
2. Observation matrix
3. Initial state distribution

Use the universal tagset:
*nltk.corpus.brown.tagged_sents(tagset=’universal’)[:10000]*.

Also, hang on to the mappings between states/observations and indices. Include an OOV observation and smoothing everywhere.

In [1]:
import nltk
import numpy as np
import pandas as pd

In [2]:
from collections import Counter

In [3]:
nltk.download('brown')
nltk.download('universal_tagset')

[nltk_data] Downloading package brown to /Users/anabelen/nltk_data...
[nltk_data]   Package brown is already up-to-date!
[nltk_data] Downloading package universal_tagset to
[nltk_data]     /Users/anabelen/nltk_data...
[nltk_data]   Package universal_tagset is already up-to-date!


True

In [4]:
tagset = nltk.corpus.brown.tagged_sents(tagset='universal')[:10000]
type(tagset)

nltk.collections.LazySubsequence

** 1. Transition Matrix **

Create list of part-of-speech (pos) sequences

In [5]:
transitions = []
for i in range(len(tagset)):
    for j in range(len(tagset[i])-1):
        tup = (tagset[i][j][1], tagset[i][j+1][1])
        transitions.append(tup)

In [6]:
pos = set([transition[0] for transition in transitions])
dictPos = dict(zip(pos, [*range(len(pos))]))

Create transition matrix count

In [7]:
transitionCount = np.zeros((len(pos), len(pos)), dtype=int)
for pair in transitions:
    transitionCount[dictPos[pair[0]],dictPos[pair[1]]] += 1 

Apply Laplace smoothing

In [8]:
alpha = 0.1
N = len(pos)

transitionMatrix = (transitionCount+alpha)/((transitionCount.sum(axis=1).reshape(-1,1))+(alpha*N))

In [9]:
transitionMatrixDf = pd.DataFrame(transitionMatrix, columns=dictPos, 
                                    index=dictPos); transitionMatrixDf.iloc[:4,:]

Unnamed: 0,ADV,ADJ,PRON,CONJ,DET,NUM,VERB,NOUN,PRT,.,X,ADP
ADV,0.096352,0.150405,0.036258,0.013683,0.080666,0.015485,0.259147,0.040179,0.029369,0.139912,0.000223,0.138322
ADJ,0.007625,0.059252,0.002383,0.033286,0.00604,0.012806,0.015488,0.674812,0.018536,0.089424,0.000616,0.079732
PRON,0.059244,0.009116,0.00926,0.011716,0.014027,0.000737,0.733163,0.008971,0.022406,0.08178,1.4e-05,0.049565
CONJ,0.088092,0.117552,0.052589,0.000468,0.157134,0.019504,0.160306,0.289778,0.023583,0.022979,0.000317,0.067697


** 2. Observation Matrix **

In [10]:
wordPos = [tup for sentence in tagset for tup in sentence]

In [11]:
words = set([tup[0] for tup in wordPos])
dictWords = dict(zip(words, [*range(len(words))]))

Create observation matrix count

In [12]:
obsCount = np.zeros((len(pos), len(words)), dtype=int)

for pair in wordPos:
    obsCount[dictPos[pair[1]], dictWords[pair[0]]] += 1

Create out-of-vocabulary (OOV) observation 

In [13]:
obsCount = np.append(obsCount, np.ones(12,).reshape(-1,1), axis=1)

dictWords['oov'] = (obsCount.shape[1]-1) # Update words dict with oov observation

Apply Laplace smoothing

In [14]:
alpha = 0.01
N = len(words)

observationMatrix = (obsCount+alpha)/((obsCount.sum(axis=1).reshape(-1,1))+(alpha*N))

In [15]:
observationMatrixDf = pd.DataFrame(observationMatrix, columns=dictWords, 
                                     index=dictPos); observationMatrixDf.iloc[:4,:10]

Unnamed: 0,partially,links,precipitated,mid-September,Solly,wrangler,freckled,elephant,Conservatory,wealth
ADV,0.0003108579,1.032751e-06,1.032751e-06,1.032751e-06,1.032751e-06,1.032751e-06,1.032751e-06,1.032751e-06,1.032751e-06,1.032751e-06
ADJ,6.002084e-07,6.002084e-07,6.002084e-07,6.002084e-07,6.002084e-07,6.002084e-07,6.002084e-07,6.002084e-07,6.002084e-07,6.002084e-07
PRON,1.396867e-06,1.396867e-06,1.396867e-06,1.396867e-06,1.396867e-06,1.396867e-06,1.396867e-06,1.396867e-06,1.396867e-06,1.396867e-06
CONJ,1.459028e-06,1.459028e-06,1.459028e-06,1.459028e-06,1.459028e-06,1.459028e-06,1.459028e-06,1.459028e-06,1.459028e-06,1.459028e-06


** 3. Initial state distribution **

In [16]:
initStatesDist = []
for i in range(len(tagset)):
    initStatesDist.append(tagset[i][0][1])

In [17]:
initStatesDist = Counter(initStatesDist)
sumInitialStates = sum(initStatesDist.values())

for k, value in initStatesDist.items():
    initStatesDist[k] = value/sumInitialStates
    
initStatesProb = [initStatesDist[k] for k in list(transitionMatrixDf.columns)]

Implement a function viterbi() that takes arguments:

1. obs - the observations [list of ints]
2. pi - the initial state probabilities [list of floats]
3. A - the state transition probability matrix [2D numpy array] 4. B - the observation probability matrix [2D numpy array]

and returns:
1. states - the inferred state sequence [list of ints]

Do everything in log space to avoid underflow.


In [18]:
import pdb
from typing import List, Tuple, Dict

In [19]:
def ViterbiAlgorithm(obs: List[str], A: np.array, B: np.array, 
                     phi: List[float], dictWords: Dict[str,int], 
                     dictPos: Dict[str,int]) -> Tuple[List[str], List[str]]:
    
    N = len(obs)
    T = A.shape[0]
    viterbi = np.zeros((T,N), dtype=float)
    posSelected = []
    
    # Initialization step
    viterbi[:,0] = np.log(phi*B[:,dictWords[obs[0]]])
    pos = np.argmax(viterbi[:,0])
    posSelected.append(pos)
    
    # Recursion step
    #pdb.set_trace()
    for n in range(1,N):
        if obs[n] in dictWords.keys():
            viterbi[:,n] = np.log(A[pos,:]*B[:,dictWords[obs[n]]])
        else:
            viterbi[:,n] = np.log(A[pos,:]*B[:,dictWords['oov']]) # oov observation
            
        pos = np.argmax(viterbi[:,n])
        posSelected.append(pos)    
    
    # Output two lists: words, predictedPos
    predictedPos = []
    for idx in posSelected:
        predictedPos.append(list(dictPos.keys())[list(dictPos.values()).index(idx)]) 
    
    return obs, predictedPos

** Test algorithm **

Use 3 sentences of the same Brown corpus to test and compare agaisnt the true POS

In [20]:
test = nltk.corpus.brown.tagged_sents(tagset='universal')[10150:10153]
obsList = [[tup[0] for tup in sentence] for sentence in test]
truePos_ = [sublist[i][1] for sublist in test for i in range(len(sublist))]

In [21]:
predictedPos_ = []
A = transitionMatrix
B = observationMatrix
phi = initStatesProb

for sentence in obsList:
    obs, predictedPos  = ViterbiAlgorithm(sentence, A, B, phi, dictWords, dictPos)
    predictedPos_.append(predictedPos)

predictedPos_ = [pos for sentence in predictedPos_ for pos in sentence]

truePred = [*zip(truePos_, predictedPos_)]

In [22]:
truePositive = []
for i in range(len(truePos_)):
    truePositive.append(truePos_[i]==predictedPos_[i])
    
accuracy = sum(truePositive)/len(truePositive)
print('Accuracy: %s' %accuracy)

Accuracy: 0.9574468085106383
