In [1]:
import math
import random
from collections import defaultdict, Counter

In [2]:
# ---------------------------
# 1. Read word_TAG formatted data
# ---------------------------
def read_data(path):
    data = []
    with open(path, "r", encoding="utf-8") as f:
        for line in f:
            sent = []
            for tok in line.strip().split():
                if "_" in tok:
                    word, tag = tok.rsplit("_", 1)
                    sent.append((word, tag))
            data.append(sent)
    return data

In [3]:
# ---------------------------
# 2. K-fold split
# ---------------------------
def k_folds(data, K=3):
    random.shuffle(data)
    fold_size = len(data) // K
    folds = []
    for i in range(K):
        test = data[i*fold_size : (i+1)*fold_size]
        train = data[:i*fold_size] + data[(i+1)*fold_size:]
        folds.append((train, test))
    return folds

In [4]:
# ---------------------------
# 3. Train HMM: compute transition & emission probabilities
# ---------------------------
def train_hmm(train):
    transition = defaultdict(lambda: defaultdict(int))
    emission = defaultdict(lambda: defaultdict(int))
    tag_count = Counter()

    for sent in train:
        prev = "<START>"
        for w, t in sent:
            tag_count[t] += 1
            emission[t][w] += 1
            transition[prev][t] += 1
            prev = t
        transition[prev]["<END>"] += 1

    # convert counts â†’ probabilities
    trans_prob = defaultdict(dict)
    emit_prob = defaultdict(dict)

    for prev in transition:
        total = sum(transition[prev].values())
        for t in transition[prev]:
            trans_prob[prev][t] = transition[prev][t] / total

    for t in emission:
        total = sum(emission[t].values())
        for w in emission[t]:
            emit_prob[t][w] = emission[t][w] / total

    return trans_prob, emit_prob, list(tag_count.keys())

In [5]:
# ---------------------------
# 4. Viterbi Decoding
# ---------------------------
def viterbi(words, trans_prob, emit_prob, tags):
    V = [{}]
    back = [{}]

    # initialization
    for tag in tags:
        trans = trans_prob["<START>"].get(tag, 1e-12)
        emit = emit_prob[tag].get(words[0], 1e-12)
        V[0][tag] = math.log(trans) + math.log(emit)
        back[0][tag] = None

    # recursion
    for t in range(1, len(words)):
        V.append({})
        back.append({})
        for tag in tags:
            best_score = -1e18
            best_prev = None
            for prev in tags:
                trans = trans_prob[prev].get(tag, 1e-12)
                emit = emit_prob[tag].get(words[t], 1e-12)
                score = V[t-1][prev] + math.log(trans) + math.log(emit)
                if score > best_score:
                    best_score = score
                    best_prev = prev
            V[t][tag] = best_score
            back[t][tag] = best_prev

    # backtrack
    final_tag = max(V[-1], key=V[-1].get)
    seq = [final_tag]
    for t in range(len(words)-1, 0, -1):
        seq.append(back[t][seq[-1]])
    return list(reversed(seq))


In [6]:
# ---------------------------
# 5. Compute Precision / Recall / F1
# ---------------------------
def compute_f1(gold_tags, pred_tags):
    TP = Counter()
    FP = Counter()
    FN = Counter()

    for g, p in zip(gold_tags, pred_tags):
        if g == p:
            TP[g] += 1
        else:
            FP[p] += 1
            FN[g] += 1

    precision = []
    recall = []
    f1 = []

    all_tags = set(TP.keys()) | set(FP.keys()) | set(FN.keys())

    for tag in all_tags:
        p = TP[tag] / (TP[tag] + FP[tag]) if (TP[tag] + FP[tag]) else 0
        r = TP[tag] / (TP[tag] + FN[tag]) if (TP[tag] + FN[tag]) else 0
        f = (2*p*r)/(p+r) if (p+r) else 0
        precision.append(p)
        recall.append(r)
        f1.append(f)

    return sum(precision)/len(precision), sum(recall)/len(recall), sum(f1)/len(f1)

In [7]:
# ---------------------------
# 6. Run all folds
# ---------------------------
def run(path):
    data = read_data(path)
    folds = k_folds(data, K=3)

    for i, (train, test) in enumerate(folds):
        print(f"\nFold {i+1}")

        trans_prob, emit_prob, tags = train_hmm(train)

        all_gold = []
        all_pred = []

        for sent in test:
            words = [w for w, t in sent]
            gold = [t for w, t in sent]
            pred = viterbi(words, trans_prob, emit_prob, tags)
            all_gold.extend(gold)
            all_pred.extend(pred)

        P, R, F = compute_f1(all_gold, all_pred)
        print("Precision:", P)
        print("Recall:", R)
        print("F1:", F)

In [8]:
# ---------------------------
# Run program
# ---------------------------
run("wsj_pos_tagged_en.txt")


Fold 1
Precision: 0.854734262613313
Recall: 0.827315890102941
F1: 0.8376786399785129

Fold 2
Precision: 0.8713300618135604
Recall: 0.8377415442348625
F1: 0.850830608844512

Fold 3
Precision: 0.8739998892537978
Recall: 0.8468235991231241
F1: 0.8561243398005073
