앙상블 학습(BERT)

In [1]:
# 후속문장(이어지는 문장)이면 1, 아니면 0
dataset = [["What music do you like?", "I like Rock music.", 1],
           ["What is your favorite food?", "I like sushi the best", 1],
           ["What is your favorite color?", "I'm going to be a doctor", 0],
           ["What is your favorite song?", "Tokyo olympic game in 2020 was postponed", 0],
           ["Do you like watching TV shows?", "Yeah, I often watch it in my spare time", 1]]

In [28]:
from transformers import BertPreTrainedModel, BertConfig, BertModel, BertTokenizer, AdamW

from torch import nn

#앙상블 학습을 위한 클래스

class BertEnsembleForNextSentencePrediction(BertPreTrainedModel):

  def __init__(self, config, *args, **kwargs):

      super().__init__(config)

      # QA(Question, Answer) BERT 모델

      self.bert_model_1 = BertModel(config)

      # AQ(Answer, Question) BERT 모델

      self.bert_model_2 = BertModel(config)

      # 선형함수

      self.cls = nn.Linear(2 * self.config.hidden_size, 2)

      # 초기 가중치

      self.init_weights()

  def forward(

          self,

          input_ids=None,

          attention_mask=None,

          token_type_ids=None,

          position_ids=None,

          head_mask=None,

          inputs_embeds=None,

          next_sentence_label=None,

  ):

    outputs = []

    # input_ids 첫번째 입력(문장) 저장

    input_ids_1 = input_ids[0]

    # input_ids 첫번째 입력(문장)의 attention_mask 저장

    attention_mask_1 = attention_mask[0]

    # bert_model_1에 input_ids_1 투입한 결과를 outputs에 순차적으로 저장

    outputs.append(self.bert_model_1(input_ids_1,

                                     attention_mask=attention_mask_1))

    # input_ids 두번째 입력(문장) 저장

    input_ids_2 = input_ids[1]

    # input_ids 두번째 입력(문장)의 attention_mask 저장

    attention_mask_2 = attention_mask[1]

    # bert_model_2에 input_ids_2 투입한 결과를 outputs에 순차적으로 저장

    outputs.append(self.bert_model_2(input_ids_2,

                                     attention_mask=attention_mask_2))

    # torch.cat()로 텐서 병합

    last_hidden_states = torch.cat([output[1] for output in outputs], dim=1)

    logits = self.cls(last_hidden_states)

    if next_sentence_label is not None:

      loss_fct = nn.CrossEntropyLoss(ignore_index=-1)

      next_sentence_loss = loss_fct(logits.view(-1, 2), next_sentence_label.view(-1))

      return next_sentence_loss, logits

    else:

      return logits

In [29]:
import torch
from torch.optim import AdamW

# 로컬에서 실행할 경우 메모리 부족으로 cpu에서 실행
device = torch.device("cpu")

# 모델 및 config 설정
config = BertConfig()
model = BertEnsembleForNextSentencePrediction(config)
model.to(device)
tokenizer = BertTokenizer.from_pretrained("bert-base-cased")
learning_rate = 1e-5
no_decay = ["bias", "LayerNorm.weight"]

# 최적화 함수 그룹 파라미터 설정
optimizer_grouped_parameters = [{
  "params": [p for n, p in model.named_parameters() if not any(nd in n for nd in no_decay)],
  }]

# 최적화 함수 설정
optimizer = AdamW(optimizer_grouped_parameters, lr=learning_rate)

In [30]:
#데이터 증강 처리 함수
# 질문-답변 문장을 답변-질문 순서로 바꿔서 추가

def prepare_data(dataset, qa=True):
  input_ids, attention_masks = [], []
  labels = []
  for point in dataset:
    if qa is True:
      # point에 있는 3개의 원소를 앞에 요소부터 q, a, _ 으로
      q, a, _ = point
    else:
      # point에 있는 3개의 원소를 앞에 요소부터 a, q, _ 으로
      a, q, _ = point
    # q와 a를 토크나이저를 통해 인코딩
    encoded_dict = tokenizer.encode_plus(
      q,  # 문장 1 인코딩
      a,  # 문장 2 인코딩
      add_special_tokens=True,  # 특수 토큰인 [CLS]와 [SEP] 생성
      max_length=128,
      pad_to_max_length=True,
      return_attention_mask=True,  # attention_mask 생성(패딩 처리된 부분은 1로 표시)
      return_tensors='pt',
      truncation=True
    )
    input_ids.append(encoded_dict["input_ids"])
    attention_masks.append(encoded_dict["attention_mask"])
    labels.append(point[-1])

  # input_ids를 첫번째 축(dim=0), 즉 세로 방향으로 병합
  input_ids = torch.cat(input_ids, dim=0)

  # attention_mask를 첫번째 축(dim=0), 즉 세로 방향으로 병합
  attention_masks = torch.cat(attention_masks, dim=0)
  return input_ids, attention_masks, labels

In [31]:
import numpy as np

from torch.utils.data import DataLoader, RandomSampler, Dataset, SequentialSampler

# QADataset 클래스 생성

class QADataset(Dataset):

  def __init__(self, input_ids, attention_masks, labels=None):
    self.input_ids = np.array(input_ids)
    self.attention_masks = np.array(attention_masks)
    self.labels = torch.tensor(labels, dtype=torch.long)

  def __getitem__(self, index):
    return self.input_ids[index], self.attention_masks[index], self.labels[index]

  def __len__(self):
    return self.input_ids.shape[0]

In [32]:
input_ids_qa, attention_masks_qa, labels_qa = prepare_data(dataset)

train_dataset_qa = QADataset(input_ids_qa, attention_masks_qa, labels_qa)

input_ids_aq, attention_masks_aq, labels_aq = prepare_data(dataset, qa=False)

train_dataset_aq = QADataset(input_ids_aq, attention_masks_aq, labels_aq)

dataloader_qa =  DataLoader(dataset=train_dataset_qa,
                            #batch_size=5,
                            batch_size=1,
                            sampler=SequentialSampler(train_dataset_qa))
dataloader_aq =  DataLoader(dataset=train_dataset_aq,
                            #batch_size=5,
                            batch_size=1,
                            sampler=SequentialSampler(train_dataset_aq))

In [33]:
import gc
torch.cuda.empty_cache()
gc.collect()

7828

In [36]:
# 앙상블 학습 파인튜닝
epochs = 3

for epoch in range(epochs):
    # dataloader와 dataloader_aq pair를 반복처리
    for step, combined_batch in enumerate(zip(dataloader_qa, dataloader_aq)):
        batch_1, batch_2 = combined_batch
        model.train()

        inputs = {
            "input_ids" : [batch_1[0], batch_2[0]],
            "attention_mask": [batch_1[1], batch_2[1]],
            "next_sentence_label": batch_1[2]
        }

        outputs = model(**inputs)

        loss = outputs[0]
        loss.backward()
        print(f'epoch: {epoch+1}, loss: {loss:.4f}')
        optimizer.step()
        model.zero_grad()

epoch: 1, loss: 0.0956
epoch: 1, loss: 0.2618
epoch: 1, loss: 1.3374
epoch: 1, loss: 1.0743
epoch: 1, loss: 0.2474
epoch: 2, loss: 0.2099
epoch: 2, loss: 0.3128
epoch: 2, loss: 0.4954
epoch: 2, loss: 0.5793
epoch: 2, loss: 0.6046
epoch: 3, loss: 0.3115
epoch: 3, loss: 0.7186
epoch: 3, loss: 0.3524
epoch: 3, loss: 0.3091
epoch: 3, loss: 0.5405


In [40]:
# 모델 테스트(학습용 데이터)

input_ids_qa, attention_masks_qa, labels_qa = prepare_data(dataset)

test_dataset_qa = QADataset(input_ids_qa, attention_masks_qa, labels_qa)

input_ids_aq, attention_masks_aq, labels_aq = prepare_data(dataset, qa=False)

test_dataset_aq = QADataset(input_ids_aq, attention_masks_aq, labels_aq)

dataloader_qa =  DataLoader(dataset=test_dataset_qa,
                            batch_size=16,
                            sampler=SequentialSampler(test_dataset_qa))

dataloader_aq =  DataLoader(dataset=test_dataset_aq,
                            batch_size=16,
                            sampler=SequentialSampler(test_dataset_aq))
complete_outputs, complete_label_ids = [], []

for step, combined_batch in enumerate(zip(dataloader_qa, dataloader_aq)):
  model.eval()
  batch_1, batch_2 = combined_batch
  batch_1 = tuple(t.to(device) for t in batch_1)
  batch_2 = tuple(t.to(device) for t in batch_2)
  with torch.no_grad():

    inputs = {
        "input_ids": [batch_1[0], batch_2[0]],
        "attention_mask": [batch_1[1], batch_2[1]],
        "next_sentence_label": batch_1[2]
    }

    outputs = model(**inputs)
    tmp_eval_loss, logits = outputs[:2]
    logits = logits.detach().cpu().numpy()
    outputs = np.argmax(logits, axis=1)
    label_ids = inputs["next_sentence_label"].detach().cpu().numpy()
  complete_outputs.extend(outputs)
  complete_label_ids.extend(label_ids)

print(complete_outputs, complete_label_ids)
#예측과 실제값

[1, 1, 0, 0, 1] [1, 1, 0, 0, 1]


In [41]:
# 모델 테스트(새로운 문장)

dataset = [["What music do you like?", "I like Rock music.", 1]]

input_ids_qa, attention_masks_qa, labels_qa = prepare_data(dataset)
test_dataset_qa = QADataset(input_ids_qa, attention_masks_qa, labels_qa)

input_ids_aq, attention_masks_aq, labels_aq = prepare_data(dataset, qa=False)
test_dataset_aq = QADataset(input_ids_aq, attention_masks_aq, labels_aq)

dataloader_qa =  DataLoader(dataset=test_dataset_qa,
                            batch_size=16,
                            sampler=SequentialSampler(test_dataset_qa))

dataloader_aq =  DataLoader(dataset=test_dataset_aq,
                            batch_size=16,
                            sampler=SequentialSampler(test_dataset_aq))

complete_outputs, complete_label_ids = [], []

for step, combined_batch in enumerate(zip(dataloader_qa, dataloader_aq)):
  model.eval()
  batch_1, batch_2 = combined_batch
  batch_1 = tuple(t.to(device) for t in batch_1)
  batch_2 = tuple(t.to(device) for t in batch_2)

  with torch.no_grad():
    inputs = {
        "input_ids": [batch_1[0], batch_2[0]],
        "attention_mask": [batch_1[1], batch_2[1]],
        "next_sentence_label": batch_1[2]
    }

    outputs = model(**inputs)

    tmp_eval_loss, logits = outputs[:2]
    logits = logits.detach().cpu().numpy()
    outputs = np.argmax(logits, axis=1)
    label_ids = inputs["next_sentence_label"].detach().cpu().numpy()

  complete_outputs.extend(outputs)
  complete_label_ids.extend(label_ids)

print(complete_outputs, complete_label_ids)

[1] [1]
