In [78]:
import os
from pathlib import Path


def open_filepath(filePath):
    try:
        abs_path = Path(os.path.abspath(filePath))
        with open(abs_path, "r") as file:
            return file.readlines()
    except FileNotFoundError:
        print(f"Error: File not found at {abs_path}")
        print(f"Current working directory: {os.getcwd()}")
        return None


def process_all_data():

    unprocessedTrain = open_filepath("EN/train")
    unprocessedDevIn = open_filepath("EN/dev.in")
    unprocessedDevOut = open_filepath("EN/dev.out")
    ProcessedTrain = []
    ProcessedDevIn = []
    ProcessedDevOut = []

    START_S = "START"

    for line in unprocessedTrain:
        value, tag = "\n", START_S
        if len(line) != 0 and line != '\n':
            temp = line.rstrip().split(" ")
            value, tag = temp[0], temp[1]
        ProcessedTrain.append((value, tag))

    for line in unprocessedDevIn:
        temp = "\n"
        if len(line) != 0:
            temp = line.strip()
        ProcessedDevIn.append(temp)

    for line in unprocessedDevOut:
        value, tag = "\n", START_S
        if len(line) != 0 and line != "\n":
            temp = line.rstrip().split(" ")
            value, tag = temp[0], temp[1]
        ProcessedTrain.append((value, tag))

    return {
        "ProcessedTrain": ProcessedTrain,
        "ProcessedDevIn": ProcessedDevIn,
        "ProcessedDevOut": ProcessedDevOut,
        }

In [79]:
data = process_all_data()
train_data = data["ProcessedTrain"]
dev_in_data = data["ProcessedDevIn"]
dev_out_data = data["ProcessedDevOut"]

In [80]:
def transition_mle(data):
    tags = {}
    count = {}

    START_S = "START"
    STOP_S = "STOP"

    prev_state = START_S

    for _, curr_state in data:
        if not (curr_state == START_S and prev_state == START_S):
            if curr_state != START_S:
                tags[(prev_state, curr_state)] = (
                    tags.get((prev_state, curr_state), 0) + 1
                )
            else:
                tags[(prev_state, STOP_S)] = tags.get((prev_state, STOP_S), 0) + 1
            count[curr_state] = count.get(curr_state, 0) + 1

        prev_state = curr_state

    for transition in tags:
        tags[transition] /= count[transition[0]]

    return tags

In [81]:
def emission_mle(data):
    START_S = "START"
    emission_prob = {}
    tags = {}
    for word, tag in data:
        if tag not in tags:
            tags[tag] = {"count": 0, "words": {}}
        tags[tag]["count"] += 1
        if tag != START_S:
            tags[tag]["words"][word] = tags[tag]["words"].get(word, 0) + 1
    for tag in tags:
        emission_prob[tag] = {}
        for word in tags[tag]["words"]:
            emission_prob[tag][word] = tags[tag]["words"][word] / tags[tag]["count"]
    return emission_prob

In [85]:
def smooth(data: dict, k:int=3 )->dict:
    UNK = "#UNK#"
    tags = {}
    replaced = {}
    for word, tag in data:
        if tag not in tags:
            tags[tag] = {}
        tags[tag][word] = tags[tag].get(word, 0) + 1

    for tag in tags:
        if tag not in replaced:
            replaced[tag] = []
        for word in tags[tag]:
            if tags[tag][word] < k:
                replaced[tag].append(word)

    for i in range(len(data)):
        if data[i][0] in replaced[data[i][1]]:
            data[i] = (UNK, data[i][1])
    
    return data

def smooth_MLE(data:dict, k:int=3) -> dict:
    data = smooth(data, k)
    return emission_mle(data)

def predict(inp_data, out_file, emission):
    words = set()
    res = []
    UNK = "#UNK#"
    for tag in emission:
        words = words | set(emission[tag].keys())
    for word in inp_data:
        curr_word = word if word in words else UNK
        pred_tag = max(emission.keys(), key = lambda tag: emission[tag].get(curr_word, 0))
        res.append((word, pred_tag))
    with open(out_file, "w") as f:
        for word, tag in res:
            f.write(f"{word} {tag}\n")
        f.close()

predict(dev_in_data, "EN/dev.p2.out", smooth_MLE(train_data))

In [63]:
def viterbi(seq, a, b, states):

    # init score matrix
    N = len(seq)
    T = len(a)
    score_m = [[0] * T for _ in range(N + 1)]
    backpointer = [[0] * T for _ in range(N + 1)]

    START_S = "START"
    STOP_S = "STOP"

    # init step 0 START
    for j in range(T):
        score_m[0][j] = a[(START_S, states[j])] * b[states[j], seq[0]]

    # forward
    for i in range(1, N):
        for j in range(T):
            parent = 0
            score = 0
            for k in range(0, T):
                score = (
                    score_m[i - 1][k] * a[(states[k], states[j])] * b[states[j], seq[i]]
                )
                if score_m[i][j] < score:
                    score_m[i][j] = score
                    parent = k
            backpointer[i][j] = parent

    # terminate recursion
    best_final_score = 0
    last_parent = None
    for j in range(T):
        score_m[N][j] = score_m[N - 1][j] * a[(states[j], STOP_S)]
        if score_m[N][j] > best_final_score:
            last_parent = j
            best_final_score = score_m[N][j]

    # backwards
    res = [None] * (N + 1)
    res[N] = STOP_S
    res[N - 1] = states[last_parent]

    for i in range(N - 2, -1, -1):
        last_parent = backpointer[i + 1][last_parent]
        res[i] = states[last_parent]

    return res