In [1]:
def read_bio_file(file_path):
    sentences = []
    labels = []
    with open(file_path, 'r', encoding='utf-8') as f:
        tokens, tags = [], []
        for line in f:
            line = line.strip()
            if not line:
                if tokens:
                    sentences.append(tokens)
                    labels.append(tags)
                    tokens, tags = [], []
            else:
                splits = line.split()
                if len(splits) >= 2:
                    tokens.append(splits[0])
                    tags.append(splits[1])
        if tokens:
            sentences.append(tokens)
            labels.append(tags)
    return sentences, labels

# 用法
sentences, labels = read_bio_file(r"C:\Users\Administrator\Desktop\Project\bio_dataset_cleaned.txt")




In [2]:
from transformers import BertTokenizerFast
import torch

# 加载 BERT 分词器
tokenizer = BertTokenizerFast.from_pretrained('bert-base-uncased')

# 创建标签映射
label_list = sorted(set(label for label_seq in labels for label in label_seq))
label2id = {label: i for i, label in enumerate(label_list)}
id2label = {i: label for label, i in label2id.items()}
num_labels = len(label2id)

def encode_examples(sentences, labels, max_length=128):
    input_ids = []
    attention_masks = []
    label_ids = []

    for sent, label in zip(sentences, labels):
        # 分词（每个单词一个元素）
        encoding = tokenizer(
            sent,
            is_split_into_words=True,
            truncation=True,
            padding='max_length',
            max_length=max_length,
            return_tensors='pt'
        )

        word_ids = encoding.word_ids(batch_index=0)
        aligned_labels = []
        prev_word_idx = None
        for word_idx in word_ids:
            if word_idx is None:
                aligned_labels.append(label2id["O"])  # 用 'O' 替换原来的 -100
  # 忽略填充位
            elif word_idx != prev_word_idx:
                aligned_labels.append(label2id[label[word_idx]])  # 第一个子词：保留标签
            else:
                aligned_labels.append(label2id["O"])  # 用 'O' 替换原来的 -100
  # 后续子词：忽略
            prev_word_idx = word_idx

        input_ids.append(encoding['input_ids'][0])
        attention_masks.append(encoding['attention_mask'][0])
        label_ids.append(torch.tensor(aligned_labels))

    return input_ids, attention_masks, label_ids


  from .autonotebook import tqdm as notebook_tqdm


In [3]:
input_ids, attention_masks, label_ids = encode_examples(sentences, labels)


In [4]:
from torch.utils.data import Dataset

class NERDataset(Dataset):
    def __init__(self, input_ids, attention_masks, label_ids):
        self.input_ids = input_ids
        self.attention_masks = attention_masks
        self.label_ids = label_ids

    def __len__(self):
        return len(self.input_ids)

    def __getitem__(self, idx):
        return {
            "input_ids": self.input_ids[idx],
            "attention_mask": self.attention_masks[idx],
            "labels": self.label_ids[idx]
        }


In [5]:
from torch.utils.data import DataLoader

# 构建数据集对象
dataset = NERDataset(input_ids, attention_masks, label_ids)

# 构建 DataLoader（可设置 batch_size）
train_loader = DataLoader(dataset, batch_size=8, shuffle=True)


In [6]:
import torch.nn as nn
from transformers import BertModel
from torchcrf import CRF

class BERT_CRF(nn.Module):
    def __init__(self, bert_model_name, num_labels):
        super(BERT_CRF, self).__init__()
        self.bert = BertModel.from_pretrained(bert_model_name)
        self.dropout = nn.Dropout(0.1)
        self.classifier = nn.Linear(self.bert.config.hidden_size, num_labels)
        # 您使用的是默认的 batch_first=False
        self.crf = CRF(num_labels)

    def forward(self, input_ids, attention_mask, labels=None):
        outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        sequence_output = self.dropout(outputs.last_hidden_state)
        emissions = self.classifier(sequence_output)  # Shape: [batch, seq_len, num_labels]

        # 统一将维度转换为 (seq_len, batch, ...) 以适配CRF层
        emissions = emissions.permute(1, 0, 2)  # Shape: [seq_len, batch, num_labels]
        mask = attention_mask.permute(1, 0).bool() # Shape: [seq_len, batch]

        if labels is not None:
            # 计算损失时，labels也需要转换为 (seq_len, batch)
            labels = labels.permute(1, 0)
            log_likelihood = self.crf(emissions, labels, mask=mask)
            loss = -log_likelihood.mean()
            return loss
        else:
            predictions = self.crf.decode(emissions, mask=mask)
            return predictions




In [7]:
import torch

# ✅ 这里改为使用 label2id
num_labels = len(label2id)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = BERT_CRF('bert-base-uncased', num_labels)
model.to(device)



BERT_CRF(
  (bert): BertModel(
    (embeddings): BertEmbeddings(
      (word_embeddings): Embedding(30522, 768, padding_idx=0)
      (position_embeddings): Embedding(512, 768)
      (token_type_embeddings): Embedding(2, 768)
      (LayerNorm): LayerNorm((768,), eps=1e-12, elementwise_affine=True)
      (dropout): Dropout(p=0.1, inplace=False)
    )
    (encoder): BertEncoder(
      (layer): ModuleList(
        (0-11): 12 x BertLayer(
          (attention): BertAttention(
            (self): BertSdpaSelfAttention(
              (query): Linear(in_features=768, out_features=768, bias=True)
              (key): Linear(in_features=768, out_features=768, bias=True)
              (value): Linear(in_features=768, out_features=768, bias=True)
              (dropout): Dropout(p=0.1, inplace=False)
            )
            (output): BertSelfOutput(
              (dense): Linear(in_features=768, out_features=768, bias=True)
              (LayerNorm): LayerNorm((768,), eps=1e-12, elementwise_affi

In [14]:
from torch.optim import AdamW

optimizer = AdamW(model.parameters(), lr=2e-5)

epochs = 5


In [15]:
from tqdm import tqdm

def train(model, dataloader, optimizer, device):
    model.train()
    total_loss = 0

    for batch in tqdm(dataloader, desc="Training"):
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['labels'].to(device)

        optimizer.zero_grad()
        loss = model(input_ids=input_ids, attention_mask=attention_mask, labels=labels)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()


    avg_loss = total_loss / len(dataloader)
    return avg_loss


In [16]:
for epoch in range(epochs):
    print(f"Epoch {epoch+1}/{epochs}")
    avg_loss = train(model, train_loader, optimizer, device)
    print(f"Average Loss: {avg_loss:.4f}")


Epoch 1/5


Training: 100%|██████████| 95/95 [04:18<00:00,  2.72s/it]


Average Loss: 128.2122
Epoch 2/5


Training: 100%|██████████| 95/95 [04:25<00:00,  2.80s/it]


Average Loss: 95.9285
Epoch 3/5


Training: 100%|██████████| 95/95 [04:26<00:00,  2.80s/it]


Average Loss: 75.2487
Epoch 4/5


Training: 100%|██████████| 95/95 [04:18<00:00,  2.72s/it]


Average Loss: 56.6552
Epoch 5/5


Training: 100%|██████████| 95/95 [04:18<00:00,  2.72s/it]

Average Loss: 44.6726





In [17]:
import torch

def predict(model, dataloader, device):
  
    model.eval()
    all_preds = []
    all_labels = []

    with torch.no_grad():
        # 1. 移除 break，遍历整个 dataloader
        for batch in dataloader:
            # 将数据移动到指定设备
            input_ids = batch["input_ids"].to(device)
            attention_mask = batch["attention_mask"].to(device)
            labels = batch["labels"].to(device)

            # 模型在前向传播时，如果不传入 labels，则会进入解码（decode）逻辑
            predictions = model(input_ids=input_ids, attention_mask=attention_mask)
            
            # 2. 收集模型的预测结果
            # model 的输出 (predictions) 已经是处理好的 list of lists
            all_preds.extend(predictions)

            # 3. 处理并收集真实标签
            # 需要根据 attention_mask 移除 padding 部分的标签
            for i in range(labels.shape[0]):
                # 计算每个样本的真实长度（非padding部分）
                true_length = attention_mask[i].sum().item()
                # 截取真实标签并转换为 list
                true_labels = labels[i][:true_length].tolist()
                all_labels.append(true_labels)

    return all_preds, all_labels






In [None]:
# 1. 确保从 seqeval.metrics 导入了这三个函数
from seqeval.metrics import precision_score, recall_score, f1_score
from seqeval.metrics import classification_report
from seqeval.scheme import IOB2

def evaluate(preds, trues, id2label):
    # 将 ID 序列转换成标签序列 (这部分不变)
    preds_label = [[id2label[idx] for idx in seq] for seq in preds]
    trues_label = [[id2label[idx] for idx in seq] for seq in trues]

    # 打印详细的分类报告 (这部分不变)
    print("分类报告:")
    print(classification_report(trues_label, preds_label, mode='strict', scheme=IOB2))

    p = precision_score(trues_label, preds_label)
    r = recall_score(trues_label, preds_label)
    f1 = f1_score(trues_label, preds_label)

    print("\n--- 总体性能指标 ---")
    print(f"Overall Precision: {p:.4f}")
    print(f"Overall Recall:    {r:.4f}")
    print(f"Overall F1-Score:  {f1:.4f}")


In [19]:
# 假设你用的是训练集
preds, trues = predict(model, train_loader, device)
evaluate(preds, trues, id2label)


📊 分类报告:
              precision    recall  f1-score   support

   AGE_DEATH       0.77      0.72      0.74        32
AGE_FOLLOWUP       0.74      0.65      0.69        74
   AGE_ONSET       0.82      0.74      0.78       111
        GENE       0.89      0.91      0.90       245
GENE_VARIANT       0.86      0.93      0.89       392
    HPO_TERM       0.96      0.97      0.97      2457
     PATIENT       0.83      0.90      0.86       299

   micro avg       0.92      0.94      0.93      3610
   macro avg       0.84      0.83      0.83      3610
weighted avg       0.92      0.94      0.93      3610


--- 总体性能指标 ---
Overall Precision: 0.9249
Overall Recall:    0.9516
Overall F1-Score:  0.9380
