<a href="https://colab.research.google.com/github/I51GMA/mlproject/blob/main/HMM_POS_Tagging.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [29]:
import nltk
from nltk.corpus import brown
import pandas as pd
import numpy as np
import os
from numba import njit
nltk.download('brown')

[nltk_data] Downloading package brown to /root/nltk_data...
[nltk_data]   Package brown is already up-to-date!


True

In [30]:
# Load word and tag data from Brown corpus
word_and_tag = list(brown.tagged_words())

# Extract unique words and POS tags
words = list(set([word for word, _ in word_and_tag]))
pos_tags = list(set([tag for _, tag in word_and_tag]))

# Mapping words and POS tags to indices
word_to_index = {word: idx for idx, word in enumerate(words)}
tag_to_index = {tag: idx for idx, tag in enumerate(pos_tags)}

# Convert word-tag pairs into their corresponding indices before passing to JIT function
indexed_word_tag_pairs = [
    (word_to_index[word], tag_to_index[tag])
    for word, tag in word_and_tag
]

@njit  # Use Numba to optimize the computation
def make_sparse_matrix(indexed_word_tag_pairs, num_words, num_tags):
    sparse_matrix = np.zeros((num_words, num_tags))
    for word_idx, tag_idx in indexed_word_tag_pairs:
        sparse_matrix[word_idx, tag_idx] += 1
    return sparse_matrix

In [31]:
bnm = True

sparse_matrix_path = './sparse_matrix.csv'

# If the CSV exists or we need to reconstruct the matrix
if not os.path.exists(sparse_matrix_path) or bnm:
    sparse_matrix = make_sparse_matrix(indexed_word_tag_pairs, len(words), len(pos_tags))

    df = pd.DataFrame(sparse_matrix, columns=pos_tags, dtype=np.int32)
    df.insert(0, 'Words', words)
    df.to_csv(sparse_matrix_path, index=False)
else:
    df = pd.read_csv(sparse_matrix_path)

In [32]:
def make_transition_matrix_and_initial_probs(tagged_sentences, tag_to_index):
    num_tags = len(tag_to_index)

    # Initializing the transition matrix and initial probabilities array
    transition_matrix = np.zeros((num_tags, num_tags), dtype=np.float64)
    initial_probs = np.zeros(num_tags, dtype=np.float64)

    for sentence in tagged_sentences:
        # Get the index of the first tag in the sentence for initial probabilities
        initial_probs[tag_to_index[sentence[0][1]]] += 1

        for n in range(len(sentence) - 1):
            tag_i = sentence[n][1]
            tag_j = sentence[n + 1][1]

            transition_matrix[tag_to_index[tag_i], tag_to_index[tag_j]] += 1

    # Normalize the transition matrix row-wise, handling cases where the row sums to 0
    row_sums = np.sum(transition_matrix, axis=1, keepdims=True)
    row_sums[row_sums == 0] = 1  # Avoid division by zero for tags with no transitions
    transition_matrix /= row_sums

    # Normalize initial probabilities, handling zero-sum case
    if np.sum(initial_probs) > 0:
        initial_probs /= np.sum(initial_probs)

    return transition_matrix, initial_probs

In [33]:
# Generating transition matrix and initial probabilities array
A, Pi = make_transition_matrix_and_initial_probs(brown.tagged_sents(categories='news'), tag_to_index)

In [34]:
# Extracting the emission probabilities
B = df.iloc[:, 1:].values.astype(np.float64).T
B /= np.sum(B, axis=1, keepdims=True)

In [35]:
import numpy as np
from numba import jit

@jit(nopython=True)
def viterbi_algorithm_log(state_transition_matrix, initial_probs, emission_matrix, observations):
    """A unique Viterbi algorithm for finding the most likely sequence of hidden states using log probabilities.

    Args:
        state_transition_matrix (np.ndarray): State transition probability matrix (I x I)
        initial_probs (np.ndarray): Initial state distribution (I)
        emission_matrix (np.ndarray): Emission matrix (I x K)
        observations (np.ndarray): Observation sequence (length N)

    Returns:
        optimal_states (np.ndarray): Optimal state sequence (length N)
        log_probability_matrix (np.ndarray): Log probability matrix
        backtrack_matrix (np.ndarray): Backtrack matrix for finding the optimal path
    """
    num_states = state_transition_matrix.shape[0]  # I: Number of states
    sequence_length = len(observations)            # N: Length of observation sequence

    # Tiny value to prevent log(0) errors
    tiny_value = np.finfo(np.float64).tiny

    # Take logarithms of all probabilities (handling zero probabilities)
    log_transition_matrix = np.log(state_transition_matrix + tiny_value)
    log_initial_probs = np.log(initial_probs + tiny_value)
    log_emission_matrix = np.log(emission_matrix + tiny_value)

    # Initialize log probability and backtrack matrices
    log_probability_matrix = np.zeros((num_states, sequence_length), dtype=np.float64)
    backtrack_matrix = np.zeros((num_states, sequence_length - 1), dtype=np.int32)

    # Initialize first column of the log probability matrix
    log_probability_matrix[:, 0] = log_initial_probs + log_emission_matrix[:, observations[0]]

    # Dynamic programming: fill in the log probability and backtrack matrices
    for t in range(1, sequence_length):
        for current_state in range(num_states):
            log_transition_values = log_transition_matrix[:, current_state] + log_probability_matrix[:, t - 1]
            log_probability_matrix[current_state, t] = np.max(log_transition_values) + log_emission_matrix[current_state, observations[t]]
            backtrack_matrix[current_state, t - 1] = np.argmax(log_transition_values)

    # Backtracking to find the optimal state sequence
    optimal_states = np.zeros(sequence_length, dtype=np.int32)
    optimal_states[-1] = np.argmax(log_probability_matrix[:, -1])

    for t in range(sequence_length - 2, -1, -1):
        optimal_states[t] = backtrack_matrix[optimal_states[t + 1], t]

    return optimal_states, log_probability_matrix, backtrack_matrix

In [36]:
r = 203
test_sent = brown.sents(categories='news')[r]

O = []
correct_tag_seq = []
test_tagged = brown.tagged_sents(categories='news')[r]

for word, tag in test_tagged:
    if word in word_to_index:
        O.append(word_to_index[word])
        if tag in tag_to_index:
            correct_tag_seq.append(tag_to_index[tag])

O = np.array(O, dtype=np.int32)

# Performing Viterbi decoding using the Viterbi function
opt_state_seq, log_prob_trellis, backtrack_matrix = viterbi_algorithm_log(A, Pi, B, O)

print('Observation sequence:   O  = ', O)
print('Optimal state sequence: S  = ', opt_state_seq)
print('Correct state sequence: S* = ', correct_tag_seq)
print("Do they match: ", correct_tag_seq == list(opt_state_seq))

guessed_tags = [pos_tags[i] for i in opt_state_seq]
test_tagged_words = [word for word, _ in test_tagged if word in word_to_index]
test_result_df = pd.DataFrame(index=test_tagged_words, columns=['Correct', 'Guessed'], data=zip(test_tagged, guessed_tags)).T

print('The sentence: ', test_sent)
print(test_result_df.iloc[:, (test_result_df.nunique() != 1).values])

Observation sequence:   O  =  [17724 15056 34685 22511 10822   191  3607  3463 15922 42426  4217 16795
 14637 11131   636 52778 32302 53709 24256 21729  5357]
Optimal state sequence: S  =  [359 190 309 325 338 402 309  71 429 439 424 402 345  89 345 157 456  61
 345 300  13]
Correct state sequence: S* =  [359, 190, 309, 325, 338, 402, 309, 71, 429, 439, 424, 402, 345, 115, 345, 157, 456, 61, 345, 300, 13]
Do they match:  False
The sentence:  ['It', 'is', 'impossible', 'to', 'get', 'a', 'fair', 'trial', 'when', 'some', 'of', 'the', 'defendants', 'made', 'statements', 'involving', 'themselves', 'and', 'others', "''", '.']
                It         is        impossible        to        get        a  \
Correct  (It, PPS)  (is, BEZ)  (impossible, JJ)  (to, TO)  (get, VB)  (a, AT)   
Guessed        PPS        BEZ                JJ        TO         VB       AT   

               fair        trial         when         some  ...        the  \
Correct  (fair, JJ)  (trial, NN)  (when, WRB)  (so