In [1]:
import re
import json
from collections import defaultdict
import numpy as np
import pandas as pd 
from tqdm import tqdm 

from __future__ import annotations
from collections import Counter
from dataclasses import dataclass
from itertools import chain
from typing import Any, List
import itertools


### Task 1: Vocabulary Creation

What is the selected threshold for unknown words replacement? What is the total size of your
vocabulary and what is the total occurrences of the special token ‘< unk >’
after replacement?

In [2]:
n_threshold = 3
train_vocab = defaultdict(int)

vocab_df  = pd.read_csv('../../data/vocab-data/train', sep='\t', skip_blank_lines = False, header = None)
vocab_df.columns = ['Index', 'Word', 'POS']

print(vocab_df.columns)

# File importing
with open('../../data/vocab-data/train', 'r') as tr_file:
    Lines = tr_file.readlines()

    # Create vocab
    for line in Lines:
        if line.strip():
            word = re.split(r'\t', line)[1]
            cleaned_word = re.sub(r'\W+', '', word)     

        if word not in train_vocab:
            train_vocab[cleaned_word] = 0
        train_vocab[cleaned_word] += 1


Index(['Index', 'Word', 'POS'], dtype='object')


If some word 'xyz' has frequency 3 and my threshold for categorizing as '<unk>' is 4. Then  we should add 3 to the frequency occurrence count of '<unk>'

In [3]:
# Handle <unk> tokens  
unk_count = sum(v for k, v in train_vocab.items() if v <= n_threshold)
new_vocab = {k: v for k, v in train_vocab.items() if v > n_threshold}
new_vocab['<unk>'] = unk_count
indexed_vocab = {word: (index, count) for index, (word, count) in enumerate(sorted(new_vocab.items(), key = lambda item: item[1], reverse=True), start = 1)}

In [4]:
# File Writing
with open("../../data/outputs/train_vocab.txt", "w") as f:
    for k,v in indexed_vocab.items():
        # word index count
        new_line = f"{k}\t{v[0]}\t{v[1]}\n"
        f.write(new_line)


In [19]:
vocab_size = 0
total_occurrences = 0

with open('../../data/outputs/train_vocab.txt', 'r') as file:
    for line in file:
        parts = line.strip().split('\t')  # Or split by space if that's the delimiter
        if len(parts) >= 2:
            vocab_size += 1
            total_occurrences += int(parts[1])

print(f"Total vocabulary size: {vocab_size}")
print(f"Total occurrences of words: {total_occurrences}")

Total vocabulary size: 12405
Total occurrences of words: 76948215


## HMM Model with Emission & Transition Probabilities

In [5]:
def load_word_frequencies(vocab_counts_file):
    word_frequencies = {}
    with open(vocab_counts_file, 'r') as file:
        for line in file:
            word, _, count = line.strip().split('\t')
            word_frequencies[word] = int(count)
    return word_frequencies

In [6]:
def extract_sequences_from_file(file_path: str):

    """
    Extracts word and tag sequences from a file.

    This function reads a file where each line represents a word followed by its tag,
    separated by tabs, and sentences are separated by double newlines. It processes
    the file to extract sequences of words and their corresponding tags, handling
    and reporting lines that do not conform to the expected format.

    Parameters:
    - file_path: str - The path to the file from which word and tag sequences will be extracted.

    Returns:
    - tuple(list[list[str]], list[list[str]]): A tuple containing two lists:
        - The first list contains sequences of words.
        - The second list contains sequences of corresponding tags for those words.

    Note:
    - The function prints an error message for each line that does not contain exactly three parts
      separated by tabs, but it continues processing the rest of the file.
    """

    with open(file_path, 'r') as file:
        content = file.read().rstrip('\n')  # Ensure no trailing newline

    sentences = content.split('\n\n')

    word_sequences, tag_sequences = [], []

    for sentence in sentences:
        words, tags = [], []
        lines = sentence.split('\n')
        for line in lines:
            try:
                _, word, tag = line.split('\t')
                words.append(word)
                tags.append(tag)
            except ValueError:
                print(f"Error processing line: {line}")
                continue 

        word_sequences.append(words)
        tag_sequences.append(tags)

    return word_sequences, tag_sequences


class TransformData:
    """
    A base class for data transformation operations.

    This class serves as a foundation for transformations that require a fitting step before
    applying a transformation. It defines a generic `fit_transform` method that chains the
    `fit` and `transform` methods, allowing subclasses to implement specific logic for these
    operations.

    Methods:
    - fit_transform: Applies both the fitting and transforming operations sequentially.
    """
    def fit_transform(self, *args, **kwargs):
        self.fit(*args, **kwargs)
        return self.transform(*args, **kwargs)

@dataclass
class UnkTransform(TransformData):

    """
    A transformation that replaces infrequent words with a special token.

    This class implements a transformation for sequences of words where words that occur
    less frequently than a specified threshold are replaced with a predefined special token.
    It first fits to the data to find the frequency of each word and then applies the transformation
    based on the threshold.

    Attributes:
    - threshold (int): The minimum frequency a word must have to not be replaced.
    - special_token (str): The token used to replace infrequent words.

    Methods:
    - fit: Calculates the frequency of each word in the provided sequences.
    - transform: Replaces infrequent words in the sequences with the special token.
    """

    def __init__(self, threshold: int, special_token: str):
        self.threshold = threshold
        self.special_token = special_token
        self.words = None

    def fit(self, sequences: list[list[str]], *_):
        frequency = Counter(chain(*sequences)) 
        self.words = {word: count for word, count in frequency.items() if count >= self.threshold}
        return self

    def transform(self, sequences: list[list[str]], *_):
        return [
            [
                (word if word in self.words else self.special_token)
                for word in sequence
            ]
            for sequence in sequences
        ]


In [7]:
# Utilization
vocab_counts_file = '../../data/outputs/train_vocab.txt'  
word_frequencies = load_word_frequencies(vocab_counts_file)


word_sequences, tag_sequences = extract_sequences_from_file('../../data/vocab-data/train')
transformer = UnkTransform(3, '<unk>')
word_sequences_t = transformer.fit_transform(word_sequences)
counts = Counter(itertools.chain(*tag_sequences))
emission_temp = [list(zip(y, x)) for (y, x) in zip(tag_sequences, word_sequences_t)]
emission_counts = Counter(itertools.chain(*emission_temp))
prior_counts = Counter([sequence[0] for sequence in tag_sequences])
bigrams = [list(zip(y, y[1:])) for y in tag_sequences]
transition_counts = Counter(chain(*bigrams))

states = set(counts.keys())
observations = set(Counter(itertools.chain(*word_sequences_t)).keys())
N = len(tag_sequences)


transition_probs = defaultdict(int)
emission_probs = defaultdict(int)
state_probs = defaultdict(int)

# Prior Probabilities
state_probs = {s: prior_counts[s] / N for s in states}

# Transition probabilities
for s in states:
    for s_ in states:
        transition_probs[s, s_] = transition_counts[s, s_] / counts[s]

# Emission probabilities
for o in observations:
    for s in states:
        emission_probs[s, o] = emission_counts[s, o] / counts[s]


In [8]:
# HMM Model for JSON
hmm_model = {
    "transition": {f"({k[0]},{k[1]})": v for k, v in transition_probs.items()},
    "emission": {f"({k[0]},{k[1]})": v for k, v in emission_probs.items()}
}

with open("../../data/outputs/hmm.json", "w") as f:
    json.dump(hmm_model, f, indent = 4)

In [18]:
print(len(hmm_model['transition']))
print(len(hmm_model['emission']))

2025
761400


## Greedy HMM Decoding

In [10]:
def decode_greedy(sentence: List[str], state_probs: defaultdict, transition_probs: defaultdict, emission_probs: defaultdict ):
    
    """
    Performs greedy decoding on a given sentence to find the most probable state path.

    This function applies a greedy decoding algorithm over a sequence of words (sentence) using
    given Hidden Markov Model (HMM) parameters: initial state probabilities, transition probabilities,
    and emission probabilities. It starts by selecting the state with the highest probability for the
    first word based on state and emission probabilities. For subsequent words, it selects the state
    that has the highest transition probability from the previous state, multiplied by the emission
    probability of the word for that state.

    Parameters:
    - sentence: List[str] - A list of words representing the sentence to decode.
    - state_probs: defaultdict - A dictionary containing the initial state probabilities.
    - transition_probs: defaultdict - A dictionary containing state-to-state transition probabilities.
    - emission_probs: defaultdict - A dictionary containing state-to-word emission probabilities.

    Returns:
    - List[str]: A list of states representing the most probable path through the states for the given sentence.

    Note: This function assumes that 'states' is a predefined list of all possible states in the HMM.
          It performs decoding based on a greedy strategy, potentially not always resulting in the globally optimal state sequence.
    """

    path = []
    word = sentence[0]
    max_state = None
    max_prob = 0

    for s in states:
        prob = state_probs[s] * emission_probs[s, word]
        # print(prob)
        if prob > max_prob:
            max_prob = prob
            max_state = s
    
    path.append(max_state)
    prev_state = max_state
        

    for i, word in enumerate(sentence[1:]):
        max_state = None
        max_prob = 0
        for s in states:
            prob = transition_probs[prev_state, s] * emission_probs[s, word]
            if prob > max_prob:
                max_prob = prob
                max_state = s
        path.append(max_state)
        prev_state = max_state
    
    return path

In [11]:
def prediction_to_file(filepath: str, word_sequences: list[list[str]], tag_sequences: list[list[str]]):
    
    """
    Writes word sequences and their corresponding tag sequences to a file.

    This function takes a filepath and two lists of lists: word_sequences and tag_sequences. 
    Each inner list in word_sequences corresponds to a sequence of words, and each inner list in 
    tag_sequences contains the tags associated with the words in the corresponding word sequence. 
    The function writes these sequences to the specified file, with each word/tag pair separated by a tab 
    and each sequence separated by a newline. The function ensures that each word sequence has a 
    corresponding tag sequence of the same length and raises a ValueError if this is not the case.

    Parameters:
    - filepath: str - The path to the file where the word and tag sequences will be written.
    - word_sequences: list[list[str]] - A list of lists, where each inner list contains a sequence of words.
    - tag_sequences: list[list[str]] - A list of lists, where each inner list contains the tags corresponding to the words in the same position of word_sequences.

    Raises:
    - ValueError: If any word sequence and its corresponding tag sequence do not have the same length.

    The output file will have the following format:
    1   word1   tag1
    2   word2   tag2
    ...


    Note: The function does not return any value.
    """

    result_str = ""
    
    for seq_index in range(len(word_sequences)):
        word_seq = word_sequences[seq_index]
        tag_seq = tag_sequences[seq_index]
        
        if len(word_seq) != len(tag_seq):
            raise ValueError("Word sequence and tag sequence lengths do not match.")
        
        for pair_index in range(len(word_seq)):
            word = word_seq[pair_index]
            tag = tag_seq[pair_index]
            result_str += f"{pair_index + 1}\t{word}\t{tag}\n"
        
        if seq_index < len(word_sequences) - 1:
            result_str += "\n"
    
    # Write the result string to the specified file
    with open(filepath, 'w') as file:
        file.write(result_str)


In [12]:
# dev predictions writing and greedy call
word_sequences_dev, tag_sequences_dev = extract_sequences_from_file('../../data/vocab-data/dev')
word_sequences_dev_t = transformer.transform(word_sequences_dev)
pred_tagsequence = [
    decode_greedy(word_sequences_dev_t[i], state_probs, transition_probs, emission_probs)
    for i in range(len(word_sequences_dev_t))
]
prediction_to_file('../../data/outputs/greedy_dev.out', word_sequences_dev_t, pred_tagsequence)


# test predictions writing and greedy call 
word_sequences_test, tag_sequences_test = extract_sequences_from_file('../../data/vocab-data/test')
word_sequences_test_t = transformer.transform(word_sequences_test)
pred_tagsequence = [
    decode_greedy(word_sequences_test_t[i], state_probs, transition_probs, emission_probs)
    for i in range(len(word_sequences_test_t))
]
prediction_to_file('../../data/outputs/greedy_test.out', word_sequences_test_t, pred_tagsequence)

In [13]:
!python ../eval.py -p ../../data/outputs/greedy_dev.out -g ../../data/vocab-data/dev

total: 131768, correct: 122119, accuracy: 92.68%


## VITERBI HMM DECODING

In [14]:
# Citation: https://en.wikipedia.org/wiki/Viterbi_algorithm
def viterbi(obs, states, start_p, trans_p, emit_p):
    # Initialize the dynamic programming table
    V = [{}]
    path = {}
    
    # Initialize base cases (t == 0)
    for st in states:
        V[0][st] = start_p[st] * emit_p[st, obs[0]]
        path[st] = [st]
    
    # Run Viterbi for t > 0
    for t in range(1, len(obs)):
        V.append({})
        newpath = {}
        
        for st in states:
            (max_prob, state) = max(
                (V[t-1][prev_st] * trans_p[prev_st, st] * emit_p[st, obs[t]], prev_st) 
                for prev_st in states)
            V[t][st] = max_prob
            newpath[st] = path[state] + [st]
        
        path = newpath
    
    # Build the output
    (max_prob, state) = max((V[len(obs) - 1][st], st) for st in states)
    return path[state], max_prob

def dptable(V):
    # Print a table of steps from dictionary
    yield "    " + " ".join(f"{i:5d}" for i in range(len(V)))
    for state in V[0]:
        yield f"{state}: " + " ".join(f"{v[state]:.5f}" for v in V)



word_sequences_dev, tag_sequences_dev = extract_sequences_from_file('../../data/vocab-data/dev')
word_sequences_dev_t = transformer.transform(word_sequences_dev)
pred_tagsequence = [
    viterbi(word_sequences_dev_t[i], states, state_probs, transition_probs, emission_probs)[0]
    for i in tqdm(range(len(word_sequences_dev_t)))
]
prediction_to_file('../../data/outputs/viterbi_dev.out', word_sequences_dev_t, pred_tagsequence)

word_sequences_test, tag_sequences_test = extract_sequences_from_file('../../data/vocab-data/test')
word_sequences_test_t = transformer.transform(word_sequences_test)
pred_tagsequence = [
    viterbi(word_sequences_test_t[i], S, state_probs, transition_probs, emission_probs)[0]
    for i in tqdm(range(len(word_sequences_test_t)))
]
prediction_to_file('../../data/outputs/viterbi_test.out', word_sequences_test_t, pred_tagsequence)

## ACC OF DEV FILE NOT TEST
!python ../eval.py -p ../../data/outputs/viterbi_dev.out -g ../../data/vocab-data/dev 

100%|██████████| 5527/5527 [01:30<00:00, 60.99it/s]


total: 131768, correct: 124333, accuracy: 94.36%
