# Dependencies


## Install


In [8]:
%pip install ipython-autotime
%pip install contractions


Note: you may need to restart the kernel to use updated packages.
time: 3.38 s (started: 2023-09-24 17:59:45 -07:00)


## Imports


In [309]:
import itertools
import json
import os

import warnings
warnings.filterwarnings("ignore")

import numpy as np
import pandas as pd

from collections import Counter
from typing import List

%load_ext autotime

The autotime extension is already loaded. To reload it, use:
  %reload_ext autotime
time: 0 ns (started: 2023-09-26 05:12:58 -07:00)


# Config


In [310]:
class PathConfig:
    HW2_DIR = os.path.dirname(os.getcwd())
    OUTPUT_DIR = os.path.join(HW2_DIR, "solution", "output")

    DATA_PATH = os.path.join(HW2_DIR, "CSCI544_HW2", "data")
    VERIFICATION_DATA_PATH = os.path.join(HW2_DIR, "CSCI544_HW2", "verification")

    VOCAB_FILE_PATH = os.path.join(OUTPUT_DIR, "vocab.txt")
    HMM_MODEL_SAVE_PATH = os.path.join(OUTPUT_DIR, "hmm.json")


class WSJDatasetConfig:
    cols = ["index", "sentences", "labels"]
    
    train_file_path = os.path.join(PathConfig.DATA_PATH, "train.json")
    dev_file_path = os.path.join(PathConfig.DATA_PATH, "dev.json")
    test_file_path = os.path.join(PathConfig.DATA_PATH, "test.json")


class VocabConfig:
    UNKNOWN_TOKEN = "<unk>"
    THRESHOLD = 3
    FILE_HEADER = ["word", "index", "frequency"]

    VOCAB_FILE = PathConfig.VOCAB_FILE_PATH


class HMMConfig:
    HMM_MODEL_SAVED = PathConfig.HMM_MODEL_SAVE_PATH

time: 0 ns (started: 2023-09-26 05:13:00 -07:00)


# Dataset Preparation

In [311]:
class WSJDataset:
    def __init__(self, path):
        self.path = path

        self.data: pd.DataFrame = None
        self.cols = WSJDatasetConfig.cols

    def read_data(self):
        self.data = pd.read_json(self.path)

    def process_sentences(self):
        self.data["sentence"] = self.data["sentence"].apply(
            lambda sentence: ' '.join(sentence).lower().split()
        )

    def prepare_dataset(self):
        self.read_data()
        self.process_sentences()
        return self.data
    
    def get_sentences_with_pos_tags(self):
        sentences_with_pos_tags = self.data.loc[:, ["sentence", "labels"]].apply(
            lambda row: list(zip(row["sentence"], row["labels"])), axis=1
        )
        sentences_with_pos_tags = sentences_with_pos_tags.tolist()
        return sentences_with_pos_tags

time: 15 ms (started: 2023-09-26 05:13:01 -07:00)


In [312]:
train_dataset = WSJDataset(path=WSJDatasetConfig.train_file_path)
df_train = train_dataset.prepare_dataset()
df_train.head()

Unnamed: 0,index,sentence,labels
0,0,"[pierre, vinken, ,, 61, years, old, ,, will, j...","[NNP, NNP, ,, CD, NNS, JJ, ,, MD, VB, DT, NN, ..."
1,1,"[mr., vinken, is, chairman, of, elsevier, n.v....","[NNP, NNP, VBZ, NN, IN, NNP, NNP, ,, DT, NNP, ..."
2,2,"[rudolph, agnew, ,, 55, years, old, and, forme...","[NNP, NNP, ,, CD, NNS, JJ, CC, JJ, NN, IN, NNP..."
3,3,"[a, form, of, asbestos, once, used, to, make, ...","[DT, NN, IN, NN, RB, VBN, TO, VB, NNP, NN, NNS..."
4,4,"[the, asbestos, fiber, ,, crocidolite, ,, is, ...","[DT, NN, NN, ,, NN, ,, VBZ, RB, JJ, IN, PRP, V..."


time: 1.28 s (started: 2023-09-26 05:13:02 -07:00)


In [313]:
valid_dataset = WSJDataset(path=WSJDatasetConfig.dev_file_path)
df_valid = valid_dataset.prepare_dataset()
df_valid.head()

Unnamed: 0,index,sentence,labels
0,0,"[the, arizona, corporations, commission, autho...","[DT, NNP, NNP, NNP, VBD, DT, CD, NN, NN, NN, I..."
1,1,"[the, ruling, follows, a, host, of, problems, ...","[DT, NN, VBZ, DT, NN, IN, NNS, IN, NNP, NNP, ,..."
2,2,"[the, arizona, regulatory, ruling, calls, for,...","[DT, NNP, JJ, NN, VBZ, IN, $, CD, CD, IN, JJ, ..."
3,3,"[the, company, had, sought, increases, totalin...","[DT, NN, VBD, VBN, NNS, VBG, $, CD, CD, ,, CC,..."
4,4,"[the, decision, was, announced, after, trading...","[DT, NN, VBD, VBN, IN, NN, VBD, .]"


time: 234 ms (started: 2023-09-26 05:13:03 -07:00)


In [314]:
test_dataset = WSJDataset(path=WSJDatasetConfig.test_file_path)
df_test = test_dataset.prepare_dataset()
df_test.head()

Unnamed: 0,index,sentence
0,0,"[influential, members, of, the, house, ways, a..."
1,1,"[the, bill, ,, whose, backers, include, chairm..."
2,2,"[the, bill, intends, to, restrict, the, rtc, t..."
3,3,"[``, such, agency, `, self-help, ', borrowing,..."
4,4,"[the, complex, financing, plan, in, the, s&l, ..."


time: 188 ms (started: 2023-09-26 05:13:04 -07:00)


# Task 1: Vocabulary Creation


In [315]:
class VocabularyGenerator:
    def __init__(
        self, threshold: int, unknown_token: str = None, save: bool = False, path: str = None
    ):
        """Initialize a VocabularyGenerator

        Args:
            threshold (int): Frequency threshold for rare words.
            unknown_token (str, optional): Token to replace rare words. Defaults to None.
            save (bool, optional): Flag to save the vocabulary. Default is True.
            path (str, optional): Path to save the vocabulary. Defaults to None.

        Usage:
            vocab_generator = VocabularyGenerator(threshold=3, unknown_token="<unk>")
            vocab_df = vocab_generator.generate_vocabulary(data, "sentence")
        """
        self.threshold = threshold
        self.unknown_token = (
            unknown_token if unknown_token is not None else VocabConfig.UNKNOWN_TOKEN
        )
        self._save = save

        if self._save and path is None:
            self.path = VocabConfig.VOCAB_FILE
        else:
            self.path = path

    def _count_word_frequency(self, data, sentence_col_name):
        word_freq = (
            data[sentence_col_name]
            .explode()
            .value_counts()
            .rename_axis("word")
            .reset_index(name="frequency")
        )
        return word_freq

    def generate_vocabulary(self, data: pd.DataFrame, sentence_col_name: str):
        """Generate a vocabulary from the provided dataset.

        Args:
            data (pd.DataFrame): The DataFrame containing the dataset.
            sentence_col_name (str): The name of the column containing sentences.

        Returns:
            pd.DataFrame: A DataFrame with the generated vocabulary.

        This method takes a DataFrame with sentences and generates a vocabulary based on word frequencies.
        It replaces words with frequencies less than the specified threshold with the unknown token ("<unk>").
        The resulting DataFrame is sorted by frequency and indexed.

        If the 'save' flag is set, the vocabulary will be saved to the specified path.

        Usage:
            ```py
            vocab_generator = VocabularyGenerator(threshold=3, unknown_token="<unk>")
            vocab_df = vocab_generator.generate_vocabulary(data, sentence_col_name)
            ```
        """
        word_freq_df = self._count_word_frequency(data, sentence_col_name)

        # Replace words with frequency less than threshold with '<unk>'
        word_freq_df["word"] = word_freq_df.apply(
            lambda row: self.unknown_token if row["frequency"] <= self.threshold else row["word"],
            axis=1,
        )

        # # Group by 'Word' and aggregate by sum
        word_freq_df = word_freq_df.groupby("word", as_index=False)["frequency"].agg("sum")

        # Sort the DataFrame by frequency
        word_freq_df = word_freq_df.sort_values(by="frequency", ascending=False, ignore_index=True)

        # Add an index column
        word_freq_df["index"] = range(1, len(word_freq_df) + 1)

        if self._save:
            self.save_vocab(word_freq_df, self.path)

        return word_freq_df

    def save_vocab(self, word_freq_df, path):
        """Write your vocabulary to the file"""
        if not os.path.exists(os.path.dirname(path)):
            os.makedirs(os.path.dirname(path))

        with open(path, "w") as file:
            vocabulary = word_freq_df.to_records(index=False)
            for word, frequency, index in vocabulary:
                file.write(f"{word}\t{index}\t{frequency}\n")

time: 47 ms (started: 2023-09-26 05:13:06 -07:00)


In [316]:
vocab_generator = VocabularyGenerator(
    threshold=3, unknown_token=VocabConfig.UNKNOWN_TOKEN, save=True
)
vocab_df = vocab_generator.generate_vocabulary(df_train, "sentence")
vocab_df.head(10)

Unnamed: 0,word,frequency,index
0,",",46476,1
1,the,46144,2
2,.,37452,3
3,<unk>,37245,4
4,of,22176,5
5,to,21459,6
6,a,19338,7
7,in,16320,8
8,and,15875,9
9,'s,8886,10


time: 984 ms (started: 2023-09-26 05:13:09 -07:00)


In [317]:
print("Selected threshold for unknown words: ", VocabConfig.THRESHOLD)
print("Total size of the vocabulary: ", vocab_df.shape[0])
print(
    "Total occurrences of the special token <unk>: ",
    int(vocab_df[vocab_df["word"] == "<unk>"].frequency),
)

Selected threshold for unknown words:  3
Total size of the vocabulary:  12680
Total occurrences of the special token <unk>:  37245
time: 16 ms (started: 2023-09-26 05:13:12 -07:00)


## Preparing Dataset


In [318]:
df_pos = df_train.labels.explode().value_counts().reset_index(name="count")
print("Number of unique POS tags =", df_pos.shape[0])
df_pos = df_pos.labels.to_list()

Number of unique POS tags = 45
time: 250 ms (started: 2023-09-26 05:13:16 -07:00)


In [319]:
train_sentences_with_pos_tags = train_dataset.get_sentences_with_pos_tags()
valid_sentences_with_pos_tags = valid_dataset.get_sentences_with_pos_tags()

time: 1.38 s (started: 2023-09-26 05:13:17 -07:00)


In [320]:
train_sentences_with_pos_tags[0]

[('pierre', 'NNP'),
 ('vinken', 'NNP'),
 (',', ','),
 ('61', 'CD'),
 ('years', 'NNS'),
 ('old', 'JJ'),
 (',', ','),
 ('will', 'MD'),
 ('join', 'VB'),
 ('the', 'DT'),
 ('board', 'NN'),
 ('as', 'IN'),
 ('a', 'DT'),
 ('nonexecutive', 'JJ'),
 ('director', 'NN'),
 ('nov.', 'NNP'),
 ('29', 'CD'),
 ('.', '.')]

time: 0 ns (started: 2023-09-26 05:13:19 -07:00)


# Task 2: Model Learning


In [321]:
class HMM:
    def __init__(self, vocab_file: str, labels: List[str]):
        """_summary_

        Args:
            train_data (pd.DataFrame): _description_
            vocab_file (str): _description_
        """
        self.vocab = self._read_vocab(vocab_file)
        self.labels = labels

        # Hidden Markov Model Parameters
        self.states = list()
        self.priors = None
        self.transitions = None
        self.emissions = None

    def _read_vocab(self, vocab_file: str):
        return pd.read_csv(vocab_file, sep="\t", names=VocabConfig.FILE_HEADER)

    def _initialize_params(self):
        self.states = list(self.labels)

        # N = Number of states i.e. number of distinct tags
        num_states = len(self.labels)
        # M = Number of observable symbols i.e. number of distinct words
        num_observations = len(self.vocab)

        # State transition probability matrix of size N * N
        self.transitions = np.zeros((num_states, num_states))

        # Obseravtion Emission probability matrix of size N * M
        self.emissions = np.zeros((num_states, num_observations))

        # Prior probability matrix of size N * 1
        self.prior = np.ones(num_states)

    def _compute_prior_params(self, train_data):
        num_sentences = len(train_data)

        state_occurrence = Counter()

        for sentence in train_data:
            # Ensure the sentence is not empty
            if sentence:
                # Get the label of the first word in the sentence
                label = sentence[0][1]
                state_occurrence[label] += 1

        self.priors = np.array([state_occurrence[state] / num_sentences for state in self.labels])

    def _compute_transition_params(self, train_data):
        labels_list = [label for sentence in train_data for _, label in sentence]
        label_indices = [self.states.index(label) for label in labels_list]

        for i in range(len(label_indices) - 1):
            curr_state = label_indices[i]
            next_state = label_indices[i + 1]
            self.transitions[curr_state, next_state] += 1

        # Handle cases where the probabilities is 0
        self.transitions = np.where(self.transitions == 0, 1e-10, self.transitions)

        row_agg = self.transitions.sum(axis=1)
        self.transitions = self.transitions / row_agg[:, np.newaxis]

    def _compute_emission_params(self, train_data):
        word_to_index = dict(zip(self.vocab["word"], self.vocab["index"]))

        for sentence in train_data:
            for word, label in sentence:
                state_idx = self.states.index(label)
                word_idx = word_to_index.get(word, word_to_index[VocabConfig.UNKNOWN_TOKEN]) - 1
                self.emissions[state_idx, word_idx] += 1

        # Handle cases where the probabilities is 0
        self.emissions = np.where(self.emissions == 0, 1e-10, self.emissions)

        row_agg = self.emissions.sum(axis=1)
        self.emissions = self.emissions / row_agg[:, np.newaxis]

    def fit(self, train_data: pd.DataFrame):
        self._initialize_params()
        self._compute_prior_params(train_data)
        self._compute_transition_params(train_data)
        self._compute_emission_params(train_data)

    @property
    def get_all_probability_matrices(self):
        return self.priors, self.transitions, self.emissions

    def save_model(self, file_path=None):
        if file_path is None:
            file_path = HMMConfig.HMM_MODEL_SAVED

        if not os.path.exists(os.path.dirname(file_path)):
            os.makedirs(os.path.dirname(file_path))

        transition_prob = {
            f"({s1}, {s2})": self.transitions[self.states.index(s1), self.states.index(s2)]
            for s1, s2 in itertools.product(self.states, repeat=2)
        }

        emission_prob = {
            f"({s}, {w})": p
            for s in self.states
            for w, p in zip(self.vocab["word"], self.emissions[self.states.index(s), :])
        }

        model_params = {"transition": transition_prob, "emission": emission_prob}

        with open(file_path, "w") as json_file:
            json.dump(model_params, json_file, indent=4)

time: 47 ms (started: 2023-09-26 05:13:31 -07:00)


In [322]:
model = HMM(vocab_file=VocabConfig.VOCAB_FILE, labels=df_pos)
model.fit(train_sentences_with_pos_tags)

time: 2.5 s (started: 2023-09-26 05:13:36 -07:00)


In [323]:
p, t, e = model.get_all_probability_matrices
print("Number of Transition Parameters =", len(t.flatten()))
print("Number of Emission Parameters =", len(e.flatten()))

Number of Transition Parameters = 2025
Number of Emission Parameters = 570600
time: 15 ms (started: 2023-09-26 05:13:42 -07:00)


In [324]:
model.save_model()

time: 5.83 s (started: 2023-09-26 05:13:45 -07:00)


# Task 3: Greedy Decoding with HMM


In [342]:
class GreedyDecoding:
    def __init__(self, prior_probs, transition_probs, emission_probs, states, vocab):
        self.priors = prior_probs
        self.transitions = transition_probs
        self.emissions = emission_probs
        self.states = states
        self.vocab = vocab

        self.tag_to_idx = {tag: idx for idx, tag in enumerate(states)}
        self.word_to_index = dict(zip(self.vocab["word"], self.vocab["index"]))

        # Precompute scores for each word-tag pair
        num_words = len(vocab)
        num_tags = len(states)
        self.scores = np.zeros((num_words, num_tags))

        for i, word in enumerate(vocab["word"]):
            for j, tag in enumerate(states):
                self.scores[i, j] = prior_probs[j] * emission_probs[j, i]

    def _decode_single_sentence(self, sentence):
        predicted_tags = []

        # Process the first word
        word_idx = (
            self.word_to_index.get(sentence[0], self.word_to_index[VocabConfig.UNKNOWN_TOKEN]) - 1
        )
        predicted_tag_idx = np.argmax(self.scores[word_idx])
        predicted_tags.append(self.states[predicted_tag_idx])

        # Process the rest of the sentence
        for i in range(1, len(sentence)):
            word = sentence[i]
            word_idx = (
                self.word_to_index.get(word, self.word_to_index[VocabConfig.UNKNOWN_TOKEN]) - 1
            )

            # Calculate scores for all tags in one step
            scores = self.transitions[:, predicted_tag_idx] * self.emissions[:, word_idx]

            # Find the tag with the highest score
            predicted_tag_idx = np.argmax(scores)
            predicted_tags.append(self.states[predicted_tag_idx])

        return predicted_tags

    def decode(self, sentences):
        predicted_tags_list = []

        for sentence in sentences:
            predicted_tags = self._decode_single_sentence([word for word, tag in sentence])
            predicted_tags_list.append(predicted_tags)

        return predicted_tags_list

time: 16 ms (started: 2023-09-26 05:53:23 -07:00)


In [345]:
def calculate_accuracy(predicted_labels, true_labels):
    """
    Calculate the accuracy of predicted labels compared to true labels.

    Args:
        predicted_labels (list): List of predicted labels.
        true_labels (list): List of true labels.

    Returns:
        float: Accuracy as a percentage (0.0 to 100.0).
    """
    assert len(predicted_labels) == len(true_labels), "Lists must have the same length."

    correct_predictions = sum(1 for pred, true in zip(predicted_labels, true_labels) if pred == true)
    total_predictions = len(predicted_labels)

    accuracy = (correct_predictions / total_predictions) * 100.0

    return accuracy


time: 0 ns (started: 2023-09-26 05:54:41 -07:00)


In [343]:
# Assuming you have the probability matrices and other data
greedy_decoder = GreedyDecoding(p, t, e, model.states, model.vocab)

# Apply Greedy Decoding on development data
predicted_dev_tags = greedy_decoder.decode(valid_sentences_with_pos_tags)

time: 2.14 s (started: 2023-09-26 05:53:25 -07:00)


In [346]:
calculate_accuracy(predicted_dev_tags, df_valid.labels.tolist())

4.016645558168989

time: 16 ms (started: 2023-09-26 05:55:18 -07:00)


# Task 4: Viterbi Decoding with HMM


# THE END
