<a href="https://colab.research.google.com/github/avinash064/Avinash_kashyap_21064/blob/main/21064_AvinashKashyap_nlpassignment3_viterbi.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!wget https://raw.githubusercontent.com/debajyotimaz/nlp_assignment/refs/heads/main/Viterbi_assignment/train_data.txt
!wget https://raw.githubusercontent.com/debajyotimaz/nlp_assignment/refs/heads/main/Viterbi_assignment/test_data.txt
!wget https://raw.githubusercontent.com/debajyotimaz/nlp_assignment/refs/heads/main/Viterbi_assignment/noisy_test_data.txt

--2024-11-15 16:22:17--  https://raw.githubusercontent.com/debajyotimaz/nlp_assignment/refs/heads/main/Viterbi_assignment/train_data.txt
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 375849 (367K) [text/plain]
Saving to: ‘train_data.txt’


2024-11-15 16:22:18 (7.52 MB/s) - ‘train_data.txt’ saved [375849/375849]

--2024-11-15 16:22:18--  https://raw.githubusercontent.com/debajyotimaz/nlp_assignment/refs/heads/main/Viterbi_assignment/test_data.txt
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 77062 (75K) [text/plain]
Saving to: 

In [2]:
def load_data(file_path):
    data = []
    with open(file_path, 'r') as file:
        for line in file:
            sentence = []
            for token in line.strip().split():
                word, tag = token.rsplit('/', 1)  # Split word and tag
                sentence.append((word, tag))
            data.append(sentence)
    return data

# Load train and test data from files
train_data_file = '/content/train_data.txt'  # Path to your training data file
test_data_file = '/content/test_data.txt'    # Path to your test data file
noisy_test_data_file = '/content/noisy_test_data.txt'  # Path to your noisy test data file

train_data = load_data(train_data_file)
test_data = load_data(test_data_file)
noisy_test_data = load_data(noisy_test_data_file)

# Print a sample from the training data
print(train_data[0])

[('He', 'PRON'), ('let', 'VERB'), ('her', 'PRON'), ('tell', 'VERB'), ('him', 'PRON'), ('all', 'PRT'), ('about', 'ADP'), ('the', 'DET'), ('church', 'NOUN'), ('.', '.')]


In [4]:
from collections import defaultdict

In [5]:
def build_vocab(data):
    word_vocab = set()
    tag_vocab = set()
    for sentence in data:
        for word, tag in sentence:
            word_vocab.add(word)
            tag_vocab.add(tag)
    return word_vocab, tag_vocab


In [6]:
train_word_vocab, train_tag_vocab = build_vocab(train_data) #Building vocabularies
print("Word Vocabulary:", train_word_vocab)
print("Tag Vocabulary:", train_tag_vocab)

Tag Vocabulary: {'CONJ', 'NOUN', 'ADV', 'ADJ', 'X', 'PRON', 'NUM', 'ADP', 'VERB', 'DET', 'PRT', '.'}


In [7]:
def calculate_probability(data, smoothing=1e-6):
    tag_count = defaultdict(int)
    start_count = defaultdict(int)
    trans_count = defaultdict(lambda: defaultdict(int))
    emit_count = defaultdict(lambda: defaultdict(int))

    for sentence in data:
        prev_tag = None
        for i, (word, tag) in enumerate(sentence):
            tag_count[tag] += 1
            emit_count[tag][word] += 1
            if i == 0:
                start_count[tag] += 1
            else:
                trans_count[prev_tag][tag] += 1
            prev_tag = tag

    total_tags = sum(tag_count.values())
    start_probs = {tag: (count / total_tags) for tag, count in start_count.items()}
    trans_probs = {
        prev_tag: {tag: (count / sum(next_tags.values()) + smoothing) for tag, count in next_tags.items()}
        for prev_tag, next_tags in trans_count.items()
    }
    emit_probs = {
        tag: {word: (count / tag_count[tag]) for word, count in words.items()}
        for tag, words in emit_count.items()
    }

    return start_probs, trans_probs, emit_probs

In [8]:
start_probs, trans_probs, emit_probs = calculate_probability(train_data) #getting probabilities


In [9]:
def viterbi(sentence, states, start_probs, trans_probs, emit_probs, smoothing=1e-6, top_k=4):
    n = len(sentence)
    m = len(states)
    dp = [[[-np.inf] * top_k for _ in range(m)] for _ in range(n)]
    backpointer = [[[-1] * top_k for _ in range(m)] for _ in range(n)]

    for j, state in enumerate(states):
        word = sentence[0].lower()
        emission_prob = emit_probs[state].get(word, smoothing)
        dp[0][j][0] = np.log(start_probs.get(state, smoothing)) + np.log(emission_prob)

    for i in range(1, n):
        word = sentence[i].lower()
        for j, state in enumerate(states):
            emission_prob = emit_probs[state].get(word, smoothing)
            candidates = []
            for k, prev_state in enumerate(states):
                for rank in range(top_k):
                    prob = dp[i - 1][k][rank] + np.log(trans_probs[prev_state].get(state, smoothing)) + np.log(emission_prob)
                    candidates.append((prob, k, rank))
            candidates = sorted(candidates, key=lambda x: x[0], reverse=True)[:top_k]
            for rank, (prob, back_k, back_rank) in enumerate(candidates):
                dp[i][j][rank] = prob
                backpointer[i][j][rank] = (back_k, back_rank)

    best_path = []
    best_last_state, best_rank = max(
        ((j, rank) for j in range(m) for rank in range(top_k)),
        key=lambda x: dp[n - 1][x[0]][x[1]]
    )
    best_path.append(states[best_last_state])

    for i in range(n - 1, 0, -1):
        best_last_state, best_rank = backpointer[i][best_last_state][best_rank]
        best_path.insert(0, states[best_last_state])

    return best_path

In [10]:
states = list(train_tag_vocab)

In [11]:
def evaluate(data, states, start_probs, trans_probs, emit_probs, use_noise_handling=False):
    corrects, totals = 0, 0
    for sentence in data:
        words = [word for word, tag in sentence]
        true_tags = [tag for word, tag in sentence]
        predicted_tags = viterbi(
            words, states, start_probs, trans_probs, emit_probs,
            top_k=4 if use_noise_handling else 1
        )
        corrects += sum(p == t for p, t in zip(predicted_tags, true_tags))
        totals += len(true_tags)
    return corrects / totals * 100

In [13]:
import numpy as np

In [14]:
baseline_accuracy = evaluate(test_data, states, start_probs, trans_probs, emit_probs)
noisy_accuracy = evaluate(noisy_test_data, states, start_probs, trans_probs, emit_probs, use_noise_handling=True)

# Results
print(f"Baseline Viterbi Accuracy: {baseline_accuracy:.2f}%")
print(f"Noise-Handled Viterbi Accuracy: {noisy_accuracy:.2f}%")

Baseline Viterbi Accuracy: 88.67%
Noise-Handled Viterbi Accuracy: 80.46%


###Evaluation
An example of evaluation:

In [None]:
sentence = [('Get', 'VERB'), ('copper', 'NOUN'), ('or', 'CONJ'), ('earthenware', 'NOUN'), ('mugs', 'NOUN'), ('that', 'PRON'), ('keep', 'VERB'), ('beer', 'NOUN'), ('chilled', 'VERB'), ('or', 'CONJ'), ('soup', 'NOUN'), ('hot', 'ADJ'), ('.', '.')]
predicted_tags = ['DET', 'NOUN', 'CONJ', 'NOUN', 'ADP', 'PRON', 'VERB', 'NOUN', '.', 'CONJ', 'NOUN', 'ADJ', '.']
true_tags = ('VERB', 'NOUN', 'CONJ', 'NOUN', 'NOUN', 'PRON', 'VERB', 'NOUN', 'VERB', 'CONJ', 'NOUN', 'ADJ', '.')
correct = 0
total = 0
correct += sum(p == t for p, t in zip(predicted_tags, true_tags))
total += len(true_tags)
accuracy = correct / total

print(f"Baseline Viterbi Accuracy: {accuracy * 100:.2f}%")
# print(f"Viterbi with Noise Handling Accuracy: {accuracy * 100:.2f}%")  # similarly calculate for Noise Handling


Baseline Viterbi Accuracy: 76.92%
