In [8]:
import json
import torch
import random
from ltp import LTP
from tqdm import tqdm
from settings import *
from transformers import AutoModel, AutoTokenizer, logging

In [None]:
NAME_TABLE = {
    'None': 0, '时间': 1,  '地点': 2,  '触发词': 3,  '人物': 4, '物品': 5, '动作': 6, 
    '事物': 7, '机构': 8, '数量': 9, '单位': 10, '原因': 11, '分隔词': 12, '开/尾': 13
}

device = 'cuda' if(torch.cuda.is_available()) else 'cpu'

### 加载模型

In [None]:
logging.set_verbosity_error()   # 消除未使用权重的warning

ltp = LTP().to(device)
pretrain=AutoModel.from_pretrained('IDEA-CCNL/Erlangshen-DeBERTa-v2-320M-Chinese')
tokenizer=AutoTokenizer.from_pretrained('IDEA-CCNL/Erlangshen-DeBERTa-v2-320M-Chinese', use_fast=False)
tokenizer.add_special_tokens({'additional_special_tokens': list(set(POSNAME.values()))})
pretrain.resize_token_embeddings(len(tokenizer.get_vocab()))

### 数据集

In [None]:
def read_from_jsonl(path):
    with open(path, encoding='utf-8') as f:
        data = []
        for line in f:
            line_js = json.loads(line)
            data.append(line_js) 
    return data

def process_doccano(data):
    data = [{'text':d['text'], 'label': d['label']} for d in data]
    # 给原句子打好标签
    for d in data:
        d['label_list'] = torch.zeros(len(d['text']), dtype=torch.long)
        for l in d['label']:
            d['label_list'][l[0]:l[1]] = NAME_TABLE[l[2]]
            
        d['label_list'] = d['label_list'].tolist()
        del d['label']
        
    return data

def process_chatgpt(data):
    data = [{'text':d['text'], 'label_list': d['label']} for d in data]
    
    return data

In [None]:
class Dataset(torch.utils.data.Dataset):
    def __init__(self, data):
        """输入data格式: 
            [{'text':xxx, 'label_list':[x, x, ..., x]},
             {'text':xxx, 'label_list':[x, x, ..., x]},
             {'text':xxx, 'label_list':[x, x, ..., x]},
             ...]
        """
        
        super().__init__()
        random.shuffle(data)
        
        # 将原句子与对应标签分词
        for d in data:
            words, pos = ltp.pipeline(d['text'], tasks = ['cws', 'pos'], return_dict = False)
            text, lable = '', []
            begin = 0
            for i in range(len(words)):
                text = text + POSNAME[pos[i]] + words[i]
                lable.append([12] + d['label_list'][begin:begin+len(words[i])])
                begin = begin + len(words[i])
            d['text'], d['label_list'] = text, sum(lable, [])
        
        
        train_len = int(0.9 * len(data))
        self.train_data = data[:train_len]
        self.test_data = data[train_len:]
        
        self.train = True
        
    def __len__(self):
        if(self.train):
            return len(self.train_data)
        else:
            return len(self.test_data)
        
    def __getitem__(self, idx):
        if(self.train):
            return self.train_data[idx]
        else:
            return self.test_data[idx]

data_doccano = read_from_jsonl('./dataset/name_recognition/all.jsonl')
data_doccano = process_doccano(data_doccano)

data_chatgpt = json.load(open('./dataset/name_recognition/ChatGPT/gpt_label.json', 'r', encoding='utf-8'))
data_chatgpt = process_chatgpt(data_chatgpt)
dataset = Dataset(data_doccano + data_chatgpt)

In [None]:
def collate_fn(data):
    text = [d['text'] for d in data]
    labels = [d['label_list'] for d in data]
    
    contents = tokenizer.batch_encode_plus(batch_text_or_text_pairs=text, return_tensors="pt", padding=True, add_special_tokens=True, return_token_type_ids=False)

    data = {}
    data['input_ids'] = contents['input_ids'].to(device)
    data['attention_mask'] = contents['attention_mask'].to(device)
    # 将label补齐
    lens = contents['input_ids'].shape[1]
    for l in range(len(labels)):
        labels[l] = [13] + labels[l]
        labels[l] += [13] * lens
        labels[l] = labels[l][:lens]
    
    data['labels'] = torch.tensor(labels, dtype=torch.long).to(device)

    return data

loader = torch.utils.data.DataLoader(dataset=dataset,
                                     batch_size=16,
                                     collate_fn=collate_fn,
                                     drop_last=True)

### 模型

In [None]:
class DebertaModel(torch.nn.Module):
    def __init__(self):
        super().__init__()
        self.pretrain = pretrain
        self.transform = torch.nn.Sequential(
            torch.nn.Linear(1024, 1024),
            torch.nn.GELU(),
            torch.nn.LayerNorm(1024, eps=1e-07, elementwise_affine=True)
        )
        self.decoder = torch.nn.Linear(1024, 14)
        self.criterion = torch.nn.CrossEntropyLoss()
        
    def forward(self, input_ids, attention_mask=None, labels=None):
        rt = {'loss': None, 'cls': None}
        
        out = self.pretrain(input_ids=input_ids, attention_mask=attention_mask)['last_hidden_state']
        out = self.transform(out)
        out = self.decoder(out)
        
        select = attention_mask.reshape(-1) == 1
        # [b, lens, 14] -> [b*lens, 14]
        out = out.reshape(-1, 14)
        out = out[select]
        rt['cls'] = out
                
        if(labels is not None):
            # [b, lens] -> [b*lens]
            labels = labels.reshape(-1)
            labels = labels[select]
            rt['loss'] = self.criterion(out, labels)
        return rt

model = DebertaModel()

### 训练

In [None]:
def train(model, epoches, lr):
    lens = len(loader)
    dataset.train = True
    model = model.train().to(device)
    optim = torch.optim.AdamW(model.parameters(), lr=lr)
    
    losses = torch.zeros((epoches, lens))
    for i in range(epoches):
        with tqdm(total=lens, ncols=80) as bar:
            bar.set_description('训练进度-epoch: {}/{}'.format(i+1,epoches))
            for n,d in enumerate(loader):
                loss = model(**d)['loss']
                loss.backward()
                optim.step()
                optim.zero_grad()
                
                losses[i,n] += loss.item()
                bar.update(1)
                
            bar.set_postfix(loss = '{:.4f}'.format(losses[i].mean().item()))
    
    model.eval().cpu()
    return losses
  
losses = train(model, 5, 2e-5)
torch.save(model, './models/named_entity_recognition2.model')

### 使用

In [None]:
def recognize(model, text):
    model = model.eval()
    # 分词
    words, pos = ltp.pipeline(text, tasks = ['cws', 'pos'], return_dict = False)  
    text = ''
    for i in range(len(words)):
        text = text + POSNAME[pos[i]] + words[i]
    # 输入模型
    inputs = tokenizer.encode_plus(text=text, return_tensors="pt", padding=True, add_special_tokens=True, return_token_type_ids=False)
    out = model(**inputs)['cls']
    res = out.argmax(dim=1)
    names = []
    for i in res:
        names.append(list(NAME_TABLE.keys())[i.item()])
    return names

In [None]:
# 导入模型
model = torch.load('./models/named_entity_recognition.model')

In [None]:
text = '机场航站楼门口偶遇鞠婧祎鞠婧祎这腰比我腿都细本人真的好美！！'
print(*recognize(model, text))

### 测试

In [None]:
def precision(model, dataset):
    total = correct = 0
    model = model.eval().to(device)
    dataset.train = False
    loader = torch.utils.data.DataLoader(dataset=dataset, batch_size=4, collate_fn=collate_fn, drop_last=True)

    for inputs in tqdm(loader):
        out = model(**inputs)['cls']
        res = out.argmax(dim=1)
        # 选择label
        select = inputs['attention_mask'].reshape(-1) == 1
        std = inputs['labels'].reshape(-1)[select]
        # 计算正确数
        correct += ((std == res) & (std != 12) & (std != 13)).sum()
        total += len(std)
        
    model = model.cpu()
    return correct / total

precision(model, dataset)