# Q1. Seq2seq 모델 구현을 통해 간단한 챗봇을 구현해봅시다!

**하단의 패키지를 모두 실행해주세요!**


In [None]:
import re
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import random

from keras import Input, Model
from keras.layers import Embedding, LSTM, Dropout, Dense
from keras.preprocessing.text import Tokenizer
from keras.preprocessing.sequence import pad_sequences

from tensorflow.keras.utils import to_categorical
from tensorflow.keras.optimizers import RMSprop

from torch.utils.data.dataset import Dataset
import torch.nn.functional as F
import torch
import torch.nn as nn
from torch import optim

import warnings
warnings.filterwarnings("ignore")

<br>

**데이터를 불러옵시다**

In [None]:
df=pd.read_csv('/content/dialogs.txt',sep='\t',names=['question','answer'])
df.head()

Unnamed: 0,question,answer
0,"hi, how are you doing?",i'm fine. how about yourself?
1,i'm fine. how about yourself?,i'm pretty good. thanks for asking.
2,i'm pretty good. thanks for asking.,no problem. so how have you been?
3,no problem. so how have you been?,i've been great. what about you?
4,i've been great. what about you?,i've been good. i'm in school right now.


<br>

**1.데이터의 각 컬럼들을 .values를 이용해 다음의 변수에 저장해주세요. (1점)**

In [None]:
data_q =df['question'].values
data_a =df['answer'].values

<br>

**2.텍스트 데이터 전처리를 진행해보겠습니다. 하단의 빈칸을 올바르게 채워주세요 (3점)**

In [None]:
from collections import Counter

#정규표현식을 이용해 특수문자 정제
def clean_text(sent):
    return re.sub(r'[!“”"#$%&\'()*+,-./:;<=>?@[\]^_`{|}~]', '', sent)

count_words_ques = [len(clean_text(ques).split()) for ques in data_q]
counter_words_ques = Counter(count_words_ques)

#단어 수가 15 이하인 문장만 선별
sorted_q = []
sorted_a = []
for i,count in enumerate(count_words_ques):
    if count <= 15:
        sorted_q.append(data_q[i])
        sorted_a.append(data_a[i])

In [None]:
#답변 데이터에 대해 문장의 시작, 끝 식별자를 추가합니다.
sorted_a = ['<START> '+ answ + ' <END>' for answ in sorted_a]

In [None]:
tokenizer = Tokenizer(filters='', lower=False)
txt = sorted_q + sorted_a

#단어 집합 생성해주세요!
tokenizer.fit_on_texts(txt)

In [None]:
VOCAB_SIZE = len(tokenizer.word_index) + 1
print(f'Vocabulary size : {VOCAB_SIZE}')

Vocabulary size : 4038


<br>

**3.인코더와 디코더를 만들어보겠습니다. 하단의 빈칸을 올바르게 채워주세요(3점)**

In [None]:
# encoder
tokenized_questions = tokenizer.texts_to_sequences(sorted_q)
maxlen_questions = 15
encoder_inp = pad_sequences(tokenized_questions,
                            maxlen=maxlen_questions,
                            padding='post')

print(encoder_inp.shape)
print(sorted_q[0])
print(tokenized_questions[0])
print(encoder_inp[0])

(3713, 15)
hi, how are you doing?
[1787, 34, 16, 4, 479]
[1787   34   16    4  479    0    0    0    0    0    0    0    0    0
    0]


In [None]:
# decoder
tokenized_answers =  tokenizer.texts_to_sequences(sorted_a)
maxlen_answers = np.max([len(x) for x in tokenized_answers])
decoder_inp = pad_sequences(tokenized_answers,
                            maxlen=maxlen_answers,
                            padding='post')

print(decoder_inp.shape)
print(sorted_a[0])
print(tokenized_answers[0])
print(decoder_inp[0])

(3713, 21)
<START> i'm fine. how about yourself? <END>
[1, 28, 1016, 34, 33, 825, 2]
[   1   28 1016   34   33  825    2    0    0    0    0    0    0    0
    0    0    0    0    0    0    0]


In [None]:
for i in range(len(tokenized_answers)):
    tokenized_answers[i] = tokenized_answers[i][1:]

padded_answers = pad_sequences(tokenized_answers, maxlen=maxlen_answers, padding='post')
decoder_final_output = to_categorical(padded_answers, VOCAB_SIZE)

print(decoder_final_output.shape)
print(tokenized_answers[0])
print(padded_answers[0])
print(decoder_final_output[0])

(3713, 21, 4038)
[28, 1016, 34, 33, 825, 2]
[  28 1016   34   33  825    2    0    0    0    0    0    0    0    0
    0    0    0    0    0    0    0]
[[0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 ...
 [1. 0. 0. ... 0. 0. 0.]
 [1. 0. 0. ... 0. 0. 0.]
 [1. 0. 0. ... 0. 0. 0.]]


In [None]:
enc_inputs = Input(shape=(None,))
enc_embedding = Embedding(input_dim=VOCAB_SIZE,
                           output_dim=200, mask_zero=True)(enc_inputs)
_, state_h, state_c = LSTM(200, return_state=True)(enc_embedding)
enc_states = [state_h, state_c]

dec_inputs = Input(shape=(None,))
dec_embedding = Embedding(input_dim=VOCAB_SIZE,
                          output_dim=200, mask_zero=True)(dec_inputs)
dec_lstm = LSTM(200, return_state=True, return_sequences=True)

dec_outputs, _, _ = dec_lstm(dec_embedding, initial_state=enc_states)
dec_dense = Dense(VOCAB_SIZE, activation='softmax')
output = dec_dense(dec_outputs)

<br>

**4.모델 학습을 진행해봅시다 (2점)**

In [None]:
model = Model([enc_inputs, dec_inputs], output)
model.compile(optimizer=RMSprop(), loss='categorical_crossentropy')
model.summary()

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_1 (InputLayer)        [(None, None)]               0         []                            
                                                                                                  
 input_2 (InputLayer)        [(None, None)]               0         []                            
                                                                                                  
 embedding (Embedding)       (None, None, 200)            807600    ['input_1[0][0]']             
                                                                                                  
 embedding_1 (Embedding)     (None, None, 200)            807600    ['input_2[0][0]']             
                                                                                              

In [None]:
#에포크와 배치크기를 자유롭게 지정해주세요! 에포크는 클 수록 결과가 좋습니다
model.fit([encoder_inp, decoder_inp],
           decoder_final_output,
           batch_size=64,
           epochs=1)



<keras.src.callbacks.History at 0x7bf2933c7d60>

In [None]:
#그대로 실행시켜주세요!
def make_inference_models():
    dec_state_input_h = Input(shape=(200,))
    dec_state_input_c = Input(shape=(200,))
    dec_states_inputs = [dec_state_input_h, dec_state_input_c]
    dec_outputs, state_h, state_c = dec_lstm(dec_embedding,
                                            initial_state=dec_states_inputs)
    dec_states = [state_h, state_c]
    dec_outputs = dec_dense(dec_outputs)
    dec_model = Model(
        inputs=[dec_inputs] + dec_states_inputs,
        outputs = [dec_outputs] + dec_states)
    print('Inference decoder:')
    dec_model.summary()
    print('Inference encoder:')
    enc_model = Model(inputs=enc_inputs, outputs=enc_states)
    enc_model.summary()
    return enc_model, dec_model

def str_to_tokens(sentence):
    #words = sentence.lower().split()
    words = sentence.split()
    tokens_list = list()
    for current_word in words:
        result = tokenizer.word_index.get(current_word)
        if result != '':
            tokens_list.append(result)
    return pad_sequences([tokens_list], maxlen=maxlen_questions,padding='post')

enc_model, dec_model = make_inference_models()

Inference decoder:
Model: "model_1"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_2 (InputLayer)        [(None, None)]               0         []                            
                                                                                                  
 embedding_1 (Embedding)     (None, None, 200)            807600    ['input_2[0][0]']             
                                                                                                  
 input_3 (InputLayer)        [(None, 200)]                0         []                            
                                                                                                  
 input_4 (InputLayer)        [(None, 200)]                0         []                            
                                                                         

In [None]:
#역시 그대로 실행해주세요!
def chatbot():
    print('Bot: Hi, good to see you')

    while True:
        input_question = input('Question: ')

        if input_question == 'bye':
            print('Bot answer: Have a wonderful day :)')
            break
        states_values = enc_model.predict(str_to_tokens(input_question))
        empty_target_seq = np.zeros((1,1))
        empty_target_seq[0,0] = tokenizer.word_index['<START>']
        stop_condition = False
        decoded_translation = ''
        while not stop_condition:
            dec_outputs, h, c = dec_model.predict([empty_target_seq]+states_values)
            sampled_word_index = np.argmax(dec_outputs[0,-1, :])
            sampled_word = None
            for word, index in tokenizer.word_index.items():
                if sampled_word_index == index:
                    if word != '<END>':
                        decoded_translation += f'{word} '
                    sampled_word = word

            if sampled_word == '<END>' or len(decoded_translation.split()) > maxlen_answers:
                stop_condition = True
            empty_target_seq = np.zeros((1,1))
            empty_target_seq[0,0] = sampled_word_index
            states_values = [h,c]

        print('Bot answer:', decoded_translation, '\n')

<br>

**4.챗봇을 실행시켜 대화를 시도해보세요! (1점)**

(단, 영어만 가능합니다)

In [None]:
chatbot()

# Q2. GRU를 활용해 간단한 챗봇 만들기

간단한 원리를 배우는 느낌이니까 편한하게 실행해주세요!

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import tqdm
import torch.nn.functional as F

## 1. 단어 집합을 만들어봅시다. (빈칸을 채워주세요)

In [None]:
SOS_token = 0
EOS_token = 1

class Lang:
    def __init__(self):
        # SOS 토큰은 0, EOS 토큰은 1이 되게 dictionary를 만들어주세요
        self.word2index = {"SOS": 0, "EOS": 1}
        self.word2count = {}
        self.index2word = {0: "SOS", 1: "EOS"}
        # 처음 단어 집합의 개수는 2개입니다.
        self.n_words = 2

    def addSentence(self, sentence):
        for word in sentence.split(' '):
            self.addWord(word)

    def addWord(self, word):
        if word not in self.word2index:
            self.word2index[word] = self.n_words
            self.word2count[word] = 1
            self.index2word[self.n_words] = word
            self.n_words += 1
        else:
            self.word2count[word] += 1

def prepareData(questions, answers):
    lang = Lang()
    pairs = []
    for i in range(len(questions)):
        lang.addSentence(questions[i])
        lang.addSentence(answers[i])
        pairs.append([questions[i], answers[i]])
    return lang, pairs

questions = ['안녕', '넌 누구니', '누구','몇 살이야', '오늘 날씨 어때','오늘 어때' , '몇','날씨','몇 살']
answers = ['안녕하세요', '저는 챗봇이에요','챗봇', '저는 2살입니다.', '오늘은 춥습니다.','오늘 좋아','2','추워','2살']
lang, pairs = prepareData(questions, answers)


## 2. Encoder와 Decoder를 정의해 봅시다. (빈칸을 채워주세요!)

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class Encoder(nn.Module):
    def __init__(self, input_size, hidden_size):
        super(Encoder, self).__init__()
        self.hidden_size = hidden_size
        self.embedding = nn.Embedding(input_size, hidden_size)

        self.gru = nn.GRU(hidden_size, hidden_size)

    def forward(self, input, hidden):
        embedded = self.embedding(input).view(1, 1, -1)
        output = embedded
        output, hidden = self.gru(output, hidden)
        return output, hidden

    def initHidden(self):
        return torch.zeros(1, 1, self.hidden_size)

class Decoder(nn.Module):
    def __init__(self, hidden_size, output_size):
        super(Decoder, self).__init__()
        self.hidden_size = hidden_size
        # Decoder의 embedding은 output_size를 받습니다.
        self.embedding = nn.Embedding(output_size, hidden_size)
        self.gru = nn.GRU(hidden_size, hidden_size)
        # Linear 레이어의 input_size는 hidden_size입니다.
        self.out = nn.Linear(hidden_size, output_size)
        self.softmax = nn.LogSoftmax(dim=1)

    def forward(self, input, hidden):
        output = self.embedding(input).view(1, 1, -1)
        output = F.relu(output)
        output, hidden = self.gru(output, hidden)
        output = self.softmax(self.out(output[0]))
        return output, hidden

    def initHidden(self):
        return torch.zeros(1, 1, self.hidden_size)



## 3. 훈련함수를 정의해봅시다. (빈칸을 채워주세요!)

In [None]:
def train(input_tensor, target_tensor, encoder, decoder, encoder_optimizer, decoder_optimizer, criterion):
    encoder_hidden = encoder.initHidden()

    encoder_optimizer.zero_grad()
    decoder_optimizer.zero_grad()
    # 차원을 축소해야할지 늘려야할지 생각하여 알맞은 코드를 적어주세요.
    input_tensor = input_tensor.unsqueeze(1)
    target_tensor = target_tensor.unsqueeze(1)

    # 어떤 부분이 length를 의미하는지 생각하여 알맞은 숫자를 적어주세요!
    input_length = input_tensor.size(0)
    target_length = target_tensor.size(0)

    loss = 0

    for ei in range(input_length):
        encoder_output, encoder_hidden = encoder(input_tensor[ei], encoder_hidden)
    # decoder의 첫번째 값은 무엇이 들어가는지 생각하여 알맞은 코드를 입력해주세요.
    decoder_input = torch.tensor([[SOS_token]])
    decoder_hidden = encoder_hidden

    for di in range(target_length):
        decoder_output, decoder_hidden = decoder(decoder_input, decoder_hidden)
        topv, topi = decoder_output.topk(1)
        decoder_input = topi.squeeze().detach()

        loss += criterion(decoder_output, target_tensor[di])
        if decoder_input.item() == EOS_token:
            break

    loss.backward()

    encoder_optimizer.step()
    decoder_optimizer.step()

    return loss.item() / target_length


## 4. 실제로 훈련을 시켜보고 test를 직접해봅시다. (빈칸을 채워주세요!)

In [None]:
hidden_size = 256
encoder = Encoder(lang.n_words, hidden_size)
decoder = Decoder(hidden_size, lang.n_words)

encoder_optimizer = optim.AdamW(encoder.parameters(), lr=0.01)
decoder_optimizer = optim.AdamW(decoder.parameters(), lr=0.01)
criterion = nn.NLLLoss()

for epoch in tqdm.tqdm(range(1000)):
    for pair in pairs:
        input_tensor = torch.tensor([lang.word2index[word] for word in pair[0].split(' ')]+[EOS_token])
        target_tensor = torch.tensor([lang.word2index[word] for word in pair[1].split(' ')]+[EOS_token])
        loss = train(input_tensor, target_tensor, encoder, decoder, encoder_optimizer, decoder_optimizer, criterion)

# 테스트
def evaluate(sentence):
    # test시 필수적으로 입력해야하는 코드를 생각하여 적어주세요.
    with torch.no_grad():
        input_tensor = torch.tensor([lang.word2index[word] for word in sentence.split(' ')]+[EOS_token])
        input_length = input_tensor.size()[0]
        encoder_hidden = encoder.initHidden()

        for ei in range(input_length):
            encoder_output, encoder_hidden = encoder(input_tensor[ei], encoder_hidden)

        decoder_input = torch.tensor([[SOS_token]])
        decoder_hidden = encoder_hidden
        decoded_words = []

        for di in range(input_length):
            decoder_output, decoder_hidden = decoder(decoder_input, decoder_hidden)
            topv, topi = decoder_output.data.topk(1)
            if topi.item() == EOS_token:
                decoded_words.append('<EOS>')
                break
            else:
                decoded_words.append(lang.index2word[topi.item()])

            decoder_input = topi.squeeze().detach()

        return decoded_words
# 실제로 테스트를 재미삼아 해보시기 바랍니다!!
for question in ['안녕하세요' ,'누구' ,'몇 살','날씨 어때']:
    print('Q: ', question)
    print('A: ', ' '.join(evaluate(question)[:-1]))



100%|██████████| 1000/1000 [02:06<00:00,  7.88it/s]

Q:  안녕하세요
A:  추워
Q:  누구
A:  챗봇
Q:  몇 살
A:  2살
Q:  날씨 어때
A:  오늘은 춥습니다.



