# Task 1: Vocabulary Creation (20 points)

The first task is to create a vocabulary using the training data. In HMM, one important problem when creating the vocabulary is to handle unknown words. One simple solution is to replace rare words whose occurrences are less than a threshold (e.g. 3) with a special token ‘< unk >’.

## REMARKS

1) Threshold for replacing unknown words: 3
2) Total vocabulary size: 16920
3) Total occurrences of <unk> after replacement: 32537
    
    
### Description
    
- Here, firstly I am reading the training data and pulling words out of it
- Then, I count the instances of each word so that we can get each word's frequency in the dataset
- Also, I set up a threshold value and words that have count less than the threshold are considered to be uncommon
- Rare words are also handled by substituting them with a unique token. This helps us during model training, as it would aid in simplifying the vocabulary and handling words that are not a part of the lexicon
- Then the terms in the vocabulary are sorted according to frequency in descending order. This gives the most commonly used words on the top. The sorted vocab is saved in a file "vocab.txt"
- Indexing is used for quick and simple search
- Lastly, I figure out and report the required data which are printed in the REMARKS section

In [1]:
from collections import Counter

vocab_list = []
train_file_path = 'data/train'
with open(train_file_path, 'r') as f:
    train_data = []
    for line in f:
        line_parts = line.split()
        if len(line_parts) >= 2:
            train_data.append(line_parts[1])
    

word_counts = Counter(train_data)

threshold = 3
for word in list(word_counts.keys()):
    if word != '<unk>' and word_counts[word] < threshold:
        word_counts['<unk>'] += word_counts[word]
        del word_counts[word]

sorted_vocab = sorted(word_counts.items(), key=lambda x: x[1], reverse=True)

with open('vocab.txt', 'w') as vocab_file:
    vocab_file.write('<unk>\t0\t' + str(word_counts['<unk>']) + '\n')
    
    index = 1
    for word, count in sorted_vocab:
        if word != '<unk>': 
            vocab_file.write(word + '\t' + str(index) + '\t' + str(count) + '\n')
            vocab_list.append(word) 
            index += 1

print("Threshold for replacing unknown words:", threshold)

total_vocab_size = len(sorted_vocab)
print("Total vocabulary size:", total_vocab_size)

total_unk_occurrences = word_counts['<unk>']
print("Total occurrences of <unk> after replacement:", total_unk_occurrences)

Threshold for replacing unknown words: 3
Total vocabulary size: 16920
Total occurrences of <unk> after replacement: 32537


# Task 2: Model Learning (20 points)

The second task is to learn an HMM from the training data.

## REMARKS

(1) The number of transition parameters in HMM: 1392
(2) The number of emission parameters in HMM: 23373
    
    
### Description
    
- Here, firstly I am initializing counters for the state, emission and transition parameters to count their occurrences in a HMM
- There is also a tracker for previous state which starts with "start" and this will handle the beginning of sentences
- Now, iterating through each line in the training data, I split each line into tokens and check if it is not empty
- If not empty, based on the previous and the current state and also update the emission counts for the current state and observation
- If the word is not in the vocabulary, I am handling it as a unknown. On top of that, I also update the counter for the current state.
- If the line is empty, I reset the previous state to "start" and update its count
- Here, I calculate transition and emission probabilities as shown in the PPT 

## Transition Probability - 
Count(Transaction) / Count(Previous State)

## Emission Probability - 
Count(Emission) / Count(Corresponding State)

- Next, I convert the calculated probabilities into JSON format and write the JSON data to a file named 'hmm.json' in the 'data' directory

In [2]:
import json
from collections import defaultdict

def count_parameters(filename, vocab_list):
    state_counts = defaultdict(int)
    emission_counts = defaultdict(int)
    transition_counts = defaultdict(int)
    prev_state = "start"
    state_counts["start"] += 1

    try:
        with open(filename, "r") as f:
            for line in f:
                tokens = line.split()
                if len(tokens) != 0:
                    transition_counts[(prev_state, tokens[2])] += 1
                    if tokens[1] in vocab_list:
                        emission_counts[(tokens[2], tokens[1])] += 1
                    else:
                        emission_counts[(tokens[2], '<unk>')] += 1
                    state_counts[tokens[2]] += 1
                    prev_state = tokens[2]
                else:
                    prev_state = "start"
                    state_counts["start"] += 1
    except Exception as e:
        print("Error occurred while processing data:", e)

    return state_counts, emission_counts, transition_counts

def calculate_probabilities(state_counts, emission_counts, transition_counts):
    transition_probabilities = {}
    emission_probabilities = {}

    for (prev_state, state), count in transition_counts.items():
        transition_probabilities[(prev_state, state)] = count / state_counts[prev_state]

    for (state, observation), count in emission_counts.items():
        emission_probabilities[(state, observation)] = count / state_counts[state]

    return transition_probabilities, emission_probabilities

def convert_to_json(transition_probabilities, emission_probabilities):
    transition_json = {str(key): val for key, val in transition_probabilities.items()}
    emission_json = {str(key): val for key, val in emission_probabilities.items()}
    return {'transition': transition_json, 'emission': emission_json}


filename = "data/train"

state_counts, emission_counts, transition_counts = count_parameters(filename, vocab_list)

transition_probabilities, emission_probabilities = calculate_probabilities(state_counts, emission_counts, transition_counts)

total_dict = convert_to_json(transition_probabilities, emission_probabilities)

with open("./data/hmm.json", "w") as output_file:
    json.dump(total_dict, output_file, indent=4)

print("The number of transition parameters in HMM:", len(transition_probabilities))
print("The number of emission parameters in HMM:", len(emission_probabilities))

The number of transition parameters in HMM: 1392
The number of emission parameters in HMM: 23373


# Task 3: Greedy Decoding with HMM (30 points)

The third task is to implement the greedy decoding algorithm with HMM.

## REMARKS

(1) The Accuracy on dev data: 92.23%
    
    
### Description
    
- I start by creating default dictionaries to store emission and transition probabilities.
- These dictionaries provide default values for offsets and state pairs that are not visible.
- Iterate through each line of the development data file in greedy_decode when the input data needs to be developed (data == "dev"). 
- I remove the word and the correct ID from each line. I then predict the most likely label for each word using emission and transition probabilities. 
- For prediction, the identifier with the highest combined probability of the emission probability of the assigned word and the transition probability from the previous identifier to the current identifier is selected.
- If the input data is for testing (data == "test"), I iterate through each line of the test data file and then predict the most likely label for each word. I then write the word, its predicted tag and index to an output file called "greedy.out". I also make sure to restore the previous tag at the end of each sentence.
- eval.py was run locally by me and I receive the same accuracy

In [3]:
import copy
from collections import defaultdict

tags = copy.deepcopy(list(state_counts.keys()))
tags.remove('start')

def greedy_decoding(data, vocab_list, emission, transition, tags):
    emission_probs = defaultdict(lambda: emission.get(('<unk>', '<unk>'), 0))
    transition_probs = defaultdict(lambda: 0)
    
    for state, word in emission:
        emission_probs[(state, word)] = emission[(state, word)]

    for prev_state, next_state in transition:
        transition_probs[(prev_state, next_state)] = transition[(prev_state, next_state)]

    if data == "dev":
        with open("./data/dev", "r") as f:
            actual_tags = []
            predicted_tags = []
            prev_tag = "start"
            for line in f:
                get_indiv = line.split('\t')
                if len(get_indiv) > 1:
                    actual_tags.append(get_indiv[2][:-1])
                    max_pred_tag = max(tags, key=lambda state: emission_probs[(state, get_indiv[1])] * transition_probs[(prev_tag, state)])
                    prev_tag = max_pred_tag
                    predicted_tags.append(max_pred_tag)
                else:
                    prev_tag = "start"
            return actual_tags, predicted_tags

    elif data == "test":
        with open("./data/test", "r") as f_test, open("./data/greedy.out", "w") as model_out:
            prev_tag = "start"
            i = 1
            for line in f_test:
                get_indiv = line.split()
                if get_indiv:
                    max_pred_tag = max(tags, key=lambda state: emission_probs[(state, get_indiv[1])] * transition_probs[(prev_tag, state)])
                    prev_tag = max_pred_tag
                    model_out.write(f"{i}\t{get_indiv[1]}\t{max_pred_tag}\n")
                    i += 1
                else:
                    prev_tag = "start"
                    model_out.write("\n")
                    i = 1

        print("greedy.out file created in data folder.")


In [4]:
greedy_decoding("dev", vocab_list, emission_probabilities, transition_probabilities, tags)

(['DT',
  'NNP',
  'NNP',
  'NNP',
  'VBD',
  'DT',
  'CD',
  'NN',
  'NN',
  'NN',
  'IN',
  'NNP',
  'NNP',
  'NNP',
  'NNP',
  ',',
  'RB',
  'JJR',
  'IN',
  'VBN',
  'JJ',
  'NN',
  'IN',
  'DT',
  'NN',
  'NN',
  'NN',
  'CC',
  'RB',
  'PDT',
  'DT',
  'NN',
  'VBN',
  'IN',
  'DT',
  'NN',
  '.',
  'DT',
  'NN',
  'VBZ',
  'DT',
  'NN',
  'IN',
  'NNS',
  'IN',
  'NNP',
  'NNP',
  ',',
  'VBG',
  'JJ',
  'NNS',
  ',',
  'DT',
  'CD',
  'NN',
  'NN',
  'IN',
  'DT',
  'JJ',
  'NN',
  'NN',
  'CC',
  'DT',
  'NN',
  'IN',
  'JJ',
  'NNP',
  'NNP',
  'NNP',
  'IN',
  'DT',
  'NN',
  'NN',
  'IN',
  'PRP$',
  'NN',
  'NNS',
  '.',
  'DT',
  'NNP',
  'JJ',
  'NN',
  'VBZ',
  'IN',
  '$',
  'CD',
  'CD',
  'IN',
  'JJ',
  'NN',
  'RB',
  ',',
  'VBN',
  'IN',
  'DT',
  '$',
  'CD',
  'CD',
  'NN',
  'VBN',
  'IN',
  'DT',
  'NN',
  'NN',
  'NN',
  '.',
  'DT',
  'NN',
  'VBD',
  'VBN',
  'NNS',
  'VBG',
  '$',
  'CD',
  'CD',
  ',',
  'CC',
  'CD',
  'NN',
  '.',
  'DT',
  'NN',
  'V

In [5]:
greedy_decoding("test", vocab_list, emission_probabilities, transition_probabilities, tags)

greedy.out file created in data folder.


In [6]:
def calculate_accuracy(actual_tags, predicted_tags):
    correct_count = sum(1 for actual, predicted in zip(actual_tags, predicted_tags) if actual == predicted)
    total_count = len(actual_tags)
    accuracy = correct_count / total_count * 100
    return accuracy

actual_tags, predicted_tags = greedy_decoding("dev", vocab_list, emission_probabilities, transition_probabilities, tags)
accuracy = calculate_accuracy(actual_tags, predicted_tags)
print(f"Accuracy on dev data: {accuracy:.2f}%")

Accuracy on dev data: 92.23%


# Task 4: Viterbi Decoding with HMM (30 points)

The fourth task is to implement the viterbi decoding algorithm with HMM.

## REMARKS

(1) The Accuracy on dev data: 94.37%
    
    
### Description
    
- I have created 2 functions to handle test and dev files seperately
- For each line in the input file - 
a) Split the line into individual components
b) Extract the actual POS tag from the line and append it to the actual_tags list
- Then I initialize the variable which are necessay that represent the data structure used to represent and compute probabilities of possible sequences of hidden state
- Now, I store probabilities of each POS tag at each word position
- For the first word in each sentence, I calculate the initial probabilities based on emission and transition probabilities.

Viterbi Algorithm Iteration:

- For subsequent words in the sentence, I update the data structure using the Viterbi algorithm, considering emission and transition probabilities.
- Then I selects the most probable previous state (POS tag) for each current state (POS tag) based on these probabilities.

- Now, I handle and perform backtracking to find the most likely sequence of POS tags, starting from the last word and tracing back through the trellis, selecting the most probable POS tag at each step.


- After processing all lines in the input file, I write the predicted POS tags to an output file named viterbi.out.
- It follows the same format as the input file, with each line containing a token, its associated information, and the predicted POS tag.

- eval.py was run locally by me and I receive the same accuracy

In [7]:
import copy

def viterbiDecoding_dev(vocab_list, emission, transition, tags):
    f = open('./data/dev', 'r')
    actual_tags = []
    predicted_tags = []
    prev_tag = "start" 
    for line in f:
        get_indiv = line.split('\t')
        if len(get_indiv) > 1:
            actual_tags.append(get_indiv[2][:-1])
            if prev_tag == "start":
                viterbi = []
                first_dict = {}
                for state in tags:
                    if get_indiv[1] in vocab_list:
                        em_prob = emission.get((state, get_indiv[1]), 0)
                    else:
                        em_prob = emission.get((state, '<unk>'), 0)
                    trans_prob = transition.get(('start', state), 0)
                    prob = em_prob * trans_prob
                    first_dict[state] = (prob, prev_tag)
                viterbi.append(copy.deepcopy(first_dict))
                prev_tag = "not start"
            else:
                curr_dict = {}
                for state in tags:
                    if get_indiv[1] in vocab_list:
                        em_prob = emission.get((state, get_indiv[1]), 0)
                    else:
                        em_prob = emission.get((state, '<unk>'), 0)
                    max_state_prob = [-1, None]
                    for prev_state_key, prev_state_val in viterbi[-1].items():
                        trans_prob = transition.get((prev_state_key, state), 0)
                        prev_state_prob_val = prev_state_val[0]
                        final_prob = em_prob * trans_prob * prev_state_prob_val
                        if final_prob > max_state_prob[0]:
                            max_state_prob = [final_prob, prev_state_key]
                    curr_dict[state] = (max_state_prob[0], max_state_prob[1])
                viterbi.append(copy.deepcopy(curr_dict))
        else:
            preds = []
            max_val = max(viterbi[len(viterbi) - 1].values(), key=lambda x: x[0])
            preds.append(next(key for key, val in viterbi[len(viterbi) - 1].items() if val == max_val))
            prev_state = max_val[1]
            for i in range(len(viterbi) - 2, -1, -1):
                preds.append(prev_state)
                prev_state = viterbi[i][prev_state][1]
            preds.reverse()
            predicted_tags.extend(preds)
            prev_tag = "start"
    f.close()
    preds = []
    max_val = max(viterbi[len(viterbi) - 1].values(), key=lambda x: x[0])
    preds.append(next(key for key, val in viterbi[len(viterbi) - 1].items() if val == max_val))
    prev_state = max_val[1]
    for i in range(len(viterbi) - 2, -1, -1):
        preds.append(prev_state)
        prev_state = viterbi[i][prev_state][1]
    preds.reverse()
    predicted_tags.extend(preds)

    return actual_tags, predicted_tags

def viterbiDecoding_test(vocab_list, emission, transition, tags):
    f = open('./data/test', 'r')
    predicted_tags = []
    prev_tag = "start"
    for line in f:
        get_indiv = line.split()
        if len(get_indiv) > 0:
            if prev_tag == "start":
                viterbi = []
                first_dict = {}
                for state in tags:
                    if get_indiv[1] in vocab_list:
                        em_prob = emission.get((state, get_indiv[1]), 0)
                    else:
                        em_prob = emission.get((state, '<unk>'), 0)
                    trans_prob = transition.get(('start', state), 0)
                    prob = em_prob * trans_prob
                    first_dict[state] = (prob, prev_tag)
                viterbi.append(copy.deepcopy(first_dict))
                prev_tag = "not start"
            else:
                curr_dict = {}
                for state in tags:
                    if get_indiv[1] in vocab_list:
                        em_prob = emission.get((state, get_indiv[1]), 0)
                    else:
                        em_prob = emission.get((state, '<unk>'), 0)
                    max_state_prob = [-1, None]
                    for prev_state_key, prev_state_val in viterbi[-1].items():
                        trans_prob = transition.get((prev_state_key, state), 0)
                        prev_state_prob_val = prev_state_val[0]
                        final_prob = em_prob * trans_prob * prev_state_prob_val
                        if final_prob > max_state_prob[0]:
                            max_state_prob = [final_prob, prev_state_key]
                    curr_dict[state] = (max_state_prob[0], max_state_prob[1])
                viterbi.append(copy.deepcopy(curr_dict))
        else:
            preds = []
            max_val = max(viterbi[len(viterbi) - 1].values(), key=lambda x: x[0])
            preds.append(next(key for key, val in viterbi[len(viterbi) - 1].items() if val == max_val))
            prev_state = max_val[1]
            for i in range(len(viterbi) - 2, -1, -1):
                preds.append(prev_state)
                prev_state = viterbi[i][prev_state][1]
            preds.reverse()
            predicted_tags.extend(preds)
            prev_tag = "start"
    f.close()
    preds = []
    max_val = max(viterbi[len(viterbi) - 1].values(), key=lambda x: x[0])
    preds.append(next(key for key, val in viterbi[len(viterbi) - 1].items() if val == max_val))
    prev_state = max_val[1]
    for i in range(len(viterbi) - 2, -1, -1):
        preds.append(prev_state)
        prev_state = viterbi[i][prev_state][1]
    preds.reverse()
    predicted_tags.extend(preds)

    model_out = open('./data/viterbi.out', 'w')
    f = open('./data/test', 'r')
    i = 0
    for line in f:
        get_indiv = line.split()
        if len(get_indiv) > 0:
            model_out.write(str(get_indiv[0]) + "\t" + get_indiv[1] + "\t" + predicted_tags[i] + "\n")
            i += 1
        else:
            model_out.write("\n")
    f.close()
    model_out.close()
    print("viterbi.out file created in data folder.")


In [8]:
actual_tags_new, predicted_tags_new = viterbiDecoding_dev(vocab_list, emission_probabilities, transition_probabilities, tags)
accuracy = calculate_accuracy(actual_tags_new, predicted_tags_new)
print(f"Accuracy on dev data: {accuracy:.2f}%")

Accuracy on dev data: 94.37%


In [9]:
viterbiDecoding_test(vocab_list, emission_probabilities, transition_probabilities, tags)

viterbi.out file created in data folder.
