# RNN NER Task

In [45]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from datasets import load_dataset
from collections import Counter
import numpy as np

torch.manual_seed(42)
np.random.seed(42)

In [46]:
ds = load_dataset("Adapting/chinese_biomedical_NER_dataset")

In [22]:
def build_vocab(dataset, min_freq=2):
    char_counter = Counter()
    label_set = set()
    
    for example in dataset:
        tokens = example['sequences']
        labels = example['tags']
        
        for char in tokens:
            char_counter[char] += 1
        for label in labels:
            label_set.add(label)
    
    # 建立字符到索引的映射
    char2idx = {'<PAD>': 0, '<UNK>': 1}
    for char, freq in char_counter.items():
        if freq >= min_freq:
            char2idx[char] = len(char2idx)
    
    # 建立標籤到索引的映射
    label2idx = {label: idx for idx, label in enumerate(sorted(label_set))}
    idx2label = {idx: label for label, idx in label2idx.items()}
    
    return char2idx, label2idx, idx2label

In [23]:
ds

DatasetDict({
    train: Dataset({
        features: ['sequences', 'tags', 'tag_ids', 'id'],
        num_rows: 914
    })
    test: Dataset({
        features: ['sequences', 'tags', 'tag_ids', 'id'],
        num_rows: 41
    })
    dev: Dataset({
        features: ['sequences', 'tags', 'tag_ids', 'id'],
        num_rows: 41
    })
})

In [24]:
char2idx, label2idx, idx2label = build_vocab(ds['train'])

In [28]:
len(char2idx)

1518

In [29]:
class NERDataset(Dataset):
    def __init__(self, data, char2idx, label2idx):
        self.data = data
        self.char2idx = char2idx
        self.label2idx = label2idx
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        example = self.data[idx]
        tokens = example['sequences']
        labels = example['tags']
        
        # 將字符轉換為索引
        char_ids = [self.char2idx.get(char, self.char2idx['<UNK>']) for char in tokens]
        label_ids = [self.label2idx[label] for label in labels]
        
        return torch.tensor(char_ids), torch.tensor(label_ids)

In [30]:
def collate_fn(batch):
    char_ids, label_ids = zip(*batch)
    
    # 獲取批次中的最大長度
    max_len = max(len(seq) for seq in char_ids)
    
    # Padding
    padded_chars = []
    padded_labels = []
    masks = []
    
    for chars, labels in zip(char_ids, label_ids):
        pad_len = max_len - len(chars)
        padded_chars.append(torch.cat([chars, torch.zeros(pad_len, dtype=torch.long)]))
        padded_labels.append(torch.cat([labels, torch.zeros(pad_len, dtype=torch.long)]))
        masks.append(torch.cat([torch.ones(len(chars)), torch.zeros(pad_len)]))
    
    return (torch.stack(padded_chars), 
            torch.stack(padded_labels), 
            torch.stack(masks))



In [31]:
class LSTM_NER(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, num_labels, num_layers=2, dropout=0.3):
        super(LSTM_NER, self).__init__()
        
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=0)
        self.lstm = nn.LSTM(
            embedding_dim, 
            hidden_dim, 
            num_layers=num_layers,
            batch_first=True,
            dropout=dropout if num_layers > 1 else 0,
            bidirectional=True
        )
        self.dropout = nn.Dropout(dropout)
        self.fc = nn.Linear(hidden_dim * 2, num_labels)  # *2 因為是雙向LSTM
    
    def forward(self, x):
        # x: (batch_size, seq_len)
        embedded = self.embedding(x)  # (batch_size, seq_len, embedding_dim)
        lstm_out, _ = self.lstm(embedded)  # (batch_size, seq_len, hidden_dim*2)
        lstm_out = self.dropout(lstm_out)
        logits = self.fc(lstm_out)  # (batch_size, seq_len, num_labels)
        return logits

In [32]:
def train_epoch(model, dataloader, optimizer, criterion, device):
    model.train()
    total_loss = 0
    correct = 0
    total = 0
    
    for chars, labels, masks in dataloader:
        chars, labels, masks = chars.to(device), labels.to(device), masks.to(device)
        
        optimizer.zero_grad()
        logits = model(chars)
        
        # 計算損失（只計算非padding位置）
        loss = criterion(logits.view(-1, logits.size(-1)), labels.view(-1))
        loss = (loss * masks.view(-1)).sum() / masks.sum()
        
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
        
        # 計算準確率
        predictions = torch.argmax(logits, dim=-1)
        correct += ((predictions == labels) * masks).sum().item()
        total += masks.sum().item()
    
    return total_loss / len(dataloader), correct / total



In [33]:
def evaluate(model, dataloader, criterion, device):
    model.eval()
    total_loss = 0
    correct = 0
    total = 0
    
    with torch.no_grad():
        for chars, labels, masks in dataloader:
            chars, labels, masks = chars.to(device), labels.to(device), masks.to(device)
            
            logits = model(chars)
            loss = criterion(logits.view(-1, logits.size(-1)), labels.view(-1))
            loss = (loss * masks.view(-1)).sum() / masks.sum()
            
            total_loss += loss.item()
            
            predictions = torch.argmax(logits, dim=-1)
            correct += ((predictions == labels) * masks).sum().item()
            total += masks.sum().item()
    
    return total_loss / len(dataloader), correct / total

In [None]:
def main():
    # 超參數
    EMBEDDING_DIM = 128
    HIDDEN_DIM = 256
    NUM_LAYERS = 2
    DROPOUT = 0.3
    BATCH_SIZE = 32
    EPOCHS = 2
    LEARNING_RATE = 0.001
    
    device = "mps"
    print(f"使用設備: {device}")
    
    # 建立資料集
    train_dataset = NERDataset(ds['train'], char2idx, label2idx)
    val_dataset = NERDataset(ds['test'], char2idx, label2idx)
    
    train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_fn)
    val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, collate_fn=collate_fn)
    
    # 建立模型
    model = LSTM_NER(
        vocab_size=len(char2idx),
        embedding_dim=EMBEDDING_DIM,
        hidden_dim=HIDDEN_DIM,
        num_labels=len(label2idx),
        num_layers=NUM_LAYERS,
        dropout=DROPOUT
    ).to(device)
    
    print(f"模型參數量: {sum(p.numel() for p in model.parameters())}")
    
    # 優化器和損失函數
    optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)
    criterion = nn.CrossEntropyLoss(reduction='none')  # 我們會手動處理mask
    
    # 訓練循環
    print("\n開始訓練...")
    best_val_acc = 0
    
    for epoch in range(EPOCHS):
        train_loss, train_acc = train_epoch(model, train_loader, optimizer, criterion, device)
        val_loss, val_acc = evaluate(model, val_loader, criterion, device)
        
        print(f"Epoch {epoch+1}/{EPOCHS}")
        print(f"  訓練 - Loss: {train_loss:.4f}, Acc: {train_acc:.4f}")
        print(f"  驗證 - Loss: {val_loss:.4f}, Acc: {val_acc:.4f}")
        
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            torch.save(model.state_dict(), 'best_lstm_ner_model.pt')
            print(f"  ✓ 儲存最佳模型 (Acc: {val_acc:.4f})")
    
    print(f"\n訓練完成！最佳驗證準確率: {best_val_acc:.4f}")

In [41]:
main()

使用設備: mps
模型參數量: 2569487

開始訓練...
Epoch 1/2
  訓練 - Loss: 1.0340, Acc: 0.7819
  驗證 - Loss: 0.7235, Acc: 0.8534
  ✓ 儲存最佳模型 (Acc: 0.8534)
Epoch 2/2
  訓練 - Loss: 0.4787, Acc: 0.8674
  驗證 - Loss: 0.4039, Acc: 0.9093
  ✓ 儲存最佳模型 (Acc: 0.9093)

訓練完成！最佳驗證準確率: 0.9093

測試預測範例:
文本: ,2009年12月底出现黑便,,于当地行胃镜检查并行病理检查示:叒胃体中下部溃疡,叒病理示中分化腺癌,叒无腹胀、泛酸、嗳气、恶心、呕吐、叒无头晕、叒心悸、乏力等症,叒2010年1月13日于我院胃胰科行胃癌根治术,叒2010年1月18日,我院病理:切缘未见癌,叒胃体可见3x2x1cm3溃疡型肿物,叒镜上为中分化腺癌侵及胃壁全层至浆膜层,网膜未见癌,叒肝总动脉旁(0/1)、叒胃大弯(0/1)淋巴结未见癌,叒贲门左(3/3)、叒胃小弯(8/9)、幽门上(2/2)淋巴结可见腺癌转移,,免疫组化:cea(+)、叒p53(+)、叒pr(-)、叒er-b(+)、叒er(+++)、叒共计,ln:叒13/16转移,叒术后于2010年2月-2010年8月行术后化疗6程,叒具体用药为艾素100mg叒静点+叒希罗达1500mg叒bid叒po,2014年6月初出现右侧下上肢活动受限,叒7月份症状逐渐加重,叒7月10日就诊于*****,叒,行mri检查提示:胃癌术后多发脑转移,叒行甘露醇及地塞米松、叒洛赛克治疗后效果不佳。遂于我院就诊,2014-8-5行奥沙利铂150mg叒d1+叒替吉奥叒50mg叒bid叒d1-14化疗一程,2014-08-18开始行三维适形全脑放疗,剂量30gy/10f。2014-09-19始行替吉奥叒50mg叒bid叒d1-14单药化疗一程。本次为行上一程化疗收入我科,叒我科以“胃癌术后脑转移叒rtxnxm1叒iv期”收入,叒入科以来,叒精神饮食尚可,叒无恶心、叒呕吐,二便正常,体重无明显减低。
真實標籤: ['O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', '

In [87]:
index = 11
test_example = ds['test'][index]

tokens = test_example['sequences']
true_labels = test_example['tags']
EMBEDDING_DIM = 128
HIDDEN_DIM = 256
NUM_LAYERS = 2
DROPOUT = 0.3

model = LSTM_NER(vocab_size = len(char2idx), embedding_dim=EMBEDDING_DIM, hidden_dim=HIDDEN_DIM, num_labels=len(label2idx))
model.load_state_dict(torch.load("./best_lstm_ner_model.pt"), strict=True)
model.to("mps")
char_ids = torch.tensor([[char2idx.get(char, char2idx['<UNK>']) for char in tokens]]).to("mps")
with torch.no_grad():
    logits = model(char_ids)
    predictions = torch.argmax(logits, dim=-1)[0].cpu().numpy()


for text, la, pre in zip(tokens, true_labels, predictions):
    pre = idx2label[pre]
    if pre != "0" and la != "0":
        print(text + "\t\t|" + la + "\t\t|" + pre)

,		|O		|O
患		|O		|O
者		|O		|O
2		|O		|O
0		|O		|O
1		|O		|O
6		|O		|O
年		|O		|O
4		|O		|O
月		|O		|O
初		|O		|O
无		|O		|O
明		|O		|O
显		|O		|O
诱		|O		|O
因		|O		|O
出		|O		|O
现		|O		|O
腹		|B_解剖部位		|B_症状
部		|I_解剖部位		|I_症状
绞		|O		|O
痛		|O		|O
,		|O		|O
伴		|O		|O
恶		|O		|O
心		|O		|O
、		|O		|O
呕		|O		|O
吐		|O		|O
,		|O		|O
无		|O		|O
呕		|O		|O
血		|O		|O
、		|O		|O
黄		|O		|O
疸		|O		|O
等		|O		|O
不		|O		|O
适		|O		|O
,		|O		|O
在		|O		|O
院		|O		|O
外		|O		|O
考		|O		|O
虑		|O		|O
“		|O		|O
肠		|B_疾病和诊断		|B_疾病和诊断
套		|I_疾病和诊断		|I_疾病和诊断
叠		|I_疾病和诊断		|I_疾病和诊断
”		|O		|O
,		|O		|O
遂		|O		|O
转		|O		|O
至		|O		|O
*		|O		|O
*		|O		|O
一		|O		|O
院		|O		|O
就		|O		|O
诊		|O		|O
,		|O		|O
入		|O		|O
院		|O		|O
后		|O		|O
行		|O		|O
c		|B_影像检查		|I_影像检查
t		|I_影像检查		|I_影像检查
检		|O		|O
查		|O		|O
提		|O		|O
示		|O		|O
全		|O		|O
身		|O		|O
多		|O		|O
发		|O		|O
性		|O		|O
淋		|O		|I_解剖部位
巴		|O		|I_解剖部位
结		|O		|O
肿		|O		|O
大		|O		|O
,		|O		|O
考		|O		|O
虑		|O		|O
淋		|B_疾病和诊断		|I_解剖部位
巴		|I_疾病和诊断		|I_解剖部位
瘤		|I_疾病和诊断		|O
;		|O		|O
左		|B_疾病和诊