In [1]:
!pip install konlpy

Collecting konlpy
  Downloading konlpy-0.6.0-py2.py3-none-any.whl.metadata (1.9 kB)
Collecting JPype1>=0.7.0 (from konlpy)
  Downloading jpype1-1.5.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (4.9 kB)
Downloading konlpy-0.6.0-py2.py3-none-any.whl (19.4 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m19.4/19.4 MB[0m [31m36.8 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading jpype1-1.5.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (493 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m493.8/493.8 kB[0m [31m20.2 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: JPype1, konlpy
Successfully installed JPype1-1.5.1 konlpy-0.6.0


## 데이터셋 불러오기

In [2]:
import numpy as np
import pandas as pd
import random
import re
from konlpy.tag import Okt
from collections import defaultdict
import torch
import torch.nn as nn
from torch import optim
import torch.nn.functional as F
from torch.utils.data.dataset import Dataset

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

df = pd.read_csv('https://raw.githubusercontent.com/songys/Chatbot_data/master/ChatbotData.csv')
df.head()

Unnamed: 0,Q,A,label
0,12시 땡!,하루가 또 가네요.,0
1,1지망 학교 떨어졌어,위로해 드립니다.,0
2,3박4일 놀러가고 싶다,여행은 언제나 좋죠.,0
3,3박4일 정도 놀러가고 싶다,여행은 언제나 좋죠.,0
4,PPL 심하네,눈살이 찌푸려지죠.,0


## 데이터 전처리

In [3]:
okt = Okt()

# Define special tokens
PAD_TOKEN = "<PAD>" # Padding Token
SOS_TOKEN = "<SOS>" # Start of Sequence Token
EOS_TOKEN = "<EOS>" # End of Sequence Token

def preprocessing_text(text):
    # 한글, 영어, 숫자, 공백, ?!.,을 제외한 나머지 문자 제거
    result_text = re.sub('[^ ?,.!A-Za-z0-9가-힣+]', ' ', text)
    result_text = okt.morphs(result_text)
    return [SOS_TOKEN] + result_text + [EOS_TOKEN]

In [4]:
questions = [preprocessing_text(text) for text in df['Q'].values]
answers = [preprocessing_text(text) for text in df['A'].values]

## 단어사전(말뭉치) 설정

In [5]:


# 어휘사전 생성을 위한 defaultdict 사용
vocab = defaultdict(lambda: len(vocab))
vocab[PAD_TOKEN] = 0  # PAD 토큰에 0 인덱스 할당
vocab[SOS_TOKEN] = 1  # SOS 토큰에 1 인덱스 할당
vocab[EOS_TOKEN] = 2  # EOS 토큰에 2 인덱스 할당

# 어휘사전에 토큰 추가
for sentence in questions + answers:
    for token in sentence:
        vocab[token]  # 새로운 토큰에 대해 인덱스 자동 할당

# 문자열에서 인덱스로의 변환 사전
str2idx = dict(vocab)  # 기본 사전 형태로 변환하여 사용

# 인덱스에서 문자열로의 변환 사전 생성
idx2str = {idx: token for token, idx in vocab.items()}

## 데이터셋 설정

In [6]:
from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import Dataset, DataLoader
class ChatBotDataset(Dataset):
    def __init__(self):
        self.questions = questions
        self.answers = answers

    def __len__(self):
        return len(self.questions)

    def __getitem__(self, idx):
        question_text = self.questions[idx]
        answers_text = self.answers[idx]
        question_idx = self.text2idx(question_text)
        answers_idx = self.text2idx(answers_text)
        return torch.tensor(question_idx), torch.tensor(answers_idx)

    def text2idx(self, x):
        return [vocab[token] for token in x]

dataset = ChatBotDataset()

In [7]:
from torch.utils.data import random_split
total_size = len(dataset)
train_size = int(0.8 * total_size)
test_size = total_size - train_size

train_dataset, test_dataset = random_split(dataset, [train_size, test_size])

In [8]:
def collate_fn(batch):
    # 텍스트 데이터와 레이블 데이터를 분리
    question_data = [item[0] for item in batch]
    answers_data = [item[1] for item in batch]

    # 텍스트 데이터만 패딩 처리
    padded_question_data = pad_sequence(question_data, batch_first=True, padding_value=vocab[PAD_TOKEN])
    padded_answers_data = pad_sequence(answers_data, batch_first=True, padding_value=vocab[PAD_TOKEN])

    return padded_question_data, padded_answers_data


# collate_fn : 데이터를 넘겨주기 전에 사용할 함수
# batch_first = True -> 배치를 첫번째 차원으로 (batch_size, sequence_length, features)
train_loader = DataLoader(train_dataset, batch_size=16, collate_fn=collate_fn)
test_loader = DataLoader(test_dataset, batch_size=16, collate_fn=collate_fn)

## 인코더

In [9]:
class Encoder(nn.Module):
    def __init__(self, num_vocabs, hidden_size, embedding_dim, num_layers):
        super(Encoder, self).__init__()

        # 단어 사전의 개수 지정
        self.num_vocabs = num_vocabs
        # 임베딩 레이어 정의 (number of vocabs, embedding dimension)
        self.embedding = nn.Embedding(num_vocabs, embedding_dim)
        # GRU (embedding dimension)
        self.gru = nn.GRU(embedding_dim,
                          hidden_size,
                          num_layers=num_layers,
                          bidirectional=False)

    def forward(self, x):
        x = self.embedding(x).permute(1, 0, 2)
        output, hidden = self.gru(x)
        return output, hidden

### 인코더 단계별로 실행

In [10]:
NUM_VOCABS = len(vocab)
# Encoder 정의
encoder = Encoder(NUM_VOCABS,
                  hidden_size=32,
                  embedding_dim=64,
                  num_layers=1)

In [11]:
embedding_dim = 64 # 임베딩 차원
x, y = next(iter(train_loader))
embedding = nn.Embedding(NUM_VOCABS, embedding_dim)

# x :(batch_size, sequence_length)
# embedding(x) :(batch_size, sequence_length, embedding_dim)
# embedding(x).permute(1, 0, 2) : (sequence_length, batch_size, embedding_dim)
embedded = embedding(x).permute(1, 0, 2)

print(x.shape)
print(embedded.shape)
# input:  (batch_size, sequence_length)
# output: (batch_size, sequence_length, embedding_dim)

torch.Size([16, 15])
torch.Size([15, 16, 64])


In [12]:
hidden_size = 32
embedding_dim = 64 # 임베딩 차원
gru = nn.GRU(embedding_dim,
             hidden_size,
             num_layers=1,
             bidirectional=False,
             batch_first=False, # batch_first=False로 지정
            )
# input       : (sequence_length, batch_size, embedding_dim)
# h0          : (Bidirectional(1) x number of layers(1), batch_size, hidden_size)
o, h = gru(embedded, None)

print(o.shape)
print(h.shape)
# output      : (sequence_length, batch_size, hidden_size x bidirectional(1))
# hidden_state: (bidirectional(1) x number of layers(1), batch_size, hidden_size)

torch.Size([15, 16, 32])
torch.Size([1, 16, 32])


In [13]:
# Encoder 정의
encoder = Encoder(NUM_VOCABS,
                  hidden_size=32,
                  embedding_dim=64,
                  num_layers=1)

In [14]:
# Encoder에 x 통과 후 output, hidden_size 의 shape 확인
# input(x)    : (batch_size, sequence_length)
o, h = encoder(x)

print(o.shape)
print(h.shape)
# output      : (sequence_length, batch_size, hidden_size x bidirectional(1))
# hidden_state: (bidirectional(1) x number of layers(1), batch_size, hidden_size)

torch.Size([15, 16, 32])
torch.Size([1, 16, 32])


## 디코더

In [15]:
class Decoder(nn.Module):
    def __init__(self, num_vocabs, hidden_size, embedding_dim, num_layers=1, dropout=0.2):
        super(Decoder, self).__init__()
        # 단어사전 개수
        self.num_vocabs = num_vocabs
        self.embedding = nn.Embedding(num_vocabs, embedding_dim)
        self.dropout = nn.Dropout(dropout)
        self.gru = nn.GRU(embedding_dim,
                          hidden_size,
                          num_layers=num_layers,
                          bidirectional=False,
                          batch_first = False)

        # 최종 출력은 단어사전의 개수
        self.fc = nn.Linear(hidden_size, num_vocabs)

    def forward(self, x, hidden_state):
        x = x.unsqueeze(0) # (1, batch_size) 로 변환 (1)
        embedded = F.relu(self.embedding(x))
        embedded = self.dropout(embedded)
        output, hidden = self.gru(embedded, hidden_state)
        output = self.fc(output.squeeze(0)) # (sequence_length, batch_size, hidden_size(32) x bidirectional(1))
        return output, hidden

### 디코더 단계적으로 실행

Embedding Layer의 입/출력 shape에 대한 이해

In [16]:
x = torch.abs(torch.randn(size=(1, 32)).long())
print(x)
x.shape
# batch_size = 32 이라 가정했을 때,
# (1, batch_size)
# 여기서 batch_size => (1, batch_size) 로 shape 변환을 선행

tensor([[1, 0, 2, 1, 1, 0, 1, 1, 1, 0, 1, 1, 0, 1, 0, 0, 0, 0, 1, 1, 0, 0, 1, 1,
         0, 0, 0, 0, 0, 0, 0, 0]])


torch.Size([1, 32])

In [17]:
NUM_VOCABS = len(vocab)
embedding_dim= 64# 임베딩 차원
embedding= nn.Embedding(NUM_VOCABS , embedding_dim)

embedded= embedding(x)
embedded.shape
# embedding 출력
# (1, batch_size, embedding_dim)

torch.Size([1, 32, 64])

GRU Layer의 입/출력 shape에 대한 이해

In [18]:
hidden_size = 32

gru = nn.GRU(embedding_dim,
             hidden_size,
             num_layers=1,
             bidirectional=False,
             batch_first=False, # batch_first=False로 지정
            )

o, h = gru(embedded)

print(o.shape)
# output shape: (sequence_length, batch_size, hidden_size(32) x bidirectional(1))
print(h.shape)
# hidden_state shape: (Bidirectional(1) x number of layers(1), batch_size, hidden_size(32))

torch.Size([1, 32, 32])
torch.Size([1, 32, 32])


최종 출력층(FC) shape에 대한 이해

In [19]:
fc = nn.Linear(32, NUM_VOCABS) # 출력은 단어사전의 개수로 가정

output = fc(o[0])

print(o[0].shape)
print(output.shape)
# input : (batch_size, output from GRU)
# output: (batch_size, output dimension)

torch.Size([32, 32])
torch.Size([32, 12659])


인코더 -> 디코더 입출력 shape

In [20]:
decoder = Decoder(num_vocabs=NUM_VOCABS,
                  hidden_size=32,
                  embedding_dim=64,
                  num_layers=1)

In [21]:
x, y = next(iter(train_loader))

o, h = encoder(x)

print(o.shape, h.shape)
# output      : (sequence_length, batch_size, hidden_size x bidirectional(1))
# hidden_state: (Bidirectional(1) x number of layers(1), batch_size, hidden_size(32))

torch.Size([15, 16, 32]) torch.Size([1, 16, 32])


In [22]:
x = torch.abs(torch.full(size=(16,), fill_value=vocab[SOS_TOKEN]).long())
print(x)
x.shape
# batch_size = 16 이라 가정(16개의 SOS 토큰)

tensor([1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1])


torch.Size([16])

In [23]:
decoder_output, decoder_hidden = decoder(x, h)
decoder_output.shape, decoder_hidden.shape
# (batch_size, num_vocabs), (1, batch_size, hidden_size)

(torch.Size([16, 12659]), torch.Size([1, 16, 32]))

## Seq2Seq

In [24]:
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, device):
        super(Seq2Seq, self).__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.device = device

    def forward(self, inputs, outputs, teacher_forcing_ratio=0.5):
        # inputs : (batch_size, sequence_length)
        # outputs: (batch_size, sequence_length)

        batch_size, output_length = outputs.shape
        output_num_vocabs = self.decoder.num_vocabs

        # 리턴할 예측된 outputs를 저장할 임시 변수
        # (sequence_length, batch_size, num_vocabs)
        predicted_outputs = torch.zeros(output_length, batch_size, output_num_vocabs).to(self.device)

        # 인코더에 입력 데이터 주입, encoder_output은 버리고 hidden_state 만 살립니다.
        # 여기서 hidden_state가 디코더에 주입할 context vector 입니다.
        # (Bidirectional(1) x number of layers(1), batch_size, hidden_size)
        _, decoder_hidden = self.encoder(inputs)

        # (batch_size) shape의 SOS TOKEN으로 채워진 디코더 입력 생성
        decoder_input = torch.full((batch_size,), vocab[SOS_TOKEN], device=self.device)

        # 순회하면서 출력 단어를 생성합니다.
        # 0번째는 SOS TOKEN이 위치하므로, 1번째 인덱스부터 순회합니다.
        for t in range(0, output_length):
            # decoder_input : 디코더 입력 (batch_size) 형태의 SOS TOKEN로 채워진 입력
            # decoder_output: (batch_size, num_vocabs)
            # decoder_hidden: (Bidirectional(1) x number of layers(1), batch_size, hidden_size), context vector와 동일 shape
            decoder_output, decoder_hidden = self.decoder(decoder_input, decoder_hidden)

            # t번째 단어에 디코더의 output 저장
            predicted_outputs[t] = decoder_output

            # teacher forcing 적용 여부 확률로 결정
            # teacher forcing 이란: 정답치를 다음 RNN Cell의 입력으로 넣어주는 경우. 수렴속도가 빠를 수 있으나, 불안정할 수 있음
            teacher_force = random.random() < teacher_forcing_ratio

            # top1 단어 토큰 예측
            top1 = decoder_output.argmax(1)

            # teacher forcing 인 경우 ground truth 값을, 그렇지 않은 경우, 예측 값을 다음 input으로 지정
            decoder_input = outputs[:, t] if teacher_force else top1
        return predicted_outputs.permute(1, 0, 2) # (batch_size, sequence_length, num_vocabs)로 변경

### seq2seq 단계적 확인

In [27]:
# Encoder 정의
encoder = Encoder(num_vocabs=len(vocab),
                       hidden_size=32,
                       embedding_dim=64,
                       num_layers=1)
# Decoder 정의
decoder = Decoder(num_vocabs=len(vocab),
                       hidden_size=32,
                       embedding_dim=64,
                       num_layers=1)
# Seq2Seq 정의
seq2seq = Seq2Seq(encoder, decoder, 'cpu')

In [28]:
x, y = next(iter(train_loader))
print(x.shape, y.shape)
# (batch_size, sequence_length), (batch_size, sequence_length)

torch.Size([16, 15]) torch.Size([16, 17])


In [29]:
output= seq2seq(x, y)
print(output.shape)
# (batch_size, sequence_length, num_vocabs)

torch.Size([16, 17, 12659])


## 인코더 디코더 정의

In [30]:
NUM_VOCABS = len(vocab)
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

HIDDEN_SIZE = 512
EMBEDDIMG_DIM = 256

print(f'num_vocabs: {NUM_VOCABS}\n======================')

# Encoder 정의
encoder = Encoder(num_vocabs=NUM_VOCABS,
                  hidden_size=HIDDEN_SIZE,
                  embedding_dim=EMBEDDIMG_DIM,
                  num_layers=1)
# Decoder 정의
decoder = Decoder(num_vocabs=NUM_VOCABS,
                  hidden_size=HIDDEN_SIZE,
                  embedding_dim=EMBEDDIMG_DIM,
                  num_layers=1)

# Seq2Seq 생성
# encoder, decoder를 device 모두 지정
model = Seq2Seq(encoder.to(device), decoder.to(device), device)
print(model)

num_vocabs: 12659
Seq2Seq(
  (encoder): Encoder(
    (embedding): Embedding(12659, 256)
    (gru): GRU(256, 512)
  )
  (decoder): Decoder(
    (embedding): Embedding(12659, 256)
    (dropout): Dropout(p=0.2, inplace=False)
    (gru): GRU(256, 512)
    (fc): Linear(in_features=512, out_features=12659, bias=True)
  )
)


## 학습 함수

In [31]:
LR = 1e-3
optimizer = optim.Adam(model.parameters(), lr=LR)
loss_fn = nn.CrossEntropyLoss(ignore_index=vocab[PAD_TOKEN])

def train(model, data_loader, optimizer, loss_fn, device):
    model.train()
    running_loss = 0

    for x, y in data_loader:
        x, y = x.to(device), y.to(device)

        optimizer.zero_grad()

        # output: (batch_size, sequence_length, num_vocabs)
        output = model(x, y)
        output_dim = output.size(2) # num_vocabs

        # 1번 index 부터 슬라이싱한 이유는 0번 index가 SOS TOKEN 이기 때문
        # (batch_size*sequence_length, num_vocabs) 로 변경
        output = output.reshape(-1, output_dim)

        # (batch_size,sequence_length) -> (batch_size*sequence_length) 로 변경
        y = y.view(-1)

        # Loss 계산
        loss = loss_fn(output, y)
        loss.backward()
        optimizer.step()

				# loss.item() : 배치의 평균 오차
				#  x.size(0) : 배치 수
        running_loss += loss.item() * x.size(0) # 배치의 토탈 오차
        # 위와 같이 하는 이유는 배치 사이즈가 다를 수 있어서
    return running_loss / len(data_loader)

## 평가함수

In [32]:
def evaluate(model, data_loader, loss_fn, device):
    model.eval()

    eval_loss = 0

    with torch.no_grad():
        for x, y in data_loader:
            x, y = x.to(device), y.to(device)
            output = model(x, y)
            output_dim = output.size(2)
            output = output.reshape(-1, output_dim)
            y = y.view(-1)

            # Loss 계산
            loss = loss_fn(output, y)

            eval_loss += loss.item() * x.size(0)

    return eval_loss / len(data_loader)

## 모델의 결과를 한글로 변환

In [33]:
def sequence_to_sentence(sequences, index2word):
    outputs = []
    for p in sequences:

        word = index2word[p]
        if p not in [SOS_TOKEN, EOS_TOKEN, PAD_TOKEN]:
            outputs.append(word)
        if word == EOS_TOKEN:
            break
    return ' '.join(outputs)

## 랜덤으로 샘플링 해서 결과 확인

In [34]:
from torch.utils.data import DataLoader, SubsetRandomSampler
def random_evaluation(model, dataset, index2word, device, n=10):

    n_samples = len(dataset)
    indices = list(range(n_samples))
    np.random.shuffle(indices)      # Shuffle
    sampled_indices = indices[:n]   # Sampling N indices

    # 샘플링한 데이터를 기반으로 DataLoader 생성
    sampler = SubsetRandomSampler(sampled_indices)
    sampled_dataloader = DataLoader(dataset, batch_size=10, sampler=sampler, collate_fn = collate_fn)

    model.eval()
    with torch.no_grad():
        for x, y in sampled_dataloader:
            x, y = x.to(device), y.to(device)
            output = model(x, y, teacher_forcing_ratio=0)
            # output: (number of samples, sequence_length, num_vocabs)

            preds = output.detach().cpu().numpy()
            x = x.detach().cpu().numpy()
            y = y.detach().cpu().numpy()

            for i in range(n):
                print(f'질문   : {sequence_to_sentence(x[i], index2word)}')
                print(f'답변   : {sequence_to_sentence(y[i], index2word)}')
                print(f'예측답변: {sequence_to_sentence(preds[i].argmax(1), index2word)}')
                print('==='*10)

## 모델 학습 및 저장

In [35]:
# NUM_EPOCHS = 10
# STATEDICT_PATH = 'seq2seq-chatbot-kor.pt'

# best_loss = np.inf

# for epoch in range(NUM_EPOCHS):
#     loss = train(model, train_loader, optimizer, loss_fn, device)

#     val_loss = evaluate(model, test_loader, loss_fn, device)

#     if val_loss < best_loss:
#         best_loss = val_loss
#         torch.save(model.state_dict(), STATEDICT_PATH)

#     if epoch % 5 == 0:
#         print(f'epoch: {epoch+1}, loss: {loss:.4f}, val_loss: {val_loss:.4f}')


# torch.save(model, STATEDICT_PATH)

## 모델 불러오기

In [36]:
STATEDICT_PATH = 'seq2seq-chatbot-kor.pt'
model.load_state_dict(torch.load(STATEDICT_PATH))
random_evaluation(model, test_dataset, idx2str, device, n=10)

  model.load_state_dict(torch.load(STATEDICT_PATH))


RuntimeError: Attempting to deserialize object on a CUDA device but torch.cuda.is_available() is False. If you are running on a CPU-only machine, please use torch.load with map_location=torch.device('cpu') to map your storages to the CPU.