In [None]:
!wget https://raw.githubusercontent.com/debajyotimaz/nlp_assignment/refs/heads/main/Viterbi_assignment/train_data.txt
!wget https://raw.githubusercontent.com/debajyotimaz/nlp_assignment/refs/heads/main/Viterbi_assignment/test_data.txt
!wget https://raw.githubusercontent.com/debajyotimaz/nlp_assignment/refs/heads/main/Viterbi_assignment/noisy_test_data.txt

--2024-11-15 14:01:09--  https://raw.githubusercontent.com/debajyotimaz/nlp_assignment/refs/heads/main/Viterbi_assignment/train_data.txt
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.111.133, 185.199.110.133, 185.199.108.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.111.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 375849 (367K) [text/plain]
Saving to: ‘train_data.txt.1’


2024-11-15 14:01:10 (10.6 MB/s) - ‘train_data.txt.1’ saved [375849/375849]

--2024-11-15 14:01:10--  https://raw.githubusercontent.com/debajyotimaz/nlp_assignment/refs/heads/main/Viterbi_assignment/test_data.txt
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 77062 (75K) [text/plain]
Saving 

In [None]:
def load_data(file_path):
    data = []
    with open(file_path, 'r') as file:
        for line in file:
            sentence = []
            for token in line.strip().split():
                word, tag = token.rsplit('/', 1)  # Split word and tag
                sentence.append((word, tag))
            data.append(sentence)
    return data

# Load train and test data from files
train_data_file = '/content/train_data.txt'  # Path to your training data file
test_data_file = '/content/test_data.txt'    # Path to your test data file
noisy_test_data_file = '/content/noisy_test_data.txt'  # Path to your noisy test data file

train_data = load_data(train_data_file)
test_data = load_data(test_data_file)
noisy_test_data = load_data(noisy_test_data_file)

# Print a sample from the training data
print(train_data[0])

[('He', 'PRON'), ('let', 'VERB'), ('her', 'PRON'), ('tell', 'VERB'), ('him', 'PRON'), ('all', 'PRT'), ('about', 'ADP'), ('the', 'DET'), ('church', 'NOUN'), ('.', '.')]


In [None]:
import numpy as np
from collections import defaultdict, Counter


def build_hmm(training_data):
    transition_counts = defaultdict(Counter)
    emission_counts = defaultdict(Counter)
    tag_frequencies = Counter()
    vocabulary = set()

    for sentence in training_data:
        prev_tag = "<START>"
        for word, tag in sentence:
            vocabulary.add(word)
            tag_frequencies[tag] += 1
            emission_counts[tag][word] += 1
            transition_counts[prev_tag][tag] += 1
            prev_tag = tag
        transition_counts[prev_tag]["<END>"] += 1

    return transition_counts, emission_counts, tag_frequencies, vocabulary


transition_counts, emission_counts, tag_frequencies, vocabulary = build_hmm(train_data)


def calculate_probabilities(transition_counts, emission_counts, tag_frequencies, vocabulary):
    tags = list(tag_frequencies.keys())
    tag_to_idx_map = {tag: idx for idx, tag in enumerate(tags)}


    transition_probs = defaultdict(lambda: defaultdict(float))
    for prev_tag, next_tags in transition_counts.items():
        total_transitions = sum(next_tags.values())
        for next_tag, count in next_tags.items():
            transition_probs[prev_tag][next_tag] = count / total_transitions


    emission_probs = defaultdict(lambda: defaultdict(float))
    for tag, words in emission_counts.items():
        total_emissions = sum(words.values())
        for word, count in words.items():
            emission_probs[tag][word] = count / total_emissions

    return transition_probs, emission_probs, tags, tag_to_idx_map


transition_probs, emission_probs, tags, tag_to_idx_map = calculate_probabilities(
    transition_counts, emission_counts, tag_frequencies, vocabulary
)


def viterbi(input_sentence, transition_probs, emission_probs, tags, tag_to_idx_map, vocabulary):
    n_tags = len(tags)
    n_words = len(input_sentence)
    dp_table = np.zeros((n_tags, n_words))
    backpointer = np.zeros((n_tags, n_words), dtype=int)


    for tag_idx, tag in enumerate(tags):
        dp_table[tag_idx, 0] = (
            transition_probs["<START>"].get(tag, 0) *
            emission_probs[tag].get(input_sentence[0], 1e-6 if input_sentence[0] not in vocabulary else 0)
        )

    for t in range(1, n_words):
        for tag_idx, current_tag in enumerate(tags):
            max_prob = -1
            best_previous_tag_idx = 0
            for prev_tag_idx, prev_tag in enumerate(tags):
                prob = dp_table[prev_tag_idx, t-1] * transition_probs[prev_tag].get(current_tag, 0) * emission_probs[current_tag].get(
                    input_sentence[t], 1e-6 if input_sentence[t] not in vocabulary else 0
                )
                if prob > max_prob:
                    max_prob = prob
                    best_previous_tag_idx = prev_tag_idx
            dp_table[tag_idx, t] = max_prob
            backpointer[tag_idx, t] = best_previous_tag_idx


    best_path = []
    best_last_tag_idx = np.argmax(dp_table[:, -1])
    best_path.append(best_last_tag_idx)
    for t in range(n_words - 1, 0, -1):
        best_last_tag_idx = backpointer[best_last_tag_idx, t]
        best_path.append(best_last_tag_idx)

    best_path.reverse()
    return [tags[idx] for idx in best_path]

# Modified Viterbi with noise handling
def viterbi_with_noise(sentence, transition_probs, emission_probs, tags, tag_to_idx_map, vocabulary, noise_level=1e-6):

    predicted_tags = viterbi(sentence, transition_probs, emission_probs, tags, tag_to_idx_map, vocabulary)

    noisy_tags = []
    for tag in predicted_tags:
        if np.random.rand() < noise_level:
            noisy_tag = np.random.choice(tags)
            noisy_tags.append(noisy_tag)
        else:
            noisy_tags.append(tag)

    return noisy_tags

# Evaluate the accuracy of a tagging method
def evaluate(data, tagging_func):
    total_tags = 0
    correct_tags = 0
    for sentence in data:
        words = [word for word, tag in sentence]
        true_tags = [tag for word, tag in sentence]
        predicted_tags = tagging_func(words, transition_probs, emission_probs, tags, tag_to_idx_map, vocabulary)
        total_tags += len(true_tags)
        correct_tags += sum(predicted == true for predicted, true in zip(predicted_tags, true_tags))
    return correct_tags / total_tags

# Evaluate baseline and noise-handling methods
baseline_accuracy = evaluate(test_data, viterbi)
noise_handling_accuracy = evaluate(noisy_test_data, viterbi_with_noise)

# Print evaluation results
print("Baseline Accuracy (Test Data):", baseline_accuracy)
print("Noise-Handled Accuracy (Noisy Data):", noise_handling_accuracy)


Baseline Accuracy (Test Data): 0.9021885316053307
Noise-Handled Accuracy (Noisy Data): 0.8180706687859152
