## NLP Homework 2
**Arash Nasr Esfahani**

**Student ID: 610303068**

### Libraries:

In [14]:
import numpy as np
import math
import random
from collections import defaultdict, Counter
from tqdm import tqdm
from concurrent.futures import ProcessPoolExecutor, as_completed

LOG_ZERO = -1e10

def log_prob(p):
    if p == 0:
        return LOG_ZERO
    return math.log(p)


### Section 1: Loading the Dataset

In [15]:
from google.colab import drive
drive.mount('/content/drive',force_remount=True)

Mounted at /content/drive


In [16]:
def load_training_data(train_file):
    sentences = []
    current_sentence = []

    with open(train_file, encoding='utf-8') as f:
        for line in f:
            line = line.strip()
            if not line or line.startswith('#'):
                if current_sentence:
                    sentences.append(current_sentence)
                    current_sentence = []
                continue
            parts = line.split()
            if len(parts) >= 2:
                word = parts[0]
                tag = parts[1]
                current_sentence.append((word, tag))
        if current_sentence:
            sentences.append(current_sentence)
    return sentences

# Loading training data
train_sentences = load_training_data('/content/drive/MyDrive/train.txt')
print("Total number of training sentences:", len(train_sentences))


Total number of training sentences: 9631


### Section 2: Preprocessing

In [18]:
def train_validation_split(sentences, validation_frac=0.2, seed=42):
    random.seed(seed)
    shuffled = sentences.copy()
    random.shuffle(shuffled)
    split_index = int(len(shuffled) * (1 - validation_frac))
    return shuffled[:split_index], shuffled[split_index:]

train_data, validation_data = train_validation_split(train_sentences, validation_frac=0.2)
print("Training sentences:", len(train_data))
print("Validation sentences:", len(validation_data))


Training sentences: 7704
Validation sentences: 1927


In [19]:
tag_counts = Counter()
emission_counts = defaultdict(Counter)
bigram_counts = defaultdict(Counter)
trigram_counts = defaultdict(Counter)

START = "<s>"
STOP = "</s>"

# Process training sentences
for sentence in train_data:
    tags = [START] + [tag for word, tag in sentence] + [STOP]

    # Unigram counts
    for word, tag in sentence:
        tag_counts[tag] += 1
        emission_counts[tag][word] += 1

    # Bigram counts
    for i in range(len(tags) - 1):
        bigram_counts[tags[i]][tags[i+1]] += 1

    # Trigram counts
    tags_tri = [START, START] + [tag for word, tag in sentence] + [STOP]
    for i in range(len(tags_tri) - 2):
        trigram_counts[(tags_tri[i], tags_tri[i+1])][tags_tri[i+2]] += 1

# Vocabulary
vocab = set()
for sentence in train_data:
    for word, tag in sentence:
        vocab.add(word)

print("Number of unique tags:", len(tag_counts))
print("Vocabulary size:", len(vocab))


Number of unique tags: 40
Vocabulary size: 66136


In [20]:
tags_list = list(tag_counts.keys())

# For emission probabilities, applying Laplace smoothing.
emission_prob = defaultdict(dict)
for tag in emission_counts:
    total = tag_counts[tag] + len(vocab)
    for word in vocab:
        emission_prob[tag][word] = (emission_counts[tag][word] + 1) / total
    emission_prob[tag]['<UNK>'] = 1 / total

# Unigram tag probabilities
total_tags = sum(tag_counts.values())
unigram_tag_prob = { tag: count/total_tags for tag, count in tag_counts.items() }

# Bigram transition probabilities
bigram_prob = defaultdict(dict)
for prev_tag in bigram_counts:
    total = sum(bigram_counts[prev_tag].values())
    for curr_tag in bigram_counts[prev_tag]:
        bigram_prob[prev_tag][curr_tag] = bigram_counts[prev_tag][curr_tag] / total

# Trigram transition probabilities
trigram_prob = defaultdict(dict)
for prev_tags in trigram_counts:
    total = sum(trigram_counts[prev_tags].values())
    for curr_tag in trigram_counts[prev_tags]:
        trigram_prob[prev_tags][curr_tag] = trigram_counts[prev_tags][curr_tag] / total


### Section 3: Implementation of N-Gram Models

In [21]:
def viterbi_unigram(sentence, unigram_tag_prob, emission_prob):
    best_tags = []
    for word in sentence:
        best_score = LOG_ZERO
        best_tag = None
        for tag in unigram_tag_prob:
            emit = emission_prob[tag][word] if word in emission_prob[tag] else emission_prob[tag]['<UNK>']
            score = log_prob(unigram_tag_prob[tag]) + log_prob(emit)
            if score > best_score:
                best_score = score
                best_tag = tag
        best_tags.append(best_tag)
    return best_tags


sample_sentence = ["اولین", "سیاره", "خارج"]
print("Unigram tags:", viterbi_unigram(sample_sentence, unigram_tag_prob, emission_prob))


Unigram tags: ['ADJ_SUP', 'N_SING', 'N_SING']


In [22]:
def viterbi_bigram(sentence, bigram_prob, emission_prob):
    n = len(sentence)
    states = list(unigram_tag_prob.keys())

    viterbi = [{} for _ in range(n)]
    backpointer = [{} for _ in range(n)]

    # Initialization
    for s in states:
        # emission probability
        emit = emission_prob[s][sentence[0]] if sentence[0] in emission_prob[s] else emission_prob[s]['<UNK>']
        # Transition probability
        trans = bigram_prob[START].get(s, 1e-6)  # small smoothing probability for unseen transitions
        viterbi[0][s] = log_prob(trans) + log_prob(emit)
        backpointer[0][s] = START

    # Recursion
    for t in range(1, n):
        for s in states:
            best_prob = LOG_ZERO
            best_state = None
            emit = emission_prob[s][sentence[t]] if sentence[t] in emission_prob[s] else emission_prob[s]['<UNK>']
            log_emit = log_prob(emit)
            for s_prev in states:
                trans = bigram_prob[s_prev].get(s, 1e-6)  # smoothing for unseen transitions
                prob = viterbi[t-1][s_prev] + log_prob(trans) + log_emit
                if prob > best_prob:
                    best_prob = prob
                    best_state = s_prev
            viterbi[t][s] = best_prob
            backpointer[t][s] = best_state

    # Termination
    best_final_prob = LOG_ZERO
    best_final_state = None
    for s in states:
        trans = bigram_prob[s].get(STOP, 1e-6)
        prob = viterbi[n-1][s] + log_prob(trans)
        if prob > best_final_prob:
            best_final_prob = prob
            best_final_state = s

    # Backtracking:
    tags = [None] * n
    tags[-1] = best_final_state
    for t in range(n-1, 0, -1):
        tags[t-1] = backpointer[t][tags[t]]
    return tags

print("Bigram tags:", viterbi_bigram(sample_sentence, bigram_prob, emission_prob))


Bigram tags: ['ADJ_SUP', 'N_SING', 'N_SING']


In [23]:
def viterbi_trigram(sentence, trigram_prob, emission_prob):
    n = len(sentence)
    states = list(unigram_tag_prob.keys())

    viterbi = [{} for _ in range(n)]
    backpointer = [{} for _ in range(n)]

    # Initialization
    for s in states:
        emit = emission_prob[s][sentence[0]] if sentence[0] in emission_prob[s] else emission_prob[s]['<UNK>']
        trans = trigram_prob[(START, START)].get(s, 1e-6)
        score = log_prob(trans) + log_prob(emit)
        viterbi[0][(START, s)] = score
        backpointer[0][(START, s)] = (START, START)

    # Recursion
    for t in range(1, n):
        for (prev1, prev2), score_prev in viterbi[t-1].items():
            for s in states:
                emit = emission_prob[s][sentence[t]] if sentence[t] in emission_prob[s] else emission_prob[s]['<UNK>']
                trans = trigram_prob[(prev1, prev2)].get(s, 1e-6)
                score = score_prev + log_prob(trans) + log_prob(emit)
                key = (prev2, s)
                if key not in viterbi[t] or score > viterbi[t][key]:
                    viterbi[t][key] = score
                    backpointer[t][key] = (prev1, prev2)

    # Termination
    best_prob = LOG_ZERO
    best_pair = None
    final_t = n - 1
    for (prev, curr), score in viterbi[final_t].items():
        trans = trigram_prob[(prev, curr)].get(STOP, 1e-6)
        total_score = score + log_prob(trans)
        if total_score > best_prob:
            best_prob = total_score
            best_pair = (prev, curr)

    # Backtracking:
    tags = [None] * n
    tags[-1] = best_pair[1]
    if n >= 2:
        tags[-2] = best_pair[0]

    for t in range(n-1, 0, -1):
        key = best_pair
        prev_pair = backpointer[t][key]
        best_pair = (prev_pair[0], key[0])
        tags[t-1] = best_pair[1]
    return tags

print("Trigram tags:", viterbi_trigram(sample_sentence, trigram_prob, emission_prob))


Trigram tags: ['ADJ_SUP', 'N_SING', 'N_SING']


### Section 4: Evaluation of N-Gram Models

In [24]:
def evaluate_model(data, tagging_func):
    total = 0
    correct = 0
    for sentence in tqdm(data, desc="Evaluating sentences"):
        words = [w for w, t in sentence]
        gold_tags = [t for w, t in sentence]
        predicted_tags = tagging_func(words)
        for gold, pred in zip(gold_tags, predicted_tags):
            total += 1
            if gold == pred:
                correct += 1
    return correct / total if total > 0 else 0

# Evaluating Unigram
acc_uni = evaluate_model(validation_data,
                           lambda sent: viterbi_unigram(sent, unigram_tag_prob, emission_prob))
print("Unigram accuracy on validation set:", acc_uni)

# Evaluating Bigram
acc_bi = evaluate_model(validation_data,
                        lambda sent: viterbi_bigram(sent, bigram_prob, emission_prob))
print("Bigram accuracy on validation set:", acc_bi)


Evaluating sentences: 100%|██████████| 1927/1927 [00:13<00:00, 141.92it/s]


Unigram accuracy on validation set: 0.9489186853800516


Evaluating sentences: 100%|██████████| 1927/1927 [04:30<00:00,  7.12it/s]

Bigram accuracy on validation set: 0.9388429125349479





In [25]:
def tagging_trigram_wrapper(words):
    return viterbi_trigram(words, trigram_prob, emission_prob)

def eval_sentence(sentence, tagging_func):
    words = [w for w, t in sentence]
    gold_tags = [t for w, t in sentence]
    predicted_tags = tagging_func(words)
    correct = sum(1 for gold, pred in zip(gold_tags, predicted_tags) if gold == pred)
    total = len(gold_tags)
    return correct, total

def evaluate_model_parallel(data, tagging_func, max_workers=None):
    total_correct = 0
    total_words = 0

    with ProcessPoolExecutor(max_workers=max_workers) as executor:
        futures = [executor.submit(eval_sentence, sentence, tagging_func) for sentence in data]

        for future in tqdm(as_completed(futures), total=len(futures), desc="Evaluating in parallel"):
            correct, total = future.result()
            total_correct += correct
            total_words += total

    return total_correct / total_words if total_words > 0 else 0

acc_tri = evaluate_model_parallel(validation_data, tagging_trigram_wrapper)
print("Parallel evaluation accuracy (trigram):", acc_tri)


Evaluating in parallel: 100%|██████████| 1927/1927 [3:14:21<00:00,  6.05s/it]


Parallel evaluation accuracy (trigram): 0.9392919220200239


In [26]:
accuracies = {'unigram': acc_uni, 'bigram': acc_bi, 'trigram': acc_tri}
best_model = max(accuracies, key=accuracies.get)
print("Best model based on validation accuracy:", best_model)

Best model based on validation accuracy: unigram


### Section 5: POS Tagging Using The Best Model

In [27]:
def load_test_data(test_file):
    words = []
    with open(test_file, encoding='utf-8') as f:
        for line in f:
            word = line.strip()
            if word:
                words.append(word)
    return words

# Reading the test data
test_words = load_test_data('/content/drive/MyDrive/test_notags.txt')
print("Number of test words:", len(test_words))


model_tagging_functions = {
    'unigram': lambda sent: viterbi_unigram(sent, unigram_tag_prob, emission_prob),
    'bigram':  lambda sent: viterbi_bigram(sent, bigram_prob, emission_prob),
    'trigram': lambda sent: viterbi_trigram(sent, trigram_prob, emission_prob)
}

# Tagging the test words using the selected best model.
selected_tagger = model_tagging_functions[best_model]
predicted_tags = selected_tagger(test_words)


output_file = '/content/drive/MyDrive/test_tags.txt'
with open(output_file, 'w', encoding='utf-8') as f:
    for word, tag in zip(test_words, predicted_tags):
        f.write(f"{word}\t{tag}\n")

print("Predicted tags have been written to", output_file)


Number of test words: 129906
Predicted tags have been written to /content/drive/MyDrive/test_tags.txt
