1. import and install necessary libraries

In [None]:
# Install necessary libraries (if not already installed)
!pip install nltk scikit-learn

# Import libraries
import nltk
from nltk.corpus import treebank
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
import math
from collections import defaultdict




2. nltk data

In [None]:
# Download necessary NLTK data
nltk.download('treebank')
nltk.download('universal_tagset')


[nltk_data] Downloading package treebank to /root/nltk_data...
[nltk_data]   Unzipping corpora/treebank.zip.
[nltk_data] Downloading package universal_tagset to /root/nltk_data...
[nltk_data]   Unzipping taggers/universal_tagset.zip.


True

3. split of datasets, train and test

In [None]:
# Load and split the dataset
sentences = treebank.tagged_sents(tagset='universal')
trainData, testData = train_test_split(sentences, test_size=0.7, random_state=1)

4. extract words and tags

In [None]:
# Extract unique words and tags from the training data
wordSet = {word.lower() for sent in trainData for word, tag in sent}
tagSet = {tag for sent in trainData for word, tag in sent}

# Define mappings between tags/words and their indices
tag_to_idx = {tag: idx for idx, tag in enumerate(sorted(tagSet))}
idx_to_tag = {idx: tag for tag, idx in tag_to_idx.items()}
word_to_idx = {word: idx for idx, word in enumerate(sorted(wordSet))}
idx_to_word = {idx: word for word, idx in word_to_idx.items()}

# Add unknown word support
word_to_idx['UNK'] = len(word_to_idx)
idx_to_word[len(word_to_idx) - 1] = 'UNK'


5, special token and initialization of counts

In [None]:
# Special tokens for start and end
START, END = '<s>', '</s>'
tag_to_idx[START], tag_to_idx[END] = len(tag_to_idx), len(tag_to_idx) + 1
idx_to_tag[tag_to_idx[START]], idx_to_tag[tag_to_idx[END]] = START, END

# Initialize count dictionaries for tag transitions and emissions
tagCounts = defaultdict(int)
transitionCounts = defaultdict(int)
emissionCounts = defaultdict(int)


6. occurance count in training data

In [None]:
# Count occurrences in the training data
for sent in trainData:
    previous_tag = START
    tagCounts[previous_tag] += 1
    for word, tag in sent:
        word = word.lower()
        tagCounts[tag] += 1
        transitionCounts[(previous_tag, tag)] += 1
        emissionCounts[(tag, word)] += 1
        previous_tag = tag
    transitionCounts[(previous_tag, END)] += 1
    tagCounts[END] += 1

# Determine the number of unique tags and words (including 'UNK')
N_tags = len(tagSet) + 2  # Plus START and END tags
Vocab_size = len(wordSet) + 1  # Plus 'UNK'


7. log probability calculation

In [None]:
# Precompute denominators for probabilities
transitionDeno = {prev_tag: tagCounts[prev_tag] + N_tags for prev_tag in tagSet | {START, END}}
emissionDeno = {tag: tagCounts[tag] + Vocab_size for tag in tagSet | {START, END}}

# Compute log probabilities for transitions and emissions
log_transition_probs = {
    prev_tag: {curr_tag: math.log((transitionCounts.get((prev_tag, curr_tag), 0) + 1) / transitionDeno[prev_tag])
               for curr_tag in tagSet | {START, END}}
    for prev_tag in tagSet | {START, END}
}

log_emission_probs = {
    tag: {word: math.log((emissionCounts.get((tag, word), 0) + 1) / emissionDeno[tag])
          for word in wordSet | {'UNK'}}
    for tag in tagSet
}


8. viterbi algo

In [None]:
# Define the Viterbi algorithm
def viterbiAlgo(sequence, tag_set, transition_probs, emission_probs, START, END):
    V = [{}]
    paths = {}

    # Initialize probabilities for the first word in the sequence
    for tag in tag_set - {START, END}:
        trans_prob = transition_probs[START].get(tag, math.log(1 / transitionDeno[START]))
        emit_prob = emission_probs[tag].get(sequence[0], emission_probs[tag]['UNK'])
        V[0][tag] = trans_prob + emit_prob
        paths[tag] = [tag]

    # Process each word in the sequence
    for t in range(1, len(sequence)):
        V.append({})
        new_paths = {}

        for curr_tag in tag_set - {START, END}:
            max_prob, best_prev_tag = max(
                (V[t-1][prev_tag] + transition_probs[prev_tag].get(curr_tag, math.log(1 / transitionDeno[prev_tag])) +
                 emission_probs[curr_tag].get(sequence[t], emission_probs[curr_tag]['UNK']), prev_tag)
                for prev_tag in tag_set - {START, END}
            )
            V[t][curr_tag] = max_prob
            new_paths[curr_tag] = paths[best_prev_tag] + [curr_tag]

        paths = new_paths

    # Calculate final transition to END
    n = len(sequence) - 1
    max_prob, best_tag = max(
        (V[n][tag] + transition_probs[tag].get(END, math.log(1 / transitionDeno[tag])), tag)
        for tag in tag_set - {START, END}
    )

    return paths[best_tag]

# Tag a sequence of words using Viterbi
def tag_sentence(sentence):
    lower_sentence = [word.lower() for word in sentence]
    return list(zip(lower_sentence, viterbiAlgo(lower_sentence, tagSet, log_transition_probs, log_emission_probs, START, END)))


9. test data and model evaluation

In [None]:
# Prepare test data
testSentences = [[word for word, tag in sent] for sent in testData]
groundTruth = [[tag for word, tag in sent] for sent in testData]

# Predict and evaluate tags for each test sentence
predicted = [tag_sentence(sentence) for sentence in testSentences]
predicted_tags = [tag for sent in predicted for word, tag in sent]
ground_truth_tags = [tag for sent in groundTruth for tag in sent]

# Evaluate the accuracy of the model
accuracy = accuracy_score(ground_truth_tags, predicted_tags)
print("Model Accuracy:", accuracy)


Model Accuracy: 0.8608137960521628
