In [None]:
import os
import json
import numpy as np

from tqdm import tqdm
from pathlib import Path
from transformers import BertTokenizerFast

In [None]:
!wget https://korquad.github.io/dataset/KorQuAD_v1.0_train.json -O KorQuAD_v1.0_train.json
!wget https://korquad.github.io/dataset/KorQuAD_v1.0_dev.json -O KorQuAD_v1.0_dev.json

In [None]:
def read_squad(path):
  path = Path(path)

  with open(path, 'rb') as f:
    squad_dict = json.load(f)

  contexts = []
  questions = []
  answers = []

  for group in squad_dict['data']:
    for passage in group['paragraphs']:
      context = passage['context']

      for qa in passage['qas']:
        question = qa['question']

        for answer in qa['answers']:
          contexts.append(context)
          questions.append(question)
          answers.append(answer)

  return contexts, questions, answers

In [None]:
train_contexts, train_questions, train_answers = read_squad('KorQuAD_v1.0_train.json')
test_contexts, test_questions, test_answers = read_squad('KorQuAD_v1.0_dev.json')

In [None]:
print('훈련 데이터')
print('본문 개수 :', len(train_contexts))
print('질문 개수 :', len(train_questions))
print('답변 개수 :', len(train_answers))
print('-' * 100)

print('테스트 데이터')
print('본문 개수 :', len(test_contexts))
print('질문 개수 :', len(test_questions))
print('답변 개수 :', len(test_answers))

In [None]:
print(train_contexts[0])

In [None]:
print(train_questions[0])

In [None]:
print(train_answers[0])

In [None]:
def add_end_idx(answers, contexts):
  for answer, context in zip(answers, contexts):
    answer['text'] = answer['text'].rstrip()

    gold_text = answer['text']
    start_idx = answer['answer_start']
    end_idx = start_idx + len(gold_text)

    assert context[start_idx:end_idx] == gold_text, 'Calculate Error'

    answer['answer_end'] = end_idx

In [None]:
add_end_idx(train_answers, train_contexts)
add_end_idx(test_answers, test_contexts)

In [None]:
print(train_answers[0])

In [None]:
tokenizer = BertTokenizerFast.from_pretrained('klue/bert-base')

train_encoding = tokenizer(train_contexts, train_questions, truncation = True, padding = True, max_length = 256)
test_encoding = tokenizer(test_contexts, test_questions, truncation = True, padding = True, max_length = 256)

In [None]:
print('첫 번째 샘플의 토큰화 결과 :', train_encoding[0].tokens)

In [None]:
print('첫 번째 샘플의 길이 :', len(train_encoding[0].tokens))

In [None]:
print('첫 번째 샘플의 어텐션 마스크 :', train_encoding[0].attention_mask)

In [None]:
def add_token_position(encodings, answers):
  start_positions = []
  end_positions = []
  deleting_list = []

  for i in tqdm(range(len(answers))):
    start_positions.append(encodings.char_to_token(i, answers[i]['answer_start']))
    end_positions.append(encodings.char_to_token(i, answers[i]['answer_end'] - 1))

    # 시작 인덱스가 비정상인 경우(본문에 정답이 없는 경우)
    if start_positions[-1] is None:
      start_positions[-1] = tokenizer.model_max_length
      deleting_list.append(i)

    # 종료 인덱스가 비정상인 경우(본문에 정답이 없는 경우)
    if end_positions[-1] is None:
      end_positions[-1] = tokenizer.model_max_length

      if i not in deleting_list:
        deleting_list.append(i)

  encodings.update({'start_positions' : start_positions, 'end_positions' : end_positions})
  return deleting_list

In [None]:
deleting_list_for_train = add_token_position(train_encoding, train_answers)
deleting_list_for_test = add_token_position(test_encoding, test_answers)

In [None]:
print('삭제 예정인 훈련 샘플 ;\n', deleting_list_for_train)
print('삭제 예정인 테스트 샘플 :\n', deleting_list_for_test)

In [None]:
print('761번 샘플의 기존 원문 :\n', train_contexts[761])
print('-' * 200)
print('761번 샘플의 질문 :\n', train_questions[761])

In [None]:
print('761번 샘플의 기존 정답 :', train_answers[761])

In [None]:
print('761번 샘플 전처리 후 :\n', tokenizer.decode(train_encoding['input_ids'][761]))

##### 슬라이딩 윈도우는 구현이 복잡하고 속도가 느리기 때문에 정답이 잘린 데이터는 삭제

In [None]:
def delete_samples(encodings, deleting_list):
  input_ids = np.delete(np.array(encodings['input_ids']), deleting_list, axis = 0)
  attention_masks = np.delete(np.array(encodings['attention_mask']), deleting_list, axis = 0)
  start_positions = np.delete(np.array(encodings['start_positions']), deleting_list, axis = 0)
  end_positions = np.delete(np.array(encodings['end_positions']), deleting_list, axis = 0)

  X_data = [input_ids, attention_masks]
  y_data = [start_positions, end_positions]

  return X_data, y_data

In [None]:
X_train, y_train = delete_samples(train_encoding, deleting_list_for_train)
X_test, y_test = delete_samples(test_encoding, deleting_list_for_test)

In [None]:
print('-------------삭제전-------------')
print('훈련 데이터 샘플의 개수 :', len(train_contexts))
print('테스트 데이터 샘플의 개수 :', len(test_contexts))
print()

print('-------------삭제후-------------')
print('훈련 데이터 샘플의 개수 :', len(X_train[0]))
print('테스트 데이터 샘플의 개수 ;', len(X_test[0]))

# BERT를 이용한 Question Answering

In [None]:
import torch
import torch.nn as nn

from torch.optim import Adam
from torch.utils.data import TensorDataset, DataLoader
from transformers import BertTokenizer, BertModel

In [None]:
class BertForQuestionAnswering(nn.Module):
  def __init__(self, model_name):
    super(BertForQuestionAnswering, self).__init__()
    self.bert = BertModel.from_pretrained(model_name)

    # 출력층에서 사용할 뉴런은 2개이며, 각각 시작 인덱스와 종료 인덱스 예측에 사용된다.
    self.qa_outputs = nn.Linear(self.bert.config.hidden_size, 2)

  def forward(self, input_ids, attention_mask = None):
    outputs = self.bert(input_ids, attention_mask = attention_mask)

    # BERT의 마지막 층의 모든 토큰들
    # outputs[0].shape == (batch_size, 문장 길이, 768)
    # 예로 하나의 데이터가 512개의 단어로 구성되어져 있다면 (batch_size, 512, 768)
    # 동시에 50개의 데이터를 처리한다면 (50, 512, 768)
    sequence_output = outputs[0]

    # 사용할 출력층은 총 뉴런 2개 각각 시작 인덱스 예측과 종료 인덱스 예측에 사용된다.
    logits = self.qa_outputs(sequence_output)

    # 뉴런 2개를 쪼갠다.
    start_logits, end_logits = logits.split(1, dim = -1)

    start_logits = start_logits.squeeze(-1)
    end_logits = end_logits.squeeze(-1)

    start_probs = torch.softmax(start_logits, dim = -1)
    end_probs = torch.softmax(end_logits, dim = -1)

    return start_probs, end_probs

In [None]:
model = BertForQuestionAnswering('klue/bert-base')
loss = nn.CrossEntropyLoss()
optimizer = Adam(model.parameters(), lr = 5e-5)

In [None]:
def create_dataset(X_data, y_data):

  input_ids, attention_masks = X_data
  start_positions, end_positions = y_data

  input_ids = torch.tensor(input_ids, dtype = torch.long)
  attention_masks = torch.tensor(attention_masks, dtype = torch.long)
  start_positions = torch.tensor(start_positions, dtype = torch.long)
  end_positions = torch.tensor(end_positions, dtype = torch.long)

  dataset = TensorDataset(input_ids, attention_masks, start_positions, end_positions)

  return dataset

In [None]:
batch_size = 32

train_data = create_dataset(X_train, y_train)
test_data = create_dataset(X_test, y_test)

train_loader = DataLoader(train_data, batch_size = batch_size, shuffle = True)
test_loader = DataLoader(test_data, batch_size = batch_size)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)
print(device)

# 평가 함수

In [None]:
def evaluation(model, loader, loss, device):

  total_loss = 0.0
  model.eval()

  with torch.no_grad():
    for input_ids, attention_masks, start_positions, end_positions in loader:
      input_ids = input_ids.to(device)
      attention_masks = attention_masks.to(device)
      start_positions = start_positions.to(device)
      end_positions = end_positions.to(device)

      start_probs, end_probs = model(input_ids, attention_mask = attention_masks)

      # 시작 위치와 종료 위치에 대한 손실 계산
      loss_start = loss(start_probs, start_positions)
      loss_end = loss(end_probs, end_positions)

      # 손실의 평균 계산
      batch_loss = (loss_start + loss_end) / 2

      total_loss += batch_loss.item()

  return total_loss / len(loader)

# 학습

In [None]:
epochs = 3

for epoch in range(epochs):

  total_loss = 0.0
  model.train()

  for input_ids, attention_masks, start_positions, end_positions in tqdm(train_loader, total = len(train_loader)):
    input_ids = input_ids.to(device)
    attention_masks = attention_masks.to(device)
    start_positions = start_positions.to(device)
    end_positions = end_positions.to(device)

    start_probs, end_probs = model(input_ids, attention_mask = attention_masks)

    start_loss = loss(start_probs, start_positions)
    end_loss = loss(end_probs, end_positions)
    batch_loss = (start_loss + end_loss) / 2

    optimizer.zero_grad()
    batch_loss.backward()
    optimizer.step()

    total_loss += batch_loss.item()

  avg_loss = total_loss / len(train_loader)
  print(f'Epoch : {epoch + 1} | Loss : {avg_loss}')

  val_loss = evaluation(model, test_loader, loss, device)
  print(f'Epoch : {epoch + 1} | Validation Loss : {val_loss}')

# 예측

In [None]:
def predict(model, input_ids, attention_mask):

  model.eval()
  with torch.no_grad():
    start_probs, end_probs = model(input_ids.unsqueeze(0).to(device), attention_mask = attention_mask.unsqueeze(0).to(device))

  start_index = torch.argmax(start_probs).item()
  end_index = torch.argmax(end_probs).item()

  return start_index, end_index

In [None]:
def display_output(test_data, index, tokenizer):

  # index번호의 테스트 데이터 샘플을 얻는다
  input_ids, attention_mask, start_position, end_position = test_data[index]

  # 임의의 index로부터 테스트 데이터의 질문(question), 본문(context), 정답(true_answer)을 추출하는 과정)
  # decoded_text는 [CLS] 본문 [SEP] 질문 [SEP]의 형태로 구성된 텍스트.
  decoded_text = tokenizer.decode(input_ids)

  # decoded_text로 부터 정답 문자열 추출.
  true_answer = toeknizer.decode(input_ids[start_positions : end_positions + 1])

  # 본문과 질문을 추출하여 각각 context, question에 저장.
  context = decoded_text.split('[SEP]')[0].replace('[CLS]', '').strip()
  question = decoded_text.split('[SEP]')[1].strip()

  start_index, end_index = predict(model, input_ids, attention_mask)
  predicted_answer = tokenizer.decode(input_ids[start_index : end_index + 1])

  print(f'본문 : {context}')
  print(f'질문 : {question}')
  print(f'정답 : {true_answer}')
  print(f'예측 : {predicted_answer}')

In [None]:
display_output(test_data, 15, tokenizer)