### 한국어 기반 챗봇을 개발하기 위한 선행 과정 (baseline code)

In [None]:
# library import
import math
import numpy as np
import pandas as pd
import random
import re
import torch
import urllib.request
from torch.utils.data import DataLoader, Dataset
from transformers import PreTrainedTokenizerFast
from collections import deque

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# dataset download
# urllib.request.urlretrieve(
#     "https://raw.githubusercontent.com/songys/Chatbot_data/master/ChatbotData.csv",
#     filename="ChatBotData.csv",
# )

Chatbot_Data = pd.read_csv("/content/drive/MyDrive/전남대/수업/24년도/자연어처리/cleaned_ko_nlp.csv")
Chatbot_Data = Chatbot_Data.dropna(axis=0, how="any")
# Chatbot_Data = pd.read_csv("ChatBotData.csv")

# 데이터 중 300개만 취함
# Chatbot_Data = Chatbot_Data[:1000]
Chatbot_Data.head(10)

Unnamed: 0.1,Unnamed: 0,Q,A
0,0,나지금밥머거2시간걸어서<ENTER>번화가찾았어..<ENTER>잉,헐 <ENTER>언넝호텔들가<ENTER>엄청피건할첸데<ENTER>나는인낫러요<ENT...
1,1,헐 <ENTER>언넝호텔들가<ENTER>엄청피건할첸데<ENTER>나는인낫러요<ENT...,오좋겠네
2,2,오좋겠네,잘잣어??
3,3,잘잣어??,아니<ENTER>자지도못했어<ENTER>진짜피씨방에서30분?<ENTER>너가좋아하는...
4,4,아니<ENTER>자지도못했어<ENTER>진짜피씨방에서30분?<ENTER>너가좋아하는...,잉<ENTER>나도줘...<ENTER>내돈가쓰...
5,5,잉<ENTER>나도줘...<ENTER>내돈가쓰...,맛있어<ENTER>고로케도존맛탱
6,6,맛있어<ENTER>고로케도존맛탱,사진찍엇어....???
7,7,사진찍엇어....???,먼사진?
8,8,먼사진?,돈따스!!!!!!!!!!
9,9,돈따스!!!!!!!!!!,안보내줫어?<ENTER>엥보낸줄알았는대<ENTER>사진#<ENTER>이거


In [None]:
# tokenizer
# hugging face 생태계에서는 tokenizer를 자동으로 생성한다. (엄청 좋은 기능)

BOS = "</s>" # 문장의 시작
EOS = "</s>" # 문장의 끝
UNK = "<unk>" # 모르는 단어
PAD = "<pad>" # 배치 크기를 맞추기 위함
MASK = "<unused0>"
ENTER = "<ENTER>"

Q_TKN = "<usr>"
A_TKN = "<sys>"
SENT = "<unused1>"


# 토크나이저 선언
koGPT2_TOKENIZER = PreTrainedTokenizerFast.from_pretrained("skt/kogpt2-base-v2",
                                                           bos_token=BOS, eos_token=EOS, unk_token=UNK, pad_token=PAD, mask_token=MASK)

# NOTE - 토크나이저에 새로운 토큰을 추가하는 방법
# # 새로운 토큰 리스트

# 카톡에서의 ENTER 토큰을 추가함
# new_tokens = ['<ENTER>']

# # 새로운 토큰을 토크나이저에 추가
# koGPT2_TOKENIZER.add_tokens(new_tokens)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer.json:   0%|          | 0.00/2.83M [00:00<?, ?B/s]



config.json:   0%|          | 0.00/1.00k [00:00<?, ?B/s]

The tokenizer class you load from this checkpoint is not the same type as the class this function is called from. It may result in unexpected tokenization. 
The tokenizer class you load from this checkpoint is 'GPT2Tokenizer'. 
The class this function is called from is 'PreTrainedTokenizerFast'.


In [None]:
# 데이터 셋 구성
class ChatbotDataset(Dataset):
    def __init__(self, chats, max_len=40): # 데이터 전처리 수행
        self._data = chats
        self.max_len = max_len
        self.q_token = Q_TKN # 나중에 정의됨. 질문토큰
        self.a_token = A_TKN # 나중에 정의됨. 답변토큰
        self.sent_token = SENT # 전송
        self.eos = EOS
        self.mask = MASK
        self.tokenizer = koGPT2_TOKENIZER

    def __len__(self):
        return len(self._data)

    def __getitem__(self, idx):
        # (return1, return2): return1 - Question, return2 - Answer
        turn = self._data.iloc[idx]
        q = turn["Q"] # 질문 가져온다
        # q = re.sub(r"([?.!,])", r" ", q)

        a = turn["A"]
        # a = re.sub(r"([?.!,])", r" ", a)

        # 일종의 프로토콜
        q_toked = self.tokenizer.tokenize(self.q_token + q + self.sent_token)
        q_len = len(q_toked)

        a_toked = self.tokenizer.tokenize(self.a_token + a + self.eos)
        a_len = len(a_toked)

        # 질문의 길이가 최대 길이보다 큰 경우
        if q_len > self.max_len:
            a_len = self.max_len - q_len # 답변 길이 = 최대길이 - 질문길이

            # 질문의 길이가 최대 길이를 초과한 경우
            if a_len <= 0:
                q_toked = q_toked[-(int(self.max_len / 2)):] # 질문 길이를 최대 길이의 절반으로 줄임 (뒷 내용 절반만 취함)
                q_len = len(q_toked)
                a_len = self.max_len - q_len

            a_toked = a_toked[:a_len]
            a_len = len(a_toked)

        # 질문의 길이 + 답변의 길이가 최대 길이보다 크면
        if q_len + a_len > self.max_len:
            a_len = self.max_len - q_len

            # 질문의 길이가 최대 길이를 초과한다면
            if a_len <= 0:
                q_toked = q_toked[-(int(self.max_len / 2)):]
                q_len = len(q_toked)
                a_len = self.max_len - q_len

            a_toked = a_toked[:a_len]
            a_len = len(a_toked)

        # 답변
        labels = [self.mask,] * q_len + a_toked[1:]

        # mask = 질문길이: 0 + 답변길이: 1 + 나머지: 0
        mask = [0] * q_len + [1] * a_len + [0] * (self.max_len - q_len - a_len)

        # 답변을 수치화 함
        labels_ids = self.tokenizer.convert_tokens_to_ids(labels)

        # 최대 길이만큼 패딩
        while len(labels_ids) < self.max_len:
            labels_ids += [self.tokenizer.pad_token_id]

        # 질문 + 답변을 index로 만듦
        token_ids = self.tokenizer.convert_tokens_to_ids(q_toked + a_toked)

        while len(token_ids) < self.max_len:
            token_ids += [self.tokenizer.pad_token_id]

        return (token_ids, np.array(mask), labels_ids)

In [None]:
# 배치 데이터를 생성함
def collate_batch(batch):
    data = [item[0] for item in batch]
    mask = [item[1] for item in batch]
    label = [item[2] for item in batch]

    return torch.LongTensor(data), torch.LongTensor(mask), torch.LongTensor(label)

In [None]:
train_set = ChatbotDataset(Chatbot_Data, max_len=60)

train_dataloader = DataLoader(train_set, batch_size=32, num_workers=2, shuffle=True, collate_fn=collate_batch)

In [None]:
# 데이터 셋 생성
for batch_idx, samples in enumerate(train_dataloader):
    token_ids, mask, label = samples
    print("token ids: ", token_ids)
    print("mask: ", mask)
    print("label: ", label)
    break

  return torch.LongTensor(data), torch.LongTensor(mask), torch.LongTensor(label)
  return torch.LongTensor(data), torch.LongTensor(mask), torch.LongTensor(label)


token ids:  tensor([[    2, 10394,  6949,  ..., 29382,   405, 30899],
        [    2,   739,   378,  ...,     3,     3,     3],
        [    2,   739,   378,  ..., 33828, 14807,  9320],
        ...,
        [    2,  9265,  7162,  ...,     3,     3,     3],
        [    2,   739, 10765,  ...,     3,     3,     3],
        [    2, 38962, 11097,  ...,     3,     3,     3]])
mask:  tensor([[0, 0, 0,  ..., 1, 1, 1],
        [0, 0, 0,  ..., 0, 0, 0],
        [0, 0, 0,  ..., 1, 1, 1],
        ...,
        [0, 0, 0,  ..., 0, 0, 0],
        [0, 0, 0,  ..., 0, 0, 0],
        [0, 0, 0,  ..., 0, 0, 0]])
label:  tensor([[    9,     9,     9,  ...,   405, 30899,     3],
        [    9,     9,     9,  ...,     3,     3,     3],
        [    9,     9,     9,  ..., 14807,  9320,     3],
        ...,
        [    9,     9,     9,  ...,     3,     3,     3],
        [    9,     9,     9,  ...,     3,     3,     3],
        [    9,     9,     9,  ...,     3,     3,     3]])


### ChatBot 개발

In [None]:
# library import
import torch
from transformers import GPT2LMHeadModel
from transformers import PreTrainedTokenizerFast

In [None]:
tokenizer = PreTrainedTokenizerFast.from_pretrained("skt/kogpt2-base-v2",
                                                    bos_token=BOS, eos_token=EOS, unk_token=UNK, pad_token=PAD, mask_token=MASK)

tokenizer.tokenize("안녕하세요. 만나서 반갑습니다.") # 특별한 방식의 토큰화를 수행함

The tokenizer class you load from this checkpoint is not the same type as the class this function is called from. It may result in unexpected tokenization. 
The tokenizer class you load from this checkpoint is 'GPT2Tokenizer'. 
The class this function is called from is 'PreTrainedTokenizerFast'.


['▁안녕', '하', '세', '요.', '▁만나', '서', '▁반', '갑', '습니다.']

In [None]:
# 모델 정의
model = GPT2LMHeadModel.from_pretrained("skt/kogpt2-base-v2")

pytorch_model.bin:   0%|          | 0.00/513M [00:00<?, ?B/s]

In [None]:
# 샘플 문장 생성
text = "안녕하세요"
input_ids = tokenizer.encode(text)
gen_ids = model.generate(torch.tensor([input_ids]),
                         max_length=128,
                         repetition_penalty=2.0,
                         pad_token_id=tokenizer.pad_token_id,
                         eos_token_id=tokenizer.eos_token_id,
                         bos_token_id=tokenizer.bos_token_id,
                         use_cache=True)

generated = tokenizer.decode(gen_ids[0, :].tolist())
print(generated)

안녕하세요?"
"그럼, 그건 뭐예요, 아저씨. 저는 지금 이 순간에도 괜찮아요. 그리고 제가 할 수 있는 일은 아무것도 없어요.
이제 그만 돌아가고 싶어요.
제가 하는 일이 무엇인지, 제게 주어진 일을 다 해내야 해요.
그리고 이제부터라도 내가 해야할 일들이 무엇인지 생각해봐.
그러면 당신은 어떻게 될까요?
당신이 원하는 것은 무엇이든 이룰 거예요.
하지만 그런 것들에 대해선 아무 것도 하지 않을 거에요.
그래서 나는 당신의 모든 것을 포기해요.
나는 내일도 다시 시작할 겁니다.
내가 하고 싶은 일, 나의 꿈은 어떤 것이든, 그것은 모두


In [None]:
# from pytorch_lightning import Trainer
# from pytorch_lightning.callbacks import ModelCheckpoint
# from pytorch_lightning.core.lightning import LightningModule
# from pytorch_lightning.core.module import LightningModule
from transformers.optimization import AdamW, get_cosine_schedule_with_warmup
import tqdm

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"

# model copy to gpu
model = model.to(device)

In [None]:
# trainig - 후에는 학습된 가중치 불러올 것임
learning_rate = 3e-5
criterion = torch.nn.CrossEntropyLoss(reduction="none")
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

epoch = 20
Sneg = -1e18 # overflow 방지 코드

for epoch in range(epoch):
    for token_ids, mask, label in tqdm.tqdm(train_dataloader):
        token_ids, mask, label = token_ids.to(device), mask.to(device), label.to(device)
        out = model(token_ids)
        out = out.logits

        mask_3d = mask.unsqueeze(dim=2).repeat_interleave(repeats=out.shape[2], dim=2)
        mask_out = torch.where(mask_3d == 1, out, Sneg * torch.ones_like(out))

        loss = criterion(mask_out.transpose(2, 1), label)
        avg_loss = loss.sum() / mask.sum()

        optimizer.zero_grad()
        avg_loss.backward()
        optimizer.step()
    print("LOSS", avg_loss)

  return torch.LongTensor(data), torch.LongTensor(mask), torch.LongTensor(label)
  return torch.LongTensor(data), torch.LongTensor(mask), torch.LongTensor(label)
We strongly recommend passing in an `attention_mask` since your input_ids may be padded. See https://huggingface.co/docs/transformers/troubleshooting#incorrect-output-when-padding-tokens-arent-masked.
  0%|          | 5/91459 [00:06<32:50:38,  1.29s/it]


KeyboardInterrupt: 

In [None]:
state_dict = torch.load("/content/drive/MyDrive/전남대/수업/24년도/자연어처리 Project/fintuning_4.pt", map_location=device)

new_state_dict = {}
for k, v in state_dict.items():
    name = k[7:] if k.startswith('module.') else k  # remove `module.`
    new_state_dict[name] = v

# 학습된 모델 로드
model.load_state_dict(new_state_dict)

<All keys matched successfully>

### 시도 1. 기본 챗 봇

In [None]:
with torch.no_grad():
    while True:
        question = input("user (quit를 입력하면 종료됩니다.) > ").strip()

        # 중단 명령
        if question == "quit":
            break

        answer = ""
        while True:
            input_ids = torch.LongTensor(koGPT2_TOKENIZER.encode(Q_TKN + question + SENT + A_TKN + answer)).unsqueeze(dim=0)
            input_ids = input_ids.to(device)
            pred = model(input_ids)
            pred = pred.logits
            gen = koGPT2_TOKENIZER.convert_ids_to_tokens(torch.argmax(pred, dim=-1).squeeze().cpu().detach().numpy().tolist())[-1]

            if (gen == EOS) or (gen == PAD):
                break

            answer += gen.replace("▁", " ")
            input_ids = torch.cat([input_ids, torch.LongTensor(koGPT2_TOKENIZER.encode(gen)).unsqueeze(dim=0)], dim=1)

        print(f"Chatbot > {answer.strip()}")

user (quit를 입력하면 종료됩니다.) > 안녕
Chatbot > 이모티콘
user (quit를 입력하면 종료됩니다.) > ㅋㅋㅋㅋㅋㅋ<ENTER>뭐해?
Chatbot > 그냥 있어<ENTER>아무것도 안해 ᄏᄏᄏ
user (quit를 입력하면 종료됩니다.) > 이런..<ENTER>뭐라도 해야지!
Chatbot > 이모티콘
user (quit를 입력하면 종료됩니다.) > 에휴
Chatbot > 에휴
user (quit를 입력하면 종료됩니다.) > 오늘 점심 뭐먹지?
Chatbot > 몰라!<ENTER>어디가지?
user (quit를 입력하면 종료됩니다.) > 맛있는거 먹자!<ENTER>나 배고파
Chatbot > 뭐먹지?<ENTER>ᄏᄏᄏᄏᄏᄏᄏᄏᄏᄏ
user (quit를 입력하면 종료됩니다.) > 음..<ENTER>롤링 파스타 어때?
Chatbot > 좋아!
user (quit를 입력하면 종료됩니다.) > 구래!<ENTER>언제 갈래?
Chatbot > 너 편할때!<ENTER>6시 30분?
user (quit를 입력하면 종료됩니다.) > 좋아! 그 때 만나장
Chatbot > 그래!
user (quit를 입력하면 종료됩니다.) > quit


### 시도 2. deque를 이용해 대화 history 만들기

In [None]:
# 대화 히스토리 최대 길이 설정
max_history_length = 5
conversation_history = deque(maxlen=max_history_length)

with torch.no_grad():
    while True:
        question = input("user (quit를 입력하면 종료됩니다.) > ").strip()

        # 중단 명령
        if question == "quit":
            break

        # 사용자 대화 내용 업데이트
        conversation_history.append(question)

        # model 입력 생성
        conversation_str = " ".join(conversation_history)
        # print("queue > ", conversation_str)

        answer = ""
        while True:
            input_ids = torch.LongTensor(koGPT2_TOKENIZER.encode(Q_TKN + conversation_str + SENT + A_TKN + answer)).unsqueeze(dim=0)
            input_ids = input_ids.to(device)
            pred = model(input_ids)
            pred = pred.logits
            gen = koGPT2_TOKENIZER.convert_ids_to_tokens(torch.argmax(pred, dim=-1).squeeze().cpu().detach().numpy().tolist())[-1]

            if (gen == EOS) or (gen == PAD):
                break

            answer += gen.replace("▁", " ")
            input_ids = torch.cat([input_ids, torch.LongTensor(koGPT2_TOKENIZER.encode(gen)).unsqueeze(dim=0)], dim=1)

        print(f"Chatbot > {answer.strip()}")

user (quit를 입력하면 종료됩니다.) > 안녕!
Chatbot > 이모티콘<ENTER>우와!<ENTER>방금일어났다
user (quit를 입력하면 종료됩니다.) > 학교 가기 싫다...
Chatbot > 나도..<ENTER>근데.. 학교 언제까지 있을래?
user (quit를 입력하면 종료됩니다.) > 모르겠어...
Chatbot > 나도..<ENTER>슬슬 출발해야겠다..
user (quit를 입력하면 종료됩니다.) > 오늘 밥 뭐먹을까?<ENTER>나 배고파
Chatbot > 나는 배고파<ENTER>지금 밥 먹는거 아니야!
user (quit를 입력하면 종료됩니다.) > 왜...밥 먹지마?
Chatbot > 헐..<ENTER>왜그러지..<ENTER>뭐 먹었어?
user (quit를 입력하면 종료됩니다.) > 롤링파스타 가자!
Chatbot > 오호라<ENTER>좋아<ENTER>학교예요?
user (quit를 입력하면 종료됩니다.) > 웅! 나 학교 도착했어!<ENTER>너는?
Chatbot > 좋아! 맛있게 먹어!!<EN
user (quit를 입력하면 종료됩니다.) > quit


### 시도 3. Mapping Table 추가

In [None]:
# 대화 히스토리 최대 길이 설정
max_history_length = 5

conversation_history = deque(maxlen=max_history_length)

# 매핑 딕셔너리 초기화
mapping_dict = {}

def extract_key_value(question):
    if ("는" in question or "은" in question) and ("이야" in question or "야" in question):
        try:
            key, value = (question.split("는")[0].strip(), question.split("는")[1].split("이야")[0].strip()) if "는" in question and "이야" in question else (question.split("은")[0].strip(), question.split("은")[1].split("이야")[0].strip()) if "은" in question and "이야" in question else (question.split("는")[0].strip(), question.split("는")[1].split("야")[0].strip()) if "는" in question and "야" in question else (question.split("은")[0].strip(), question.split("은")[1].split("야")[0].strip())
            return key, value
        except:
            return None, None
    else:
        return None, None

with torch.no_grad():
    while True:
        question = input("user (quit를 입력하면 종료됩니다.) > ").strip()

        # 중단 명령
        if question == "quit":
            break

        # 사용자 대화 내용 업데이트
        conversation_history.append(question)

        # 매핑 딕셔너리 업데이트
        key, value = extract_key_value(question)
        if key is not None and value is not None:
            mapping_dict[key] = value
            answer = f"{key}은 {value}이야."
            print(f"Chatbot > {answer}")
            continue

        # 매핑 딕셔너리에 해당하는 답변이 있는지 확인
        for key in mapping_dict.keys():
            if key in question:
                answer = mapping_dict[key]
                print(f"Chatbot > {key}은 {answer}이야!")
                break
        else:
            # model 입력 생성
            conversation_str = " ".join(conversation_history)
            # print("queue > ", conversation_str)

            answer = ""
            while True:
                input_ids = torch.LongTensor(koGPT2_TOKENIZER.encode(Q_TKN + conversation_str + SENT + A_TKN + answer)).unsqueeze(dim=0)
                input_ids = input_ids.to(device)
                pred = model(input_ids)
                pred = pred.logits
                gen = koGPT2_TOKENIZER.convert_ids_to_tokens(torch.argmax(pred, dim=-1).squeeze().cpu().detach().numpy().tolist())[-1]

                if (gen == EOS) or (gen == PAD):
                    break

                answer += gen.replace("▁", " ")
                input_ids = torch.cat([input_ids, torch.LongTensor(koGPT2_TOKENIZER.encode(gen)).unsqueeze(dim=0)], dim=1)

            print(f"Chatbot > {answer.strip()}")

        # 매핑 테이블 출력
        print("Mapping Table:")
        for key, value in mapping_dict.items():
            print(f"{key}: {value}")

user (quit를 입력하면 종료됩니다.) > 뭐하고 있어?
Chatbot > 세미나 준비 후딱 준비!
Mapping Table:
user (quit를 입력하면 종료됩니다.) > ㅋㅋㅋㅋ 내가 문제하나 낼게
Chatbot > 아니!<ENTER>너가 손해인 것 같어!<ENTER>글은 안 써도 괜찮아
Mapping Table:
user (quit를 입력하면 종료됩니다.) > 울 교수님은 조영준 교수님이야.
Chatbot > 울 교수님은 조영준 교수님이야.
user (quit를 입력하면 종료됩니다.) > 자연어 처리 교수님은 김미수 교수님!
Chatbot > ᄏᄏᄏᄏᄏᄏᄏᄏᄏᄏᄏᄏᄏ<ENTER>넹!<
Mapping Table:
울 교수님: 조영준 교수님
user (quit를 입력하면 종료됩니다.) > 교수님이 누구라고?
Chatbot > ᄏᄏᄏᄏᄏᄏᄏᄏᄏᄏᄏᄏᄏᄏ<EN
Mapping Table:
울 교수님: 조영준 교수님
user (quit를 입력하면 종료됩니다.) > 울 교수님은 누구게
Chatbot > 울 교수님은 조영준 교수님이야.
Mapping Table:
울 교수님: 조영준 교수님
user (quit를 입력하면 종료됩니다.) > 오 맞췄어!
Chatbot > 울 교수님26!<ENTER>써줘!
Mapping Table:
울 교수님: 조영준 교수님
user (quit를 입력하면 종료됩니다.) > 자연어 처리 교수님은 김미수 교수님이야
Chatbot > 자연어 처리 교수님은 김미수 교수님이야.
user (quit를 입력하면 종료됩니다.) > 자연어 처리 교수님이 누구라고?
Chatbot > 자연어 처리 교수님은 김미수 교수님이야.
Mapping Table:
울 교수님: 조영준 교수님
자연어 처리 교수님: 김미수 교수님
user (quit를 입력하면 종료됩니다.) > 축하행~
Chatbot > ᄏᄏᄏᄏᄏᄏᄏᄏᄏᄏᄏᄏᄏ<ENTER>화욜부터 지금까지
Mapping Table:
울 교수님: 조영준 교수님
자연어 처리 교수님: 김미수 교수님


KeyboardInterrupt: Interrupted by user

In [None]:
text = "재밌는 이야기 해주세요<ENTER>"
input_ids = tokenizer.encode(text)
gen_ids = model.generate(torch.tensor([input_ids]).to(device),
                         max_length=64,
                         repetition_penalty=2.0,
                         pad_token_id=tokenizer.pad_token_id,
                         eos_token_id=tokenizer.eos_token_id,
                         bos_token_id=tokenizer.bos_token_id,
                         use_cache=True)

generated = tokenizer.decode(gen_ids[0, :].tolist())
print(generated)