# <center> --- Explore HMM POS Taggers using Brown corpus --- </center>


<center> 罗金子 21300180123 数学与应用数学 </center>

In [17]:
# In this assignment, you will explore three taggers for a Brown corpus.
# import your packages here

from collections import defaultdict, Counter
from nltk.tag import hmm
import torch
from sklearn.model_selection import train_test_split
import numpy as np
from datasets import ClassLabel, Sequence, DatasetDict, Dataset
from transformers import AutoTokenizer
from transformers import Trainer
from transformers import DataCollatorForTokenClassification
import evaluate
from transformers import AutoModelForTokenClassification

## Task 1 --- Load and explore your data ---

### 1) load train/test samples from Brown corpus files, brown-train.txt, brown-test.txt.

In [18]:
def load_brown_corpus(file_path):
    with open(file_path, 'r') as file:
        lines = file.readlines()
    
    sentences = []
    sentence = []
    for line in lines:
        line = line.strip()
        if not line: # 空行
            if sentence:
                sentences.append(sentence)
                sentence = []
        else:
            if line.startswith('b100'): # 页面
                continue  
            word, tag = line.split('\t')
            sentence.append((word, tag))
    
    if sentence:
        sentences.append(sentence)
    
    return sentences

train_samples = load_brown_corpus('brown-train.txt')
test_samples = load_brown_corpus('brown-test.txt')

## 样例
for sentence in train_samples[:2]:
    print(sentence)

for sentence in test_samples[:2]:
    print(sentence)


[('Mr.', 'NOUN'), ('Podger', 'NOUN'), ('had', 'VERB'), ('thanked', 'VERB'), ('him', 'PRON'), ('gravely', 'ADV'), (',', '.'), ('and', 'CONJ'), ('now', 'ADV'), ('he', 'PRON'), ('made', 'VERB'), ('use', 'NOUN'), ('of', 'ADP'), ('the', 'DET'), ('advice', 'NOUN'), ('.', '.')]
[('But', 'CONJ'), ('there', 'PRT'), ('seemed', 'VERB'), ('to', 'PRT'), ('be', 'VERB'), ('some', 'DET'), ('difference', 'NOUN'), ('of', 'ADP'), ('opinion', 'NOUN'), ('as', 'ADP'), ('to', 'ADP'), ('how', 'ADV'), ('far', 'ADV'), ('the', 'DET'), ('board', 'NOUN'), ('should', 'VERB'), ('go', 'VERB'), (',', '.'), ('and', 'CONJ'), ('whose', 'DET'), ('advice', 'NOUN'), ('it', 'PRON'), ('should', 'VERB'), ('follow', 'VERB'), ('.', '.')]
[('``', '.'), ("I've", 'PRT'), ('got', 'VERB'), ('cancer', 'NOUN'), (',', '.'), ("haven't", 'VERB'), ('I', 'PRON'), ("''", '.'), ('?', '.'), ('?', '.')]
[('the', 'DET'), ('need', 'NOUN'), ('to', 'PRT'), ('protect', 'VERB'), ('the', 'DET'), ('public', 'ADJ'), ('interest', 'NOUN'), ('in', 'ADP'), 

### 2). load all 12 tags from brown-tag.txt and print it out

In [19]:
with open('brown-tag.txt', 'r') as file:
        lines = file.readlines()

tags = [line.strip() for line in lines]
print(tags)

['.', 'ADJ', 'ADP', 'ADV', 'CONJ', 'DET', 'NOUN', 'NUM', 'PRON', 'PRT', 'VERB', 'X']


### 3). counting how many sentences and words in both train and test datasets.

In [20]:
count_sentence_train = len(train_samples)
print("num of sentences in train sample:", count_sentence_train)

count_sentence_test = len(test_samples)
print("num of sentences in test sample:", count_sentence_test)

count_words_train = 0
for sentence in train_samples:
    count_words_train += len(sentence)
print("num of words in train sample:", count_words_train)

count_words_test = 0
for sentence in test_samples:
    count_words_test += len(sentence)
print("num of words in test sample:", count_words_test)

num of sentences in train sample: 45800
num of sentences in test sample: 11540
num of words in train sample: 928327
num of words in test sample: 232865


### 4). for each tag, counting how many words in train and test. e.g, tag1: [count_tr, count_te]

In [21]:
record = []

for k in range(12):
    tag = tags[k]
    count_tr, count_te = 0, 0
    for sentence in train_samples:
        for word in sentence:
            if word[1] == tag:
                count_tr += 1
    for sentence in test_samples:
        for word in sentence:
            if word[1] == tag:
                count_te += 1
    record.append([tag, [count_tr, count_te]])
    
for i in range(len(record)):
    print("tag{} {}:".format(i+1, record[i][0]), record[i][1])

tag1 .: [117723, 29842]
tag2 ADJ: [66985, 16736]
tag3 ADP: [115752, 29014]
tag4 ADV: [44765, 11474]
tag5 CONJ: [30455, 7696]
tag6 DET: [109418, 27601]
tag7 NOUN: [220451, 55107]
tag8 NUM: [11921, 2953]
tag9 PRON: [39657, 9677]
tag10 PRT: [23889, 5940]
tag11 VERB: [146199, 36551]
tag12 X: [1112, 274]


## Task 2 --- Method 1: Build a baseline method, namely, the most frequent tagger ---
If you can recall, we introduced a strong baseline method (See Dan's book in 
 https://web.stanford.edu/~jurafsky/slp3/ed3book_jan72023.pdf Page 164.),
     where we label each word by using the most frequent-used tag associated with it.


Notice: since there are unkown words in test samples. 

Following ways could handle this (choose one or create your own): 

(1). mark all words that appear only once in the data with a "UNK-x" tag

(2). tag every out-of-vocabulary word with the majority tag among all training samples.

(3). find more methods in https://github.com/Adamouization/POS-Tagging-and-Unknown-Words

### 1). find the most frequent class label for each word in the training data.    
 For example, {tr_word_1:tag_1,tr_word_2:tag_2,...}

In [38]:
word_tag_freq = defaultdict(Counter)

for sentence in train_samples:
    for word, tag in sentence:
        word_tag_freq[word][tag] += 1

word_most_frequent_tag = {}
for word, tag_counter in word_tag_freq.items():
    most_frequent_tag = tag_counter.most_common(1)[0][0]
    word_most_frequent_tag[word] = most_frequent_tag

print(word_most_frequent_tag)



### 2). use your built method to predict tags for both train and test datasets.

You should print out two values: the accuracies of train and test samples.
You would expect that the accuracy on train will be > 9.0 (but never = 1.0) and higher than on test.

In [44]:
# 1). mark all words that appear only once in the data with a "UNK-x" tag 
def base_line_acc1(samples, word_most_tag_freq, sentence_count, word_count):
    count = 0
    for i in range(sentence_count):
        for word_set in samples[i]:
            word_true_tag = word_set[1]
            word = word_set[0] 
            if word not in word_most_tag_freq:
                continue
            if word_most_tag_freq[word] == word_true_tag:
                count += 1
            
    return count / word_count
            



print("method 1")
print("train acc:",base_line_acc1(train_samples, word_most_frequent_tag, count_sentence_train, count_words_train))
print("test acc:", base_line_acc1(test_samples, word_most_frequent_tag, count_sentence_test, count_words_test))

# 2). tag every out-of-vocabulary word with the majority tag among all training samples.
# we know that the most freq in train sample is "NOUN"

def base_line_acc2(samples, word_most_tag_freq, sentence_count, word_count):
    count = 0
    for i in range(sentence_count):
        for word_set in samples[i]:
            word_true_tag = word_set[1]
            word = word_set[0] 
            if word not in word_most_tag_freq:
                if word_true_tag == "NOUN":
                    count += 1
                continue
            if word_most_tag_freq[word] == word_true_tag:
                count += 1
            
    return count / word_count

print("method 2")
print("train acc:",base_line_acc2(train_samples, word_most_frequent_tag, count_sentence_train, count_words_train))
print("test acc:", base_line_acc2(test_samples, word_most_frequent_tag, count_sentence_test, count_words_test))

method 1
train acc: 0.9571961173164197
test acc: 0.9303802632426513
method 2
train acc: 0.9571961173164197
test acc: 0.945187125587787


可以看到加most-freq效果更好，因为UNK直接选择抛弃语义信息，正确率会更低

## <center> Task 3 --- Method 2: Build an HMM tagger --- </center>
Notice: You may also need to handle unknown words just like Task 2.

### 1) You should use nltk.tag.HiddenMarkovModelTagger to build an HMM tagger.
    It has parameters: symbols, states, transitions, outputs, priors, transform (ignore it).
    Specify these parameters properly. For example, you can use MLE to estimate transitions, outputs and priors.
    That is, MLE to estimate matrix A (transition matrix), and matrix B (output probabilites) (See. Page 8.4.3)

In [None]:
# 准备parameters

symbols = set()
states = set()
for sentence in train_samples:
    for word, tag in sentence:
        symbols.add(word)
        states.add(tag)
symbols = list(symbols)
states = list(states)

### 2) After build your model, report both the accuracy of HMM tagger for train samples and test samples.

In [92]:
trainer = nltk.tag.HiddenMarkovModelTagger.train(train_samples, symbols=symbols, states=states)

train_accuracy = trainer.accuracy(train_samples)
print("训练样本准确性：", train_accuracy)

训练样本准确性： 0.9698694533284069


In [103]:
test_words = [word for sentence in test_samples for word, tag in sentence]
test_tags = [tag for sentence in test_samples for word, tag in sentence]

In [112]:
tagger = nltk.tag.HiddenMarkovModelTagger(test_words, tags, trainer._transitions, trainer._outputs, trainer._priors)
test_accuracy = tagger.accuracy(test_samples)
print("测试样本准确性：", test_accuracy)

测试样本准确性： 0.9510231249865803


In [129]:
UNK_test_words = []
for sentence in test_samples:
    k = []
    for word in sentence:
        if word[0] in word_most_frequent_tag:
            k.append((word[0], word[1]))
        else:
            k.append((word[0], 'UNK'))
    UNK_test_words.append(k)

In [130]:
test_accuracy2 = tagger.accuracy(UNK_test_words)
print("测试样本准确性：", test_accuracy2)

测试样本准确性： 0.9435037468060894


In [132]:
NOUN_test_words = []
for sentence in test_samples:
    k = []
    for word in sentence:
        if word[0] in word_most_frequent_tag:
            k.append((word[0], word[1]))
        else:
            k.append((word[0], 'NOUN'))
    NOUN_test_words.append(k)

In [133]:
test_accuracy3 = tagger.accuracy(NOUN_test_words)
print("测试样本准确性：", test_accuracy3)

测试样本准确性： 0.9485281171494213


### 3) Compared with your baseline method, discuss that why your HMM tagger is better/worse than baseline method.

可以看出 HMM更好：

| method | train acc | test acc|
| -- | -- | -- |
| baseline UNK | 0.9571961173164197 | 0.9303802632426513|
| baseline most-freq | 0.9571961173164197 | 0.945187125587787|
| HMM |  <font color=red>0.9698694533284069</font> | <font color=red>0.9510231249865803</font>|
|HMM UNK | 0.9698694533284069 | 0.9435037468060894|
|HMM most-freq| 0.9698694533284069 | 0.9485281171494213|



分析：
1. 上下文信息：HMM 标注器考虑句子中单词的上下文，而基线方法仅单独考虑单个单词。 通过考虑单词的顺序及其关系，HMM 标注器可以做出更准确的标注决策。
2. 转换概率：HMM 标记器对单词序列中不同标签之间的转换概率进行建模。 这使得它能够捕获标签之间的依赖关系，并根据标签序列的可能性做出更明智的标记决策。
3. 歧义处理：HMM 标注器可以比基线方法更好地处理语言中的歧义。 它使用概率模型根据给定观察到的单词的最可能的标签序列来分配标签，从而在歧义的情况下实现更准确的标记。
4. 在HMM中， 加了UNK和most-freq后正确率会变低，可能是丢失信息导致的
   
总的来说：与简单方法相比，HMM 标注器能够捕获上下文更多信息，这使其成为一种更有效、更准确的词性标注方法。

## <center> Task 4 --- Method 3: Fine-tuning on BERT-base model for POS-tagging --- </center>

### 1) You may download a BERT model (say, you choose BERT-base cased) 
   and use tools in https://github.com/huggingface/transformers

In [22]:
model_checkpoint = "bert-base-cased"
tokenizer = AutoTokenizer.from_pretrained(model_checkpoint)

In [23]:
# 建立POS标签到数字的映射字典
pos2label = {
    '.': 0,
    'ADJ': 1,
    'ADP': 2,
    'ADV': 3,
    'CONJ': 4,
    'DET': 5,
    'NOUN': 6,
    'NUM': 7,
    'PRON': 8,
    'PRT': 9,
    'VERB': 10,
    'X': 11
}
label2pos = {
    1: 'NOUN',
    2: 'VERB',
    3: 'ADJ',
    4: 'ADV',
    5: 'PRON',
    6: 'DET',
    7: 'ADP',
    8: 'CONJ',
    9: 'NUM',
    10: 'PRT',
    11: 'X',
}

In [33]:
## 准备数据集
from datasets import DatasetDict, Dataset

# spl = int(len(train_samples)*(0.7))
# spl2 = int(len(train_samples)*(0.3))

n = len(train_samples)
train_data = {
    'id': [x for x in range(n)],
    'tokens': [[word[0] for word in sentence] for sentence in train_samples],
    'pos_tags': [[word[1] for word in sentence] for sentence in train_samples],
}

test_data = {
    'id': [x for x in range(len(test_samples))],
    'tokens': [[word[0] for word in sentence] for sentence in test_samples],
    'pos_tags': [[word[1] for word in sentence] for sentence in test_samples]
}

train_dataset = Dataset.from_dict(train_data)
test_dataset = Dataset.from_dict(test_data)

raw_datasets = DatasetDict({
    'train': train_dataset,
    'test': test_dataset
})

print(raw_datasets)

DatasetDict({
    train: Dataset({
        features: ['id', 'tokens', 'pos_tags'],
        num_rows: 45800
    })
    test: Dataset({
        features: ['id', 'tokens', 'pos_tags'],
        num_rows: 11540
    })
})


In [34]:

pos_names = list(pos2label.keys())
num_classes = len(pos_names)
pos_feature = ClassLabel(num_classes=num_classes, names=pos_names)

def map_pos_tags(examples):
    return {'pos_tags': [[pos2label[tag] for tag in tags] for tags in examples['pos_tags']]}

raw_datasets = raw_datasets.map(map_pos_tags, batched=True)

for split in raw_datasets:
    raw_datasets[split] = raw_datasets[split].cast_column("pos_tags", Sequence(pos_feature))

print(raw_datasets["train"].features["pos_tags"])


Map:   0%|          | 0/45800 [00:00<?, ? examples/s]

Map:   0%|          | 0/11540 [00:00<?, ? examples/s]

Casting the dataset:   0%|          | 0/45800 [00:00<?, ? examples/s]

Casting the dataset:   0%|          | 0/11540 [00:00<?, ? examples/s]

Sequence(feature=ClassLabel(names=['.', 'ADJ', 'ADP', 'ADV', 'CONJ', 'DET', 'NOUN', 'NUM', 'PRON', 'PRT', 'VERB', 'X'], id=None), length=-1, id=None)


In [35]:
## 分词
def align_labels_with_tokens(labels, word_ids):
    new_labels = []
    current_word = None
    for word_id in word_ids:
        if word_id != current_word:
            # Start of a new word!
            current_word = word_id
            label = -100 if word_id is None else labels[word_id]
            new_labels.append(label)
        elif word_id is None:
            # Special token
            new_labels.append(-100)
        else:
            # Same word as previous token
            label = labels[word_id]
            new_labels.append(label)

    return new_labels

def tokenize_and_align_labels(examples):
    tokenized_inputs = tokenizer(
        examples["tokens"], truncation=True, is_split_into_words=True
    )
    all_labels = examples["pos_tags"]
    new_labels = []
    for i, labels in enumerate(all_labels):
        word_ids = tokenized_inputs.word_ids(i)
        new_labels.append(align_labels_with_tokens(labels, word_ids))

    tokenized_inputs["labels"] = new_labels
    return tokenized_inputs

tokenized_datasets = raw_datasets.map(
    tokenize_and_align_labels,
    batched=True,
    remove_columns=raw_datasets["train"].column_names,
)

Map:   0%|          | 0/45800 [00:00<?, ? examples/s]

Map:   0%|          | 0/11540 [00:00<?, ? examples/s]

In [36]:

data_collator = DataCollatorForTokenClassification(tokenizer=tokenizer)
pos_feature = raw_datasets["train"].features["pos_tags"]
label_names = pos_feature.feature.names

In [37]:
## 评估
metric = evaluate.load("accuracy")

def compute_metris(eval_preds):
    logits, labels = eval_preds
    predictions = np.argmax(logits, axis=-1)
    true_labels = []
    for label in labels:
        for l in label:
            if l != -100:
                true_labels.append(pos2label[label_names[l]])
    
    true_predictions = []
    
    for prediction, label in zip(predictions, labels):
        for (p, l) in zip(prediction, label):
            if l != -100:
                true_predictions.append(pos2label[label_names[p]])
    accuracy = metric.compute(predictions=true_predictions, references=true_labels)
    return accuracy

id2label = {i: label for i, label in enumerate(label_names)}
label2id = {v: k for k, v in id2label.items()}

In [47]:
## 模型构建
model = AutoModelForTokenClassification.from_pretrained(
    model_checkpoint,
    id2label=id2label,
    label2id=label2id,
)

from transformers import TrainingArguments

args = TrainingArguments(
    output_dir = "output"
    "bert-finetuned-pos",
    eval_strategy="epoch",
    save_strategy="epoch",
    learning_rate=2e-5,
    num_train_epochs=3,
    weight_decay=0.01,
    push_to_hub=False,
)

from torch.utils.data import DataLoader

train_dataloader = DataLoader(
    tokenized_datasets["train"],
    shuffle=True,
    collate_fn=data_collator,
    batch_size=8,
)


test_dataloader = DataLoader(
    tokenized_datasets["test"], collate_fn=data_collator, batch_size= 1
)

model = AutoModelForTokenClassification.from_pretrained(
    model_checkpoint,
    id2label=id2label,
    label2id=label2id,
)

Some weights of BertForTokenClassification were not initialized from the model checkpoint at bert-base-cased and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
Some weights of BertForTokenClassification were not initialized from the model checkpoint at bert-base-cased and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [40]:
from transformers import Trainer

trainer = Trainer(
    model=model,
    args=args,
    train_dataset=tokenized_datasets["train"],
    eval_dataset=tokenized_datasets["test"],
    data_collator=data_collator,
    compute_metrics=compute_metris,
    tokenizer=tokenizer,
)
trainer.train()

Detected kernel version 5.4.0, which is below the recommended minimum of 5.5.0; this can cause the process to hang. It is recommended to upgrade the kernel to the minimum version or higher.


Epoch,Training Loss,Validation Loss,Accuracy
1,0.0156,0.048173,0.988922
2,0.0167,0.046061,0.989578
3,0.0072,0.052761,0.989863


TrainOutput(global_step=17175, training_loss=0.01483428579577539, metrics={'train_runtime': 1316.1505, 'train_samples_per_second': 104.395, 'train_steps_per_second': 13.049, 'total_flos': 3546840276422208.0, 'train_loss': 0.01483428579577539, 'epoch': 3.0})

### 2) After build your model, report both the accuracy of BERT tagger for train samples and test samples.

In [55]:
import torch
device = torch.device("cpu")

model_checkpoint_final = "checkpoint-17175"

model2 = AutoModelForTokenClassification.from_pretrained(
    model_checkpoint_final,
    id2label=id2label,
    label2id=label2id,
).to(device)
model2.eval()
for batch in train_dataloader:
    with torch.no_grad():
        outputs = model2(**batch)
        predictions = outputs.logits.argmax(dim=-1).to(device)
        labels = batch["labels"].to(device)
        true_labels = []
        for label in labels:
            for l in label:
                if l != -100:
                    true_labels.append(pos2label[label_names[l]])
        
        true_predictions = []
        
        for prediction, label in zip(predictions, labels):
            for (p, l) in zip(prediction, label):
                if l != -100:
                    true_predictions.append(pos2label[label_names[p]])
                    
        metric.add_batch(predictions= true_predictions , references= true_labels)
train_acc = metric.compute()
print("训练集正确率：", train_acc)


训练集正确率： {'accuracy': 0.9992376686623179}


### 3) Compared with Method 1,2, discuss that why your BERT tagger is better/worse than these two.

可以看出 bert更好：

| method | train acc | test acc|
| -- | -- | -- |
| baseline UNK | 0.9571961173164197 | 0.9303802632426513|
| baseline most-freq | 0.9571961173164197 | 0.945187125587787|
| HMM |  0.9698694533284069 | 0.9510231249865803|
|HMM UNK | 0.9698694533284069 | 0.9435037468060894|
|HMM most-freq| 0.9698694533284069 | 0.9485281171494213|
|bert |             <font color=red>0.9992376686623179  </font>        | <font color=red>0.989863</font>|

1. 上下文嵌入：BERT（来自 Transformers 的双向编码器表示）模型利用上下文词嵌入，它根据句子中周围的单词捕获单词的含义。 这种上下文理解使 BERT 能够比 HMM 或基线方法更有效地处理语义歧义和上下文相关的词性标记
2. 预训练的语言表示：BERT 在大量文本数据上进行了预训练，这使其能够学习丰富的语言表示和句法结构。 这种预训练有助于 BERT 很好地泛化到各种标记任务和领域，而不需要大量特定于任务的训练数据。
3. 双向编码：与 HMM 等传统模型仅限于一个方向的顺序处理不同，BERT 对句子中的单词进行双向编码。 这种双向性使 BERT 能够捕获两个方向上单词之间的依赖关系，从而提高上下文理解和更准确的词性标记。

总的来说：bert因为有更大的参数量，有了预训练而学习到更丰富的语言特征，所以效果比前两个好