In [8]:
# --- Before your go ----
# 1. Rename Assignment-03-###.ipynb where ### is your student ID.
# 2. The deadline of Assignment-03 is 23:59pm, 06-05-2024


# --- Explore HMM POS Taggers using Brown corpus ---
# In this assignment, you will explore three taggers for a Brown corpus.
# import your packages here
import re
import numpy as np
import pickle
from tqdm import tqdm
from collections import defaultdict, Counter

from nltk import FreqDist
from nltk.probability import ConditionalFreqDist, ConditionalProbDist, MLEProbDist, LidstoneProbDist
from nltk.tag import HiddenMarkovModelTagger


from transformers import AutoTokenizer, BertForTokenClassification, DataCollatorForTokenClassification, Trainer, TrainingArguments
from transformers import BertTokenizer, BertForTokenClassification, Trainer, TrainingArguments
from datasets import Dataset

from sklearn.metrics import classification_report
import torch

In [3]:
# Task 1 --- Load and explore your data ---
# 1). load train/test samples from Brown corpus files, brown-train.txt, brown-test.txt.
# 2). load all 12 tags from brown-tag.txt and print it out
# 3). counting how many sentences and words in both train and test datasets.
# 4). for each tag, counting how many words in train and test. e.g, tag1: [count_tr, count_te]

# Your code
train_file = 'brown-train.txt'
test_file = 'brown-test.txt'
tag_file = 'brown-tag.txt'


def load_data(file_path):
    with open(file_path, 'r') as file:
        data = file.read().strip().split('\n\n')  # read as a string and split by double newline
    sentences = []
    for sentence in data:
        lines = sentence.strip().split('\n')
        id = lines[0]  # first line is the sentence id
        word_tag_pairs = lines[1:]  # the rest of the lines are word/tag pairs
        sentences.append([tuple(word_tag.split('\t'))
                         for word_tag in word_tag_pairs])
    return sentences

# 1) load data
train_sentences = load_data(train_file)
test_sentences = load_data(test_file)

# 2) load tags
with open(tag_file, 'r') as file:
    tags = file.read().strip().split()

print("12 Tags:", tags)

12 Tags: ['.', 'ADJ', 'ADP', 'ADV', 'CONJ', 'DET', 'NOUN', 'NUM', 'PRON', 'PRT', 'VERB', 'X']


In [4]:
# 3) counting sentences and words
num_train_sentences = len(train_sentences)
num_test_sentences = len(test_sentences)

num_train_words = sum(len(sentence) for sentence in train_sentences)
num_test_words = sum(len(sentence) for sentence in test_sentences)
unique_train_words = set(word for sentence in train_sentences for word, _ in sentence)
unique_test_words = set(word for sentence in test_sentences for word, _ in sentence)
num_unique_train_words = len(unique_train_words)
num_unique_test_words = len(unique_test_words)

print(f"Number of sentences in train dataset: {num_train_sentences}")
print(f"Number of words in train dataset (punctuation included): {num_train_words}")
print(f"Number of unique words in train dataset (punctuation included): {num_unique_train_words}")
print(f"Number of sentences in test dataset: {num_test_sentences}")
print(f"Number of words in test dataset (punctuation included): {num_test_words}")
print(f"Number of unique words in test dataset (punctuation included): {num_unique_test_words}")

Number of sentences in train dataset: 45800
Number of words in train dataset (punctuation included): 928327
Number of unique words in train dataset (punctuation included): 50490
Number of sentences in test dataset: 11540
Number of words in test dataset (punctuation included): 232865
Number of unique words in test dataset (punctuation included): 25353


In [6]:
# 4) count words for each tag
def count_tags(sentences):
    tags = []
    for sentence in sentences:
        for pair in sentence:
            tags.append(pair[1])
    return FreqDist(tags)


train_tag_freq = count_tags(train_sentences)
test_tag_freq = count_tags(test_sentences)

tag_word_count = {tag: [train_tag_freq[tag],
                        test_tag_freq[tag]] for tag in tags}

print("Tag counts [train, test]:")
for tag, counts in tag_word_count.items():
    print(f"{tag}: {counts}")

Tag counts [train, test]:
.: [117723, 29842]
ADJ: [66985, 16736]
ADP: [115752, 29014]
ADV: [44765, 11474]
CONJ: [30455, 7696]
DET: [109418, 27601]
NOUN: [220451, 55107]
NUM: [11921, 2953]
PRON: [39657, 9677]
PRT: [23889, 5940]
VERB: [146199, 36551]
X: [1112, 274]


In [5]:
# Task 2 --- Method 1: Build a baseline method, namely, the most frequent tagger ---
#     If you can recall, we introduced a strong baseline method (See Dan's book in 
# https://web.stanford.edu/~jurafsky/slp3/ed3book_jan72023.pdf Page 164.),
#     where we label each word by using the most frequent-used tag associated with it.
# 1). find the most frequent class label for each word in the training data.
#     For example, {tr_word_1:tag_1,tr_word_2:tag_2,...}
# 2). use your built method to predict tags for both train and test datasets.
#     You should print out two values: the accuracies of train and test samples.
#     You would expect that the accuracy on train will be > 9.0 (but never = 1.0) and higher than on test.

# Notice: since there are unkown words in test samples. 
#  Following ways could handle this (choose one or create your own): 
#  1). mark all words that appear only once in the data with a "UNK-x" tag
#  2). tag every out-of-vocabulary word with the majority tag among all training samples.
#  3). find more methods in https://github.com/Adamouization/POS-Tagging-and-Unknown-Words

# Your code
# 1) find the most frequent class label for each word
def find_mostfreq_tag_word(dataset):
    word_tag_freq = defaultdict(Counter)
    for sentence in dataset:
        for word, tag in sentence:
            word_tag_freq[word][tag] += 1
    most_freq_tag = {word: tag_freq.most_common(1)[0][0]
                     for word, tag_freq in word_tag_freq.items()}
    return most_freq_tag


# 2) predict tags
def predict_tags(sentences, most_freq_tag, default_tag='NOUN'):
    correct = 0
    total = 0
    for sentence in sentences:
        for word, true_tag in sentence:
            pred_tag = most_freq_tag.get(word, default_tag)
            if pred_tag == true_tag:
                correct += 1
            total += 1
    accuracy = correct / total
    return accuracy

Handling UNK method 1: replace with "UNK" token

Part of the UNK handling functions below are taken from https://github.com/Adamouization/POS-Tagging-and-Unknown-Words.

In [6]:
# handle UNKs: mark all words that appear only once in the data with a "UNK-x" tag
def extract_words(dataset):
    # dataset: list of sentences, where each sentence is a list of (word, tag) tuples
    return [word for sentence in dataset for word, _ in sentence]


def get_hapax_words(words, threshold=1):
    # Return words that appear only once in the data
    words_freq = FreqDist(words)
    return [word for word, freq in words_freq.items() if freq <= threshold]


# def basic_UNK_rules(word):
#     if word.endswith('ing'):
#         return "UNK-ing"
#     elif word.istitle():
#         return "UNK-capitalised"
#     return "UNK"

def extra_UNK_rules(word):
    if word.startswith('$'):
        return "UNK-currency"
    elif word.isdigit():
        return "UNK-number"
    elif re.compile(r'\d+(?:[,.]\d*)?').match(word):
        return "UNK-decimal-number"
    elif word.istitle():
        if word.endswith('ing'):
            return "Unk-ing"
        elif word.endswith('ed'):
            return "Unk-ed"
        elif word.endswith("'s"):
            return "Unk-apostrophe-s"
        elif '-' in word:
            return "Unk-hyphen"
    elif not word.istitle():
        if word.endswith('ing'):
            return "unk-ing"
        elif word.endswith('ed'):
            return "unk-ed"
        elif word.endswith("'s"):
            return "unk-apostrophe-s"
        elif '-' in word:
            return "unk-hyphen"
    return "UNK"


def handle_UNK_train(dataset, hapax_words):
    # Replace hapax words with appropriate "UNK-x" strings
    for sentence in tqdm(dataset, desc="Replacing train words", total=len(dataset), leave=True):
        for i, (word, tag) in enumerate(sentence):
            if word in hapax_words:
                sentence[i] = (extra_UNK_rules(word), tag)
    return dataset


def handle_UNK_test(dataset, unique_train_words):
    # Replace test words not in the training set
    for sentence in tqdm(dataset, desc="Replacing test words", total=len(dataset), leave=True):
        for i, (word, tag) in enumerate(sentence):
            if word not in unique_train_words:
                sentence[i] = (extra_UNK_rules(word), tag)
    return dataset

In [74]:
train_hapax_words = get_hapax_words(extract_words(train_sentences))
train_sentences_UNK = handle_UNK_train(train_sentences, train_hapax_words)

# update unique words (remove hapax words)
unique_train_words_hapax_excluded = unique_test_words - set(train_hapax_words)
test_sentences_UNK = handle_UNK_test(
    test_sentences, unique_train_words_hapax_excluded)
print("Finish handling UNK words.")

Replacing test words: 100%|██████████| 11540/11540 [00:00<00:00, 256419.56it/s]

Finish handling UNK words.





In [66]:
# save to file
with open('train_sentences_UNK.pkl', 'wb') as file:
    pickle.dump(train_sentences_UNK, file)

with open('test_sentences_UNK.pkl', 'wb') as file:
    pickle.dump(test_sentences_UNK, file)

In [7]:
# read from file
with open('train_sentences_UNK.pkl', 'rb') as file:
    train_sentences_UNK = pickle.load(file)

with open('test_sentences_UNK.pkl', 'rb') as file:
    test_sentences_UNK = pickle.load(file)

In [75]:
# Perform baseline method with UNK replacement handling
most_freq_tag_UNK = find_mostfreq_tag_word(train_sentences_UNK)

baselineUNK_train_acc = predict_tags(train_sentences_UNK, most_freq_tag_UNK)
baselineUNK_test_acc = predict_tags(test_sentences_UNK, most_freq_tag_UNK)

print(f"Baseline method with UNK replacement handling:")
print(f"Train accuracy: {baselineUNK_train_acc:.4f}")
print(f"Test accuracy: {baselineUNK_test_acc:.4f}")

Baseline method with UNK replacement handling:
Train accuracy: 0.9495
Test accuracy: 0.9437


Handling UNK method 2: tagging with the most frequent tag

In [49]:
# handle UNKs: tag every out-of-vocabulary word with the majority tag among all training sample
default_tag = train_tag_freq.most_common(1)[0][0]
most_freq_tag = find_mostfreq_tag_word(train_sentences)

baselineMaj_train_acc = predict_tags(train_sentences, most_freq_tag, default_tag)
baselineMaj_test_acc = predict_tags(test_sentences, most_freq_tag, default_tag)

print(f"Baseline method with majority tag handling:")
print(f"Train Accuracy: {baselineMaj_train_acc:.4f}")
print(f"Test Accuracy: {baselineMaj_test_acc:.4f}")

Baseline method with UNK majority tag handling:
Train Accuracy: 0.9572
Test Accuracy: 0.9452


In [76]:
# Task 3 --- Method 2: Build an HMM tagger ---
# 1) You should use nltk.tag.HiddenMarkovModelTagger to build an HMM tagger.
#    It has parameters: symbols, states, transitions, outputs, priors, transform (ignore it).
#    Specify these parameters properly. For example, you can use MLE to estimate transitions, outputs and priors.
#    That is, MLE to estimate matrix A (transition matrix), and matrix B (output probabilites) (See. Page 8.4.3)
# 2) After build your model, report both the accuracy of HMM tagger for train samples and test samples.
# 
# 3) Compared with your baseline method, discuss that why your HMM tagger is better/worse than baseline method.

# Notice: You may also need to handle unknown words just like Task 2.

# Your code
# 1) build HMM tagger
# Calculate transition probabilities (A), output probabilities (B), and priors
def get_HMM_params(dataset):
    symbols = set(word for sentence in train_sentences for word, _ in sentence)
    states = set(tag for sentence in train_sentences for _, tag in sentence)
    transitions = []
    outputs = []
    priors = []

    for sentence in dataset:
        previous_tag = None
        for word, tag in sentence:
            outputs.append((tag, word))
            if previous_tag is None:
                priors.append(tag)
            else:
                transitions.append((previous_tag, tag))
            previous_tag = tag

    # Convert counts to probabilities
    A = ConditionalProbDist(ConditionalFreqDist(transitions), MLEProbDist)
    B = ConditionalProbDist(ConditionalFreqDist(outputs), MLEProbDist)
    priors = MLEProbDist(FreqDist(priors))

    return symbols, states, A, B, priors


symbols, states, A, B, priors = get_HMM_params(train_sentences_UNK)
hmm_tagger = HiddenMarkovModelTagger(
    symbols=symbols, states=states, transitions=A, outputs=B, priors=priors)

# 2) evaluate HMM tagger
hmm_tagger.test(train_sentences_UNK)
hmm_tagger.test(test_sentences_UNK)

# 3) compare with baseline
print(f"Baseline Train Accuracy: {baselineUNK_train_acc:.4f}")
print(f"Baseline Test Accuracy: {baselineUNK_test_acc:.4f}")

  O[i, k] = self._output_logprob(si, self._symbols[k])
  X[i, j] = self._transitions[si].logprob(self._states[j])


accuracy over 928327 tokens: 96.85
accuracy over 232865 tokens: 96.09
Baseline Train Accuracy: 0.9495
Baseline Test Accuracy: 0.9437


We observe that there is an issue of overflow. Now we try to perform smoothing to avoid the problem.

In [10]:
def get_HMM_params_smooth(dataset, gamma=0.1):
    symbols = set(word for sentence in train_sentences for word, _ in sentence)
    states = set(tag for sentence in train_sentences for _, tag in sentence)
    transitions = []
    outputs = []
    priors = []

    for sentence in dataset:
        previous_tag = None
        for word, tag in sentence:
            outputs.append((tag, word))
            if previous_tag is None:
                priors.append(tag)
            else:
                transitions.append((previous_tag, tag))
            previous_tag = tag

    # Convert counts to probabilities
    A = ConditionalProbDist(ConditionalFreqDist(transitions), lambda fd: LidstoneProbDist(fd, gamma))
    B = ConditionalProbDist(ConditionalFreqDist(outputs), lambda fd: LidstoneProbDist(fd, gamma))
    priors = LidstoneProbDist(FreqDist(priors), gamma)

    return symbols, states, A, B, priors


symbols, states, A, B, priors = get_HMM_params_smooth(train_sentences_UNK)
hmm_tagger = HiddenMarkovModelTagger(symbols=symbols, states=states, transitions=A, outputs=B, priors=priors)

# 2) evaluate HMM tagger
hmm_tagger.test(train_sentences_UNK)
hmm_tagger.test(test_sentences_UNK)

accuracy over 928327 tokens: 95.81
accuracy over 232865 tokens: 95.13


Although Lidstone smoothing lowers the accuracy, it is still better than the baseline method. 

The HMM tagger performs better than the baseline method because it considers the context in which words appear. By using transition probabilities to model the likelihood of a tag given the previous tag, the HMM tagger captures the syntactic structure of the language. This allows it to disambiguate words based on their context, improving accuracy.

For Task 4 below, some codes are from https://huggingface.co/learn/nlp-course/chapter7/2?fw=pt

In [8]:
# Task 4 --- Method 3: Fine-tuning on BERT-base model for POS-tagging ---
# 
# 1) You may download a BERT model (say, you choose BERT-base cased) 
#    and use tools in https://github.com/huggingface/transformers
# 
# 2) After build your model, report both the accuracy of BERT tagger for train samples and test samples.
# 
# 3) Compared with Method 1,2, discuss that why your BERT tagger is better/worse than these two.
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
tag2id = {tag: i for i, tag in enumerate(tags)}
id2tag = {i: tag for tag, i in tag2id.items()}


# convert data into datasets.Dataset format
def convert_dataset(sentences):
    words = [[word for word, tag in sentence] for sentence in sentences]
    tags = [[tag2id[tag] for word, tag in sentence] for sentence in sentences]
    return {'words': words, 'tags': tags}


train_data = convert_dataset(train_sentences)
test_data = convert_dataset(test_sentences)

train_dataset = Dataset.from_dict(train_data)
test_dataset = Dataset.from_dict(test_data)

# Load BERT tokenizer
model_checkpoint = "bert-base-cased"
tokenizer = AutoTokenizer.from_pretrained(model_checkpoint)

In [9]:
# Preprocess data
# BERT uses subword tokenization, so we need to tokenize the words and align the labels
def align_labels_with_tokens(labels, word_ids):
    new_labels = []
    current_word = None
    for word_id in word_ids:
        if word_id != current_word:
            # Start of a new word!
            current_word = word_id
            label = -100 if word_id is None else labels[word_id]
            new_labels.append(label)
        elif word_id is None:
            # Special token
            new_labels.append(-100)
        else:
            # Same word as previous token
            label = labels[word_id]
            new_labels.append(label)

    return new_labels

In [None]:
# tokenize lots of texts at the same time
def tokenize_and_align_labels(examples):
    tokenized_inputs = tokenizer(
        examples["words"], truncation=True, is_split_into_words=True
    )
    all_labels = examples["tags"]
    new_labels = []
    for i, labels in enumerate(all_labels):
        word_ids = tokenized_inputs.word_ids(i)
        new_labels.append(align_labels_with_tokens(labels, word_ids))

    tokenized_inputs["labels"] = new_labels
    return tokenized_inputs


tokenized_train_dataset = train_dataset.map(
    tokenize_and_align_labels,
    batched=True,
    remove_columns=train_dataset.column_names,
)

tokenized_test_dataset = test_dataset.map(
    tokenize_and_align_labels,
    batched=True,
    remove_columns=test_dataset.column_names,
)

In [35]:
# Define metrics
def compute_metrics(pred):
    labels = pred.label_ids
    preds = pred.predictions.argmax(-1)

    # Flatten the arrays to match the format required by classification_report
    true_labels = np.concatenate(labels, axis=0)
    true_preds = np.concatenate(preds, axis=0)

    mask = true_labels != -100
    true_labels = true_labels[mask]
    true_preds = true_preds[mask]

    report_dict = classification_report(true_labels, true_preds, target_names=tags, output_dict=True)
    report_str = classification_report(true_labels, true_preds, target_names=tags)

    print(report_str) 
    # with open("classification_report.txt", "w") as f:
    #     f.write(report_str)
    accuracy = report_dict["accuracy"]
    macro_f1 = report_dict["macro avg"]["f1-score"]
    weighted_f1 = report_dict["weighted avg"]["f1-score"]

    return {
        "accuracy": accuracy,
        "macro_f1": macro_f1,
        "weighted_f1": weighted_f1
    }

In [41]:
# Load pre-trained BERT model
data_collator = DataCollatorForTokenClassification(tokenizer)
model = BertForTokenClassification.from_pretrained(
    model_checkpoint,
    id2label=id2tag,
    label2id=tag2id,
).to(device)

Some weights of BertForTokenClassification were not initialized from the model checkpoint at bert-base-cased and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [42]:
# Training arguments
args = TrainingArguments(
    output_dir='results',
    eval_strategy="no",
    save_strategy="epoch",
    learning_rate=2e-5,
    num_train_epochs=1,
    weight_decay=0.01,
    fp16=True,
    logging_dir='logs',
    logging_steps=500
)

# Train the model
trainer = Trainer(
    model=model,
    args=args,
    train_dataset=tokenized_train_dataset,
    data_collator=data_collator,
    compute_metrics=compute_metrics,
    tokenizer=tokenizer,
)
trainer.train()

huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)


Step,Training Loss
500,0.309
1000,0.0779
1500,0.0663
2000,0.0552
2500,0.055
3000,0.0486
3500,0.0515
4000,0.0468
4500,0.0469
5000,0.0458


TrainOutput(global_step=5725, training_loss=0.07529560772092062, metrics={'train_runtime': 371.0463, 'train_samples_per_second': 123.435, 'train_steps_per_second': 15.429, 'total_flos': 1180980295456896.0, 'train_loss': 0.07529560772092062, 'epoch': 1.0})

In [44]:
# Evaluate the model
train_results_BERT = trainer.evaluate(eval_dataset=tokenized_train_dataset)
test_results_BERT = trainer.evaluate(eval_dataset=tokenized_test_dataset)
print("Training results of BERT:\n", train_results_BERT)
print("Test results of BERT:\n", test_results_BERT)

              precision    recall  f1-score   support

           .       1.00      1.00      1.00    134478
         ADJ       0.98      0.98      0.98     87581
         ADP       0.99      0.99      0.99    115959
         ADV       0.98      0.98      0.98     49180
        CONJ       1.00      1.00      1.00     30495
         DET       1.00      1.00      1.00    109463
        NOUN       0.99      0.99      0.99    296657
         NUM       0.99      0.99      0.99     14695
        PRON       1.00      0.99      0.99     39762
         PRT       0.98      0.99      0.99     28383
        VERB       0.99      1.00      1.00    165283
           X       0.92      0.78      0.84      2400

    accuracy                           0.99   1074336
   macro avg       0.98      0.97      0.98   1074336
weighted avg       0.99      0.99      0.99   1074336

              precision    recall  f1-score   support

           .       1.00      1.00      1.00     34144
         ADJ       0.96 

We can summarize the result of every method into a table:

| Tagger | Accuracy on trainset | Accuracy on testset |
| --- | --- | --- |
| Baseline+MostFreq | 95.72 | 94.52 |
| Baseline+UNK | 94.95 | 94.37 |
| HMM+UNK+MLE | 96.85 | 96.09 |
| HMM+UNK+MLE+Lidstone | 95.81 | 95.13 |
| **BERT** (1 epoch) | **99.28** | **98.86** |

Our fine-tuned BERT outperforms baseline and HMM in POS tagging, probably due to its advanced architecture and capabilities:

1. BERT captures the context of each word within a sentence, allowing it to understand word meanings based on surrounding words. This is crucial for POS tagging as the meaning of a word can change depending on its context.

2. BERT leverages deep neural networks with multiple layers, enabling it to learn complex patterns and representations in language data, leading to higher accuracy.

3. BERT is pre-trained on a large corpus of text using masked language modeling and next sentence prediction, which helps it to grasp general language structures and nuances effectively.

4. The self-attention mechanism in BERT's transformer architecture allows it to weigh the importance of different words in a sentence, which can enhance its ability to make accurate predictions.

In contrast, baseline methods like most frequent tagging do not consider context, and HMMs, while considering context, are limited by their simpler statistical models and inability to capture long-range dependencies effectively.