In [1]:
# from google.colab import drive
# drive.mount('/content/drive')

In [2]:

from collections import defaultdict
import operator
import copy
import json
import toolz
import os
import subprocess
# os.chdir('/content/drive/Shared drives/USC_CSCI544-Applied NLP/HWs/HW3') # where the files for this project are


In [3]:
# Open and read the training data
with open("./data/train", "r") as f:
    count_dict = defaultdict(int)
    for line in f:
        get_words = line.split()
        if len(get_words) != 0:
            count_dict[get_words[1]] += 1

# Count the number of rare words (words occurring less than 2 times)
unkw = sum(val for val in count_dict.values() if val < 2)

# Sort the count dictionary by values in descending order
sorted_count_list = sorted(count_dict.items(), key=operator.itemgetter(1), reverse=True)

# **TASK 1**

In [4]:
# Open the vocab.txt file for writing
with open("vocab.txt", "w") as f:
    # Write the '<unk>' token line
    f.write('<unk>\t0\t' + str(unkw) + '\n')

    # Initialize variables
    i = 1
    vocab_count = 0
    vocab_list = []

    # Iterate through sorted_count_list and write lines to the file
    for word, count in sorted_count_list:
        if count >= 2:
            vocab_count += 1
            vocab_list.append(word)
            f.write(word + '\t' + str(i) + '\t' + str(count) + '\n')
            i += 1


In [5]:
print("The total size of the vocabulary is "+str(vocab_count)+".")
print("This is excluding the '<unk>' token.")
print("The total occurences of the special token '<unk>' after replacement is "+str(unkw)+".")

The total size of the vocabulary is 23182.
This is excluding the '<unk>' token.
The total occurences of the special token '<unk>' after replacement is 20011.


# **TASK 2**

In [6]:
# Initialize defaultdicts and variables
s_counts = defaultdict(int)
e_counts = defaultdict(int)
t_counts = defaultdict(int)
prev_s = "start"
s_counts["start"] += 1

# Open and read the training data
with open("./data/train", "r") as f:
    for line in f:
        get_indiv = line.split()
        if len(get_indiv) != 0:
            # Update transition counts
            t_counts[(prev_s, get_indiv[2])] += 1

            # Update emission counts
            if get_indiv[1] in vocab_list:
                e_counts[(get_indiv[2], get_indiv[1])] += 1
            else:
                e_counts[(get_indiv[2], '<unk>')] += 1

            # Update state counts
            s_counts[get_indiv[2]] += 1
            prev_s = get_indiv[2]
        else:
            # Reset state counts at the end of each sentence
            prev_s = "start"
            s_counts["start"] += 1


In [7]:
# Calculate transition probabilities
transition = defaultdict(int)
for key, val in t_counts.items():
    transition[key] = t_counts[key] / s_counts[key[0]]

# Calculate emission probabilities
emission = defaultdict(int)
for key, val in e_counts.items():
    emission[key] = e_counts[key] / s_counts[key[0]]


In [8]:
print("The number of transition parameters in HMM:",str(len(transition.keys())))
print("The number of emission parameters in HMM:",str(len(emission.keys())))

The number of transition parameters in HMM: 1392
The number of emission parameters in HMM: 30303


In [9]:
tags = copy.deepcopy(list(s_counts.keys()))
tags.remove('start') #this is done because start is not an actual tag. It was only taken in s counts to help compute probabilities when a word was at the start of the sentence - prior prob


In [10]:
def tup_to_str(x):
    return str(x)

transition_json = copy.deepcopy(transition)
emission_json = copy.deepcopy(emission)
transition_json = toolz.keymap(tup_to_str, transition_json)
emission_json = toolz.keymap(tup_to_str, emission_json)
total_dict = {'transition':transition_json, 'emission':emission_json}
with open("hmm.json","w") as output_file:
    json.dump(total_dict, output_file, indent=4)

# **TASK 3**

In [11]:
def greedyDecoding(data):
    # Check if the data file path is provided
    if data:
        # Initialize the previous tag as "start" since the first line of the file starts with a sentence
        prev_tag = "start"

        # Open the input data file for reading
        with open(data, "r") as f_test:
            # Open the output file for writing the decoded tags
            with open("greedy.out", "w") as model_out:
                i = 1  # Counter for line numbers
                # Iterate through each line in the input data file
                for line in f_test:
                    get_indiv = line.split()  # Split the line into individual components
                    # Check if the line is not empty
                    if len(get_indiv) > 0:
                        max_pred_tag = [-1, None]  # Initialize the maximum predicted tag probability
                        # Iterate through each possible state (tag)
                        for state in tags:
                            # Get the emission probability for the word given the state (tag)
                            if get_indiv[1] in vocab_list:
                                em_prob = emission[(state, get_indiv[1])]
                            else:
                                em_prob = emission[(state, '<unk>')]
                            # Get the transition probability from the previous tag to the current state (tag)
                            trans_prob = transition[(prev_tag, state)]
                            # Calculate the combined probability
                            prob = em_prob * trans_prob
                            # Update the maximum predicted tag and its probability if a higher probability is found
                            if prob > max_pred_tag[0]:
                                max_pred_tag = [prob, state]
                        prev_tag = max_pred_tag[1]  # Update the previous tag with the maximum predicted tag
                        # Write the line number, word, and predicted tag to the output file
                        model_out.write(str(i) + "\t" + get_indiv[1] + "\t" + max_pred_tag[1] + "\n")
                        i += 1  # Increment the line number counter
                    else:
                        prev_tag = "start"  # Reset the previous tag to "start" at the end of each sentence
                        model_out.write("\n")  # Write an empty line for sentence separation
                        i = 1  # Reset the line number counter at the beginning of a new sentence

        # Close the input and output files
        f_test.close()
        model_out.close()
        print("greedy.out file created in data folder.")  # Print a message indicating file creation


In [12]:
greedyDecoding('data/dev')


greedy.out file created in data folder.


In [13]:
command = "python eval.py -p greedy.out -g data/dev"

# Run the command and capture the output
completed_process = subprocess.run(command, shell=True, capture_output=True, text=True)

# Print the output
print("STDOUT:")
print(completed_process.stdout)

# Print the error if there is one
if completed_process.stderr:
    print("STDERR:")
    print(completed_process.stderr)

STDOUT:

STDERR:
python: can't open file 'c:\\Users\\navsa\\OneDrive\\Desktop\\CSCI544_AppliedNLP\\HW3\\eval.py': [Errno 2] No such file or directory



In [14]:
greedyDecoding('data/test')

greedy.out file created in data folder.


# **TASK 4**

In [15]:
def viterbiDecoding(data):
    if data:
        f = open(data, 'r')
        predicted_tags = []
        prev_tag="start" #since the first line of file starts with a sentence, we need to mention "start" so prior probability is computed.
        for line in f:
            get_indiv = line.split()
            if len(get_indiv)>0:
                if prev_tag=="start":
                    viterbi=[]
                    first_dict = {}
                    for state in tags:
                        if get_indiv[1] in vocab_list:
                            em_prob = emission[(state,get_indiv[1])]
                        else:
                            em_prob = emission[(state,'<unk>')]
                        trans_prob = transition[(prev_tag,state)]
                        prob = em_prob*trans_prob
                        first_dict[state] = (prob, prev_tag)
                    viterbi.append(copy.deepcopy(first_dict))
                    prev_tag="not start"
                else:
                    curr_dict = {}
                    for state in tags:
                        if get_indiv[1] in vocab_list:
                            em_prob = emission[(state,get_indiv[1])]
                        else:
                            em_prob = emission[(state,'<unk>')]
                        max_state_prob = [-1,None]
                        for prev_state_key, prev_state_val in viterbi[-1].items():
                            trans_prob = transition[(prev_state_key,state)]
                            prev_state_prob_val = prev_state_val[0]
                            final_prob = em_prob*trans_prob*prev_state_prob_val
                            if final_prob>max_state_prob[0]:
                                max_state_prob = [final_prob, prev_state_key]
                        curr_dict[state] = (max_state_prob[0], max_state_prob[1])
                    viterbi.append(copy.deepcopy(curr_dict))
            else:
                preds = []
                max_val = max(viterbi[len(viterbi)-1].values(), key = lambda x: x[0])
                preds.append(next(key for key,val in viterbi[len(viterbi)-1].items() if val==max_val))
                prev_state=max_val[1]
                for i in range(len(viterbi)-2, -1, -1):
                    preds.append(prev_state)
                    prev_state = viterbi[i][prev_state][1]
                preds.reverse()
                predicted_tags.extend(preds)
                prev_tag = "start"
        f.close()
        preds = []
        max_val = max(viterbi[len(viterbi)-1].values(), key = lambda x: x[0])
        preds.append(next(key for key,val in viterbi[len(viterbi)-1].items() if val==max_val))
        prev_state=max_val[1]
        for i in range(len(viterbi)-2, -1, -1):
            preds.append(prev_state)
            prev_state = viterbi[i][prev_state][1]
        preds.reverse()
        predicted_tags.extend(preds)


    model_out = open('viterbi.out','w')
    f = open(data,'r')
    i=0
    for line in f:
        get_indiv = line.split()
        if len(get_indiv)>0:
            model_out.write(str(get_indiv[0])+"\t"+get_indiv[1]+"\t"+predicted_tags[i]+"\n")
            i+=1
        else:
            model_out.write("\n")
    f.close()
    model_out.close()
    print("viterbi.out file created in data folder.")




In [16]:
viterbiDecoding('data/dev')

viterbi.out file created in data folder.


In [17]:
command = "python eval.py -p viterbi.out -g data/dev"

# Run the command and capture the output
completed_process = subprocess.run(command, shell=True, capture_output=True, text=True)

# Print the output
print("STDOUT:")
print(completed_process.stdout)

# Print the error if there is one
if completed_process.stderr:
    print("STDERR:")
    print(completed_process.stderr)

STDOUT:

STDERR:
python: can't open file 'c:\\Users\\navsa\\OneDrive\\Desktop\\CSCI544_AppliedNLP\\HW3\\eval.py': [Errno 2] No such file or directory



In [18]:
viterbiDecoding('data/test')

viterbi.out file created in data folder.
