## ML Project Part 2

### Write a function that estimates the transition parameters from the training set using MLE

In [31]:
"""
we need 2 things:
every tag pair for consecutive words
every tag
"""

from collections import defaultdict

def estimate_emission_parameters(data):
    count_y = defaultdict(int)
    count_y_to_x = defaultdict(lambda: defaultdict(int))
    for word, tag in data:
        count_y[tag] += 1
        count_y_to_x[tag][word] += 1

    emission_probs = {}
    for tag in count_y_to_x:
        emission_probs[tag] = {}
        for word in count_y_to_x[tag]:
            emission_probs[tag][word] = count_y_to_x[tag][word] / count_y[tag]
    return emission_probs

def estimate_transition_parameters(sentences):
    transition_counts = defaultdict(lambda: defaultdict(int))
    tag_counts = defaultdict(int)
    for sentence in sentences:
        tags = ["START"] + sentence + ["STOP"]
        for i in range(1, len(tags)):
            prev_tag = tags[i - 1]
            curr_tag = tags[i]
            transition_counts[prev_tag][curr_tag] += 1
            tag_counts[prev_tag] += 1

    transition_probs = {}
    for prev_tag in transition_counts:
        transition_probs[prev_tag] = {}
        for curr_tag in transition_counts[prev_tag]:
            transition_probs[prev_tag][curr_tag] = transition_counts[prev_tag][curr_tag] / tag_counts[prev_tag]
    return transition_probs

if __name__ == "__main__":

    sentences = read_sentences("EN/train")
    transition_probs = estimate_transition_parameters(sentences)

    # eg q(B-NP | START)
    print(transition_probs["START"].get("B-NP", 0))

    # eg q(STOP | I-NP)
    print(transition_probs["I-NP"].get("STOP", 0))


0.6480490669450607
0.000787675624187137


### Implementing the Viterbi algorithm

THIS IS WRONG AND INCOMPLETE

In [32]:
"""
given a sequence of words x1,...,xn, find the most likely tag sequence y1*,....,yn* using the Viterbi algo
we need to maintain 2 things:
pi[i][tag]: max log-prob of best path to position i ending in tag
bp[i][tag]: best previous tag to reach tag at position i
"""

import math
from collections import defaultdict, Counter

SMALL_PROB = 1e-10

def safe_log(p):
    return math.log(p) if p > 0 else math.log(SMALL_PROB)

def is_valid_transition(prev_tag, curr_tag):
    if curr_tag == "O" or curr_tag.startswith("B-"):
        return True
    if curr_tag.startswith("I-"):
        if prev_tag.startswith("B-") or prev_tag.startswith("I-"):
            prev_type = prev_tag.split("-")[1]
            curr_type = curr_tag.split("-")[1]
            return prev_type == curr_type
        return False
    return True

def viterbi(sentences, transition_probs, emission_probs, tag_set):
    tagged_sentences = []

    for sentence in sentences:
        n = len(sentence)
        V = [{} for _ in range(n)]
        backpointer = [{} for _ in range(n)]

        # Initialization
        for tag in tag_set:
            if tag in {"START", "STOP"}:
                continue
            emission = emission_probs[tag].get(sentence[0], emission_probs[tag].get("#UNK#", SMALL_PROB))
            trans = transition_probs.get("START", {}).get(tag, SMALL_PROB)
            V[0][tag] = safe_log(trans) + safe_log(emission)
            backpointer[0][tag] = "START"

        # Recursion
        for t in range(1, n):
            for curr_tag in tag_set:
                if curr_tag in {"START", "STOP"}:
                    continue
                emission = emission_probs[curr_tag].get(sentence[t], emission_probs[curr_tag].get("#UNK#", SMALL_PROB))
                best_score = float('-inf')
                best_prev_tag = None
                for prev_tag in V[t - 1]:
                    if not is_valid_transition(prev_tag, curr_tag):
                        continue
                    trans = transition_probs[prev_tag].get(curr_tag, SMALL_PROB)
                    score = V[t - 1][prev_tag] + safe_log(trans) + safe_log(emission)
                    if score > best_score:
                        best_score = score
                        best_prev_tag = prev_tag
                if best_prev_tag is not None:
                    V[t][curr_tag] = best_score
                    backpointer[t][curr_tag] = best_prev_tag

        # Termination
        best_final_score = float('-inf')
        best_last_tag = None
        for tag in V[n - 1]:
            trans = transition_probs[tag].get("STOP", SMALL_PROB)
            score = V[n - 1][tag] + safe_log(trans)
            if score > best_final_score:
                best_final_score = score
                best_last_tag = tag

        if best_last_tag is None:
            best_last_tag = max(V[n - 1], key=V[n - 1].get, default="O")

        tags = [best_last_tag]
        for t in range(n - 1, 0, -1):
            tags.insert(0, backpointer[t].get(tags[0], "O"))

        tagged_sentences.append(list(zip(sentence, tags)))

    return tagged_sentences

In [33]:
## THIS IS THE FUNCTIONS FROM PART 1

def read_training_data(file_path):
    data = []
    with open(file_path, 'r') as f:
        for line in f:
            if line.strip():
                word, tag = line.strip().split()
                data.append((word, tag))
    return data

def count_word_frequencies(data):
    word_counts = defaultdict(int)
    for word, _ in data:
        word_counts[word] += 1
    return word_counts

def replace_rare_words(data, word_counts, k=3):
    new_data = []
    for word, tag in data:
        if word_counts[word] < k:
            new_data.append(('#UNK#', tag))
        else:
            new_data.append((word, tag))
    return new_data

def read_sentences(file_path):
    sentences = []
    sentence = []
    with open(file_path, 'r') as f:
        for line in f:
            line = line.strip()
            if line:
                _, tag = line.split()
                sentence.append(tag)
            else:
                if sentence:
                    sentences.append(sentence)
                    sentence = []
    if sentence:
        sentences.append(sentence)
    return sentences

def read_dev_sentences(dev_path, word_counts, k=3):
    sentences = []
    sentence = []
    with open(dev_path, 'r') as f:
        for line in f:
            word = line.strip()
            if word:
                if word_counts[word] < k:
                    sentence.append("#UNK#")
                else:
                    sentence.append(word)
            else:
                if sentence:
                    sentences.append(sentence)
                    sentence = []
    if sentence:
        sentences.append(sentence)
    return sentences

In [34]:
if __name__ == "__main__":
    # Load and prepare training data
    train_data = read_training_data("EN/train")
    word_counts = count_word_frequencies(train_data)
    train_data_unk = replace_rare_words(train_data, word_counts, k=3)
    emission_probs = estimate_emission_parameters(train_data_unk)

    # Estimate transition parameters
    tag_only_sentences = read_sentences("EN/train")
    transition_probs = estimate_transition_parameters(tag_only_sentences)

    # Build filtered tag set
    tag_freqs = Counter(tag for _, tag in train_data)
    tag_set = {tag for tag, count in tag_freqs.items() if count >= 20} | {"START", "STOP"}

    # Ensure all tags have #UNK# entries
    for tag in tag_set:
        if tag not in emission_probs:
            emission_probs[tag] = {}
        if "#UNK#" not in emission_probs[tag]:
            emission_probs[tag]["#UNK#"] = SMALL_PROB

    # Read dev input and run Viterbi
    dev_sentences = read_dev_sentences("EN/dev.in", word_counts, k=3)
    tagged_sentences = viterbi(dev_sentences, transition_probs, emission_probs, tag_set)

    # Write output
    with open("EN/dev.p2.out", "w") as f:
        for sentence in tagged_sentences:
            for word, tag in sentence:
                f.write(f"{word} {tag}\n")
            f.write("\n")