In [3]:
!pip install pytorch-crf
!pip install peft
# !huggingface-cli login --token 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

#clean output
from IPython.display import clear_output
clear_output()

In [4]:
import torch
import torch.nn as nn
from torchcrf import CRF
from peft import TaskType, LoraConfig, get_peft_model
from transformers import AutoModelForCausalLM, BertModel, AutoTokenizer, AutoModel, BertConfig

In [5]:
_ = torch.manual_seed(0)

In [6]:
class LoRATrainBert_BiLSTM_CRF_TEST(nn.Module):

    def __init__(self, tag_to_ix, embedding_dim=768, hidden_dim=256):
        super(LoRATrainBert_BiLSTM_CRF_TEST, self).__init__()
        self.tag_to_ix = tag_to_ix
        self.tagset_size = len(tag_to_ix)
        self.hidden_dim = hidden_dim
        self.embedding_dim = embedding_dim

        # self.bert = BertModel.from_pretrained('bert-base-chinese')
        self.bert = AutoModelForCausalLM.from_pretrained('UJForSchool/Bert_base_chinese_LoRA', is_decoder=True, return_dict=False)

        self.fine_tune_layers()

        self.lstm = nn.LSTM(input_size=embedding_dim, hidden_size=hidden_dim//2,
                            num_layers=2, bidirectional=True, batch_first=True)
        self.dropout = nn.Dropout(p=0.1)
        self.linear = nn.Linear(hidden_dim, self.tagset_size)
        self.crf = CRF(self.tagset_size, batch_first=True)

    def fine_tune_layers(self):
        # 冻结所有层。
        for param in self.bert.parameters():
            param.requires_grad = False

        # # 解冻指定的层。
        for i in range(12 - self.num_layers_to_finetune, 12):
            for param in self.bert.encoder.layer[i].parameters():
                param.requires_grad = True

    def _get_features(self, sentence):
        with torch.no_grad():
          embeds, _  = self.bert(sentence)
        enc, _ = self.lstm(embeds)
        enc = self.dropout(enc)
        feats = self.linear(enc)
        return feats

    def forward(self, sentence, tags, mask, is_test=False):
        emissions = self._get_features(sentence)
        if not is_test: # Training，return loss
            loss=-self.crf.forward(emissions, tags, mask, reduction='mean')
            return loss
        else: # Testing，return decoding
            decode=self.crf.decode(emissions, mask)
            return decode

In [7]:
class LoRATrainBert_BiLSTM_CRF(nn.Module):

    def __init__(self, tag_to_ix, embedding_dim=768, hidden_dim=256):
        super(LoRATrainBert_BiLSTM_CRF, self).__init__()
        self.tag_to_ix = tag_to_ix
        self.tagset_size = len(tag_to_ix)
        self.hidden_dim = hidden_dim
        self.embedding_dim = embedding_dim

        # self.bert = BertModel.from_pretrained('bert-base-chinese')
        self.bert = AutoModelForCausalLM.from_pretrained('UJForSchool/Bert_base_chinese_LoRA', is_decoder=True, return_dict=False)
        self.lstm = nn.LSTM(input_size=embedding_dim, hidden_size=hidden_dim//2,
                            num_layers=2, bidirectional=True, batch_first=True)
        self.dropout = nn.Dropout(p=0.1)
        self.linear = nn.Linear(hidden_dim, self.tagset_size)
        self.crf = CRF(self.tagset_size, batch_first=True)

    def _get_features(self, sentence):
        with torch.no_grad():
          embeds, _  = self.bert(sentence)
        enc, _ = self.lstm(embeds)
        enc = self.dropout(enc)
        feats = self.linear(enc)
        return feats

    def forward(self, sentence, tags, mask, is_test=False):
        emissions = self._get_features(sentence)
        if not is_test: # Training，return loss
            loss=-self.crf.forward(emissions, tags, mask, reduction='mean')
            return loss
        else: # Testing，return decoding
            decode=self.crf.decode(emissions, mask)
            return decode

In [8]:
class MacBERT_BiLSTM_CRF(nn.Module):

    def __init__(self, tag_to_ix, embedding_dim=768, hidden_dim=256):
        super(MacBERT_BiLSTM_CRF, self).__init__()
        self.tag_to_ix = tag_to_ix
        self.tagset_size = len(tag_to_ix)
        self.hidden_dim = hidden_dim
        self.embedding_dim = embedding_dim

        config = AutoTokenizer.from_pretrained("hfl/chinese-macbert-base", output_hidden_states=True)
        self.bert = AutoModel.from_pretrained("hfl/chinese-macbert-base", config=config)

        self.lstm = nn.LSTM(input_size=embedding_dim, hidden_size=hidden_dim//2,
                            num_layers=2, bidirectional=True, batch_first=True)
        self.dropout = nn.Dropout(p=0.1)
        self.linear = nn.Linear(hidden_dim, self.tagset_size)
        self.crf = CRF(self.tagset_size, batch_first=True)

    def _get_features(self, sentence):
        with torch.no_grad():
            embeds = self.bert(sentence)['last_hidden_state']
        enc, _ = self.lstm(embeds)
        enc = self.dropout(enc)
        feats = self.linear(enc)
        return feats

    def forward(self, sentence, tags, mask, is_test=False):
        emissions = self._get_features(sentence)
        if not is_test:  # Training, return loss
            loss = -self.crf.forward(emissions, tags, mask, reduction='mean')
            return loss
        else:  # Testing, return decoding
            decode = self.crf.decode(emissions, mask)

In [9]:
class Bert_LoRA_BiLSTM_CRF(nn.Module):
    def __init__(self, tag_to_ix, num_layers_to_finetune, r, lora_alpha, lora_dropout, embedding_dim=768, hidden_dim=256):
        super(Bert_LoRA_BiLSTM_CRF, self).__init__()
        self.tag_to_ix = tag_to_ix
        self.tagset_size = len(tag_to_ix)
        self.num_layers_to_finetune = num_layers_to_finetune
        self.r = r
        self.lora_alpha = lora_alpha
        self.lora_dropout = lora_dropout
        self.hidden_dim = hidden_dim
        self.embedding_dim = embedding_dim
        print(hidden_dim,self.hidden_dim)

        # 创建并加载具有隐藏状态的BERT模型。
        self.bert = AutoModel.from_pretrained('bert-base-chinese')
        # self.bert = AutoModel.from_pretrained('hfl/chinese-llama-2-lora-7b')

        # 微调指定的层。
        self.fine_tune_layers()

        self.lora_config = LoraConfig(
            r = self.r,
            lora_alpha = self.lora_alpha,
            lora_dropout = self.lora_dropout,
            task_type = None,
            # task_type = TaskType.SEQ_CLS, # TypeError: forward() got an unexpected keyword argument 'labels'
            # task_type = TaskType.CAUSAL_LM, # TypeError: forward() got an unexpected keyword argument 'labels'
            # task_type = TaskType.SEQ_2_SEQ_LM, # TypeError: forward() got an unexpected keyword argument 'decoder_input_ids'
            # task_type = TaskType.TOKEN_CLS, # TypeError: forward() got an unexpected keyword argument 'labels'
            inference_mode = True,
        )

        self.bert = get_peft_model(self.bert, self.lora_config)

        self.lstm = nn.LSTM(input_size=embedding_dim, hidden_size=hidden_dim // 2,
                            num_layers=2, bidirectional=True, batch_first=True)
        self.dropout = nn.Dropout(p=0.1)
        self.linear = nn.Linear(hidden_dim, self.tagset_size)
        self.crf = CRF(self.tagset_size, batch_first=True)

    def fine_tune_layers(self):
        # 冻结所有层。
        for param in self.bert.parameters():
            param.requires_grad = False

        # # 解冻指定的层。
        for i in range(12 - self.num_layers_to_finetune, 12):
            for param in self.bert.encoder.layer[i].parameters():
                param.requires_grad = True

    def _get_features(self, sentence,mask):
        # with torch.no_grad():
        #   embeds = self.bert(sentence)['last_hidden_state']
        # enc, _ = self.lstm(embeds)
        # enc = self.dropout(enc)
        # feats = self.linear(enc)
        with torch.no_grad():
           embeds = self.bert(sentence, attention_mask=mask)['last_hidden_state']
        enc, _ = self.lstm(embeds)
        enc = self.dropout(enc)
        feats = self.linear(enc)
        return feats

    def forward(self, sentence, tags, mask, is_test=False):
        emissions = self._get_features(sentence,mask)
        if not is_test: # Training，return loss
            loss=-self.crf.forward(emissions, tags, mask, reduction='mean')
            return loss
        else: # Testing，return decoding
            decode=self.crf.decode(emissions, mask)
            return decode

In [10]:
class ClinicalDistilBERT_BiLSTM_CRF(nn.Module):

    def __init__(self, tag_to_ix, embedding_dim=768, hidden_dim=256):
        super(ClinicalDistilBERT_BiLSTM_CRF, self).__init__()
        self.tag_to_ix = tag_to_ix
        self.tagset_size = len(tag_to_ix)
        self.hidden_dim = hidden_dim
        self.embedding_dim = embedding_dim

        # Build the DistilBERT model, including outputting all hidden states
        config = AutoTokenizer.from_pretrained("medicalai/ClinicalBERT", output_hidden_states=True)
        self.bert = AutoModel.from_pretrained("medicalai/ClinicalBERT", config=config)


        self.lstm = nn.LSTM(input_size=embedding_dim, hidden_size=hidden_dim//2,
                            num_layers=2, bidirectional=True, batch_first=True)
        self.dropout = nn.Dropout(p=0.1)
        self.linear = nn.Linear(hidden_dim, self.tagset_size)
        self.crf = CRF(self.tagset_size, batch_first=True)

    def _get_features(self, sentence):
        with torch.no_grad():
            embeds = self.bert(sentence)['last_hidden_state']
        enc, _ = self.lstm(embeds)
        enc = self.dropout(enc)
        feats = self.linear(enc)
        return feats

    def forward(self, sentence, tags, mask, is_test=False):
        emissions = self._get_features(sentence)
        if not is_test:  # Training, return loss
            loss = -self.crf.forward(emissions, tags, mask, reduction='mean')
            return loss
        else:  # Testing, return decoding
            decode = self.crf.decode(emissions, mask)
            return decode

In [11]:
class Bert_BiLSTM_CRF(nn.Module):

    def __init__(self, tag_to_ix, embedding_dim=768, hidden_dim=256):
        super(Bert_BiLSTM_CRF, self).__init__()
        self.tag_to_ix = tag_to_ix
        self.tagset_size = len(tag_to_ix)
        self.hidden_dim = hidden_dim
        self.embedding_dim = embedding_dim

        # self.bert = BertModel.from_pretrained('bert-base-chinese')
        self.bert = BertModel.from_pretrained('bert-base-chinese',return_dict=False)
        self.lstm = nn.LSTM(input_size=embedding_dim, hidden_size=hidden_dim//2,
                            num_layers=2, bidirectional=True, batch_first=True)
        self.dropout = nn.Dropout(p=0.1)
        self.linear = nn.Linear(hidden_dim, self.tagset_size)
        self.crf = CRF(self.tagset_size, batch_first=True)

    def _get_features(self, sentence):
        with torch.no_grad():
          embeds, _  = self.bert(sentence)
        enc, _ = self.lstm(embeds)
        enc = self.dropout(enc)
        feats = self.linear(enc)
        return feats

    def forward(self, sentence, tags, mask, is_test=False):
        emissions = self._get_features(sentence)
        if not is_test: # Training，return loss
            loss=-self.crf.forward(emissions, tags, mask, reduction='mean')
            return loss
        else: # Testing，return decoding
            decode=self.crf.decode(emissions, mask)
            return decode

In [12]:
class Bert_CRF(nn.Module):

    def __init__(self, tag_to_ix, embedding_dim=768, hidden_dim=768):
        super(Bert_CRF, self).__init__()
        self.tag_to_ix = tag_to_ix
        self.tagset_size = len(tag_to_ix)
        self.hidden_dim = hidden_dim
        self.embedding_dim = embedding_dim

        # self.bert = BertModel.from_pretrained('bert-base-chinese')
        self.bert = BertModel.from_pretrained('bert-base-chinese',return_dict=False)
        self.dropout = nn.Dropout(p=0.1)
        self.linear = nn.Linear(hidden_dim, self.tagset_size)
        self.crf = CRF(self.tagset_size, batch_first=True)

    def _get_features(self, sentence):
        with torch.no_grad():
          embeds, _  = self.bert(sentence)
        enc = self.dropout(embeds)
        feats = self.linear(enc)
        return feats

    def forward(self, sentence, tags, mask, is_test=False):
        emissions = self._get_features(sentence)
        if not is_test: # Training，return loss
            loss=-self.crf.forward(emissions, tags, mask, reduction='mean')
            return loss
        else: # Testing，return decoding
            decode=self.crf.decode(emissions, mask)
            return decode

In [13]:
class Bert_LoRA_CRF(nn.Module):
      def __init__(self, tag_to_ix, num_layers_to_finetune, r, lora_alpha, lora_dropout, embedding_dim=768, hidden_dim=768):
          super(Bert_LoRA_CRF, self).__init__()
          self.tag_to_ix = tag_to_ix
          self.tagset_size = len(tag_to_ix)
          self.num_layers_to_finetune = num_layers_to_finetune
          self.r = r
          self.lora_alpha = lora_alpha
          self.lora_dropout = lora_dropout
          self.hidden_dim = hidden_dim
          self.embedding_dim = embedding_dim


          # 创建并加载具有隐藏状态的BERT模型。
          self.bert = AutoModel.from_pretrained('bert-base-chinese')
          # self.bert = AutoModel.from_pretrained('hfl/chinese-llama-2-lora-7b')

          # 微调指定的层。
          self.fine_tune_layers()

          self.lora_config = LoraConfig(
              r = self.r,
              lora_alpha = self.lora_alpha,
              lora_dropout = self.lora_dropout,
              task_type = None,
              # task_type = TaskType.SEQ_CLS, # TypeError: forward() got an unexpected keyword argument 'labels'
              # task_type = TaskType.CAUSAL_LM, # TypeError: forward() got an unexpected keyword argument 'labels'
              # task_type = TaskType.SEQ_2_SEQ_LM, # TypeError: forward() got an unexpected keyword argument 'decoder_input_ids'
              # task_type = TaskType.TOKEN_CLS, # TypeError: forward() got an unexpected keyword argument 'labels'
              inference_mode = True,
          )

          self.bert = get_peft_model(self.bert, self.lora_config)
          self.dropout = nn.Dropout(p=0.1)
          self.linear = nn.Linear(hidden_dim, self.tagset_size)
          self.crf = CRF(self.tagset_size, batch_first=True)

      def fine_tune_layers(self):
          # 冻结所有层。
          for param in self.bert.parameters():
              param.requires_grad = False

          # # 解冻指定的层。
          for i in range(12 - self.num_layers_to_finetune, 12):
              for param in self.bert.encoder.layer[i].parameters():
                  param.requires_grad = True

      def _get_features(self, sentence,mask):
          # with torch.no_grad():
          #   embeds = self.bert(sentence)['last_hidden_state']
          # enc, _ = self.lstm(embeds)
          # enc = self.dropout(enc)
          # feats = self.linear(enc)
          with torch.no_grad():
            embeds = self.bert(sentence, attention_mask=mask)
          enc = self.dropout(embeds)
          feats = self.linear(enc)
          return feats

      def forward(self, sentence, tags, mask, is_test=False):
          emissions = self._get_features(sentence,mask)
          if not is_test: # Training，return loss
              loss=-self.crf.forward(emissions, tags, mask, reduction='mean')
              return loss
          else: # Testing，return decoding
              decode=self.crf.decode(emissions, mask)
              return decode

In [14]:
class Roberta_wwm_BiLSTM_CRF(nn.Module):
    def __init__(self, tag_to_ix, embedding_dim=768, hidden_dim=256):
        super(Roberta_wwm_BiLSTM_CRF, self).__init__()
        self.tag_to_ix = tag_to_ix
        self.tagset_size = len(tag_to_ix)
        self.hidden_dim = hidden_dim
        self.embedding_dim = embedding_dim

        # Build the DistilBERT model, including outputting all hidden states
        config = AutoTokenizer.from_pretrained("hfl/chinese-roberta-wwm-ext", output_hidden_states=True)
        self.bert = AutoModel.from_pretrained("hfl/chinese-roberta-wwm-ext", config=config)


        self.lstm = nn.LSTM(input_size=embedding_dim, hidden_size=hidden_dim//2,
                            num_layers=2, bidirectional=True, batch_first=True)
        self.dropout = nn.Dropout(p=0.1)
        self.linear = nn.Linear(hidden_dim, self.tagset_size)
        self.crf = CRF(self.tagset_size, batch_first=True)

    def _get_features(self, sentence):
        with torch.no_grad():
            embeds = self.bert(sentence)['last_hidden_state']
        enc, _ = self.lstm(embeds)
        enc = self.dropout(enc)
        feats = self.linear(enc)
        return feats

    def forward(self, sentence, tags, mask, is_test=False):
        emissions = self._get_features(sentence)
        if not is_test:  # Training, return loss
            loss = -self.crf.forward(emissions, tags, mask, reduction='mean')
            return loss
        else:  # Testing, return decoding
            decode = self.crf.decode(emissions, mask)
            return decode

In [15]:
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

def format_parameters(num_params):
    return "{:,}".format(num_params)

In [16]:
VOCAB = ('<PAD>', '[CLS]', '[SEP]', 'O', 'B-BODY', 'I-BODY',
         'B-SYMP', 'I-SYMP', 'B-INST', 'I-INST', 'B-EXAM', 'I-EXAM',
         'B-CHEM', 'I-CHEM','B-DISE', 'I-DISE', 'B-DRUG', 'I-DRUG',
         'B-SUPP', 'I-SUPP', 'B-TREAT', 'I-TREAT', 'B-TIME', 'I-TIME')

tag2idx = {tag: idx for idx, tag in enumerate(VOCAB)}

In [None]:
# 初始化模型
LTbert_bilstm_model = LoRATrainBert_BiLSTM_CRF(tag2idx)
Macbert_bilstm_model = MacBERT_BiLSTM_CRF(tag2idx)
bert_lora_bilstm_model = Bert_LoRA_BiLSTM_CRF(tag2idx, num_layers_to_finetune = 2, r = 4, lora_alpha =16, lora_dropout = 0.05)
clinical_distilbert_model = ClinicalDistilBERT_BiLSTM_CRF(tag2idx)
bert_bilstm_model = Bert_BiLSTM_CRF(tag2idx)
bert_model = Bert_CRF(tag2idx)
bert_lora_model = Bert_LoRA_CRF(tag2idx, num_layers_to_finetune = 2, r = 4, lora_alpha =16, lora_dropout = 0.05)
RobertaWWM_model = Roberta_wwm_BiLSTM_CRF(tag2idx)

In [None]:
# 計算參數量
LTbert_bilstm_params = count_parameters(LTbert_bilstm_model)
Macbert_bilstm_params = count_parameters(Macbert_bilstm_model)
bert_lora_bilstm_params = count_parameters(bert_lora_bilstm_model)
clinical_distilbert_params = count_parameters(clinical_distilbert_model)
bert_bilstm_params = count_parameters(bert_bilstm_model)
bert_params = count_parameters(bert_model)
bert_lora_params = count_parameters(bert_lora_model)
RobertaWWM_params = count_parameters(RobertaWWM_model)

print(f"TrainningBert_BiLSTM_CRF parameters: {format_parameters(LTbert_bilstm_params)}")
print(f"MacBERT_BiLSTM_CRF parameters: {format_parameters(Macbert_bilstm_params)}")
print(f"Bert_LoRA_BiLSTM_CRF parameters: {format_parameters(bert_lora_bilstm_params)}")
print(f"ClinicalDistilBERT_BiLSTM_CRF parameters: {format_parameters(clinical_distilbert_params)}")
print(f"Bert_BiLSTM_CRF parameters: {format_parameters(bert_bilstm_params)}")
print(f"Bert_CRF parameters: {format_parameters(bert_params)}")
print(f"Bert_LoRA_CRF parameters: {format_parameters(bert_lora_params)}")
print(f"Roberta_wwm_BiLSTM_CRF parameters: {format_parameters(RobertaWWM_params)}")