# HMM POS Tagger
This notebook trains a Hidden Markov Model (HMM) POS tagger on the provided CONLLU data, evaluates on the dev and test files, and prints performance metrics.
Files used: `PosData/fa_perdt-ud-train.conllu`, `PosData/fa_perdt-ud-dev.conllu`, `PosData/fa_perdt-ud-test.conllu`.

## Dependencies (run this cell once to install)
pip install hmmlearn


In [9]:
import sys
# Install required packages if necessary. Uncomment to run within the notebook.
# Note: running installs inside notebooks may require a restart of the kernel in some setups.
# Uncomment the following line if pyconll or scikit-learn are missing:
# !{sys.executable} -m pip install pyconll scikit-learn numpy

## Load data and implement HMM (training, Viterbi, evaluation)

In [None]:
import pyconll
import math
from collections import defaultdict, Counter
from sklearn.metrics import classification_report, accuracy_score, confusion_matrix
import numpy as np

# Paths
TRAIN_PATH = 'PosData/fa_perdt-ud-train.conllu'
DEV_PATH = 'PosData/fa_perdt-ud-dev.conllu'
TEST_PATH = 'PosData/fa_perdt-ud-test.conllu'

def load_conllu(path):
    """apparently this is best practice to load conllu files using pyconll"""
    data = []
    try:
        conll = pyconll.load_from_file(path)
    except FileNotFoundError:
        print(f"Error: File not found at {path}. Returning empty data.")
        return []

    for sentence in conll:
        words = [token.form for token in sentence]
        tags = [token.upos for token in sentence]
        # Skip sentences with inconsistent tokenization/annotation
        if all(words) and all(tags) and len(words) == len(tags):
            data.append((words, tags))
    return data

# define train dev and test data
print("Loading data...")
train_data = load_conllu(TRAIN_PATH)
dev_data = load_conllu(DEV_PATH)
test_data = load_conllu(TEST_PATH)
print(f"Train sentences: {len(train_data)}, Dev sentences: {len(dev_data)}, Test sentences: {len(test_data)}")

# --- HMM Training Function ---
def train_hmm(sentences, alpha=1.0, unk_threshold=1):
    """Trains the HMM by calculating log probabilities with Laplace smoothing."""
    tag_counts = Counter()
    emission_counts = defaultdict(Counter)
    transition_counts = defaultdict(Counter)
    word_counts = Counter()

    for words, tags in sentences:
        # Check if the sentence is not empty
        if not tags:
            continue
            
        # Initial Transition (<s> -> Tag_1)
        prev = '<s>'
        transition_counts['<s>'][tags[0]] += 1
        
        # Internal Transitions and Emissions
        for w, t in zip(words, tags):
            word_counts[w] += 1
            tag_counts[t] += 1
            emission_counts[t][w] += 1
            transition_counts[prev][t] += 1
            prev = t
            
        # Final Transition (Tag_n -> </s>)
        transition_counts[prev]['</s>'] += 1

    # Build vocab with UNK handling for low-frequency words
    vocab = set(w for w,c in word_counts.items() if c > unk_threshold)
    vocab.add('<UNK>')
    tags = list(tag_counts.keys())

    # Precompute log-probabilities with Laplace smoothing
    V = len(vocab)
    tag_total = {t: tag_counts[t] for t in tags}

    # Emission probabilities (B)
    emission_logprob = defaultdict(dict)
    for t in tags:
        denom = tag_total[t] + alpha * V
        for w in vocab:
            # Check for word in the original counts
            count = emission_counts[t].get(w, 0)
            emission_logprob[t][w] = math.log((count + alpha) / denom)
    
    # Transition probabilities (A)
    transition_logprob = defaultdict(dict)
    all_possible_prev_tags = list(transition_counts.keys())
    
    for prev in all_possible_prev_tags:
        total = sum(transition_counts[prev].values())
        
        # N includes all regular tags + the end-of-sentence tag </s>
        N = len(tags) + (1 if prev != '<s>' else 0) # Only count </s> if not starting 
        
        # Use tags + </s> as all possible next states for smoothing
        possible_next_states = tags + (['</s>'] if prev != '<s>' else []) 

        # Correct calculation of N for smoothing denominator
        N_smoothing = len(tags) + 1 # Use |Tags| + 1 for start tag smoothing
        if prev != '<s>':
             N_smoothing = len(tags) + 1 # Use |Tags| + 1 for regular tag smoothing

        for t in possible_next_states:
            c = transition_counts[prev].get(t, 0)
            transition_logprob[prev][t] = math.log((c + alpha) / (total + alpha * N_smoothing))

    model = {
        'tags': tags,
        'vocab': vocab,
        'emission_logprob': emission_logprob,
        'transition_logprob': transition_logprob,
        'tag_counts': tag_total,
        'alpha': alpha
    }
    return model

# --- Viterbi Decoding Function ---
def viterbi(model, words):
    """Uses the Viterbi algorithm to find the most likely sequence of tags."""
    tags = model['tags']
    emission = model['emission_logprob']
    trans = model['transition_logprob']
    vocab = model['vocab']

    W = len(words)
    if W == 0:
        return []
        
    # Map rare/unseen words to <UNK>
    obs = [w.lower() if w in vocab else '<UNK>' for w in words]

    V = [{} for _ in range(W)] # Viterbi probabilities (log)
    backpointer = [{} for _ in range(W)]

    # Initialization (t=0)
    for t in tags:
        # Transition from <s>
        tp = trans.get('<s>', {}).get(t, math.log(1e-12))
        # Emission for obs[0] from tag t
        ep = emission.get(t, {}).get(obs[0], emission.get(t, {}).get('<UNK>', math.log(1e-12)))
        V[0][t] = tp + ep
        backpointer[0][t] = None

    # Recursion (t > 0)
    for i in range(1, W):
        for t in tags:
            max_prob = -np.inf
            arg_best = None
            # Emission for obs[i] from tag t
            ep = emission.get(t, {}).get(obs[i], emission.get(t, {}).get('<UNK>', math.log(1e-12)))
            
            for t_prev in tags:
                # Transition from t_prev to t
                tp = trans.get(t_prev, {}).get(t, math.log(1e-12))
                
                prob = V[i-1][t_prev] + tp + ep
                
                if prob > max_prob:
                    max_prob = prob
                    arg_best = t_prev
            
            V[i][t] = max_prob
            backpointer[i][t] = arg_best

    # Termination: Find best tag at the last position (W-1)
    best_prob = -np.inf
    best_tag = None
    for t in tags:
        # Include transition to </s>
        prob = V[W-1][t] + trans.get(t, {}).get('</s>', math.log(1e-12))
        if prob > best_prob:
            best_prob = prob
            best_tag = t

    # Backtrack
    out_tags = [best_tag] * W
    curr_tag = best_tag
    for i in range(W-1, 0, -1):
        curr_tag = backpointer[i][curr_tag]
        out_tags[i-1] = curr_tag
        
    return out_tags

# --- Evaluation Function ---
def evaluate_model(model, dataset, dataset_name='set'):
    """Performs tagging and prints metrics (Accuracy, Confusion Matrix, Classification Report)."""
    y_true = []
    y_pred = []
    
    # Get all unique tags from the model to ensure all tags are included in the report
    all_model_tags = sorted(model['tags']) 
    
    for words, tags in dataset:
        preds = viterbi(model, words)
        # Ensure predictions and true tags have the same length before extending
        if len(preds) == len(tags):
            y_true.extend(tags)
            y_pred.extend(preds)

    print(f'\n{"="*10} Evaluation on {dataset_name} Set {"="*10}')
    if not y_true:
        print("No valid data for evaluation.")
        return

    # Accuracy
    print('Overall Tagging Accuracy:', accuracy_score(y_true, y_pred))
    
    # Classification Report (Precision, Recall, F1-Score)
    print('\nDetailed Metrics (Precision, Recall, F1-Score) per UPOS Tag:')
    # Use only tags present in the true labels for the report
    target_names = sorted(list(set(y_true)))
    print(classification_report(y_true, y_pred, labels=target_names, target_names=target_names, digits=4, zero_division=0))
    
    # Confusion Matrix
    print('\nConfusion Matrix (Labels Order):')
    # Use all model tags for consistent matrix labels
    labels = all_model_tags
    cm = confusion_matrix(y_true, y_pred, labels=labels)
    print('Labels:', labels)
    print(cm)
    print('='*50)




print("Training HMM model...")
model = train_hmm(train_data)
print("HMM Training complete.")

# 1. Evaluate on Development Set (Tuning)
evaluate_model(model, dev_data, 'Development (Dev)')

# 2. Evaluate on Test Set (Final Score)
evaluate_model(model, test_data, 'Final Test')

Loading data...
Train sentences: 20913, Dev sentences: 1162, Test sentences: 1212
Training HMM model...
Train sentences: 20913, Dev sentences: 1162, Test sentences: 1212
Training HMM model...
HMM Training complete.
HMM Training complete.

Overall Tagging Accuracy: 0.9084587441619097

Detailed Metrics (Precision, Recall, F1-Score) per UPOS Tag:
              precision    recall  f1-score   support

         ADJ     0.8472    0.7879    0.8165      1485
         ADP     0.9053    0.9873    0.9445      2749
         ADV     0.9064    0.8313    0.8672       326
         AUX     0.9207    0.9744    0.9468       703
       CCONJ     0.9565    0.9976    0.9766       838
         DET     0.7793    0.9333    0.8494       420
        INTJ     0.9500    0.5938    0.7308        32
        NOUN     0.9344    0.8871    0.9101      6530
         NUM     0.9251    0.8607    0.8917       244
        PART     1.0000    0.6875    0.8148        16
        PRON     0.8694    0.9450    0.9056       655
     