<a href="https://colab.research.google.com/github/cshmzin/nlp-code/blob/main/Bert-lstm-crf%E5%91%BD%E5%90%8D%E5%AE%9E%E4%BD%93%E8%AF%86%E5%88%AB.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# 获取数据
import json
import os

train_data = []
dev_data = []

for line in open('sample_data/train.json','r',encoding='UTF-8'):
    train_data.append(json.loads(line))

for line in open('sample_data/dev.json','r',encoding='UTF-8'):
    dev_data.append(json.loads(line))


In [None]:
#上图为标签类别
#需要构建标签
import re

label_type = {'o':0,'address':1,'company':2,'name':3,'organization':4,'pad': 5}

def decode_label(d):
#解析标签，以列表形式构成
  text_len = len(d['text'])
  label = [0]*text_len
  types = d['label'].keys()
  for t in types:
    if t in label_type:
      values = d['label'][t].values()
      si = [v for value in values for v in value]
      for i in si:
        for j in range(i[0],i[1]+1):
          label[j] = label_type[t]
  return label



def transfrom_data(data,mode):
  data_texts = [d['text'] for d in data]
  
  if mode == 'train':
    data_labels = []
    for d in data:
      data_labels.append(decode_label(d))
    return (data_texts,data_labels)
  
  else:
    return data_texts 

train_texts,train_labels = transfrom_data(train_data,'train')
dev_texts,dev_labels = transfrom_data(dev_data,'train')
test_texts = transfrom_data(train_data,'test')
    

In [None]:
! pip install transformers
from transformers import BertTokenizer
from IPython.display import clear_output

# 使用bert的tokenizer将文字转化成数字。
PRETRAINED_MODEL_NAME = "bert-base-chinese"  # 指定为中文
tokenizer = BertTokenizer.from_pretrained(PRETRAINED_MODEL_NAME)
clear_output()

train_ids = []
dev_ids = []

tokens = [[tokenizer.tokenize(t)[0] for t in text] for text in train_texts]
train_ids = [tokenizer.convert_tokens_to_ids(token) for token in tokens]

tokens = [[tokenizer.tokenize(t)[0] for t in text] for text in dev_texts]
dev_ids = [tokenizer.convert_tokens_to_ids(token) for token in tokens]

dev_labels = [label for label in dev_labels]
train_labels = [label for label in train_labels]
    

In [None]:
import torch
from keras_preprocessing.sequence import pad_sequences
from torch.utils.data import TensorDataset,DataLoader
from transformers import BertTokenizer
from IPython.display import clear_output

class Dataset():
    def __init__(self):
        self.label_type = {'o': 0, 'address': 1, 'company': 2, 'name': 3, 'organization': 4, 'pad': 5}
        clear_output()

    def pad(self,ids,labels):

        input_ids = pad_sequences(ids,maxlen=60,dtype='long', value=0.0,truncating="post", padding="post")
        tags = pad_sequences(labels,maxlen=60, value=self.label_type["pad"], padding="post",dtype="long", truncating="post")
        attention_masks = [[float(i != 0.0) for i in ii] for ii in input_ids]
        return (input_ids,tags,attention_masks)

    def loader(self,ids,labels):
        input_ids,tags,attention_masks = self.pad(ids,labels)
        dataset = TensorDataset(torch.tensor(input_ids),torch.tensor(tags),torch.tensor(attention_masks))
        dataloader = DataLoader(dataset,batch_size=64)
        print('dataloader load ok')
        return dataloader

dataloaders = Dataset()
trainloader = dataloaders.loader(train_ids,train_labels)
devloader = dataloaders.loader(dev_ids,dev_labels)


dataloader load ok
dataloader load ok


In [None]:
! pip install pytorch-crf
from transformers import BertPreTrainedModel,BertModel
from torchcrf import CRF
import torch.nn as nn
class BertLstmCrf(BertPreTrainedModel):

    _keys_to_ignore_on_load_unexpected = [r"pooler"]

    def __init__(self, config,need_bilstm = False,rnn_dim = 128):
        super().__init__(config)
        self.num_labels = config.num_labels

        self.bert = BertModel(config, add_pooling_layer=False)
        self.dropout = nn.Dropout(config.hidden_dropout_prob)
        self.out_dim = config.hidden_size
        self.need_bilstm = need_bilstm
        if need_bilstm:
            self.bilstm = nn.LSTM(config.hidden_size, rnn_dim, num_layers=1, bidirectional=True, batch_first=True)
            self.out_dim = 2*rnn_dim
        self.liner = nn.Linear(self.out_dim, config.num_labels)
        self.crf = CRF(config.num_labels,batch_first=True)


    def forward(self,input_ids=None,attention_mask=None,token_type_ids=None,labels=None,):

        outputs = self.bert(
            input_ids,
            attention_mask=attention_mask,
            token_type_ids=token_type_ids,
        )

        sequence_output = outputs[0]
        if self.need_bilstm:
            sequence_output,_ = self.bilstm(sequence_output)
        sequence_output = self.dropout(sequence_output)
        sequence_output = self.liner(sequence_output)
        loss = -1 * self.crf(sequence_output, labels, mask=attention_mask.byte()) if labels != None else None
        output = self.crf.decode(sequence_output, attention_mask.byte())

        return [loss,output] if loss is not None else output



In [None]:
! pip install seqeval
from transformers import BertForTokenClassification
import numpy as np
from sklearn.metrics import f1_score
from seqeval.metrics import f1_score as f1
import os

model = BertLstmCrf.from_pretrained("bert-base-chinese", num_labels=6)
need_CRF = True
# model =  BertForTokenClassification.from_pretrained("bert-base-chinese", num_labels=6)
# need_CRF = False
model.cuda()
optimizer = torch.optim.Adam(model.parameters(),lr=1e-5)
Epochs = 10
type_label = {0:'o',1:'address',2:'company',3:'name',4:'organization',5:'pad'}

if os.path.exists('sample_data/bert_lstm_crf.pth'):model.load_state_dict(torch.load('sample_data/bert_lstm_crf.pth'))

for epoch in range(Epochs):
  model.train()
  losses = 0.0
  for data in trainloader:
      tokens_tensors,label_tensors,masks_tensors = [t.cuda() for t in data]
      optimizer.zero_grad()
      outputs = model(input_ids = tokens_tensors,attention_mask = masks_tensors,labels = label_tensors)
      loss = outputs[0]
      loss.backward()
      optimizer.step()
      losses += loss.item()
  avg_train_loss = losses / len(trainloader)
  print("Average train loss: {}".format(avg_train_loss))
  
  model.eval()
  predictions , true_labels = [], []


  if not need_CRF:
    for data in devloader:
      tokens_tensors, label_tensors, masks_tensors = [t.cuda() for t in data]
      with torch.no_grad():
        preds = model(input_ids=tokens_tensors, attention_mask=masks_tensors)
      
      for pred,label_tensor in zip(preds[0],label_tensors):
        logit = pred.detach().cpu().numpy()#detach的方法，将variable参数从网络中隔离开，不参与参数更新
        label_ids = label_tensor.cpu().numpy()

        predictions.extend(np.argmax(logit, axis=1))
        true_labels.append(label_ids)

    pred_tags = list(np.array(predictions).flatten())
    valid_tags = list(np.array(true_labels).flatten())
    print("F1-Score: {}".format(f1_score(pred_tags,valid_tags,average='weighted')))#传入的是具体的tag

  else:
    for batch in devloader:
      tokens_tensors, label_tensors, masks_tensors = [t.cuda() for t in data]
      with torch.no_grad():
        outputs = model(input_ids=tokens_tensors, attention_mask=masks_tensors,labels=label_tensors)
      logits = outputs[1]
      label_ids = label_tensors.cpu().numpy()

      predictions.extend(logits)
      true_labels.extend(list(label_ids))

    pred_tags = [[type_label[p_i] for p, l in zip(predictions, true_labels)
                  for p_i, l_i in zip(p, l) if type_label[l_i] != "pad"]]
    valid_tags = [[type_label[l_i] for l in true_labels
                    for l_i in l if type_label[l_i] != "pad"]]
    print("Validation F1-Score: {}".format(f1(pred_tags, valid_tags)))


torch.save(model.state_dict(), 'sample_data/bert_lstm_crf.pth')





Some weights of the model checkpoint at bert-base-chinese were not used when initializing BertLstmCrf: ['cls.predictions.bias', 'cls.predictions.transform.dense.weight', 'cls.predictions.transform.dense.bias', 'cls.predictions.decoder.weight', 'cls.seq_relationship.weight', 'cls.seq_relationship.bias', 'cls.predictions.transform.LayerNorm.weight', 'cls.predictions.transform.LayerNorm.bias']
- This IS expected if you are initializing BertLstmCrf from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertLstmCrf from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some weights of BertLstmCrf were not initialized from the model checkpoint at bert-base-chinese and are newly initialized: ['liner.weight', 'liner.bi

Average train loss: 28.61979950041998




Validation F1-Score: 0.9738562091503268
Average train loss: 25.63988749186198
Validation F1-Score: 0.9738562091503268
Average train loss: 23.452751988456363
Validation F1-Score: 0.974025974025974
Average train loss: 26.00460913067772
Validation F1-Score: 0.9607843137254902
Average train loss: 21.062477208319166
Validation F1-Score: 0.961038961038961
Average train loss: 19.2672103927249
Validation F1-Score: 0.974025974025974
Average train loss: 20.661885148002987
Validation F1-Score: 0.974025974025974
Average train loss: 16.21223018850599
Validation F1-Score: 0.9738562091503268
Average train loss: 15.863000744865055
Validation F1-Score: 0.974025974025974
Average train loss: 14.482931227911086
Validation F1-Score: 0.974025974025974


In [None]:
text = '360集团发布一个漏洞'
need_CRF = True
test_tokens = tokenizer.tokenize(text)
test_ids = tokenizer.convert_tokens_to_ids(test_tokens)
test_tokens_tensor = torch.tensor(test_ids)
test_tokens_tensor = test_tokens_tensor

test_masks_tensor = torch.zeros(test_tokens_tensor.shape, dtype=torch.long)
test_masks_tensor = test_masks_tensor.masked_fill(test_tokens_tensor != 0, 1)

if not need_CRF:
  outputs = model(input_ids=test_tokens_tensor.unsqueeze(0).cuda(),attention_mask=test_masks_tensor.unsqueeze(0).cuda())
  logits = outputs[0]
  preds = []
  for logit in logits:
    preds.extend(np.argmax(logit.detach().cpu().numpy(), axis=1))

  inverse_dict=dict([val,key] for key,val in label_type.items())
  preds = [inverse_dict[i] for i in preds]

  print(test_tokens)
  print(preds)

else:
  logits = model(input_ids=test_tokens_tensor.unsqueeze(0).cuda(),attention_mask=test_masks_tensor.unsqueeze(0).cuda())[0]

  preds = [type_label[i] for i in logits]

  print(test_tokens)
  print(preds)



NameError: ignored