In [26]:
#Viterbi Algorithm
import numpy as np

def viterbi(states,prob_dict,initial_probs,_obs):
  '''
  Parameters:
  1. states - list of hidden states ['Hot','Cold','Mild']
  2. prob_dict - tuple that contains the dictionaries transition_probs and emission probs
     example of transition_probs:
     transition_probs = {'Hot|Hot':0.6,'Hot|Mild':0.3,'Hot|Cold':0.1,
                         'Mild|Hot':0.4,'Mild|Mild':0.3,'Mild|Cold':0.2,
                         'Cold|Hot':0.1,'Cold|Mild':0.4,'Cold|Cold':0.5}

     example of emission_probs:
     emission_probs =   {'CasualWear|Hot':0.8,'CasualWear|Mild':0.19,'CasualWear|Cold':0.01,
                         'SemiCasualWear|Hot':0.5,'SemiCasualWear|Mild':0.4,'SemiCasualWear|Cold':0.1,
                         'ApparelWear|Hot':0.01,'ApparelWear|Mild':0.2,'ApparelWear|Cold':0.79}

  3. _obs - list of a sequence of observable states
     example of _obs = ['ApparelWear','CasualWear', 'CasualWear', 'SemiCasualWear']

  Returns:
  1. cache - required for backtracking to achieve the best hidden state sequence
  2. viterbi_list - the index of the max value of this list is required to initiate backward pass of the Viterbi Algorithm
  '''
  # Generate list of sequences
  n = len(states)
  transition_probs = prob_dict[0]
  emission_probs = prob_dict[1]
  viterbi_list = []
  cache = {}

  for state in states:
    viterbi_list.append(initial_probs[state]*emission_probs[_obs[0]+"|"+state])

  for i,ob in enumerate(_obs):
    if i==0: continue
    temp_list = [None]*n
    for j,state in enumerate(states):
      x = -1
      for k,prob in enumerate(viterbi_list):
        val = prob*transition_probs[state+"|"+states[k]]*emission_probs[ob+"|"+state]
        if (x<val):
          x = val
          cache[str(i)+"-"+state] = [states[k],val]
      temp_list[j]= x
    viterbi_list = [x for x in temp_list]


  return cache,viterbi_list

def viterbi_backward(states,cache,viterbi_list):
  '''
  Parameters:

  To be used by passing (states , return values of Viterbi Algorithm) as parameters

  1. cache - dictionary that stores state information of algorithm
     example of cache:
     {'1-Hot': ['Cold', 0.015800000000000005], '1-Mild': ['Cold', 0.02528000000000001], '1-Cold': ['Cold', 0.015800000000000005], '2-Hot': ['Hot', 0.007584000000000002], '2-Mild': ['Mild', 0.0014409600000000007], '2-Cold': ['Mild', 0.00010112000000000005]}

  2. viterbi_list - list of numeric values (one corresponding to each state)

  Returns:
  1. best_sequence - list of predicted hidden states
     example of best_sequence:
     best_sequence = ['Hot','Cold','Cold']...

  2. best_sequence_breakdown - list of probabilities at each stage (used for debugging)
     example of best_sequence_breakdown:
     best_sequence_breakdown = [0.5832000000000002, 0.4199040000000001, 0.3023308800000001]
  '''
  num_states = len(states)
  n = len(cache)//num_states
  best_sequence = []
  best_sequence_breakdown=[]
  x = states[np.argmax(np.asarray(viterbi_list))]
  best_sequence.append(x)

  for i in range(n,0,-1):
    val = cache[str(i)+'-'+x][1]
    x = cache[str(i)+'-'+x][0]
    best_sequence = [x] + best_sequence
    best_sequence_breakdown = [val]+best_sequence_breakdown

  return best_sequence,best_sequence_breakdown


# POS TAGGING

In [27]:
#POS TAGGING
def get_probs_ex1():
  # transition probabilities
  transition_probs = {'H|H':0.6,'H|M':0.3,'H|C':0.1,
                      'M|H':0.4,'M|M':0.3,'M|C':0.2,
                      'C|H':0.1,'C|M':0.4,'C|C':0.5}

  # emission probabilities (CW-Casual Wear , SW-Semi Casual Wear , AW - Apparel)
  emission_probs =   {'CW|H':0.8,'CW|M':0.19,'CW|C':0.01,
                      'SW|H':0.5,'SW|M':0.4,'SW|C':0.1,
                      'AW|H':0.01,'AW|M':0.2,'AW|C':0.79}

  return transition_probs,emission_probs

#@#@#@#@#@#@#@#@#@#@#@#@#@#@#@#@#@#@#@#@#@#@#@#@#@#@#@#@#@#@#@#@#@#@#@#@#@#@#@#@

# hidden states
states1 = ['H','M','C']

# set of observations
obs = ['AW','SW','CW']

# initial state probability distribution (our priors)
initial_probs1 = {'H':0.3,'M':0.3,'C':0.4}

# Generate list of sequences
sequence_length,sequences,best_sequence,sequence_scores = initializeSequences(states1,get_probs_ex1(),initial_probs1,obs)

print("Initial Distributions")
print(initial_probs1)

transition_probs, emission_probs = get_probs_ex1()

print("\nTransition Probabilities")
pretty_print_probs(transition_probs)

print("\nEmission Probabilities")
pretty_print_probs(emission_probs)

print("\nScores")
# Display sequence scores
for i in range(len(sequences)):
    print("Sequence:%10s,Score:%0.4f" % (sequences[i],sequence_scores[i]))

# The best sequence for the given observation (obs) and initial chances of the hidden states (initial_probs)
best_seq_n = np.argmax(sequence_scores)
print("\nBest Sequence:",sequences[best_seq_n],best_sequence)
print()
cache,l = viterbi(states1,get_probs_ex1(),initial_probs1,obs)

print("Path generated by viterbi algorithm by reducing computation...",cache,sep="\n")
print()
print("Finding the best hidden sequence from the path generated by viterbi algorithm...")
print(viterbi_backward(states1,cache,l),np.max(np.asarray(l)))

Initial Distributions
{'H': 0.3, 'M': 0.3, 'C': 0.4}

Transition Probabilities
+--------+-----+-----+-----+
| rows   |   C |   M |   H |
|--------+-----+-----+-----|
| C      | 0.5 | 0.4 | 0.1 |
| M      | 0.2 | 0.3 | 0.4 |
| H      | 0.1 | 0.3 | 0.6 |
+--------+-----+-----+-----+

Emission Probabilities
+--------+------+------+------+
| rows   |    C |    M |    H |
|--------+------+------+------|
| CW     | 0.01 | 0.19 | 0.8  |
| SW     | 0.1  | 0.4  | 0.5  |
| AW     | 0.79 | 0.2  | 0.01 |
+--------+------+------+------+

Scores
Sequence:['H', 'H', 'H'],Score:0.0004
Sequence:['H', 'H', 'M'],Score:0.0001
Sequence:['H', 'H', 'C'],Score:0.0000
Sequence:['H', 'M', 'H'],Score:0.0001
Sequence:['H', 'M', 'M'],Score:0.0000
Sequence:['H', 'M', 'C'],Score:0.0000
Sequence:['H', 'C', 'H'],Score:0.0000
Sequence:['H', 'C', 'M'],Score:0.0000
Sequence:['H', 'C', 'C'],Score:0.0000
Sequence:['M', 'H', 'H'],Score:0.0043
Sequence:['M', 'H', 'M'],Score:0.0007
Sequence:['M', 'H', 'C'],Score:0.0000
Sequen

In the above example, initial probabilities of it being a hot day, cold day or a mild day were equal. But since the observable sequence (`obs`) was `['CW','CW','CW']`, therefore our HMM predicted that the three days would have been `['H', 'H', 'H']`

In [28]:
#Example 2 (Simple Part of speech tagging)
def get_probs_ex2():
  transition_probs = {'Noun|Noun':0.1,'Noun|Verb':0.1,'Noun|Determiner':0.8,
                      'Verb|Noun':0.8,'Verb|Verb':0.1,'Verb|Determiner':0.1,
                      'Determiner|Noun':0.1,'Determiner|Verb':0.8,'Determiner|Determiner':0.1}
  emission_probs = {'Bob|Noun':0.9,'ate|Noun':0.05,'the|Noun':0.05,'fruit|Noun':0.9,\
                    'Bob|Verb':0.05,'ate|Verb':0.9,'the|Verb':0.05,'fruit|Verb':0.05,\
                    'Bob|Determiner':0.05,'ate|Determiner':0.05,'the|Determiner':0.9,'fruit|Determiner':0.05}
  return transition_probs,emission_probs

#@#@#@#@#@#@#@#@#@#@#@#@#@#@#@#@#@#@#@#@#@#@#@#@#@#@#@#@#@#@#@#@#@#@#@#@#@#@#@#@

# generate new sequences
states2 = ['Noun','Verb','Determiner']

initial_probs2 = {'Noun':0.9,'Verb':0.05,'Determiner':0.05}

transition_probs, emission_probs = get_probs_ex2()

print("Initial Distributions")
print(initial_probs2)
print("\nTransition Probabilities")
pretty_print_probs(transition_probs)
print("\nEmission Probabilities")
pretty_print_probs(emission_probs)

Initial Distributions
{'Noun': 0.9, 'Verb': 0.05, 'Determiner': 0.05}

Transition Probabilities
+------------+--------+--------------+--------+
| rows       |   Noun |   Determiner |   Verb |
|------------+--------+--------------+--------|
| Noun       |    0.1 |          0.8 |    0.1 |
| Determiner |    0.1 |          0.1 |    0.8 |
| Verb       |    0.8 |          0.1 |    0.1 |
+------------+--------+--------------+--------+

Emission Probabilities
+--------+--------+--------------+--------+
| rows   |   Noun |   Determiner |   Verb |
|--------+--------+--------------+--------|
| the    |   0.05 |         0.9  |   0.05 |
| Bob    |   0.9  |         0.05 |   0.05 |
| fruit  |   0.9  |         0.05 |   0.05 |
| ate    |   0.05 |         0.05 |   0.9  |
+--------+--------+--------------+--------+


In [29]:
obs = ['Bob','ate','the','fruit']

# print results
print("\nScores")

# Generate list of sequences
sequence_length,sequences,best_sequence,sequence_scores = initializeSequences(states2,get_probs_ex2(),initial_probs2,obs)

# Display sequence scores
# Total number of sequences are 3^4 = 81
for i in range(len(sequences)):
    print("Sequence:%-60s Score:%0.6f" % (sequences[i],sequence_scores[i]))

# Display the winning score
best_seq_n = np.argmax(sequence_scores)
print("\nBest Sequence:",sequences[best_seq_n],best_sequence)
print()
cache,l = viterbi(states2,get_probs_ex2(),initial_probs2,obs)
print("Path generated by viterbi algorithm by reducing computation...",cache,sep="\n")
print()
print("Finding the best hidden sequence from the path generated by viterbi algorithm...")
print(viterbi_backward(states2,cache,l),np.max(np.asarray(l)))


Scores
Sequence:['Noun', 'Noun', 'Noun', 'Noun']                             Score:0.000002
Sequence:['Noun', 'Noun', 'Noun', 'Verb']                             Score:0.000001
Sequence:['Noun', 'Noun', 'Noun', 'Determiner']                       Score:0.000000
Sequence:['Noun', 'Noun', 'Verb', 'Noun']                             Score:0.000015
Sequence:['Noun', 'Noun', 'Verb', 'Verb']                             Score:0.000001
Sequence:['Noun', 'Noun', 'Verb', 'Determiner']                       Score:0.000006
Sequence:['Noun', 'Noun', 'Determiner', 'Noun']                       Score:0.000262
Sequence:['Noun', 'Noun', 'Determiner', 'Verb']                       Score:0.000002
Sequence:['Noun', 'Noun', 'Determiner', 'Determiner']                 Score:0.000002
Sequence:['Noun', 'Verb', 'Noun', 'Noun']                             Score:0.000262
Sequence:['Noun', 'Verb', 'Noun', 'Verb']                             Score:0.000117
Sequence:['Noun', 'Verb', 'Noun', 'Determiner']          

We have correctly tagged the part of speech in



1. Bob - Noun

2. ate - Verb

3. the - Determiner

4. fruit - Noun



In [30]:
#Data for part of speech tagging
import numpy as np
import nltk
nltk.download('treebank')
nltk.download('universal_tagset')

[nltk_data] Downloading package treebank to /root/nltk_data...
[nltk_data]   Package treebank is already up-to-date!
[nltk_data] Downloading package universal_tagset to /root/nltk_data...
[nltk_data]   Package universal_tagset is already up-to-date!


True

In [31]:
from nltk.corpus import treebank
sentences = treebank.tagged_sents(tagset='universal')

![alt text](https://drive.google.com/uc?id=1fJQAmAvb9-17J6gcdvUJHJGyy6Y-hctd)

In [32]:
len(sentences)

3914

In [33]:
tagged_sentence = nltk.corpus.treebank.tagged_sents(tagset='universal')
print("Number of Tagged Sentences ",len(tagged_sentence))
tagged_words=[tup for sent in tagged_sentence for tup in sent]
print("Total Number of Tagged words", len(tagged_words))
vocab=set([word for word,tag in tagged_words])
print("Vocabulary of the Corpus",len(vocab))
tags=set([tag for word,tag in tagged_words])
print("Number of Tags in the Corpus ",len(tags))

Number of Tagged Sentences  3914
Total Number of Tagged words 100676
Vocabulary of the Corpus 12408
Number of Tags in the Corpus  12


In [34]:
print(tags)

{'X', 'NUM', 'VERB', 'ADV', 'ADJ', 'NOUN', 'CONJ', 'PRT', 'PRON', 'DET', 'ADP', '.'}


`transition_probs` is a dictionary that tells the probability of getting a particular tag as the next POS tag given that which tag had occured previously.

Hence using `transition_probs` we get:
> `P['next state'|'previous state']`



In [35]:
#Generating Dictionary for our hidden states (Part of speech tags) - transition_probs
import numpy as np
states = list(tags)
print(states)

['X', 'NUM', 'VERB', 'ADV', 'ADJ', 'NOUN', 'CONJ', 'PRT', 'PRON', 'DET', 'ADP', '.']


In [36]:
# Generating the martix for calculating transition_probs for the dataset

tr_matrix = np.zeros((len(states),len(states)))

for sentence in tagged_sentence:
  for i in range(len(sentence)):
    if i==0: continue
    tr_matrix[states.index(sentence[i-1][1])][states.index(sentence[i][1])]+=1

print(tr_matrix.astype(int))

[[ 494   18 1354  170  112  410   68 1221  367  361  955 1082]
 [ 746  656   64   10  118 1252   48   96    5   11  124  414]
 [2954  310 2292 1110  884 1497   73  426  482 1822 1239  475]
 [  73  100 1093  252  411  100   22   45   48  218  376  432]
 [ 134  133   77   30  425 4474  108   69    4   31  497  415]
 [ 837  273 4240  490  350 7623 1229 1267  135  377 5100 6926]
 [  19   94  355  124  266  792    1   11  133  270  120   80]
 [  43  183 1291   32  273  796    7    6   58  326   66  138]
 [ 254   20 1329   93  200  573   14   34   21   26   62  111]
 [ 398  193  346  110 1788 5569    4    2   31   48   81  154]
 [ 342  618   82  133 1050 3173    8   14  679 3194  167  392]
 [ 229  915 1003  412  359 1467  482   23  486 1114  565  776]]


In [37]:
tr_matrix /= np.sum(tr_matrix,keepdims=True,axis=1)
print(tr_matrix)

[[7.47126437e-02 2.72232305e-03 2.04779189e-01 2.57108288e-02
  1.69388990e-02 6.20084694e-02 1.02843315e-02 1.84664247e-01
  5.55051422e-02 5.45977011e-02 1.44434362e-01 1.63641863e-01]
 [2.10496614e-01 1.85101580e-01 1.80586907e-02 2.82167043e-03
  3.32957111e-02 3.53273138e-01 1.35440181e-02 2.70880361e-02
  1.41083521e-03 3.10383747e-03 3.49887133e-02 1.16817156e-01]
 [2.17782365e-01 2.28546152e-02 1.68976703e-01 8.18342672e-02
  6.51725155e-02 1.10365674e-01 5.38189325e-03 3.14066647e-02
  3.55352403e-02 1.34326157e-01 9.13447361e-02 3.50191684e-02]
 [2.30283912e-02 3.15457413e-02 3.44794953e-01 7.94952681e-02
  1.29652997e-01 3.15457413e-02 6.94006309e-03 1.41955836e-02
  1.51419558e-02 6.87697161e-02 1.18611987e-01 1.36277603e-01]
 [2.09473191e-02 2.07909958e-02 1.20368923e-02 4.68969830e-03
  6.64373925e-02 6.99390339e-01 1.68829139e-02 1.07863061e-02
  6.25293106e-04 4.84602157e-03 7.76926684e-02 6.48741598e-02]
 [2.90151489e-02 9.46372240e-03 1.46982355e-01 1.69861684e-02
  1

In [38]:
#Hence we get our transition matrix. Now we need to create the dictionary transition_probs

def get_transition_probs(states,tr_matrix):
  state_dict = {}
  for i,state in enumerate(states):
    state_dict[i]=state
  transition_probs = {}
  for i in range(tr_matrix.shape[0]):
    for j in range(tr_matrix.shape[1]):
      transition_probs[state_dict[j]+'|'+state_dict[i]] = tr_matrix[i][j]
  return transition_probs

In [39]:
def get_tr_matrix(states,tagged_sentence):
  tr_matrix = np.zeros((len(states),len(states)))
  for sentence in tagged_sentence:
    for i in range(len(sentence)):
      if i==0: continue
      tr_matrix[states.index(sentence[i-1][1])][states.index(sentence[i][1])]+=1
  tr_matrix /= np.sum(tr_matrix,keepdims=True,axis=1)
  return tr_matrix

In [40]:
#Generating Dictionary for our observable states (words) - emission_probs
vocab = sorted(list(vocab))
em_matrix = np.zeros((len(vocab),len(states)))
print(em_matrix.shape)

for sentence in tagged_sentence:
  for word,tag in sentence:
    em_matrix[vocab.index(word)][states.index(tag)]+=1

print(em_matrix.astype(int))

(12408, 12)
[[  0   0   0 ...   0   0   6]
 [  0   0   0 ...   0   0  16]
 [  0   0   0 ...   0   0 718]
 ...
 [  0   0   1 ...   0   0   0]
 [  0   0   0 ...   0   0   0]
 [  0   0   1 ...   0   0   0]]


In [41]:
sum_matrix = np.sum(em_matrix,keepdims=True,axis=0)
em_matrix = np.divide(em_matrix, sum_matrix, out=np.zeros_like(em_matrix), where=sum_matrix!=0)
print(em_matrix)

[[0.00000000e+00 0.00000000e+00 0.00000000e+00 ... 0.00000000e+00
  0.00000000e+00 5.12163892e-04]
 [0.00000000e+00 0.00000000e+00 0.00000000e+00 ... 0.00000000e+00
  0.00000000e+00 1.36577038e-03]
 [0.00000000e+00 0.00000000e+00 0.00000000e+00 ... 0.00000000e+00
  0.00000000e+00 6.12889458e-02]
 ...
 [0.00000000e+00 0.00000000e+00 7.37245650e-05 ... 0.00000000e+00
  0.00000000e+00 0.00000000e+00]
 [0.00000000e+00 0.00000000e+00 0.00000000e+00 ... 0.00000000e+00
  0.00000000e+00 0.00000000e+00]
 [0.00000000e+00 0.00000000e+00 7.37245650e-05 ... 0.00000000e+00
  0.00000000e+00 0.00000000e+00]]


In [42]:
#Hence we get our emission matrix. Now we need to create the dictionary transition_probs
def get_emission_probs(states,vocab,em_matrix):
  state_dict = {}
  for i,state in enumerate(states):
    state_dict[i]=state
  emission_probs = {}
  for i in range(em_matrix.shape[0]):
    for j in range(em_matrix.shape[1]):
      emission_probs[vocab[i]+'|'+state_dict[j]] = em_matrix[i][j]
  return emission_probs

In [43]:
def get_em_matrix(states,vocab,tagged_sentence):
  vocab = sorted(list(vocab))
  em_matrix = np.zeros((len(vocab),len(states)))
  for sentence in tagged_sentence:
    for word,tag in sentence:
      em_matrix[vocab.index(word)][states.index(tag)]+=1
  sum_matrix = np.sum(em_matrix,keepdims=True,axis=0)
  em_matrix = np.divide(em_matrix, sum_matrix, out=np.zeros_like(em_matrix), where=sum_matrix!=0)
  return em_matrix

In [44]:
def get_probs(states,vocab,tagged_sentence):
  tr_matrix = get_tr_matrix(states,tagged_sentence)
  em_matrix = get_em_matrix(states,vocab,tagged_sentence)
  transition_probs = get_transition_probs(states,tr_matrix)
  emission_probs = get_emission_probs(states,vocab,em_matrix)
  return transition_probs,emission_probs

In [45]:
def get_initial_probs(states,tagged_sentence):
  initial_list = [0]*len(states)
  for sent in tagged_sentence:
    tag = sent[0][1]
    initial_list[states.index(tag)]+=1
  initial_list = np.asarray(initial_list)
  initial_list = initial_list/np.sum(initial_list,keepdims=True)
  initial_probs = {}
  for i,state in enumerate(states):
    initial_probs[state] = initial_list[i]
  return initial_probs

In [46]:
def get_observation(sent):
  ob = []
  ob_tags = []
  for word,tag in sent:
    ob.append(word)
    ob_tags.append(tag)
  return ob,ob_tags

In [47]:
initial_probs = get_initial_probs(states,tagged_sentence)

probs = get_probs(states,vocab,tagged_sentence)

In [48]:
#Testing our Hidden Markov Model on the dataset
import random
ob = random.choice(tagged_sentence)
obs,ob_tags = get_observation(ob)

cache,l = viterbi(states,probs,initial_probs,obs)
print("Finding the best hidden sequence from the path generated by viterbi algorithm...")
print()
best_seq,_ = viterbi_backward(states,cache,l)
print("TAGS ACHIEVED USING HIDDEN MARKOV MODEL-",best_seq,sep="\n")
print()
print("ACTUAL TAGS-",ob_tags,sep="\n")

Finding the best hidden sequence from the path generated by viterbi algorithm...

TAGS ACHIEVED USING HIDDEN MARKOV MODEL-
['PRON', 'VERB', 'VERB', 'X', 'PRT', 'NOUN', 'PRT', 'NOUN', 'NOUN', '.', 'NOUN', 'CONJ', 'NOUN', 'NOUN', 'NOUN', '.']

ACTUAL TAGS-
['PRON', 'VERB', 'VERB', 'X', 'PRT', 'VERB', 'PRT', 'NOUN', 'NOUN', '.', 'NOUN', 'CONJ', 'NOUN', 'NOUN', 'NOUN', '.']
