<a href="https://colab.research.google.com/github/1190303311/AI/blob/main/BiLSTM_CRF.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch
import torch.autograd as autgrad
import torch.nn as nn
import torch.optim as optim

torch.manual_seed(1)

BiLSTM+CRF进行NER任务，模型参考pytorch教程。
数据集为conll2003，一轮batch_size=1的训练结果，
得到的weighted f1 score为91.2。
另有BERT-BILSTM-CRF版本，支持batch训练

In [None]:
def argmax(vec):
  _, idx = torch.max(vec, 1)
  return idx.item()

def prepare_sequence(seq, to_idx):
  idxs = [to_idx.get(w, len(to_idx)-1) for w in seq]
  return torch.tensor(idxs, dtype=torch.long)


def log_sum_exp(vec):
  max_score = vec[0, argmax(vec)]
  max_score_broadcast = max_score.view(1, -1).expand(1, vec.size()[1])
  return max_score + torch.log(torch.sum(torch.exp(vec - max_score_broadcast)))

In [None]:
START_TAG = '<START>'
STOP_TAG = '<STOP>'
class BiLSTM_CRF(nn.Module):
  def __init__(self, vocab_size, tag_to_idx, embedding_dim, hidden_dim):
    super(BiLSTM_CRF, self).__init__()
    self.embedding_dim = embedding_dim
    self.hidden_dim = hidden_dim
    self.vocab_size = vocab_size
    self.tag_to_idx = tag_to_idx
    self.tagset_size = len(tag_to_idx)

    self.word_embeds = nn.Embedding(vocab_size, embedding_dim)
    self.lstm = nn.LSTM(embedding_dim, hidden_dim // 2, num_layers=1, bidirectional=True)

    self.hidden2tag = nn.Linear(hidden_dim, self.tagset_size)
    #转移矩阵，CRF层参数,(i,j)表示从j转移到i的分数
    self.transitions = nn.Parameter(torch.randn(self.tagset_size, self.tagset_size))
    #不可能转移到开始标志，不可能从结束标志转移
    self.transitions.data[tag_to_idx[START_TAG], :] = -10000
    self.transitions.data[:, tag_to_idx[STOP_TAG]] = -10000

    self.hidden = self.init_hidden()

  def init_hidden(self):
    return (torch.randn(2, 1, self.hidden_dim // 2),
         torch.randn(2, 1, self.hidden_dim // 2))
    
  def _forward_alg(self, feats):
    init_alphas = torch.full((1, self.tagset_size), -10000.)
    init_alphas[0][self.tag_to_idx[START_TAG]] = 0

    forward_var = init_alphas

    #feats是一个句子
    for feat in feats:
      alphas_t = []
      #动态规划
      for next_tag in range(self.tagset_size):
        emit_score = feat[next_tag].view(1, -1).expand(1, self.tagset_size)

        trans_score = self.transitions[next_tag].view(1, -1)

        next_tag_var = forward_var + trans_score + emit_score

        alphas_t.append(log_sum_exp(next_tag_var).view(1))
      forward_var = torch.cat(alphas_t).view(1, -1)
    terminal_var = forward_var + self.transitions[self.tag_to_idx[STOP_TAG]]
    #输出所有路径分数的log_sum_e()和
    alpha = log_sum_exp(terminal_var)
    return alpha

  def _get_lstm_features(self, sentence):
    self.hidden = self.init_hidden()
    embeds = self.word_embeds(sentence).view(len(sentence), 1, -1)
    lstm_out, self.hidden = self.lstm(embeds, self.hidden)
    lstm_out = lstm_out.view(len(sentence), self.hidden_dim)
    lstm_feats = self.hidden2tag(lstm_out)
    return lstm_feats

  def _score_sentence(self, feats, tags):
    #计算真实路径的分数，发射分数、转移分数相加
    score = torch.zeros(1)
    #给句子添加开始标志
    tags = torch.cat([torch.tensor([self.tag_to_idx[START_TAG]], dtype=torch.long), tags])
    for i, feat in enumerate(feats):
      #tags[i]对应feats[i-1]
      score += self.transitions[tags[i+1], tags[i]] + feat[tags[i+1]]
    score += self.transitions[self.tag_to_idx[STOP_TAG], tags[-1]]
    return score

  '''维特比解码，找出分数最高的路径：
    当前时刻为t，需要选择t时刻的tag，对每一个可选的tag，计算上一时刻
    每一类tag转移到当前tag的分数，选取最大的，得到的集合必然包含最终
    结果
  '''
  def _viterbi_decode(self, feats):
    #回溯
    backpointers = []

    init_vvars = torch.full((1, self.tagset_size), -10000.)
    #在第一个时刻，都会选择START_TAG最为最大路径的上一时刻tag
    init_vvars[0][self.tag_to_idx[START_TAG]] = 0

    forward_var = init_vvars
    #feat是一个句子的一个词
    for feat in feats:
      bptrs_t = []
      viterbivars_t = []

      for next_tag in range(self.tagset_size):
        #对当前时刻可选的每一个tag
        next_tag_var = forward_var + self.transitions[next_tag]
        best_tag_id = argmax(next_tag_var)
        bptrs_t.append(best_tag_id)
        viterbivars_t.append(next_tag_var[0][best_tag_id].view(1))
      
      forward_var = (torch.cat(viterbivars_t) + feat).view(1, -1)
      #保存路径
      backpointers.append(bptrs_t)

    terminal_var = forward_var + self.transitions[self.tag_to_idx[STOP_TAG]]
    best_tag_id = argmax(terminal_var)
    path_score = terminal_var[0][best_tag_id]

    best_path = [best_tag_id]
    for bptrs_t in reversed(backpointers):
      best_tag_id = bptrs_t[best_tag_id]
      best_path.append(best_tag_id)

    start = best_path.pop()
    assert start == self.tag_to_idx[START_TAG]
    best_path.reverse()
    return path_score, best_path

  def neg_log_likelihood(self, sentence, tags):
    #最大化 gold_score/all_score
    #取-log，score算法是所有位置的发射分数和转移分数之和，再e
    feats = self._get_lstm_features(sentence)
    forward_score = self._forward_alg(feats)
    gold_score = self._score_sentence(feats, tags)
    return forward_score - gold_score

  def forward(self, sentence):
    lstm_feats = self._get_lstm_features(sentence)
    score, tag_seq = self._viterbi_decode(lstm_feats)
    return score, tag_seq

In [None]:
!pip install datasets
from datasets import list_datasets, load_dataset
from pprint import pprint
datasets = list_datasets()
print("Number of datasets in the Datasets library: ", len(datasets), "\n\n")
datasets = load_dataset('conll2003')

In [None]:
train_set = datasets['train']
train_tokens = train_set['tokens'][:-1]
train_tags = train_set['ner_tags'][:-1]
val_set = datasets['validation']
val_tokens = val_set['tokens'][:-1]
val_tags = val_set['ner_tags'][:-1]

In [None]:
#训练
#定义embedding_dim 和lstm的hidden_dim
EMBEDDING_DIM = 100
HIDDEN_DIM = 100

word2idx = {}
for sentence in train_tokens:
  for word in sentence:
    if word not in word2idx:
      word2idx[word] = len(word2idx)
word2idx['UNK'] = len(word2idx)
tag2idx = {'O': 0, 'B-PER': 1, 'I-PER': 2, 'B-ORG': 3, 'I-ORG': 4, 'B-LOC': 5, 'I-LOC': 6, 'B-MISC': 7, 'I-MISC': 8, START_TAG: 9, STOP_TAG: 10}
model = BiLSTM_CRF(len(word2idx), tag2idx, EMBEDDING_DIM, HIDDEN_DIM)
optimizer = optim.Adam(model.parameters(), lr=0.01, weight_decay=1e-4)

for epoch in range(1):
  a = 0
  losses = 0.
  for sentence, tags in zip(train_tokens, train_tags):
    model.zero_grad()
    input = prepare_sequence(sentence, word2idx)
    tags = torch.tensor(tags, dtype=torch.long)

    loss = model.neg_log_likelihood(input, tags)

    loss.backward()
    losses += loss.item()
    optimizer.step()
    a+=1
    if a % 1000 == 0:
      print('epoch:', epoch, ' input:', a, ' loss:',losses/1000)
      losses = 0.

torch.save(model,'save.pt')

epoch: 0  input: 1000  loss: 6.418446699500084
epoch: 0  input: 2000  loss: 3.626508648812771
epoch: 0  input: 3000  loss: 2.3487684562802316
epoch: 0  input: 4000  loss: 1.8104148924350738
epoch: 0  input: 5000  loss: 1.8326457664966582
epoch: 0  input: 6000  loss: 1.5888553948402404
epoch: 0  input: 7000  loss: 1.8805180814862252
epoch: 0  input: 8000  loss: 2.448946821987629
epoch: 0  input: 9000  loss: 1.9829042331874371
epoch: 0  input: 10000  loss: 2.5726412694454193
epoch: 0  input: 11000  loss: 2.0982760944366454
epoch: 0  input: 12000  loss: 2.5124796831011773
epoch: 0  input: 13000  loss: 1.983550221800804
epoch: 0  input: 14000  loss: 2.0402784041762354


In [42]:
model.eval()
total_tokens = 0
right = 0
pred = []
y_true = []
for sentence, tags in zip(val_tokens, val_tags):
  input = prepare_sequence(sentence, word2idx)
  y_true.extend(tags)
  tags = torch.Tensor(tags)
  score, tags_pred = model(input)
  pred.extend(tags_pred)
  total_tokens += len(input)
  tags_pred = torch.Tensor(tags_pred)
  res = tags == tags_pred
  right += torch.sum(res)

acurracy = right/total_tokens
print(right, total_tokens, acurracy)



tensor(46742) 51362 tensor(0.9101)


In [53]:
from sklearn.metrics import f1_score
a = f1_score(y_true, pred, average='weighted')
a

0.9118695407465229