In [120]:
import opendatasets as od
import numpy as np
import re
import pandas as pd

# simple example of Viterbi Algorithm

import numpy as np

obs = {
    "normal": 0, 
    "cold": 1, 
    "dizzy": 2
}

states = {
    "Healthy": 0,
    "Fever": 1
}

start_p = [0.6, 0.4]

trans_p = np.array([[0.7, 0.3], [0.4, 0.6]])

emit_p = np.array([[0.5, 0.4, 0.1], [0.1, 0.3, 0.6]])

inputObs = ['normal', 'normal', 'dizzy']

res = viterbi(states, obs, start_p, trans_p, emit_p, inputObs)

print(res)
##### [0, 0, 1,]


In [3]:
od.download('https://www.kaggle.com/datasets/yingxuhe/pos-tagging?select=sents.answer')

Please provide your Kaggle credentials to download this dataset. Learn more: http://bit.ly/kaggle-creds
Your Kaggle username: vasanth12
Your Kaggle Key: ········
Downloading pos-tagging.zip to .\pos-tagging


100%|███████████████████████████████████████████████████████████████████████████████████████████| 2.50M/2.50M [00:03<00:00, 724kB/s]





In [6]:
# import POS tagging dataset
data = open('pos-tagging/sents.train', encoding='utf-8')

In [121]:
def findStationaryDistrb(transitionProb, stationaryDist):
    
    for i in range(100):
        stationaryDist = np.dot(transitionProb, stationaryDist)
    return np.transpose(stationaryDist)
    

In [122]:
def findEmissionAndTransitionProb(
    sentences, tags, states, observables, tagCount, wordCount, transmissionDist, transmissionProb, emissionDist, emissionProb
):
    # Distribution matrices   
    for i in range(len(sentences)): 
        _ptag = 0
        for j in range(len(sentence[i])):
            _token = sentences[i][j]
            _tag = tags[i][j]
            
            emissionDist[_tag][_token] += 1
            transmissionDist[_ptag][_tag] += 1
            
            _ptag = _tag
            
    # Emission Probability martices
    for i in range(tagCount):
        _keys = list(states.keys())
        _idx = list(filter(lambda x:states[x]["index"] == i, _keys))[0]
        _count = states[_idx]["count"] 
        
        for j in range(tagCount):
            transmissionProb[j][i] = transmissionDist[j][i] / _count
            
    # Transmission Probability martices
    for i in range(wordCount):
        _keys = list(observables.keys())
        _idx = list(filter(lambda x:observables[x]["index"] == i, _keys))[0]
        _count = observables[_idx]["count"]
        
        for j in range(tagCount):
            emissionProb[j][i] = emissionDist[j][i] / _count 
            
            

In [123]:
def featureExtraction(filename, states, observables):    
    _sentences = []
    _tags = []
    
    tagCount, wordCount = 0, 0
    with open(filename) as data:
        sentences = list(data)
        
        for sentence in sentences:
            sentence = sentence.lower()
            s = []
            t = []
            
            for token in sentence.split(' '):
                wAt = re.sub(r'\n', '', token)
                wAt = wAt.split('/')
                word = wAt[0]
                if observables.get(word) == None:
                    observables[word] = {
                        "index": wordCount,
                        "count": 1
                    }
                    wordCount += 1
                else:
                    observables[word]["count"] += 1
                s.append(observables[word]["index"])
                
                tag = wAt[-1]
                if states.get(tag) == None:
                    states[tag] = {
                        "index": tagCount,
                        "count": 1
                    }
                    tagCount += 1
                else:
                    states[tag]["count"] += 1
                t.append(states[tag]["index"])
            _sentences.append(s)
            _tags.append(t)
                    
    return _sentences, _tags                

In [124]:
# Declaration of variables
states = {}
observables = {}

# Extract features from data
sentences, tags = featureExtraction('./pos-tagging/sents.train', states, observables)
tagCount = len(states.keys())
wordCount = len(observables.keys())

# Probabilities Distribution
transmissionDist = np.zeros((tagCount, tagCount))
transmissionProb = np.zeros((tagCount, tagCount))
emissionDist = np.zeros((tagCount, wordCount))
emissionProb = np.zeros((tagCount, wordCount))

findEmissionAndTransitionProb(
    sentences, tags, states, observables, tagCount, wordCount, transmissionDist, transmissionProb, emissionDist, emissionProb
)
stationaryDist = transmissionProb

# Stationary Probability distribution
stationaryDist = findStationaryDistrb(transmissionProb, stationaryDist)

# Prediction
res = viterbi(states, observables, stationaryDist[0], transmissionProb, emissionProb, 'Rolls-Royce Motor Cars Inc. said it expects its U.S. sales to remain steady at about 1,200 cars in 1990 .')
print(res)


[ 1.  4. 19. 24.]


In [119]:
def viterbi(states, observables, stationaryTrans, transitionProbability, emissionProbability, inputSent):
    
    N = len(states.keys())
    M = len(inputSent)
    
    t1 = np.zeros((N,M+1))
    t2 = np.zeros((N,M+1))
    
    #y = lambda word:observables.get(word) in inputSent
    y = []
    for obs in inputSent.split(' '):    
        if observables.get(obs) != None:
            y.append(observables[obs]["index"])
    
    for i in range(N):
        t1[i][0] = stationaryTrans[i] * emissionProbability[i][y[0]]

    x = -1
    for j in range(1, M+1):
        for i in range(N):
            _max = -1000
            argmax = -1
            for k in range(N):
                try:
                    abc = t1[k][j-1] * transitionProbability[k][i] * emissionProbability[i][y[j]]
                    if _max < abc:
                        _max = abc
                        argmax = k
                except:
                    abc = t1[k][j-1]
                    if _max < abc:
                        _max = abc
                        argmax = k
            t1[i][j] = _max 
            t2[i][j] = argmax
            x = argmax
            
    _inputLength = len(y)
    res = np.zeros(_inputLength)
    for i in range(_inputLength, 0, -1):
        res[i-1] = t2[x][i]
        x = int(res[i-1])
        
    return res
        

In [None]:
Rolls-Royce/NNP Motor/NNP Cars/NNPS Inc./NNP said/VBD it/PRP expects/VBZ its/PRP$ U.S./NNP sales/NNS to/TO remain/VB steady/JJ at/IN about/IN 1,200/CD cars/NNS in/IN 1990/CD ./.

In [126]:
idxToTag = {}
for _tag in states.keys():
    _idx = states[_tag]['index']
    idxToTag[_idx] = _tag 

In [128]:
def convertIndextoTAG(res):
    tagRes = ''
    for idx in res:
        tagRes += idxToTag[idx] + ' '
    return tagRes

In [132]:
res = viterbi(states, observables, stationaryDist[0], transmissionProb, emissionProb, 'rolls-royce motor cars inc. said it expects its u.s. sales to remain steady at about 1,200 cars in 1990 .')
print(res)
print(convertIndextoTAG(res))

[ 2.  2. 10.  2. 15. 21. 19. 22.  2. 10. 17. 23. 24.  0.  0.  3. 10.  0.
  3. 18.]
nnp nnp nns nnp vbd prp vbz prp$ nnp nns to vb jj in in cd nns in cd . 


In [133]:
'''
??? What's next ???
    * Need to handle OOV
    * Validation
'''

"\n??? What's next ???\n    * Need to handle OOV\n    * Validation\n"