# 多标签文本分类流程
1. 数据预处理
a.中文分词
b.去停用词
2. 标签归一化
3. 特征提取
4. 模型训练

In [3]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from transformers import BertTokenizer, BertForSequenceClassification, AdamW, BertModel, BertConfig, get_linear_schedule_with_warmup
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import MultiLabelBinarizer
from sklearn.metrics import recall_score, f1_score
import matplotlib.pyplot as plt
import os
import time
import re


# 定义数据集类
class RecordDataset(Dataset):
    def __init__(self, text, label_matrix, tokenizer, max_len, is_test=False):
        self.text = text
        self.label_matrix = label_matrix
        self.tokenizer = tokenizer
        self.max_len = max_len
        self.is_test = is_test
    
    def __len__(self):
        return len(self.text)
    
    def __getitem__(self, index):
        text = str(self.text[index])
        label = torch.tensor(self.label_matrix[index], dtype=torch.float)
        
        if self.is_test:
            # 删除包含‘辨证:’、‘治法:’的行
            pattern = re.compile(r'.*(辨证|治法)：.*')
            text = pattern.sub('', text)

        encoding = self.tokenizer.encode_plus(
            text,
            add_special_tokens=True,
            max_length=self.max_len,
            return_token_type_ids=False,
            padding='max_length',
            return_attention_mask=True,
            return_tensors='pt',
            truncation=True,
        )
        
        return {
            'text': text,
            'input_ids': encoding['input_ids'].flatten(),
            'attention_mask': encoding['attention_mask'].flatten(),
            'labels': label
        }


# 定义数据加载器
def create_data_loader(df, tokenizer, max_len, batch_size):
    '''
    @param df : dataframe数据集
    @param tokenizer : 分词器
    @param max_len : 最大长度
    @param batch_size : 批次大小
    '''
    # 分隔出训练集与测试集
    df_train = df.sample(frac=0.9, random_state=42) # 随机取90%的数据作为训练集
    df_test = df.drop(df_train.index) # 剩下的10%作为验证集

    # 检查训练集和测试集的标签，将标签合并为一个标签集合
    train_labels = set()
    for labels in df_train['label']:
        train_labels.update(str(labels).split('\n'))
    test_labels = set()
    for labels in df_test['label']:
        test_labels.update(str(labels).split('\n'))
    all_labels = list(train_labels | test_labels)

    mlb = MultiLabelBinarizer(classes=all_labels)
    mlb.fit(all_labels)
    label_list_train = []
    for i in df_train['label']:
        labels = str(i).split('\n')
        # 删除''数据
        if '' in labels:
            labels.remove('')
        label_list_train.append(labels)
    train_label_matrix = mlb.transform(label_list_train)

    label_list_test = []
    for i in df_test['label']:
        labels = str(i).split('\n')
        # 删除''
        if '' in labels:
            labels.remove('')
        label_list_test.append(labels)
    test_label_matrix = mlb.transform(label_list_test)

    train_dataset = RecordDataset(
        text=df_train['text'].to_numpy(),
        label_matrix=train_label_matrix,
        tokenizer=tokenizer,
        max_len=max_len,
        is_test=False
    )

    test_dataset = RecordDataset(
        text=df_test['text'].to_numpy(),
        label_matrix=test_label_matrix,
        tokenizer=tokenizer,
        max_len=max_len,
        is_test=True
    )
    
    # 打印数据集信息
    print('Total data set size:', df.shape[0])
    print('Train data set size:', len(train_dataset))
    print('Test data set size:', len(test_dataset))
    print('Label size:', len(train_label_matrix[1]))
    return DataLoader(train_dataset, batch_size=batch_size,num_workers=4), DataLoader(test_dataset, batch_size=batch_size, num_workers=4), mlb


# 定义训练函数
def train_epoch(model, data_loader, loss_fn, optimizer, device, scheduler, n_examples):
    '''
    @param model : 模型
    @param data_loader : 数据加载器
    @param loss_fn : 损失函数
    @param optimizer : 优化器
    @param device : 设备
    @param scheduler : 动态学习率
    @param n_examples : 样本数
    @return : 准确率和损失
    '''
    model = model.train()
    
    losses = []
    predictions = []
    labels = []
    correct_predictions = 0
    
    for d in data_loader:
        input_ids = d['input_ids'].to(device)
        attention_mask = d['attention_mask'].to(device)
        batch_labels = d['labels'].to(device)

        outputs = model(
            input_ids=input_ids,
            attention_mask=attention_mask,
            labels=batch_labels
        )
        
        _, preds = torch.max(outputs.logits, dim=1)
        loss = loss_fn(outputs.logits, batch_labels)
        losses.append(loss.item())
        
        result = (outputs.logits > 0.95).int()
        predictions.extend(result.cpu().tolist())
        labels.extend(batch_labels.cpu().tolist())

        correct_predictions += torch.sum(result == batch_labels)
        
        loss.backward()
        nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        optimizer.step()
        scheduler.step()
        optimizer.zero_grad()
        
    precision = float(correct_predictions) / (n_examples * num_labels)
    recall = recall_score(labels, predictions, average='micro')
    f1 = f1_score(labels, predictions, average='micro')
    return precision, recall, f1, np.mean(losses)


# 使用测试集验证模型
def eval_model(model, data_loader, loss_fn, device, n_examples, mlb):
    model  = model.eval()
    losses = []
    predictions = []
    labels = []
    correct_predictions = 0
    num_labels = len(data_loader.dataset.label_matrix[1])
    df = pd.DataFrame(columns=['text', 'label', 'pred'])
    count = 0
    with torch.no_grad():
        for d in data_loader:
            input_ids = d['input_ids'].to(device)
            attention_mask = d['attention_mask'].to(device)
            batch_labels = d['labels'].to(device)
            outputs = model(
                input_ids=input_ids,
                attention_mask=attention_mask,
                labels=batch_labels
            )

            loss = loss_fn(outputs.logits, batch_labels)
            losses.append(loss.item())

            result = (outputs.logits > 0.95).int()
            predictions.extend(result.cpu().tolist())
            labels.extend(batch_labels.cpu().tolist())

            correct_predictions += torch.sum(result == batch_labels)
                        
            # 将标签从数字转换为字符串
            real_label = mlb.inverse_transform(batch_labels.cpu())
            predict_label = mlb.inverse_transform(result.cpu())
            # 将预测结果写入csv文件
            np.savetxt('tmp/pred_label-' + str(count) + '.txt', result.cpu().detach().numpy(), fmt='%.0f', encoding='utf-8', delimiter=',')
            np.savetxt('tmp/label-' + str(count) + '.txt', batch_labels.cpu().detach().numpy(), fmt='%.0f', encoding='utf-8', delimiter=',')
            count += 1

            text_list = []
            for i in input_ids.cpu().tolist():
                text_list.append(tokenizer.decode(i))
            new_df = pd.DataFrame({'text': text_list, 'label': real_label, 'pred': predict_label})
            df = pd.concat([df, new_df], ignore_index=True)

        # 写入csv文件
        df.to_csv('tmp/result.csv', index=False, encoding='utf-8')
                        
    precision = float(correct_predictions) / (n_examples * num_labels)
    recall = recall_score(labels, predictions, average='micro')
    f1 = f1_score(labels, predictions, average='micro')
    return precision, recall, f1, np.mean(losses)
            
            
def show_time(start_time, end_time):
    '''
    @param start_time : 开始时间
    @param end_time : 结束时间
    @return : 用时
    '''
    hours, rem = divmod(end_time - start_time, 3600)
    minutes, seconds = divmod(rem, 60)
    return "{:0>2}:{:0>2}:{:05.2f}".format(int(hours), int(minutes), seconds)

# 清空cuda
torch.cuda.empty_cache()

EPOCHS = 10
BATCH_SIZE = 16
MAX_LEN = 128
LEARNING_RATE = 2e-5
# 模型路径
model_dir = 'BERT/chinese_bert_wwm_ext_L-12_H-768_A-12/'

# 数据读取
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
df_total = pd.read_csv(os.path.join('Record Collections', 'medical_record.csv'))
df_total_mzy = pd.read_csv(os.path.join('Record Collections MZY', 'medical_record_mzy.csv'))
# 合并两个数据集
df_total = pd.concat([df_total, df_total_mzy], ignore_index=True)
tokenizer = BertTokenizer.from_pretrained(model_dir)
train_data_loader, test_data_loader, my_mlb = create_data_loader(df_total, tokenizer, MAX_LEN, BATCH_SIZE)
num_labels = len(train_data_loader.dataset.label_matrix[1])

KeyboardInterrupt: 

In [None]:
# 模型加载
config = BertConfig.from_pretrained(os.path.join(model_dir, 'pytorch_model.bin/config.json'))
model = BertForSequenceClassification.from_pretrained(os.path.join(model_dir, 'pytorch_model.bin/pytorch_model.bin'), config=config)
optimizer = AdamW(model.parameters(), lr=LEARNING_RATE, correct_bias=False, eps=1e-8)
model.to(device)

print('Model:', model)
print('Parameters for each layer:', [p.numel() for p in model.parameters()])
print('Paramter count:', sum([p.numel() for p in model.parameters()]))



Data set size: 10
Label size: 28


In [None]:
loss_fn = nn.CrossEntropyLoss().to(device) # 损失函数定义
# pos_weight = torch.tensor([0.1], device=device)
# loss_fn = nn.BCEWithLogitsLoss(pos_weight=pos_weight)
scheduler = get_linear_schedule_with_warmup(
    optimizer,
    num_warmup_steps=0,
    num_training_steps=len(train_data_loader) * EPOCHS
) # 动态学习率

acc_list = []
recall_list = []
# 训练模型
for epoch in range(EPOCHS):
    print(f'Epoch {epoch + 1}/{EPOCHS}')
    print('-' * 10)
    start_time = time.time()
    train_acc, train_recall, train_f1, train_loss = train_epoch(
        model,
        train_data_loader,
        loss_fn,
        optimizer,
        device,
        scheduler,
        len(train_data_loader.dataset)
    )
    end_time = time.time()
    print('{} : Test loss {:.4f}  recall {:4f} f1 {:4f} accuracy {:.4f}'.format(show_time(start_time, end_time), train_loss, train_recall, train_f1, train_acc))

    # 验证模型
    test_acc, test_recall, test_f1, test_loss = eval_model(
        model,
        test_data_loader,
        loss_fn,
        device,
        len(test_data_loader.dataset),
        my_mlb
    )
    acc_list.append(test_acc)
    recall_list.append(test_recall)
    print('{} : Test loss {:.4f}  recall {:4f} f1 {:4f} accuracy {:.4f}'.format(show_time(start_time, end_time), test_loss, test_recall, test_f1, test_acc))

Epoch 1/2
----------


In [None]:
# 绘制PR曲线
zip_data = list(zip(recall_list, acc_list))
zip_data.sort(key=lambda x : x[0])
recall_list, acc_list = zip(*zip_data)

plt.figure(figsize=(5, 5))
plt.scatter(recall_list, acc_list)
plt.plot(recall_list, acc_list, label='BERT')
plt.xlabel('Recall')
plt.ylabel('Accuracy')
plt.title('PR Curve')
plt.legend()
plt.show()

In [2]:
from sklearn.preprocessing import MultiLabelBinarizer
import numpy as np

# 定义多标签数据
labels = [['a', 'b'], ['c', 'd'], ['a', 'c', 'd'], ['b']]
test_labels = [['a','b']]

# 创建 MultiLabelBinarizer 实例并拟合标签数据
my_mlb = MultiLabelBinarizer()
my_mlb.fit(labels)

# 对标签数据进行编码
binary_labels = my_mlb.transform(test_labels)

# 对编码后的数据进行反编码
decoded_labels = my_mlb.inverse_transform(binary_labels)

# 打印结果
print('Original Labels:', labels)
print('Binary Labels:', binary_labels)
print('Decoded Labels:', decoded_labels)

4
(4,)
Original Labels: [['a', 'b'], ['c', 'd'], ['a', 'c', 'd'], ['b']]
Binary Labels: [[1 1 0 0]]
Decoded Labels: [('a', 'b')]
