In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
from collections import defaultdict
import json

In [3]:
Train_data = "/content/drive/MyDrive/data/train"  # Update this path


#TASK 1

In [4]:
def process_data(train_data_path, threshold=2):
    word_freq, tag_freq, sentence_count = defaultdict(int), defaultdict(int), 0
    file_data = ["<s>"]

    with open(train_data_path) as f:
        for line in f:
            if line.strip() == "":
                file_data.append("<s>")
                sentence_count += 1
            else:
                parts = line.strip().split("\t")
                if len(parts) != 3:
                    continue
                word, tag = parts[1], parts[2]
                word_freq[word] += 1
                tag_freq[tag] += 1
                file_data.append(line.strip())

    vocab = {word: freq for word, freq in word_freq.items() if freq >= threshold}
    vocab["<unk>"] = sum(freq for word, freq in word_freq.items() if freq < threshold)

    with open("vocab.txt", "w") as vf:
        vf.write("<unk>\t0\t{}\n".format(vocab["<unk>"]))
        for idx, (word, freq) in enumerate(sorted(vocab.items(), key=lambda x: x[1], reverse=True), start=1):
            vf.write("{}\t{}\t{}\n".format(word, idx, freq))

    print(f"Vocabulary size after replacement: {len(vocab)}")
    print(f"Total occurrences of ‘<unk>’: {vocab['<unk>']}")

    return file_data, vocab, tag_freq, sentence_count

file_data, vocab, tag_freq, sentence_count = process_data(Train_data)

Vocabulary size after replacement: 23183
Total occurrences of ‘<unk>’: 20011


#TASK 2

In [5]:
def compute_hmm_parameters(file_data, vocab, tag_freq, sentence_count):
    emission_probs, transition_probs = defaultdict(int), defaultdict(int)
    prev_tag = "<s>"

    for line in file_data:
        if line == "<s>":
            prev_tag = "<s>"
            continue

        parts = line.split("\t")
        word, cur_tag = parts[1], parts[2]
        word = word if word in vocab else "<unk>"
        emission_probs[(cur_tag, word)] += 1

        if prev_tag != "<s>":
            transition_probs[(prev_tag, cur_tag)] += 1
        else:
            transition_probs[("start", cur_tag)] += 1
        prev_tag = cur_tag

    for key in emission_probs:
        emission_probs[key] /= tag_freq[key[0]]

    for key in transition_probs:
        if key[0] == "start":
            transition_probs[key] /= sentence_count
        else:
            transition_probs[key] /= tag_freq[key[0]]


    # Filter and print transition probabilities excluding "start"
    filtered_transitions = {k: v for k, v in transition_probs.items() if k[0] != "start"}


    print(f"\nNumber of emission parameters: {len(emission_probs)}")
    print(f"Number of transition parameters: {len(transition_probs)}")
    print(f"Number of transition parameters (excluding start): {len(filtered_transitions)}")

    return emission_probs, transition_probs


In [6]:

def save_hmm_model(emission_probs, transition_probs):
    emission_str_keys = {f"({tag},{word})": prob for (tag, word), prob in emission_probs.items()}
    transition_str_keys = {f"({prev_tag},{next_tag})": prob for (prev_tag, next_tag), prob in transition_probs.items()}

    model = {"Transition": transition_str_keys, "Emission": emission_str_keys}
    with open('hmm.json', 'w') as f:
        json.dump(model, f, indent=4)

    print("HMM model saved to hmm.json")

# Main execution

emission_probs, transition_probs = compute_hmm_parameters(file_data, vocab, tag_freq, sentence_count)
save_hmm_model(emission_probs, transition_probs)



Number of emission parameters: 30303
Number of transition parameters: 1392
Number of transition parameters (excluding start): 1351
HMM model saved to hmm.json


#TASK 3

In [7]:
def tag_sentences_with_greedy(input_path, emissions, transitions, tag_set, vocab, data_type='dev'):
    """
    Performs POS tagging on sentences using a greedy decoding algorithm.

    Parameters:
    - input_path: Path to the input file containing sentences.
    - emissions: A dictionary with emission probabilities.
    - transitions: A dictionary with transition probabilities.
    - tag_set: A set containing all possible tags.
    - vocab: A set containing all known vocabulary words.
    - data_type: A string indicating the type of data ('dev' or 'test').
    """
    # Choose the output file name based on data_type
    output_file = "greedy_dev.out" if data_type == 'dev' else "greedy.out"

    tagged_output = []  # Store the output lines here
    current_tag = "start"  # Initialize the current tag as start

    with open(input_path, 'r') as input_data:
        for line in input_data:
            if line.strip() == "":  # Sentence boundary detected
                current_tag = "start"  # Reset for new sentence
                tagged_output.append("\n")  # Keep sentences separated in output
                continue

            index, token = line.strip().split("\t")[:2]
            processed_word = token if token in vocab else "<unk>"  # Handle unknown words

            max_probability, chosen_tag = 0, None
            for potential_tag in tag_set:
                emission_key = (potential_tag, processed_word)
                transition_key = (current_tag, potential_tag)

                emission_probability = emissions.get(emission_key, 0)
                transition_probability = transitions.get(transition_key, 0)
                total_probability = emission_probability * transition_probability

                if total_probability > max_probability:
                    max_probability, chosen_tag = total_probability, potential_tag

            current_tag = chosen_tag or "start"  # Update the current tag
            tagged_output.append(f"{index}\t{token}\t{chosen_tag}\n")  # Add the tagged line

    with open(output_file, 'w') as output_data:
        output_data.writelines(tagged_output)  # Write all tagged lines to the output file

# Assuming tag_freq is a dictionary where keys are tags and values are their frequencies
tags = set(tag_freq.keys())

# Example calls
tag_sentences_with_greedy("/content/drive/MyDrive/data/test", emission_probs, transition_probs, tags, vocab, data_type='test')
tag_sentences_with_greedy("/content/drive/MyDrive/data/dev", emission_probs, transition_probs, tags, vocab, data_type='dev')


In [9]:
print(f"The accuracy on validation data using the greedy method is")

!python eval.py -p /content/greedy_dev.out -g /content/drive/MyDrive/data/dev


The accuracy on validation data using the greedy method is
total: 131768, correct: 123203, accuracy: 93.50%


In [10]:
def perform_viterbi_decoding(observed_words, state_list, emission_probabilities, transition_probabilities):
    total_observations = len(observed_words)
    total_states = len(state_list)
    viterbi_matrix = [[0 for _ in range(total_states)] for _ in range(total_observations)]
    backpointers = [[0 for _ in range(total_states)] for _ in range(total_observations)]

    # Initialization step
    for state_index in range(total_states):
        transition_prob = transition_probabilities.get(('start', state_list[state_index]), 1e-10)
        emission_prob = emission_probabilities.get((state_list[state_index], observed_words[0]), 1e-10)
        viterbi_matrix[0][state_index] = transition_prob * emission_prob
        backpointers[0][state_index] = 0

    # Recursion step
    for time_step in range(1, total_observations):
        for state_index in range(total_states):
            max_probability, best_previous_state = max(
                (viterbi_matrix[time_step-1][prev_state] *
                 transition_probabilities.get((state_list[prev_state], state_list[state_index]), 1e-10) *
                 emission_probabilities.get((state_list[state_index], observed_words[time_step]), 1e-10), prev_state)
                for prev_state in range(total_states))

            viterbi_matrix[time_step][state_index] = max_probability
            backpointers[time_step][state_index] = best_previous_state

    # Termination step
    last_time_step = total_observations - 1
    best_final_state = max(range(total_states), key=lambda s: viterbi_matrix[last_time_step][s])

    # Path backtracking
    optimal_path = [best_final_state]
    for time_step in range(total_observations - 1, 0, -1):
        optimal_path.insert(0, backpointers[time_step][optimal_path[0]])

    return [state_list[state] for state in optimal_path]


# Assuming emission_probs is a dictionary where keys are tuples (tag, word)
# Extract unique tags from the keys of emission_probs
states = set(tag for tag, _ in emission_probs.keys())



In [11]:
def process_file_and_tag_viterbi(input_file_path, states, emission_probs, transition_probs, vocab, data_type='dev'):
      # Determine the output file based on the data type

    output_file = "viterbi_dev.out" if data_type == 'dev' else "viterbi.out"
    viterbi_output = []

    with open(input_file_path) as file:
        lines = file.readlines()
        word_indices, sentence_words, observed_words = [], [], []

        for line in lines:
            if len(line.strip()) == 0:  # Sentence boundary
                if observed_words:
                    tags = perform_viterbi_decoding(observed_words, states, emission_probs, transition_probs)
                    viterbi_output.extend(f"{index}\t{word}\t{tag}\n" for index, word, tag in zip(word_indices, sentence_words, tags))
                # Reset for next sentence
                observed_words, word_indices, sentence_words = [], [], []
                viterbi_output.append("\n")
                continue

            index, word = line.strip().split("\t")[:2]
            word_indices.append(index)
            sentence_words.append(word)
            observed_words.append(word if word in vocab else "<unk>")

    with open(output_file, "w") as viterbi_file:
        viterbi_file.writelines(viterbi_output)

# Example usage
dev_file_path = "/content/drive/MyDrive/data/dev"  # For development data
test_file_path = "/content/drive/MyDrive/data/test"  # For test data

# If 'states' was originally defined as a set, convert it to a list
states_list = list(states)

# Then, pass this list to the process_file_and_tag_viterbi function
process_file_and_tag_viterbi(dev_file_path, states_list, emission_probs, transition_probs, vocab, data_type='dev')
process_file_and_tag_viterbi(test_file_path, states_list, emission_probs, transition_probs, vocab, data_type='test')



In [12]:
print(f"The accuracy on validation data using the viterbi decoding method is")

!python eval.py -p /content/viterbi_dev.out -g /content/drive/MyDrive/data/dev


The accuracy on validation data using the viterbi decoding method is
'1\tThat\tDT' '38\t.\t.' 131751
total: 131751, correct: 124912, accuracy: 94.81%
