## CS310 Natural Language Processing
## Assignment 4. Long Short Term Memory (LSTM) Network for Named Entity Recognition (NER)
## Beam Search

### 0. Import Necessary Libraries

In [1]:
from pprint import pprint
import torch.nn as nn
import torch.nn.functional as F
import torch
import numpy as np
from utils import Indexer, read_ner_data_from_connl, load_embedding_dict

In [2]:
train_words, train_tags = read_ner_data_from_connl('data/train.txt')
dev_words, dev_tags = read_ner_data_from_connl('data/dev.txt')
test_words, test_tags = read_ner_data_from_connl('data/test.txt')

train_words = [word.lower() for word in train_words]
dev_words = [word.lower() for word in dev_words]
test_words = [word.lower() for word in test_words]

In [4]:
word_indexer = Indexer(train_words)
tag_indexer = Indexer(train_tags)

In [5]:
class NERDataset:
    def __init__(self, words, tags, word_indexer, tag_indexer):
        self.words = words
        self.tags = tags
        self.word_indexer = word_indexer
        self.tag_indexer = tag_indexer
    
    def __len__(self):
        return len(self.words)
    
    def __getitem__(self, idx):
        word_idx = self.word_indexer.element_to_index(self.words[idx])
        tag_idx = self.tag_indexer.element_to_index(self.tags[idx])
        return word_idx, tag_idx

In [6]:
train_dataset = NERDataset(train_words, train_tags, word_indexer, tag_indexer)
dev_dataset = NERDataset(dev_words, dev_tags, word_indexer, tag_indexer)
test_dataset = NERDataset(test_words, test_tags, word_indexer, tag_indexer)

In [None]:
def get_batch(dataset, batch_size):
    words = dataset.words
    tags = dataset.tags
    for i in range(0, len(dataset), batch_size):
        batch_words = words[i:i + batch_size]
        batch_tags = tags[i:i + batch_size]
        word_indices = [dataset.word_indexer.element_to_index(w) for w in batch_words]
        tag_indices = [dataset.tag_indexer.element_to_index(t) for t in batch_tags]
        yield (torch.tensor(word_indices, dtype=torch.long),
               torch.tensor(tag_indices, dtype=torch.long))

In [8]:
batch_size = 128
train_loader = list(get_batch(train_dataset, batch_size))
dev_loader = list(get_batch(dev_dataset, batch_size))
test_loader = list(get_batch(test_dataset, batch_size))

In [9]:
glove_path = "data/glove.6B.100d.txt"
embeddings_index = load_embedding_dict(glove_path)
embedding_dim = 100
embedding_matrix = np.zeros((len(word_indexer), embedding_dim))
for word, idx in word_indexer.get_element_to_index_dict().items():
    embedding_vector = embeddings_index.get(word)
    if embedding_vector is not None:
        embedding_matrix[idx] = embedding_vector

100%|██████████| 400000/400000 [00:02<00:00, 140465.51it/s]


### 1. Build the Model

In [None]:
class BiLSTMNER(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, tagset_size, embedding_matrix):
        super(BiLSTMNER, self).__init__()
        self.embedding = nn.Embedding.from_pretrained(
            torch.FloatTensor(embedding_matrix), freeze=False)  

        self.lstm = nn.LSTM(embedding_dim, hidden_dim, num_layers=2, 
                           bidirectional=True, batch_first=True)

        self.fc = nn.Linear(hidden_dim * 2, tagset_size)  
    
    def forward(self, x):
        # x: (batch_size, seq_len)，输入是单词索引
        emb = self.embedding(x)  # (batch_size, seq_len, embedding_dim)
        lstm_out, _ = self.lstm(emb)  # (batch_size, seq_len, hidden_dim * 2)
        tag_scores = self.fc(lstm_out)  # (batch_size, seq_len, tagset_size)
        return tag_scores
    

vocab_size = len(word_indexer)
tagset_size = len(tag_indexer)
hidden_dim = 256  
model = BiLSTMNER(vocab_size, embedding_dim, hidden_dim, tagset_size, embedding_matrix)

### 2. Train and Evaluate

In [None]:
from utils import get_tag_indices_from_scores
from metrics import MetricsHandler
import torch.optim as optim

labels_str = ['O', 'B-PER', 'I-PER', 'B-ORG', 'I-ORG', 'B-LOC', 'I-LOC']
labels_int = list(range(len(labels_str)))
train_metrics = MetricsHandler(labels_int)
dev_metrics = MetricsHandler(labels_int)
test_metrics = MetricsHandler(labels_int)


loss_func = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)


def train_model(model, train_loader, dev_loader, optimizer, loss_func, train_metrics, dev_metrics, num_epochs=5, device=None, train_dataset=None, dev_dataset=None, **kwargs):
    model.train()
    losses = []
    dev_f1_scores = []  
    
    for epoch in range(num_epochs):
        running_loss = 0.0
        train_metrics = MetricsHandler(labels_int)  
        
        for batch in train_loader:
            inputs, targets = batch
            inputs = inputs.to(device)
            targets = targets.to(device)

            optimizer.zero_grad()
            output = model(inputs)  # (batch_size, seq_len, tagset_size)

            output = output.view(-1, tagset_size)
            targets_flat = targets.view(-1)

            loss = loss_func(output, targets_flat)
            loss.backward()
            optimizer.step()

            predictions = get_tag_indices_from_scores(output.detach().cpu().numpy())
            train_metrics.update(predictions, targets_flat.cpu().numpy())
            running_loss += loss.item() * inputs.size(0)
        
        train_metrics.collect()
        epoch_loss = running_loss / len(train_dataset)
        losses.append(epoch_loss)
        train_f1 = train_metrics.get_metrics()["F1-score"][-1]
        
        dev_loss, dev_metrics = evaluate_model(model, dev_loader, loss_func, dev_metrics, device=device, dataset=dev_dataset)
        dev_metrics.collect()
        dev_f1 = dev_metrics.get_metrics()["F1-score"][-1]
        dev_f1_scores.append(dev_f1)  # 记录开发集F-1分数
        
        print(f"Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss:.4f}, Train F1: {train_f1:.4f}, Dev F1: {dev_f1:.4f}")

    return model, train_metrics, dev_metrics, losses, dev_f1_scores


In [None]:
import torch.nn.functional as F

def beam_search_decode(tag_scores, beam_width, tagset_size):
    """
    参数:
        tag_scores: 模型输出的标签得分 (seq_len, tagset_size)
        beam_width: beam宽度
        tagset_size: 标签集大小
    返回:
        best_sequence: 得分最高的标签序列（列表）
    """
    seq_len = tag_scores.size(0)
    tag_scores = F.log_softmax(tag_scores, dim=-1) 

    beams = [(0.0, [])]
    for t in range(seq_len):
        all_candidates = []
        for score, seq in beams:
            curr_scores = tag_scores[t]  # (tagset_size,)
            for tag in range(tagset_size):
                new_score = score + curr_scores[tag].item()
                new_seq = seq + [tag]
                all_candidates.append((new_score, new_seq))
        
        all_candidates = sorted(all_candidates, key=lambda x: x[0], reverse=True)
        beams = all_candidates[:beam_width]
    
    best_score, best_sequence = beams[0]
    return best_sequence  

In [None]:
def evaluate_model(model, data_loader, loss_func, eval_metrics, device=None, dataset=None, beam_width=None):
    model.eval()
    eval_metrics = MetricsHandler(labels_int)
    total_loss = 0.0
    all_preds = []
    all_targets = []

    with torch.no_grad():
        for batch in data_loader:
            inputs, targets = batch
            inputs = inputs.to(device)
            targets = targets.to(device)

            if len(inputs.size()) == 1:  # 如果是1维，添加batch维度
                inputs = inputs.unsqueeze(0)  # (seq_len,) -> (1, seq_len)
            batch_size, seq_len = inputs.size()

            output = model(inputs)  # (batch_size, seq_len, tagset_size)

            output_flat = output.view(-1, tagset_size)
            targets_flat = targets.view(-1)
            loss = loss_func(output_flat, targets_flat)
            total_loss += loss.item() * inputs.size(0)

            if beam_width is not None:
                # 使用beam search
                for i in range(batch_size):
                    pred_seq = beam_search_decode(output[i], beam_width, tagset_size)  # 返回列表
                    target_seq = targets[i].cpu().tolist() if len(targets.size()) > 1 else targets.cpu().tolist()
                    all_preds.append(pred_seq)
                    all_targets.append(target_seq)
            else:
                # 使用greedy search
                predictions = get_tag_indices_from_scores(output_flat.cpu().numpy())
                eval_metrics.update(predictions, targets_flat.cpu().numpy())

    if beam_width is not None:
        eval_metrics = MetricsHandler(labels_int)
        for preds, targets in zip(all_preds, all_targets):
            if not isinstance(preds, list) or not isinstance(targets, list):
                raise TypeError(f"preds or targets is not a list: preds={preds}, targets={targets}")
            if len(preds) != len(targets):
                raise ValueError(f"Prediction length {len(preds)} does not match target length {len(targets)}")
            eval_metrics.update(preds, targets)
    
    eval_loss = total_loss / len(dataset) if dataset is not None else total_loss
    return eval_loss, eval_metrics

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = BiLSTMNER(vocab_size, embedding_dim, hidden_dim, tagset_size, embedding_matrix)
model = model.to(device)

model_path = "bilstm_ner_model.pth"
model.load_state_dict(torch.load(model_path))
print(f"已从 {model_path} 加载训练好的模型参数")

# 测试集评估：greedy search（与第3步对比）
print("\n使用Greedy Search评估测试集（第3步结果）：")
test_loss_greedy, test_metrics_greedy = evaluate_model(model, test_loader, loss_func, test_metrics, 
                                                       device=device, dataset=test_dataset, beam_width=None)
test_metrics_greedy.collect()
test_f1_greedy = test_metrics_greedy.get_metrics()["F1-score"][-1]
print(f"Test F1 Score (Greedy Search): {test_f1_greedy:.4f}")

# 测试集评估：beam search
beam_width = 100  
print(f"\n使用Beam Search (beam_width={beam_width})评估测试集：")
test_loss_beam, test_metrics_beam = evaluate_model(model, test_loader, loss_func, test_metrics, 
                                                   device=device, dataset=test_dataset, beam_width=beam_width)
test_metrics_beam.collect()
test_f1_beam = test_metrics_beam.get_metrics()["F1-score"][-1]
print(f"Test F1 Score (Beam Search): {test_f1_beam:.4f}")

print(f"\n性能差异 (Beam - Greedy): {test_f1_beam - test_f1_greedy:.4f}")

已从 bilstm_ner_model.pth 加载训练好的模型参数

使用Greedy Search评估测试集（第3步结果）：
Test F1 Score (Greedy Search): 0.8245

使用Beam Search (beam_width=100)评估测试集：
Test F1 Score (Beam Search): 0.8245

性能差异 (Beam - Greedy): 0.0000


In [None]:
# 在测试集上评估
print("开始测试...")
test_loss, test_metrics = evaluate_model(
    model, test_loader, loss_func, test_metrics, 
    device=device, dataset=test_dataset
)
test_metrics.collect()
test_f1 = test_metrics.get_metrics()["F1-score"][-1]
print(f"Test F1 Score: {test_f1:.4f}")

### 3. Other Experiments