# BERT-CRF on sequence labelling

Tutorial of KnULP lab meetning, National Chengchi University

*Chang-Yu Tsai, 2025.04.25*

- In this week, we will try:
  - to obtain embeddings from BERT, instead of conducting any tasks with BERT
  - to feed the embeddings to CRF to conduct sequence labelling.
- We will compare the performance on two approaches to obtaining embeddings:

  1. Using the pre-trained BERT of Hugging Face to obtain embeddings.
  2. Fine-tuning the pre-trained BERT of Hugging Face first, and using the fine-tuned embeddings.
  


## Set-up

- installing `pytorhc-crf`

In [None]:
pip install pytorch-crf

- importing required packages

```
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report

from transformers import BertTokenizerFast, BertModel
import torch
from torch.optim import AdamW
from torch.utils.data import DataLoader, TensorDataset
import torch.nn as nn

from tqdm import tqdm

import json

from torchcrf import CRF

import os

```


In [None]:
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report

from transformers import BertTokenizerFast, BertModel
import torch
from torch.optim import AdamW
from torch.utils.data import DataLoader, TensorDataset
import torch.nn as nn

from tqdm import tqdm

import json

from torchcrf import CRF

import os

import matplotlib.pyplot as plt

- download the dataset from `Github`

The dataset is collected from [Chinese HealthNER Corpus](https://github.com/NYCU-NLP/Chinese-HealthNER-Corpus). Chinese Healthcare Named Entity Recognition (HealthNER) Corpus is collected and annotated by [NYCU NLP Lab](https://ainlp.tw/).

```
!wget https://raw.githubusercontent.com/NYCU-NLP/Chinese-HealthNER-Corpus/a5eaca54376267cee7a015eb870f7f302517d813/train.zip

```

In [None]:
!wget https://raw.githubusercontent.com/NYCU-NLP/Chinese-HealthNER-Corpus/a5eaca54376267cee7a015eb870f7f302517d813/train.zip

- unzipping the file

```
!unzip train.zip
```

In [None]:
!unzip train.zip

- reading the file

```
data = []
with open("train.json", "r", encoding="utf-8") as f:
    for line in f:
        if line.strip():
            data.append(json.loads(line))

# You can choose to work on smaller or bigger data according to your device.
data = data[:2000]

print("Row number:", len(data))
print("The first row:\n", data[0])
```

In [None]:
data = []
with open("train.json", "r", encoding="utf-8") as f:
    for line in f:
        if line.strip():
            data.append(json.loads(line))

# You can choose to work on smaller or bigger data according to your device.
data = data[:2000]

print("Row number:", len(data))
print("The first row:\n", data[0])

- inspecting the key of the dataset

This `json` includes different keys pairing to different values. We will only work with some of them.
```
data[0].keys()
```

In [None]:
data[0].keys()

#### obtainnig texts



- extracting the data we are going to work on NER

We are working on the character-level sequence labelling task. Therefore, we first obtain the characters and the character-level labels from the dataset. As for `sentences`, you can optionally obtain it if you would like to check the original sentence.

```
sentences=[]
characters=[]
character_labels=[]
for i in range(len(data)):
  sentences.append(data[i]['sentence'])
  characters.append(data[i]['character'])
  character_labels.append(data[i]['character_label'])
```


In [None]:
sentences=[]
characters=[]
character_labels=[]
for i in range(len(data)):
  sentences.append(data[i]['sentence'])
  characters.append(data[i]['character'])
  character_labels.append(data[i]['character_label'])


- calculating the max length of the sentence for the padding later

```
length_list=[]
for char_list in characters:
  length=len(char_list)
  length_list.append(length)
max_len=max(length_list)
print('The maximum sentence length:', max_len)
```

In [None]:
length_list=[]
for char_list in characters:
  length=len(char_list)
  length_list.append(length)
max_len=max(length_list)
print('The maximum sentence length:', max_len)

- taking a look at the dataset
  - `sentence`: the string of the text
  - `character`: the list of characters of the text
  - `character_label`: the list of character-level label of the text, based on the BIO schema

```
data[0]['sentence']
# data[0]['character']
# data[0]['character_label']
```

In [None]:
data[0]['sentence']
# data[0]['character']
# data[0]['character_label']

## Preprocessing

### text encoding
It is important to conduct **tokenising** and **aligning** for text encoding before we work on BERT.

##### tokenising

`tokenizer` in HuggingFace Transformers is a built-in utility that handles not only tokenisation, but also:
- Automatic padding: With `padding='max_length'`, it pads all sequences to the maximum length of the dataset.

- Tensor output: With `return_tensors="pt"`, it directly returns PyTorch tensors (no need to convert manually).

- defining the function to encode the texts

  **👓 Note that we also output `alignment_ids_list`, which is used to perform the following alignment between tokens and labels.**
  
  - We rely on the `.word_ids()` method to determine the alignment between subword tokens and their corresponding original characters.
    - `word_ids` assigns an integer index for each subword, indicating which original token (i.e., character) it comes from.
    - Special tokens such as `[CLS]`, `[SEP]`, and `[PAD]` will be assigned `None`, since they do not correspond to any original token. These should be ignored during label alignment.


```
def character_encode(characters, character_labels, label2id, tokenizer_path, local_files_only=None):
    """
    Encode character-level inputs and prepare alignment information for NER tasks.

    Args:
        characters (List[List[str]]): List of List of characters
        character_labels (List[List[str]]): List of corresponding labels
        label2id (Dict[str, int]): Mapping from label string to ID
        tokenizer_path (str): Path to local tokenizer or Hugging Face model ID
        local_files_only (bool, optional): Whether to only load from local files.
                                           If None, automatically detect.

    Returns:
        input_ids_tensor, attention_mask_tensor, alignment_ids_list
    """

    # --- Load tokenizer ---
    if local_files_only is None:
        # If user didn't specify, automatically check if path exists
        local_files_only = os.path.exists(tokenizer_path)

    tokenizer = BertTokenizerFast.from_pretrained(tokenizer_path, local_files_only=local_files_only)

    # --- Tokenize inputs ---
    encodings = tokenizer(
        characters,                    # List of List of characters
        is_split_into_words=True,       # The input is already split so it won't be split into subwords.
        padding='max_length',           # Padding based on maximum length specified in `max_length`
        max_length=max_len,             # Specifying the maximum length
        truncation=True,                # Truncating the input if its length is longer than `max_len`
        return_tensors="pt"             # Return the torch tensors
    )

    input_ids_tensor = encodings['input_ids']
    attention_mask_tensor = encodings['attention_mask']

    # --- Preparation for alignment ---
    alignment_ids_list = []
    for i in range(len(characters)):
        alignment_ids = encodings.word_ids(batch_index=i)   # Obtaining the alignment IDs through `word_ids`
        alignment_ids_list.append(alignment_ids)

    return input_ids_tensor, attention_mask_tensor, alignment_ids_list
```

In [None]:
def character_encode(characters, character_labels, label2id, tokenizer_path, local_files_only=None):
    """
    Encode character-level inputs and prepare alignment information for NER tasks.

    Args:
        characters (List[List[str]]): List of List of characters
        character_labels (List[List[str]]): List of corresponding labels
        label2id (Dict[str, int]): Mapping from label string to ID
        tokenizer_path (str): Path to local tokenizer or Hugging Face model ID
        local_files_only (bool, optional): Whether to only load from local files.
                                           If None, automatically detect.

    Returns:
        input_ids_tensor, attention_mask_tensor, alignment_ids_list
    """

    # --- Load tokenizer ---
    if local_files_only is None:
        # If user didn't specify, automatically check if path exists
        local_files_only = os.path.exists(tokenizer_path)

    tokenizer = BertTokenizerFast.from_pretrained(tokenizer_path, local_files_only=local_files_only)

    # --- Tokenize inputs ---
    encodings = tokenizer(
        characters,                    # List of List of characters
        is_split_into_words=True,       # The input is already split so it won't be split into subwords.
        padding='max_length',           # Padding based on maximum length specified in `max_length`
        max_length=max_len,             # Specifying the maximum length
        truncation=True,                # Truncating the input if its length is longer than `max_len`
        return_tensors="pt"             # Return the torch tensors
    )

    input_ids_tensor = encodings['input_ids']
    attention_mask_tensor = encodings['attention_mask']

    # --- Preparation for alignment ---
    alignment_ids_list = []
    for i in range(len(characters)):
        alignment_ids = encodings.word_ids(batch_index=i)   # Obtaining the alignment IDs through `word_ids`
        alignment_ids_list.append(alignment_ids)

    return input_ids_tensor, attention_mask_tensor, alignment_ids_list


#### aligning
It is very important to align tokens with labels after tokenising with BERT, since BERT performs tokenisation at the subword level.


- defining the function to perform the alignment

```
def align_labels(alignment_ids_list, character_labels, label2id):
    label_ids_list = []

    for alignment_ids, label_seq in zip(alignment_ids_list, character_labels):
        """
        """
        label_ids = []
        previous_word_idx = None

        for word_idx in alignment_ids:
            if word_idx is None:                  # if this token is CLS/SEP/PAD
                label_ids.append(-100)            # assigning the label ID -100

            elif word_idx != previous_word_idx:   # if this token is not the same as the previous one
                label_str = label_seq[word_idx]   # obtaining the corresponding label
                label_id = label2id[label_str]    # obtaining the corresponding ID
                label_ids.append(label_id)        # saving the ID

            else:                                 # if it is a continuation subword of the previous one
                label_ids.append(-100)            # assigning the label ID -100

            previous_word_idx = word_idx          # saving the current ID for the next loop

        label_ids_list.append(label_ids)          # saving the label of the same sentence into one list

    label_tensor = torch.tensor(label_ids_list)   # converting the label into `torch.tensor`
    return label_tensor

```

In [None]:
def align_labels(alignment_ids_list, character_labels, label2id):
    label_ids_list = []

    for alignment_ids, label_seq in zip(alignment_ids_list, character_labels):
        """
        """
        label_ids = []
        previous_word_idx = None

        for word_idx in alignment_ids:
            if word_idx is None:                  # if this token is CLS/SEP/PAD
                label_ids.append(-100)            # assigning the label ID -100

            elif word_idx != previous_word_idx:   # if this token is not the same as the previous one
                label_str = label_seq[word_idx]   # obtaining the corresponding label
                label_id = label2id[label_str]    # obtaining the corresponding ID
                label_ids.append(label_id)        # saving the ID

            else:                                 # if it is a continuation subword of the previous one
                label_ids.append(-100)            # assigning the label ID -100

            previous_word_idx = word_idx          # saving the current ID for the next loop

        label_ids_list.append(label_ids)          # saving the label of the same sentence into one list

    label_tensor = torch.tensor(label_ids_list)   # converting the label into `torch.tensor`
    return label_tensor


### obtaining the IDs of labels
We will later encode labels as tensors, and at first, we need to creat the mapping list of IDs and labels.


- defining a function to create mapping lists

```
def label_and_id(label_seqs):
    all_labels = set()
    for seq in label_seqs:
        for label in seq:
            all_labels.add(label)

    # sorting
    label_list = sorted(all_labels)

    # building the mapping lists
    label2id = {}
    for i, label in enumerate(label_list):
        label2id[label] = i
    id2label = {}
    for label, i in label2id.items():
        id2label[i] = label

    return label2id, id2label
```

In [None]:
def label_and_id(label_seqs):
    all_labels = set()
    for seq in label_seqs:
        for label in seq:
            all_labels.add(label)

    # sorting
    label_list = sorted(all_labels)

    # building the mapping lists
    label2id = {}
    for i, label in enumerate(label_list):
        label2id[label] = i
    id2label = {}
    for label, i in label2id.items():
        id2label[i] = label

    return label2id, id2label

- creating the mapping lists

```
label2id, id2label=label_and_id(character_labels)
print(label2id)
print(id2label)
```

In [None]:
label2id, id2label=label_and_id(character_labels)
print(label2id)
print(id2label)

### data splitting
Before encoding the texts, we split the data into the trainging set (70\%), the dev set(10\%), and the test set (20\%).

```
# Step 1: 70% for the training set and 30% for the remaining data
train_chars, temp_chars, train_labels, temp_labels = train_test_split(
    characters, character_labels, test_size=0.2, random_state=42
)

# Step 2: in the remaining data, 10% for the dev set and 20% for the test set
dev_chars, test_chars, dev_labels, test_labels = train_test_split(
    temp_chars, temp_labels, test_size=2/3, random_state=42
)
```

In [None]:
# Step 1: 70% for the training set and 30% for the remaining data
train_chars, temp_chars, train_labels, temp_labels = train_test_split(
    characters, character_labels, test_size=0.2, random_state=42
)

# Step 2: in the remaining data, 10% for the dev set and 20% for the test set
dev_chars, test_chars, dev_labels, test_labels = train_test_split(
    temp_chars, temp_labels, test_size=2/3, random_state=42
)

## Model defining

- building the model of BERT-CRF

Although CRF models are theoretically designed to reduce the label bias problem by globally normalizing sequence scores, this capability assumes sufficient data and complete training. The `PyTorch` CRF implementation is lightweight and does not inherently enforce transition constraints to guarantee valid label transitions, such as those required by BIO tagging schemes.

Therefore, in practical structured prediction tasks like NER, it is necessary to manually define allowed transitions to ensure label consistency. In our model, we additionally define `_fix_bio_labels` to fix the label to avoid the problem.

```
class BertCRFTagger(nn.Module):
    def __init__(self, bert_model_path: str, num_labels: int, label2id: dict):
        super(BertCRFTagger, self).__init__()
        self.bert = BertModel.from_pretrained(bert_model_path)
        self.hidden_size = self.bert.config.hidden_size
        self.num_labels = num_labels

        self.classifier = nn.Linear(self.hidden_size, self.num_labels)
        self.crf = CRF(self.num_labels, batch_first=True)

        # label mapping
        self.label2id = label2id
        self.id2label = {v: k for k, v in label2id.items()}

    def forward(self, input_ids, attention_mask, labels=None):
        outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        sequence_output = outputs.last_hidden_state
        emissions = self.classifier(sequence_output)

        if labels is not None:
            mask = labels != -100
            for i in range(mask.size(0)):
                if not mask[i, 0]:
                    for j in range(1, mask.size(1)):
                        if mask[i, j]:
                            labels[i, 0] = labels[i, j]
                            mask[i, 0] = True
                            break
            labels = labels.clone()
            labels[~mask] = 0
            loss = -self.crf(emissions, labels, mask=mask, reduction='mean')
            return loss
        else:
            prediction = self.crf.decode(emissions, mask=attention_mask.bool())
            # fixing label bias
            fixed_prediction = [self._fix_bio_labels(seq) for seq in prediction]
            return fixed_prediction

    def _fix_bio_labels(self, label_seq):
        """
        fixing the case of incorresponding labels, such as ['B-BODY', 'I-CHEM']
        """
        fixed = []
        prev_type = None

        for label_id in label_seq:
            label = self.id2label[label_id]
            if label == 'O':
                fixed.append(label_id)
                prev_type = None
            elif label.startswith("B-"):
                fixed.append(label_id)
                prev_type = label[2:]
            elif label.startswith("I-"):
                current_type = label[2:]
                if prev_type == current_type:
                    fixed.append(label_id)
                else:
                    # starting with the B label
                    new_label = "B-" + current_type
                    fixed.append(self.label2id.get(new_label, label_id))
                    prev_type = current_type
            else:
                fixed.append(label_id)
                prev_type = None

        return fixed
```

In [None]:
class BertCRFTagger(nn.Module):
    def __init__(self, bert_model_path: str, num_labels: int, label2id: dict):
        super(BertCRFTagger, self).__init__()
        self.bert = BertModel.from_pretrained(bert_model_path)
        self.hidden_size = self.bert.config.hidden_size
        self.num_labels = num_labels

        self.classifier = nn.Linear(self.hidden_size, self.num_labels)
        self.crf = CRF(self.num_labels, batch_first=True)

        # label mapping
        self.label2id = label2id
        self.id2label = {v: k for k, v in label2id.items()}

    def forward(self, input_ids, attention_mask, labels=None):
        outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        sequence_output = outputs.last_hidden_state
        emissions = self.classifier(sequence_output)

        if labels is not None:
            mask = labels != -100
            for i in range(mask.size(0)):
                if not mask[i, 0]:
                    for j in range(1, mask.size(1)):
                        if mask[i, j]:
                            labels[i, 0] = labels[i, j]
                            mask[i, 0] = True
                            break
            labels = labels.clone()
            labels[~mask] = 0
            loss = -self.crf(emissions, labels, mask=mask, reduction='mean')
            return loss
        else:
            prediction = self.crf.decode(emissions, mask=attention_mask.bool())
            # fixing label bias
            fixed_prediction = [self._fix_bio_labels(seq) for seq in prediction]
            return fixed_prediction

    def _fix_bio_labels(self, label_seq):
        """
        fixing the case of incorresponding labels, such as ['B-BODY', 'I-CHEM']
        """
        fixed = []
        prev_type = None

        for label_id in label_seq:
            label = self.id2label[label_id]
            if label == 'O':
                fixed.append(label_id)
                prev_type = None
            elif label.startswith("B-"):
                fixed.append(label_id)
                prev_type = label[2:]
            elif label.startswith("I-"):
                current_type = label[2:]
                if prev_type == current_type:
                    fixed.append(label_id)
                else:
                    # starting with the B label
                    new_label = "B-" + current_type
                    fixed.append(self.label2id.get(new_label, label_id))
                    prev_type = current_type
            else:
                fixed.append(label_id)
                prev_type = None

        return fixed


## Raw BERT + CRF

In the following section, we utilise the pre-trained BERT which is **NOT** fine-tuned with some data in advance. It's noted that we need to use the corresponding tokeniser of the BERT.

### text encoding
It is important to conduct **tokenising** and **aligning** for text encoding before we work on BERT.

#### tokenising

`tokenizer` in HuggingFace Transformers is a built-in utility that handles not only tokenisation, but also:
- Automatic padding: With `padding='max_length'`, it pads all sequences to the maximum length of the dataset.

- Tensor output: With `return_tensors="pt"`, it directly returns PyTorch tensors (no need to convert manually).

- encoding the texts

**💡 Grouping shared arguments into a dictionary improves code neatness and readability.**

```
# grouping the shared arguments into a dictionary (packing)
common_args = {
    "label2id": label2id,
    "tokenizer_path": "bert-base-chinese",
    "local_files_only": False
}

# passing the shared arguments using dictionary unpacking (**common_args)
train_input_ids_tensors, train_attention_mask_tensors, train_alignment_ids_list = character_encode(train_chars, train_labels, label2id, **common_args)
dev_input_ids_tensors, dev_attention_mask_tensors, dev_alignment_ids_list = character_encode(dev_chars, dev_labels, label2id, **common_args)
test_input_ids_tensors, test_attention_mask_tensors, test_alignment_ids_list = character_encode(test_chars, test_labels, label2id, **common_args)
```

In [None]:
# grouping the shared arguments into a dictionary (packing)
common_args = {
    "label2id": label2id,
    "tokenizer_path": "bert-base-chinese",
    "local_files_only": False
}

# passing the shared arguments using dictionary unpacking (**common_args)
train_input_ids_tensors, train_attention_mask_tensors, train_alignment_ids_list = character_encode(train_chars, train_labels, label2id, **common_args)
dev_input_ids_tensors, dev_attention_mask_tensors, dev_alignment_ids_list = character_encode(dev_chars, dev_labels, label2id, **common_args)
test_input_ids_tensors, test_attention_mask_tensors, test_alignment_ids_list = character_encode(test_chars, test_labels, label2id, **common_args)

- performing the alignment

```
train_label_tensors=align_labels(train_alignment_ids_list, train_labels, label2id)
dev_label_tensors=align_labels(dev_alignment_ids_list, dev_labels, label2id)
test_label_tensors=align_labels(test_alignment_ids_list, test_labels, label2id)
```

In [None]:
train_label_tensors=align_labels(train_alignment_ids_list, train_labels, label2id)
dev_label_tensors=align_labels(dev_alignment_ids_list, dev_labels, label2id)
test_label_tensors=align_labels(test_alignment_ids_list, test_labels, label2id)

### `TensorDataset`

We combine the processed input tensors and label tensors into `TensorDataset` objects, one for each data split:

- **training set** → `train_dataset`
- **dev set** → `dev_dataset`
- **test set** → `test_dataset`

```
train_dataset = TensorDataset(train_input_ids_tensors, train_attention_mask_tensors, train_label_tensors)
dev_dataset = TensorDataset(dev_input_ids_tensors, dev_attention_mask_tensors, dev_label_tensors)
test_dataset = TensorDataset(test_input_ids_tensors, test_attention_mask_tensors, test_label_tensors)
```

In [None]:
train_dataset = TensorDataset(train_input_ids_tensors, train_attention_mask_tensors, train_label_tensors)
dev_dataset = TensorDataset(dev_input_ids_tensors, dev_attention_mask_tensors, dev_label_tensors)
test_dataset = TensorDataset(test_input_ids_tensors, test_attention_mask_tensors, test_label_tensors)

### `DataLoader`

After creating `TensorDataset` objects for each data split, we wrap them with PyTorch `DataLoader` for efficient mini-batch loading during training and evaluation.

We specify the `batch_size` and whether to shuffle the data (shuffling is used only for training).

```
batch_size = 32
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
dev_loader   = DataLoader(dev_dataset, batch_size=batch_size, shuffle=False)
test_loader  = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
```

In [None]:
batch_size = 32
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
dev_loader   = DataLoader(dev_dataset, batch_size=batch_size, shuffle=False)
test_loader  = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)


### model training

- initialising the model

```
max_label_id = max(label2id.values())
num_labels = max_label_id + 1
raw_bert_model = BertCRFTagger(
    bert_model_path="bert-base-chinese",  # specifying the model: the model which is not fine-tuned in advance
    num_labels=num_labels,
    label2id=label2id
)
torch.manual_seed(24)
```

In [None]:
max_label_id = max(label2id.values())
num_labels = max_label_id + 1
raw_bert_model = BertCRFTagger(
    bert_model_path="bert-base-chinese",  # specifying the model: the model which is not fine-tuned in advance
    num_labels=num_labels,
    label2id=label2id
)
torch.manual_seed(24)

- training

```
train_losses = []
dev_losses = []
num_epochs = 1

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
optimizer = AdamW(raw_bert_model.parameters(), lr=5e-5)
raw_bert_model.to(device)

for epoch in range(num_epochs):
    print(f"\n===== Epoch {epoch+1}/{num_epochs} =====")

    # === Training ===
    raw_bert_model.train()
    train_loss = 0.0

    for batch in tqdm(train_loader, desc=f"[Epoch {epoch+1}] Training", leave=False):
        input_ids = batch[0].to(device)
        attention_mask = batch[1].to(device)
        labels = batch[2].to(device)

        optimizer.zero_grad()
        loss = raw_bert_model(input_ids=input_ids, attention_mask=attention_mask, labels=labels)
        loss.backward()
        optimizer.step()

        train_loss += loss.item() * input_ids.size(0)

    avg_train_loss = train_loss / len(train_loader.dataset)
    train_losses.append(avg_train_loss)

    # === Validation ===
    raw_bert_model.eval()
    dev_loss = 0.0

    with torch.no_grad():
        for batch in tqdm(dev_loader, desc=f"[Epoch {epoch+1}] Validation", leave=False):
            input_ids = batch[0].to(device)
            attention_mask = batch[1].to(device)
            labels = batch[2].to(device)

            loss = raw_bert_model(input_ids=input_ids, attention_mask=attention_mask, labels=labels)
            dev_loss += loss.item() * input_ids.size(0)

    avg_dev_loss = dev_loss / len(dev_loader.dataset)
    dev_losses.append(avg_dev_loss)

    # === Summary ===
    print(f"Epoch {epoch+1} - Train Loss: {avg_train_loss:.4f} - Dev Loss: {avg_dev_loss:.4f}")
```

In [None]:
# @title
train_losses = []
dev_losses = []
num_epochs = 1

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
optimizer = AdamW(raw_bert_model.parameters(), lr=5e-5)
raw_bert_model.to(device)

for epoch in range(num_epochs):
    print(f"\n===== Epoch {epoch+1}/{num_epochs} =====")

    # === Training ===
    raw_bert_model.train()
    train_loss = 0.0

    for batch in tqdm(train_loader, desc=f"[Epoch {epoch+1}] Training", leave=False):
        input_ids = batch[0].to(device)
        attention_mask = batch[1].to(device)
        labels = batch[2].to(device)

        optimizer.zero_grad()
        loss = raw_bert_model(input_ids=input_ids, attention_mask=attention_mask, labels=labels)
        loss.backward()
        optimizer.step()

        train_loss += loss.item() * input_ids.size(0)

    avg_train_loss = train_loss / len(train_loader.dataset)
    train_losses.append(avg_train_loss)

    # === Validation ===
    raw_bert_model.eval()
    dev_loss = 0.0

    with torch.no_grad():
        for batch in tqdm(dev_loader, desc=f"[Epoch {epoch+1}] Validation", leave=False):
            input_ids = batch[0].to(device)
            attention_mask = batch[1].to(device)
            labels = batch[2].to(device)

            loss = raw_bert_model(input_ids=input_ids, attention_mask=attention_mask, labels=labels)
            dev_loss += loss.item() * input_ids.size(0)

    avg_dev_loss = dev_loss / len(dev_loader.dataset)
    dev_losses.append(avg_dev_loss)

    # === Summary ===
    print(f"Epoch {epoch+1} - Train Loss: {avg_train_loss:.4f} - Dev Loss: {avg_dev_loss:.4f}")


- plotting the loss curve
import matplotlib.pyplot as plt

```
plt.figure(figsize=(8, 5))
plt.plot(train_losses, label='Train Loss')
plt.plot(dev_losses, label='Dev Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Loss Curve')
plt.legend()
plt.grid(True)
plt.show()
```

In [None]:
plt.figure(figsize=(8, 5))
plt.plot(train_losses, label='Train Loss')
plt.plot(dev_losses, label='Dev Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Loss Curve')
plt.legend()
plt.grid(True)
plt.show()

### predicting

```
raw_bert_model.eval()

pred_labels = []
true_labels = []

with torch.no_grad():
    for batch in test_loader:
        input_ids = batch[0].to(device)
        attention_mask = batch[1].to(device)
        labels = batch[2].to(device)

        # CRF prediction: list of predicted tag ids
        predictions = raw_bert_model(input_ids=input_ids, attention_mask=attention_mask)

        for i in range(len(predictions)):
            pred_seq = predictions[i]
            gold_seq = labels[i]

            cleaned_preds = []
            cleaned_labels = []

            for j, label_id in enumerate(gold_seq):
                if label_id != -100:
                    cleaned_preds.append(pred_seq[j])
                    cleaned_labels.append(label_id.item())

            pred_labels.append(cleaned_preds)
            true_labels.append(cleaned_labels)
```

In [None]:
raw_bert_model.eval()

pred_labels = []
true_labels = []

with torch.no_grad():
    for batch in test_loader:
        input_ids = batch[0].to(device)
        attention_mask = batch[1].to(device)
        labels = batch[2].to(device)

        # CRF prediction: list of predicted tag ids
        predictions = raw_bert_model(input_ids=input_ids, attention_mask=attention_mask)

        for i in range(len(predictions)):
            pred_seq = predictions[i]
            gold_seq = labels[i]

            cleaned_preds = []
            cleaned_labels = []

            for j, label_id in enumerate(gold_seq):
                if label_id != -100:
                    cleaned_preds.append(pred_seq[j])
                    cleaned_labels.append(label_id.item())

            pred_labels.append(cleaned_preds)
            true_labels.append(cleaned_labels)


### evaluating

```
# flattening
flattened_preds = [p for seq in pred_labels for p in seq]
flattened_trues = [t for seq in true_labels for t in seq]

# filtering out the O label
interesting_labels = [k for k in label2id if k != "O"]
interesting_ids = [label2id[k] for k in interesting_labels]

# evaluating
evaluation = classification_report(
    flattened_trues,
    flattened_preds,
    labels=interesting_ids,
    target_names=interesting_labels,
    digits=4
)
print(evaluation)

```

In [None]:

# flattening
flattened_preds = [p for seq in pred_labels for p in seq]
flattened_trues = [t for seq in true_labels for t in seq]

# filtering out the O label
interesting_labels = [k for k in label2id if k != "O"]
interesting_ids = [label2id[k] for k in interesting_labels]

# evaluating
evaluation = classification_report(
    flattened_trues,
    flattened_preds,
    labels=interesting_ids,
    target_names=interesting_labels,
    digits=4
)
print(evaluation)


- inspecting the input, true labels, and the predicted labels

In [None]:
# converting id lists into label lists
true_labels_str = [[id2label[idx] for idx in seq] for seq in true_labels]
pred_labels_str = [[id2label[idx] for idx in seq] for seq in pred_labels]


for i in range(5):
    print("Sentence", test_chars[i])
    print("True: ", true_labels_str[i])
    print("Pred: ", pred_labels_str[i])

## Fine-tuned BERT + CRF

In the following section, we utilise the pre-trained BERT which is fine-tuned with some data in advance. It's noted that we need to use the corresponding tokeniser of the BERT.

### text encoding
It is important to conduct **tokenising** and **aligning** for text encoding before we work on BERT.

#### tokenising

`tokenizer` in HuggingFace Transformers is a built-in utility that handles not only tokenisation, but also:
- Automatic padding: With `padding='max_length'`, it pads all sequences to the maximum length of the dataset.

- Tensor output: With `return_tensors="pt"`, it directly returns PyTorch tensors (no need to convert manually).

- mounting to Google Drive to access the fine-tuned model and tokeniser

```
from google.colab import drive
drive.mount('/content/drive')
```

In [None]:
from google.colab import drive
drive.mount('/content/drive')

- encoding the texts

**💡 Grouping shared arguments into a dictionary improves code neatness and readability.**

```
# grouping the shared arguments into a dictionary (packing)
common_args = {
    "label2id": label2id,
    "tokenizer_path": "/content/drive/MyDrive/YOUR_PATH",
    "local_files_only": True
}

# passing the shared arguments using dictionary unpacking (**common_args)
train_input_ids_tensors, train_attention_mask_tensors, train_alignment_ids_list = character_encode(train_chars, train_labels, **common_args)

dev_input_ids_tensors, dev_attention_mask_tensors, dev_alignment_ids_list = character_encode(dev_chars, dev_labels, **common_args)

test_input_ids_tensors, test_attention_mask_tensors, test_alignment_ids_list = character_encode(test_chars, test_labels, **common_args)

```

In [None]:
# grouping the shared arguments into a dictionary (packing)
common_args = {
    "label2id": label2id,
    "tokenizer_path": "/content/drive/MyDrive/YOUR_PATH",
    "local_files_only": True
}

# passing the shared arguments using dictionary unpacking (**common_args)
train_input_ids_tensors, train_attention_mask_tensors, train_alignment_ids_list = character_encode(train_chars, train_labels, **common_args)

dev_input_ids_tensors, dev_attention_mask_tensors, dev_alignment_ids_list = character_encode(dev_chars, dev_labels, **common_args)

test_input_ids_tensors, test_attention_mask_tensors, test_alignment_ids_list = character_encode(test_chars, test_labels, **common_args)


- performing the alignment

```
train_label_tensors=align_labels(train_alignment_ids_list, train_labels, label2id)
dev_label_tensors=align_labels(dev_alignment_ids_list, dev_labels, label2id)
test_label_tensors=align_labels(test_alignment_ids_list, test_labels, label2id)
```

In [None]:
train_label_tensors=align_labels(train_alignment_ids_list, train_labels, label2id)
dev_label_tensors=align_labels(dev_alignment_ids_list, dev_labels, label2id)
test_label_tensors=align_labels(test_alignment_ids_list, test_labels, label2id)

### `TensorDataset`

We combine the processed input tensors and label tensors into `TensorDataset` objects, one for each data split:

- **training set** → `train_dataset`
- **dev set** → `dev_dataset`
- **test set** → `test_dataset`

```
train_dataset = TensorDataset(train_input_ids_tensors, train_attention_mask_tensors, train_label_tensors)
dev_dataset = TensorDataset(dev_input_ids_tensors, dev_attention_mask_tensors, dev_label_tensors)
test_dataset = TensorDataset(test_input_ids_tensors, test_attention_mask_tensors, test_label_tensors)
```

In [None]:
train_dataset = TensorDataset(train_input_ids_tensors, train_attention_mask_tensors, train_label_tensors)
dev_dataset = TensorDataset(dev_input_ids_tensors, dev_attention_mask_tensors, dev_label_tensors)
test_dataset = TensorDataset(test_input_ids_tensors, test_attention_mask_tensors, test_label_tensors)

### `DataLoader`

After creating `TensorDataset` objects for each data split, we wrap them with PyTorch `DataLoader` for efficient mini-batch loading during training and evaluation.

We specify the `batch_size` and whether to shuffle the data (shuffling is used only for training).

```
batch_size = 32
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
dev_loader   = DataLoader(dev_dataset, batch_size=batch_size, shuffle=False)
test_loader  = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
```

In [None]:
batch_size = 32
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
dev_loader   = DataLoader(dev_dataset, batch_size=batch_size, shuffle=False)
test_loader  = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)


### model training

- initialising the model

```
max_label_id = max(label2id.values())
num_labels = max_label_id + 1
fine_tuned_bert_model = BertCRFTagger(
    bert_model_path="/content/drive/MyDrive/NCCU/TA/1132_computational_linguistics/my_finetuned_bert/model",  # specifying the model: the model which is not fine-tuned in advance
    num_labels=num_labels,
    label2id=label2id
)
torch.manual_seed(24)
```

In [None]:
max_label_id = max(label2id.values())
num_labels = max_label_id + 1
fine_tuned_bert_model = BertCRFTagger(
    bert_model_path="/content/drive/MyDrive/NCCU/TA/1132_computational_linguistics/my_finetuned_bert/model",  # specifying the model: the model which is not fine-tuned in advance
    num_labels=num_labels,
    label2id=label2id
)
torch.manual_seed(24)

- training

```
train_losses = []
dev_losses = []
num_epochs = 1

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
optimizer = AdamW(fine_tuned_bert_model.parameters(), lr=5e-5)
fine_tuned_bert_model.to(device)

for epoch in range(num_epochs):
    print(f"\n===== Epoch {epoch+1}/{num_epochs} =====")

    # === Training ===
    fine_tuned_bert_model.train()
    train_loss = 0.0

    for batch in tqdm(train_loader, desc=f"[Epoch {epoch+1}] Training", leave=False):
        input_ids = batch[0].to(device)
        attention_mask = batch[1].to(device)
        labels = batch[2].to(device)

        optimizer.zero_grad()
        loss = fine_tuned_bert_model(input_ids=input_ids, attention_mask=attention_mask, labels=labels)
        loss.backward()
        optimizer.step()

        train_loss += loss.item() * input_ids.size(0)

    avg_train_loss = train_loss / len(train_loader.dataset)
    train_losses.append(avg_train_loss)

    # === Validation ===
    fine_tuned_bert_model.eval()
    dev_loss = 0.0

    with torch.no_grad():
        for batch in tqdm(dev_loader, desc=f"[Epoch {epoch+1}] Validation", leave=False):
            input_ids = batch[0].to(device)
            attention_mask = batch[1].to(device)
            labels = batch[2].to(device)

            loss = fine_tuned_bert_model(input_ids=input_ids, attention_mask=attention_mask, labels=labels)
            dev_loss += loss.item() * input_ids.size(0)

    avg_dev_loss = dev_loss / len(dev_loader.dataset)
    dev_losses.append(avg_dev_loss)

    # === Summary ===
    print(f"Epoch {epoch+1} - Train Loss: {avg_train_loss:.4f} - Dev Loss: {avg_dev_loss:.4f}")
```

In [None]:
train_losses = []
dev_losses = []
num_epochs = 1

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
optimizer = AdamW(fine_tuned_bert_model.parameters(), lr=5e-5)
fine_tuned_bert_model.to(device)

for epoch in range(num_epochs):
    print(f"\n===== Epoch {epoch+1}/{num_epochs} =====")

    # === Training ===
    fine_tuned_bert_model.train()
    train_loss = 0.0

    for batch in tqdm(train_loader, desc=f"[Epoch {epoch+1}] Training", leave=False):
        input_ids = batch[0].to(device)
        attention_mask = batch[1].to(device)
        labels = batch[2].to(device)

        optimizer.zero_grad()
        loss = fine_tuned_bert_model(input_ids=input_ids, attention_mask=attention_mask, labels=labels)
        loss.backward()
        optimizer.step()

        train_loss += loss.item() * input_ids.size(0)

    avg_train_loss = train_loss / len(train_loader.dataset)
    train_losses.append(avg_train_loss)

    # === Validation ===
    fine_tuned_bert_model.eval()
    dev_loss = 0.0

    with torch.no_grad():
        for batch in tqdm(dev_loader, desc=f"[Epoch {epoch+1}] Validation", leave=False):
            input_ids = batch[0].to(device)
            attention_mask = batch[1].to(device)
            labels = batch[2].to(device)

            loss = fine_tuned_bert_model(input_ids=input_ids, attention_mask=attention_mask, labels=labels)
            dev_loss += loss.item() * input_ids.size(0)

    avg_dev_loss = dev_loss / len(dev_loader.dataset)
    dev_losses.append(avg_dev_loss)

    # === Summary ===
    print(f"Epoch {epoch+1} - Train Loss: {avg_train_loss:.4f} - Dev Loss: {avg_dev_loss:.4f}")


- plotting the loss curve
import matplotlib.pyplot as plt

```
plt.figure(figsize=(8, 5))
plt.plot(train_losses, label='Train Loss')
plt.plot(dev_losses, label='Dev Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Loss Curve')
plt.legend()
plt.grid(True)
plt.show()

```

In [None]:
plt.figure(figsize=(8, 5))
plt.plot(train_losses, label='Train Loss')
plt.plot(dev_losses, label='Dev Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Loss Curve')
plt.legend()
plt.grid(True)
plt.show()


### predicting

```
fine_tuned_bert_model.eval()

pred_labels = []
true_labels = []

with torch.no_grad():
    for batch in test_loader:
        input_ids = batch[0].to(device)
        attention_mask = batch[1].to(device)
        labels = batch[2].to(device)

        # CRF prediction: list of predicted tag ids
        predictions = fine_tuned_bert_model(input_ids=input_ids, attention_mask=attention_mask)

        for i in range(len(predictions)):
            pred_seq = predictions[i]
            gold_seq = labels[i]

            cleaned_preds = []
            cleaned_labels = []

            for j, label_id in enumerate(gold_seq):
                if label_id != -100:
                    cleaned_preds.append(pred_seq[j])
                    cleaned_labels.append(label_id.item())

            pred_labels.append(cleaned_preds)
            true_labels.append(cleaned_labels)
```

In [None]:
fine_tuned_bert_model.eval()

pred_labels = []
true_labels = []

with torch.no_grad():
    for batch in test_loader:
        input_ids = batch[0].to(device)
        attention_mask = batch[1].to(device)
        labels = batch[2].to(device)

        # CRF prediction: list of predicted tag ids
        predictions = fine_tuned_bert_model(input_ids=input_ids, attention_mask=attention_mask)

        for i in range(len(predictions)):
            pred_seq = predictions[i]
            gold_seq = labels[i]

            cleaned_preds = []
            cleaned_labels = []

            for j, label_id in enumerate(gold_seq):
                if label_id != -100:
                    cleaned_preds.append(pred_seq[j])
                    cleaned_labels.append(label_id.item())

            pred_labels.append(cleaned_preds)
            true_labels.append(cleaned_labels)


### evaluating

```
# flattening
flattened_preds = [p for seq in pred_labels for p in seq]
flattened_trues = [t for seq in true_labels for t in seq]

# filtering out the O label
interesting_labels = [k for k in label2id if k != "O"]
interesting_ids = [label2id[k] for k in interesting_labels]

# evaluating
evaluation = classification_report(
    flattened_trues,
    flattened_preds,
    labels=interesting_ids,
    target_names=interesting_labels,
    digits=4
)
print(evaluation)

```

In [None]:

# flattening
flattened_preds = [p for seq in pred_labels for p in seq]
flattened_trues = [t for seq in true_labels for t in seq]

# filtering out the O label
interesting_labels = [k for k in label2id if k != "O"]
interesting_ids = [label2id[k] for k in interesting_labels]

# evaluating
evaluation = classification_report(
    flattened_trues,
    flattened_preds,
    labels=interesting_ids,
    target_names=interesting_labels,
    digits=4
)
print(evaluation)

- inspecting the input, true labels, and the predicted labels

```
# converting id lists into label lists
true_labels_str = [[id2label[idx] for idx in seq] for seq in true_labels]
pred_labels_str = [[id2label[idx] for idx in seq] for seq in pred_labels]


for i in range(5):
    print("Sentence", test_chars[i])
    print("True: ", true_labels_str[i])
    print("Pred: ", pred_labels_str[i])

```

In [None]:
# converting id lists into label lists
true_labels_str = [[id2label[idx] for idx in seq] for seq in true_labels]
pred_labels_str = [[id2label[idx] for idx in seq] for seq in pred_labels]


for i in range(5):
    print("Sentence", test_chars[i])
    print("True: ", true_labels_str[i])
    print("Pred: ", pred_labels_str[i])
