In [1]:
import re
import string
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
import plotly.express as px
from collections import defaultdict, Counter
from unidecode import unidecode
from typing import List, Tuple, Dict, Union
import log
import mynlputils as nu

In [2]:
logger = log.get_logger(__name__)

In [3]:
def load_data(raw_txt_train_path: str, raw_txt_test_path: str) -> Tuple[pd.DataFrame, pd.DataFrame]:
    df_train = pd.read_excel(raw_txt_train_path)
    df_test = pd.read_excel(raw_txt_test_path)
    logger.info(f"df_train.shape: {df_train.shape}")
    logger.info(f"df_train unique tokens: {df_train['Token'].nunique()}")
    logger.info(f"df_train unique POS: {df_train['POS'].nunique()}")
    logger.info(f"df_test.shape: {df_test.shape}")
    logger.info(f"df_test unique tokens: {df_test['Token'].nunique()}")
    logger.info(f"df_test unique POS: {df_test['POS'].nunique()}")
    return df_train, df_test


def remove_punctuation(data: pd.DataFrame):
    """
    Removes rows with 'PUNCT' in the 'POS' column from the dataset.

    Args:
    data (pd.DataFrame): DataFrame containing the tokenized isiZulu data with 'Token' and 'POS' columns.

    Returns:
    pd.DataFrame: DataFrame with rows containing 'PUNCT' removed.
    """
    return data[data['POS'] != 'PUNCT']


def split_sentences(data: pd.DataFrame):
    """
    Splits the dataset into sentences based on rows with NaN values and add start and end tokens.

    Args:
    data (pd.DataFrame): DataFrame containing the tokenized isiZulu data with 'Token' and 'POS' columns.

    Returns:
    list: List of sentences, where each sentence is a list of tuples (Token, POS).
    """
    sentences = []
    sentence = []
    for _, row in data.iterrows():
        if pd.isnull(row['Token']) and pd.isnull(row['POS']):
            if sentence:
                sentence = [('<s>', 'START')] + sentence + [('</s>', 'STOP')]
                sentences.append(sentence)
                sentence = []
        else:
            sentence.append((row['Token'], row['POS']))
    if sentence:
        sentence = [('<s>', 'START')] + sentence + [('</s>', 'STOP')]
        sentences.append(sentence)
    return sentences


def compute_transition_probabilities(sentences: List[Tuple[str, str]], smoothing: float) -> dict:
    """
    Computes transition probabilities for the Hidden Markov Model (HMM) based on the given sentences.

    Args:
    sentences (List[Tuple[str, str]]): List of sentences where each sentence is a tuple of (token, POS).
    smoothing (float): Smoothing parameter for Laplace smoothing. Default is 0.0 for no smoothing.

    Returns:
    dict: Transition probabilities for the HMM.
    """
    transition_counts = {}
    transition_probabilities = {}

    for sentence in sentences:
        for i in range(1, len(sentence) - 1):  # we start from 1 and end at len(sentence) - 1 to exclude <s> and </s>
            current_token, current_pos = sentence[i]
            next_token, next_pos = sentence[i + 1]

            if current_pos not in transition_counts:
                transition_counts[current_pos] = {}

            if next_pos not in transition_counts[current_pos]:
                transition_counts[current_pos][next_pos] = smoothing

            transition_counts[current_pos][next_pos] += 1

    for current_pos, next_pos_counts in transition_counts.items():
        total_count = sum(next_pos_counts.values())
        transition_probabilities[current_pos] = {}

        for next_pos, count in next_pos_counts.items():
            if smoothing == 0.0:
                transition_probabilities[current_pos][next_pos] = count / total_count
            else:
                transition_probabilities[current_pos][next_pos] = (count + smoothing) / (total_count + smoothing * len(transition_counts))
                # Number of unique tags instead of transition_counts. 
    return transition_probabilities


# def compute_emission_probabilities(sentences: List[Tuple[str, str]], smoothing: float) -> dict:
#     """
#     Computes emission probabilities for the Hidden Markov Model (HMM) based on the given sentences.

#     Args:
#     sentences (List[Tuple[str, str]]): List of sentences where each sentence is a tuple of (token, POS).
#     smoothing (float): Smoothing parameter for Laplace smoothing. Default is 0.0 for no smoothing.

#     Returns:
#     dict: Emission probabilities for the HMM.
#     """
#     emission_counts = {}
#     emission_probabilities = {}

#     for sentence in sentences:
#         for token, pos in sentence[1:-1]:  # we exclude <s> and </s> tokens
#             if pos not in emission_counts:
#                 emission_counts[pos] = {}

#             if token not in emission_counts[pos]:
#                 emission_counts[pos][token] = smoothing

#             emission_counts[pos][token] += 1

#     for pos, token_counts in emission_counts.items():
#         total_count = sum(token_counts.values())
#         emission_probabilities[pos] = {}

#         for token, count in token_counts.items():
#             if smoothing == 0.0:
#                 emission_probabilities[pos][token] = count / total_count
#             else:
#                 emission_probabilities[pos][token] = (count + smoothing) / (total_count + smoothing * len(emission_counts))
#                 # Number of words instead of emission_counts.
#     return emission_probabilities


def compute_emission_probabilities(sentences: List[Tuple[str, str]], smoothing: float) -> dict:
    """
    Computes emission probabilities for the Hidden Markov Model (HMM) based on the given sentences.

    Args:
    sentences (List[Tuple[str, str]]): List of sentences where each sentence is a tuple of (token, POS).
    smoothing (float): Smoothing parameter for Laplace smoothing. Default is 0.0 for no smoothing.

    Returns:
    dict: Emission probabilities for the HMM.
    """
    emission_counts = {}
    emission_probabilities = {}

    for sentence in sentences:
        for token, pos in sentence[1:-1]:  # we exclude <s> and </s> tokens
            if pos not in emission_counts:
                emission_counts[pos] = {}

            if token not in emission_counts[pos]:
                emission_counts[pos][token] = smoothing

            emission_counts[pos][token] += 1

    # Count total number of words in vocabulary (including unknown word)
    vocab_size = sum(len(token_counts) for token_counts in emission_counts.values()) + 1

    for pos, token_counts in emission_counts.items():
        total_count = sum(token_counts.values())
        emission_probabilities[pos] = {}

        for token, count in token_counts.items():
            if smoothing == 0.0:
                emission_probabilities[pos][token] = count / total_count
            else:
                emission_probabilities[pos][token] = (count + smoothing) / (total_count + smoothing * vocab_size)

        # Add probability for unknown word
        emission_probabilities[pos]['<UNK>'] = smoothing / (total_count + smoothing * vocab_size)

    return emission_probabilities


def train(sentences_train, smoothing):
    transitions = compute_transition_probabilities(sentences_train, smoothing) # Compute transition probabilities
    emissions = compute_emission_probabilities(sentences_train, smoothing) # Compute emission probabilities
    return {'transitions': transitions, 'emissions': emissions} # Create and return the trained HMM model


def evaluate_hmm_model(test_sentences: List[Tuple[str, str]], transition_probabilities: dict, emission_probabilities: dict) -> float:
    """
    Evaluates the Hidden Markov Model (HMM) on the test sentences and returns the accuracy.

    Args:
    test_sentences (List[Tuple[str, str]]): List of test sentences where each sentence is a tuple of (token, POS).
    transition_probabilities (dict): Transition probabilities for the HMM.
    emission_probabilities (dict): Emission probabilities for the HMM.

    Returns:
    float: Accuracy of the HMM on the test sentences.
    """
    total_tokens = 0
    correct_predictions = 0

    for sentence in test_sentences:
        tokens = [token for token, _ in sentence]
        true_pos_tags = [pos for _, pos in sentence]
        predicted_pos_tags = tag_sentence(tokens, transition_probabilities, emission_probabilities)

        total_tokens += len(tokens)
        correct_predictions += sum(1 for true_pos, predicted_pos in zip(true_pos_tags, predicted_pos_tags) if true_pos == predicted_pos)

    accuracy = correct_predictions / total_tokens
    return round(accuracy * 100, 2)


# def tag_sentence(tokens: List[str], transition_probabilities: dict, emission_probabilities: dict) -> List[str]:
#     """
#     Tags a sentence with part-of-speech (POS) tags using the Hidden Markov Model (HMM).

#     Args:
#     tokens (List[str]): List of tokens in the sentence.
#     transition_probabilities (dict): Transition probabilities for the HMM.
#     emission_probabilities (dict): Emission probabilities for the HMM.

#     Returns:
#     List[str]: List of predicted POS tags for the sentence.
#     """
#     n = len(tokens)
#     viterbi = []

#     # Initialization
#     viterbi.append({})
#     for pos in transition_probabilities.keys():
#         if pos == 'START':
#             viterbi[0][pos] = 1
#         else:
#             viterbi[0][pos] = 0

#     # Recursion
#     for t in range(1, n):
#         viterbi.append({})
#         for pos in transition_probabilities.keys():
#             max_prob = max(
#                 viterbi[t - 1][prev_pos] * transition_probabilities[prev_pos].get(pos, 0) * emission_probabilities[pos].get(tokens[t], 0)
#                 for prev_pos in transition_probabilities)
#             viterbi[t][pos] = max_prob

#     # Termination
#     for pos in transition_probabilities.keys():
#         transition_prob = transition_probabilities[pos].get('STOP', 0)
#         viterbi[-1][pos] *= transition_prob

#     # Backtracking
#     optimal_path = []
#     max_prob_pos = max(viterbi[-1], key=viterbi[-1].get)
#     optimal_path.append(max_prob_pos)
#     for t in range(n - 2, -1, -1):
#         max_prob_pos = max(viterbi[t], key=viterbi[t].get)
#         optimal_path.insert(0, max_prob_pos)

#     return optimal_path

def tag_sentence(tokens: List[str], transition_probabilities: dict, emission_probabilities: dict) -> List[str]:
    """
    Tags a sentence with part-of-speech (POS) tags using the Hidden Markov Model (HMM).

    Args:
    tokens (List[str]): List of tokens in the sentence.
    transition_probabilities (dict): Transition probabilities for the HMM.
    emission_probabilities (dict): Emission probabilities for the HMM.

    Returns:
    List[str]: List of predicted POS tags for the sentence.
    """
    n = len(tokens)
    viterbi = []

    # Replace unknown words with '<UNK>'
    tokens = [token if token in emission_probabilities else '<UNK>' for token in tokens]

    # Initialization
    viterbi.append({})
    for pos in transition_probabilities.keys():
        if pos == 'START':
            viterbi[0][pos] = 1
        else:
            viterbi[0][pos] = 0

    # Recursion
    for t in range(1, n):
        viterbi.append({})
        for pos in transition_probabilities.keys():
            max_prob = max(
                viterbi[t - 1][prev_pos] * transition_probabilities[prev_pos].get(pos, 0) * emission_probabilities[pos].get(tokens[t], 0)
                for prev_pos in transition_probabilities)
            viterbi[t][pos] = max_prob

    # Termination
    for pos in transition_probabilities.keys():
        transition_prob = transition_probabilities[pos].get('STOP', 0)
        viterbi[-1][pos] *= transition_prob

    # Backtracking
    optimal_path = []
    max_prob_pos = max(viterbi[-1], key=viterbi[-1].get)
    optimal_path.append(max_prob_pos)
    for t in range(n - 2, -1, -1):
        max_prob_pos = max(viterbi[t], key=viterbi[t].get)
        optimal_path.insert(0, max_prob_pos)

    return optimal_path

In [4]:
conf = nu.load_config("a2")
df_train, df_test = load_data(conf.paths.raw_txt_train, conf.paths.raw_txt_test)
df_train = remove_punctuation(df_train)
df_test = remove_punctuation(df_test)
# Split sentences for training and test sets
sentences_train, sentences_valid = train_test_split(split_sentences(df_train), test_size=0.15, random_state=42)
sentences_test = split_sentences(df_test)
logger.info(f"Number of training sentences: {len(sentences_train)}")
logger.info(f"Number of validation sentences: {len(sentences_valid)}")
logger.info(f"Number of test sentences: {len(sentences_test)}")
model = train(sentences_train, conf.model.smoothing)
accuracy = evaluate_hmm_model(sentences_valid, model['transitions'], model['emissions'])
accuracy # = round(accuracy * 100, 2)

15-Jul-23 23:26:15 - INFO - Starting 'load_config'.
15-Jul-23 23:26:15 - INFO - Finished 'load_config' in 0.0081 secs.
15-Jul-23 23:26:15 - INFO - df_train.shape: (44324, 2)
15-Jul-23 23:26:15 - INFO - df_train unique tokens: 14125
15-Jul-23 23:26:15 - INFO - df_train unique POS: 99
15-Jul-23 23:26:15 - INFO - df_test.shape: (4676, 2)
15-Jul-23 23:26:15 - INFO - df_test unique tokens: 2415
15-Jul-23 23:26:15 - INFO - df_test unique POS: 77
15-Jul-23 23:26:16 - INFO - Number of training sentences: 2216
15-Jul-23 23:26:16 - INFO - Number of validation sentences: 392
15-Jul-23 23:26:16 - INFO - Number of test sentences: 333


28.12