In [216]:
import numpy as np
from hmmlearn import hmm
from matplotlib import cm, pyplot as plt
%matplotlib inline
import matplotlib
import pandas as pd
from pandas import DataFrame, Series

USING HIDDEN MARKOV MODEL FOR PART-OF SPEECH TAGGING.

In [218]:
import nltk
import numpy as np

# Download necessary NLTK data (if you don't have it already)
nltk.download('treebank')
nltk.download('universal_tagset')

# 1. Data Preparation (using NLTK's Penn Treebank)
tagged_sentences = nltk.corpus.treebank.tagged_sents(tagset='universal')

# Split into training and testing sets
train_data = tagged_sentences[:int(len(tagged_sentences) * 0.8)]
test_data = tagged_sentences[int(len(tagged_sentences) * 0.8):]

# Create vocabulary and tag set
word_set = set()
tag_set = set()
for sent in train_data:
    for word, tag in sent:
        word_set.add(word.lower())
        tag_set.add(tag)

word_to_idx = {word: i for i, word in enumerate(word_set)}
tag_to_idx = {tag: i for i, tag in enumerate(tag_set)}
idx_to_tag = {i: tag for i, tag in enumerate(tag_set)}

# 2. Parameter Estimation (Training)

# Initialize transition, emission, and initial probability matrices
transition_probs = np.zeros((len(tag_set), len(tag_set)))
emission_probs = np.zeros((len(tag_set), len(word_set)))
initial_probs = np.zeros(len(tag_set))

# Count occurrences for transition probabilities
for sent in train_data:
    for i in range(len(sent) - 1):
        current_tag_idx = tag_to_idx[sent[i][1]]
        next_tag_idx = tag_to_idx[sent[i+1][1]]
        transition_probs[current_tag_idx, next_tag_idx] += 1

# Count occurrences for emission probabilities
for sent in train_data:
    for word, tag in sent:
        word_idx = word_to_idx[word.lower()]
        tag_idx = tag_to_idx[tag]
        emission_probs[tag_idx, word_idx] += 1

# Count occurrences for initial probabilities
for sent in train_data:
    initial_tag_idx = tag_to_idx[sent[0][1]]
    initial_probs[initial_tag_idx] += 1

# Add-one (Laplace) smoothing and normalization
transition_probs += 1
emission_probs += 1
initial_probs += 1

transition_probs /= transition_probs.sum(axis=1, keepdims=True)
emission_probs /= emission_probs.sum(axis=1, keepdims=True)
initial_probs /= initial_probs.sum()

# 3. Decoding (Inference) - Viterbi Algorithm

def viterbi(obs, transition_probs, emission_probs, initial_probs):
    num_states = transition_probs.shape[0]
    num_obs = len(obs)

    # Initialize the Viterbi matrix and backpointer matrix
    viterbi_matrix = np.zeros((num_states, num_obs))
    backpointer = np.zeros((num_states, num_obs), dtype=int)

    # Initialization step
    first_word_idx = word_to_idx.get(obs[0].lower(), 0)  # Handle unknown words
    viterbi_matrix[:, 0] = initial_probs * emission_probs[:, first_word_idx]

    # Recursion step
    for t in range(1, num_obs):
        word_idx = word_to_idx.get(obs[t].lower(), 0) # Handle unknown words
        for s in range(num_states):
            viterbi_scores = viterbi_matrix[:, t-1] * transition_probs[:, s] * emission_probs[s, word_idx]
            best_prev_state = np.argmax(viterbi_scores)
            viterbi_matrix[s, t] = viterbi_scores[best_prev_state]
            backpointer[s, t] = best_prev_state

    # Termination step
    best_final_state = np.argmax(viterbi_matrix[:, num_obs - 1])
    best_path = [best_final_state]

    # Backtrack to find the best path
    for t in range(num_obs - 1, 0, -1):
        best_final_state = backpointer[best_final_state, t]
        best_path.insert(0, best_final_state)

    return [idx_to_tag[state_idx] for state_idx in best_path]

# Example usage on a test sentence
test_sentence = "the quick brown fox jumps over the lazy dog.".split()
predicted_tags = viterbi(test_sentence, transition_probs, emission_probs, initial_probs)
# predicted_tags = viterbi(test_sentence,states, transition_probs, emission_probs, initial_probs)

# obs_seq, states, start_prob, trans_mat, emit_mat
print(f"Test Sentence: {test_sentence}")
print(f"Predicted Tags: {predicted_tags}")

# Evaluation (very basic - just accuracy on the test set)
correct = 0
total = 0
for sent in test_data:
    words = [word.lower() for word, _ in sent]
    actual_tags = [tag for _, tag in sent]
    predicted_tags = viterbi(words, transition_probs, emission_probs, initial_probs)
    # predicted_tags = viterbi(words,states, transition_probs, emission_probs, initial_probs)

    for p, a in zip(predicted_tags, actual_tags):
        if p == a:
            correct += 1
        total += 1

accuracy = correct / total
print(f"Accuracy: {accuracy * 100:.2f}%")

[nltk_data] Downloading package treebank to
[nltk_data]     /Users/hrantbaloyan/nltk_data...
[nltk_data]   Package treebank is already up-to-date!
[nltk_data] Downloading package universal_tagset to
[nltk_data]     /Users/hrantbaloyan/nltk_data...
[nltk_data]   Package universal_tagset is already up-to-date!


Test Sentence: ['the', 'quick', 'brown', 'fox', 'jumps', 'over', 'the', 'lazy', 'dog.']
Predicted Tags: ['DET', 'ADJ', 'NOUN', 'ADP', 'NOUN', 'ADP', 'DET', 'ADJ', 'NOUN']
Accuracy: 88.73%


Code for Forward algorithm


In [220]:
def forward(observations, trans_prob, emit_prob, initial_probs):
    n_states = trans_prob.shape[0]
    print('n_state is ', n_states)
    n_obs = len(observations)
    forward = np.zeros((n_states, n_obs))  # Matrix to store forward probabilities


    # Initialization step
    first_word_idx = word_to_idx.get(observations[0].lower(), 0)
    print('first word idx is ', first_word_idx)
    for state in range(n_states):
        forward[state, 0] = (
          initial_probs[state] * emit_prob[state, first_word_idx]
        )

    # Recursion step
    for t in range(1, n_obs):
        word_idx = word_to_idx.get(observations[t].lower(), 0)
        for state in range(n_states):
            forward[state, t] = sum(
                forward[prev_state, t - 1]
                * trans_prob[prev_state, state]
                * emit_prob[state, word_idx]
                for prev_state in range(n_states)
            )

    probability_of_observation = sum(forward[i, n_obs-1] for i in range(n_states))

    return probability_of_observation
