In [25]:
import json
import string
import numpy as np
import math
from collections import defaultdict
from utils import load_data, assign_unk_english, get_word_tag

In [26]:
def create_dictionaries(training_corpus, vocab, verbose=True):
    """
    Input: 
        training_corpus: a corpus where each line has a word followed by its tag.
        vocab: a dictionary where keys are words in vocabulary and value is an index
    Output: 
        emission_counts: a dictionary where the keys are (tag, word) and the values are the counts
        transition_counts: a dictionary where the keys are (prev_tag, tag) and the values are the counts
        tag_counts: a dictionary where the keys are the tags and the values are the counts
    """
    emission_counts = defaultdict(int)
    transition_counts = defaultdict(int)
    tag_counts = defaultdict(int)
    prev_tag = '--s--' 
    i = 0 
    for word_tag in training_corpus:
        i += 1
        if i % 50000 == 0 and verbose:
            print(f"word count = {i}")
        word, tag = get_word_tag(word_tag, vocab, True)
        transition_counts[(prev_tag, tag)] += 1
        emission_counts[(tag, word)] += 1
        tag_counts[tag] += 1
        prev_tag = tag
    return emission_counts, transition_counts, tag_counts

def predict_pos(prep, y, emission_counts, vocab, states):
    '''
    Input: 
        prep: a preprocessed version of 'y'. A list with the 'word' component of the tuples.
        y: a corpus composed of a list of tuples where each tuple consists of (word, POS)
        emission_counts: a dictionary where the keys are (tag,word) tuples and the value is the count
        vocab: a dictionary where keys are words in vocabulary and value is an index
        states: a sorted list of all possible tags for this assignment
    Output: 
        accuracy: Number of times you classified a word correctly
    '''
    num_correct = 0
    all_words = set(emission_counts.keys())
    total = 0
    for word, y_tup in zip(prep, y): 
        y_tup_l = y_tup.split()
        if len(y_tup_l) == 2:
            true_label = y_tup_l[1]
        else:
            continue
        count_final = 0
        pos_final = ''
        if word in vocab:
            for pos in states:
                key = (pos, word)
                if key in all_words:
                    count = emission_counts[key]
                    if count > count_final:
                        count_final = count
                        pos_final = pos
            if pos_final == true_label:
                num_correct += 1
        total += 1
    accuracy = num_correct / total
    return accuracy

In [27]:
def create_transition_matrix(alpha, tag_counts, transition_counts):
    ''' 
    Input: 
        alpha: number used for smoothing
        tag_counts: a dictionary mapping each tag to its respective count
        transition_counts: a dictionary where the keys are (prev_tag, tag) and the values are the counts
    Output:
        A: matrix of dimension (num_tags,num_tags)
    '''
    all_tags = sorted(tag_counts.keys())
    num_tags = len(all_tags)
    A = np.zeros((num_tags,num_tags))
    trans_keys = set(transition_counts.keys())
    for i in range(num_tags):
        for j in range(num_tags):
            count = 0
            key = (all_tags[i], all_tags[j])
            if key in trans_keys:
                count = transition_counts[key]
            count_prev_tag = tag_counts[key[0]]
            A[i,j] = (count + alpha) / ( count_prev_tag + (alpha * num_tags))
    return A

def create_emission_matrix(alpha, tag_counts, emission_counts, vocab):
    '''
    Input: 
        alpha: tuning parameter used in smoothing 
        tag_counts: a dictionary mapping each tag to its respective count
        emission_counts: a dictionary where the keys are (tag, word) and the values are the counts
        vocab: a dictionary where keys are words in vocabulary and value is an index.
               within the function it'll be treated as a list
    Output:
        B: a matrix of dimension (num_tags, len(vocab))
    '''
    num_tags = len(tag_counts)
    all_tags = sorted(tag_counts.keys())
    num_words = len(vocab)
    B = np.zeros((num_tags, num_words))
    emis_keys = set(list(emission_counts.keys()))
    for i in range(num_tags):
        for j in range(num_words):
            count = 0 
            key = (all_tags[i], vocab[j]) # tuple of form (tag,word)
            if key in emis_keys:
                count = emission_counts[key]
            count_tag = tag_counts[key[0]]
            B[i,j] = (count + alpha) / (count_tag + (num_words * alpha))
    return B

def initialize(states, tag_counts, A, B, corpus, vocab):
    '''
    Input: 
        states: a list of all possible parts-of-speech
        tag_counts: a dictionary mapping each tag to its respective count
        A: Transition Matrix of dimension (num_tags, num_tags)
        B: Emission Matrix of dimension (num_tags, len(vocab))
        corpus: a sequence of words whose POS is to be identified in a list 
        vocab: a dictionary where keys are words in vocabulary and value is an index
    Output:
        best_probs: matrix of dimension (num_tags, len(corpus)) of floats
        best_paths: matrix of dimension (num_tags, len(corpus)) of integers
    '''
    num_tags = len(tag_counts)
    best_probs = np.zeros((num_tags, len(corpus)))
    best_paths = np.zeros((num_tags, len(corpus)), dtype=int)
    s_idx = states.index("--s--")
    for i in range(num_tags):
        best_probs[i,0] = math.log(A[s_idx, i]) + math.log(B[i, vocab[corpus[0]]])
    return best_probs, best_paths

In [28]:
def viterbi_forward(A, B, test_corpus, best_probs, best_paths, vocab, verbose=True):
    '''
    Input: 
        A, B: The transition and emission matrices respectively
        test_corpus: a list containing a preprocessed corpus
        best_probs: an initilized matrix of dimension (num_tags, len(corpus))
        best_paths: an initilized matrix of dimension (num_tags, len(corpus))
        vocab: a dictionary where keys are words in vocabulary and value is an index 
    Output: 
        best_probs: a completed matrix of dimension (num_tags, len(corpus))
        best_paths: a completed matrix of dimension (num_tags, len(corpus))
    '''
    num_tags = best_probs.shape[0]
    for i in range(1, len(test_corpus)): 
        if i % 5000 == 0 and verbose:
            print("Words processed: {:>8}".format(i))
        for j in range(num_tags):
            best_prob_i = float("-inf")
            best_path_i = None
            for k in range(num_tags):
                prob = best_probs[k, i-1] + math.log(A[k,j]) + math.log(B[j,vocab[test_corpus[i]]])
                if prob > best_prob_i:
                    best_prob_i = prob
                    best_path_i = k
            best_probs[j,i] = best_prob_i
            best_paths[j,i] = best_path_i
    return best_probs, best_paths

def viterbi_backward(best_probs, best_paths, corpus, states):
    '''
    This function returns the best path.
    
    '''
    m = best_paths.shape[1] 
    z = [None] * m # DO NOT replace the "None"
    num_tags = best_probs.shape[0]
    best_prob_for_last_word = float('-inf')
    pred = [None] * m
    for k in range(num_tags):
        if best_probs[k, -1] > best_prob_for_last_word: 
            best_prob_for_last_word = best_probs[k, -1]
            z[m - 1] = k
    pred[m - 1] = states[z[m - 1]]
    for i in range(m-1, 0, -1):
        pos_tag_for_word_i = z[i]
        z[i - 1] = best_paths[pos_tag_for_word_i, i]
        pred[i - 1] = states[z[i - 1]]
    return pred

In [29]:
def compute_accuracy(pred, y):
    '''
    Input: 
        pred: a list of the predicted parts-of-speech 
        y: a list of lines where each word is separated by a '\t' (i.e. word \t tag)
    Output: 
        
    '''
    num_correct = 0
    total = 0
    for prediction, y in zip(pred, y):
        word_tag_tuple = y.split()
        if len(word_tag_tuple) != 2: 
            continue
        word, tag = word_tag_tuple[0], word_tag_tuple[1]
        if prediction == tag:
            num_correct += 1
        total += 1
    return num_correct/total

In [30]:
train_path = "../../data/WSJ_02-21.pos"
test_path = "../../data/WSJ_24.pos"
corpus_path = "../../data/hmm_vocab.txt"

training_corpus = load_data(train_path, False)
y = load_data(test_path, False)
vocab = load_data(corpus_path, True)

prep = []
for item in y:
    word = item.split('\t')[0]
    if not word in vocab.keys():
        prep.append(assign_unk_english(word))
    else:
        prep.append(word)

In [41]:
emission_counts, transition_counts, tag_counts = create_dictionaries(training_corpus, vocab)
states = sorted(tag_counts.keys())
accuracy_predict_pos = predict_pos(prep, y, emission_counts, vocab, states)

word count = 50000
word count = 100000
word count = 150000
word count = 200000
word count = 250000
word count = 300000
word count = 350000
word count = 400000
word count = 450000
word count = 500000
word count = 550000
word count = 600000
word count = 650000
word count = 700000
word count = 750000
word count = 800000
word count = 850000
word count = 900000
word count = 950000


In [9]:
#prep = ['economy', 'what', '--unk--', 'run']
best_probs, best_paths = initialize(states, tag_counts, A, B, prep, vocab)
best_probs, best_paths = viterbi_forward(A, B, prep, best_probs, best_paths, vocab)
pred = viterbi_backward(best_probs, best_paths, prep, states)

Words processed:     5000
Words processed:    10000
Words processed:    15000
Words processed:    20000
Words processed:    25000
Words processed:    30000


In [10]:
print(f"Accuracy of the Viterbi algorithm is {compute_accuracy(pred, y):.4f}")

Accuracy of the Viterbi algorithm is 0.9542


In [76]:
import pandas as pd
from conllu import parse_incr
from pathlib import Path

def conllu_to_dataframe(filepath):
    """
    Parses a CoNLL-U file and returns a pandas DataFrame with columns:
    'sentence_id', 'word', 'lemma', 'upos', 'xpos', 'feats'
    """
    data = []
    sentence_id = 0

    with open(filepath, "r", encoding="utf-8") as f:
        for tokenlist in parse_incr(f):
            for token in tokenlist:
                if isinstance(token['id'], int):  # skip multi-word tokens
                    data.append({
                        "sentence_id": sentence_id,
                        "word": token["form"],
                        "lemma": token["lemma"],
                        "upos": token["upos"],  # Universal POS
                        "xpos": token["xpos"],  # Language-specific POS
                        "feats": token["feats"]  # Morphological features
                    })
            data.append({
                "sentence_id": sentence_id,
                "word": '\n',
                "lemma": '\n',
                "upos": '\n',  # Universal POS
                "xpos": '\n',  # Language-specific POS
                "feats": '\n'  # Morphological features
            })
            sentence_id += 1

    return pd.DataFrame(data)

train_file = "../../data/es_ancora-ud-train.conllu"
test_file = "../../data/es_ancora-ud-test.conllu"

# --- Load as DataFrames ---
df_train = conllu_to_dataframe(train_file)
df_train['word'] = df_train['word'].apply(lambda x: x if len(x)<2 else x.replace('"','').replace("'", '').replace('(', '').replace(')', ''))
df_test = conllu_to_dataframe(test_file)
df_test['word'] = df_test['word'].apply(lambda x: x if len(x)<2 else x.replace('"','').replace("'", '').replace('(', '').replace(')', ''))

In [78]:
upos_to_ptb = {
    "NOUN": "NN",      # Common noun, singular or mass
    "PROPN": "NNP",    # Proper noun, singular
    "VERB": "VB",      # Base form of verb (e.g., "run")
    "AUX": "VB",       # Auxiliary verb (e.g., "have", "be") simplified to base form
    "ADJ": "JJ",       # Adjective (e.g., "blue")
    "ADV": "RB",       # Adverb (e.g., "quickly")
    "PRON": "PRP",     # Personal pronoun (e.g., "he", "they")
    "DET": "DT",       # Determiner (e.g., "the", "some")
    "ADP": "IN",       # Preposition or subordinating conjunction (e.g., "in", "of", "that")
    "CCONJ": "CC",     # Coordinating conjunction (e.g., "and", "but")
    "SCONJ": "IN",     # Subordinating conjunction, mapped to "IN" (same as ADP)
    "NUM": "CD",       # Cardinal number (e.g., "one", "42")
    "PART": "RP",      # Particle (e.g., "up" in "give up")
    "INTJ": "UH",      # Interjection (e.g., "uh", "wow")
    "PUNCT": ".",      # Punctuation (simplified to comma — adjust as needed)
    "SYM": "SYM",      # Symbol (e.g., "$", "%")
    "X": "FW",          # Foreign word or other uncategorized token
    "\n": ""          # Foreign word or other uncategorized token
}


In [79]:
df_train = df_train[['word', 'upos']]
df_train['label'] = df_train['upos'].apply(lambda x: upos_to_ptb[x])

df_test = df_test[['word', 'upos']]
df_test['label'] = df_test['upos'].apply(lambda x: upos_to_ptb[x])

In [80]:
training_corpus = (df_train['word']+'\t'+df_train['label']+'\n').tolist()
y = (df_test['word']+'\t'+df_test['label']+'\n').tolist()

In [81]:
unk_list = ["--unk_digit--", "--unk_punct--", "--unk_upper--", "--unk_noun--", "--unk_verb--", "--unk_adj--", "--unk_adv--", "--unk--"]
vocab_list = df_train['word'].unique().tolist()
vocab_list.extend(unk_list)
vocab = {}
for i, word in enumerate(sorted(vocab_list)):
    vocab[word] = i

In [82]:
prep = []
for item in y:
    word = item.split('\t')[0]
    if not word in vocab.keys():
        prep.append(assign_unk_english(word))
    else:
        prep.append(word)

In [83]:
emission_counts, transition_counts, tag_counts = create_dictionaries(training_corpus, vocab)
states = sorted(tag_counts.keys())
accuracy_predict_pos = predict_pos(prep, y, emission_counts, vocab, states)

word count = 50000
word count = 100000
word count = 150000
word count = 200000
word count = 250000
word count = 300000
word count = 350000
word count = 400000
word count = 450000


In [84]:
alpha = 0.001
A = create_transition_matrix(alpha, tag_counts, transition_counts)
B = create_emission_matrix(alpha, tag_counts, emission_counts, list(vocab))

In [86]:
#prep = ['economy', 'what', '--unk--', 'run']
best_probs, best_paths = initialize(states, tag_counts, A, B, prep, vocab)
best_probs, best_paths = viterbi_forward(A, B, prep, best_probs, best_paths, vocab)
pred = viterbi_backward(best_probs, best_paths, prep, states)

Words processed:     5000
Words processed:    10000
Words processed:    15000
Words processed:    20000
Words processed:    25000
Words processed:    30000
Words processed:    35000
Words processed:    40000
Words processed:    45000
Words processed:    50000
Words processed:    55000


In [87]:
print(f"Accuracy of the Viterbi algorithm is {compute_accuracy(pred, y):.4f}")

Accuracy of the Viterbi algorithm is 0.9361
