# 단어 레벨 챗봇 (함수형 API)

이 챗봇은 데이터를 매우 적게 학습하였습니다. 더 많은 데이터를 학습할수록 일반화 능력이 높아집니다.

In [None]:
import tensorflow as tf
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import os
import re
import MeCab
import urllib

class Mecab:
    def pos(self, text):
        p = re.compile(".+\t[A-Z]+")
        return [tuple(p.match(line).group().split("\t")) for line in MeCab.Tagger().parse(text).splitlines()[:-1]]
    
    def morphs(self, text):
        p = re.compile(".+\t[A-Z]+")
        return [p.match(line).group().split("\t")[0] for line in MeCab.Tagger().parse(text).splitlines()[:-1]]
    
    def nouns(self, text):
        p = re.compile(".+\t[A-Z]+")
        temp = [tuple(p.match(line).group().split("\t")) for line in MeCab.Tagger().parse(text).splitlines()[:-1]]
        nouns=[]
        for word in temp:
            if word[1] in ["NNG", "NNP", "NNB", "NNBC", "NP", "NR"]:
                nouns.append(word[0])
        return nouns
    
mcb = Mecab()

In [None]:
# urllib.request.urlretrieve("https://raw.githubusercontent.com/songys/data/master/ChatbotData%20.csv", filename="ChatbotData.csv")

In [None]:
# 태그 단어
PAD = "<PAD>"   # 패딩
STA = "<SOS>"     # 시작
END = "<EOS>"       # 끝
UNK = "<UNK>"       # 없는 단어(Out of Vocabulary)

# 태그 인덱스
PAD_INDEX = 0
STA_INDEX = 1
END_INDEX = 2
UNK_INDEX = 3

# 데이터 타입
enc_input  = 0
dec_input  = 1
dec_output = 2

In [None]:
# 한 문장에서 단어 시퀀스의 최대 개수
max_len = 30
# 임베딩 벡터 차원
emb_dim = 100

# LSTM 히든레이어 차원
h_size = 128

# 정규 표현식 필터
RE_FILTER = re.compile(r"[.,!?\'':;~()]")

In [None]:
data = pd.read_csv("ChatbotData .csv")

In [None]:
data.head()

Unnamed: 0,Q,A,label
0,12시 땡!,하루가 또 가네요.,0
1,1지망 학교 떨어졌어,위로해 드립니다.,0
2,3박4일 놀러가고 싶다,여행은 언제나 좋죠.,0
3,3박4일 정도 놀러가고 싶다,여행은 언제나 좋죠.,0
4,PPL 심하네,눈살이 찌푸려지죠.,0


In [None]:
len(data)

11823

In [None]:
Qs = data["Q"].tolist()[:100]
As = data["A"].tolist()[:100]

In [None]:
# # 형태소분석 함수
# def pos_tag(sentences):
    
#     # KoNLPy 형태소분석기 설정
#     tagger = Okt()
    
#     # 문장 품사 변수 초기화
#     sentences_pos = []
    
#     # 모든 문장 반복
#     for sentence in sentences:
#         # 특수기호 제거
#         sentence = re.sub(RE_FILTER, "", sentence)
        
#         # 배열인 형태소분석의 출력을 띄어쓰기로 구분하여 붙임
#         sentence = " ".join(tagger.morphs(sentence))
#         sentences_pos.append(sentence)
        
#     return sentences_pos

# # 형태소분석 수행
# Q = pos_tag(Q)
# A = pos_tag(A)

# # 형태소분석으로 변환된 챗봇 데이터 출력
# for i in range(10):
#     print('Q : ' + Q[i])
#     print('A : ' + A[i])
#     print()

In [None]:
# # 질문과 대답 문장들을 하나로 합침
# sentences = []
# sentences.extend(Qs)
# sentences.extend(As)

In [None]:
QAs = Qs + As

tkn = tf.keras.preprocessing.text.Tokenizer(oov_token="UNK")
tkn.fit_on_texts(QAs)

word2idx = tkn.word_index
idx2word = tkn.index_word

In [None]:
# 태그 단어
PAD = "<PAD>"   # 패딩
STA = "<SOS>"     # 시작
END = "<EOS>"       # 끝
UNK = "<UNK>"       # 없는 단어(Out of Vocabulary)

# 태그 인덱스
PAD_INDEX = 0
STA_INDEX = 1
END_INDEX = 2
UNK_INDEX = 3

In [None]:
QAs[2]

'3박4일 놀러가고 싶다'

In [None]:
tkn.texts_to_sequences(['3박4일 놀러가고 싶다'])

[[63, 64, 31]]

In [None]:
max_len = 30
# 문장을 인덱스로 변환
def convert_text_to_idx(sents, word2idx, type): 
    
    sents_idx = []
    
    # 모든 문장에 대해서 반복
    for sent in sents:
        sent_idx = []
        # 디코더 입력일 경우 맨 앞에 START 태그 추가
        if type == "dec_input":
            sent_idx.extend([word2idx["<SOS>"]])
        # 문장의 단어들을 띄어쓰기로 분리
        for word in sent.split():
            # 사전에 있는 단어면 해당 인덱스를 추가
            if word2idx[word] != None:
                sent_idx.append(word2idx[word])
            # 사전에 없는 단어면 UNK 인덱스를 추가
            else:
                sent_idx.append(word2idx["<UNK>"])

        # 최대 길이 검사
        if type == "dec_output":
            # 디코더 목표일 경우 맨 뒤에 END 태그 추가
            # 문장의 최대 길이 이상일 경우
            if len(sent_idx) >= max_len:
                sent_idx = sent_idx[:max_len-1] + [word2idx["<EOS>"]]
            else:
                sent_idx += [word2idx["<EOS>"]]
        else:
            if len(sent_idx) > max_len:
                sent_idx = sent_idx[:max_len]
            
        # 최대 길이에 없는 공간은 패딩 인덱스로 채움
        sent_idx += (max_len - len(sent_idx)) * [word2idx["<PAD>"]]
        
        # 문장의 인덱스 배열을 추가
        sents_idx.append(sent_idx)

    return np.array(sents_idx)

In [None]:
# 인코더 입력 인덱스 변환
x_encoder = convert_text_to_idx(Qs, word_to_index, "enc_input")

# 첫 번째 인코더 입력 출력 (12시 땡)
x_encoder[0]

array([ 2775, 20868,     0,     0,     0,     0,     0,     0,     0,
           0,     0,     0,     0,     0,     0,     0,     0,     0,
           0,     0,     0,     0,     0,     0,     0,     0,     0,
           0,     0,     0])

In [None]:
# 디코더 입력 인덱스 변환
x_decoder = convert_text_to_idx(As, word_to_index, "dec_input")

# 첫 번째 디코더 입력 출력 (START 하루 가 또 가네요)
x_decoder[0]

array([    1, 19616,  5970,  1688,     0,     0,     0,     0,     0,
           0,     0,     0,     0,     0,     0,     0,     0,     0,
           0,     0,     0,     0,     0,     0,     0,     0,     0,
           0,     0,     0])

In [None]:
# 디코더 목표 인덱스 변환
y_decoder = convert_text_to_idx(As, word_to_index, "dec_output")

# 첫 번째 디코더 목표 출력 (하루 가 또 가네요 END)
y_decoder[0]

array([19616,  5970,  1688,     2,     0,     0,     0,     0,     0,
           0,     0,     0,     0,     0,     0,     0,     0,     0,
           0,     0,     0,     0,     0,     0,     0,     0,     0,
           0,     0,     0])

In [None]:
# 원핫인코딩 초기화
one_hot_data = np.zeros((len(y_decoder), max_len, len(words)))

# 디코더 목표를 원핫인코딩으로 변환
# 학습시 입력은 인덱스이지만, 출력은 원핫인코딩 형식임
for i, sequence in enumerate(y_decoder):
    for j, index in enumerate(sequence):
        one_hot_data[i, j, index] = 1

# 디코더 목표 설정
y_decoder = one_hot_data

# 첫 번째 디코더 목표 출력
y_decoder[0]

array([[0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.],
       ...,
       [1., 0., 0., ..., 0., 0., 0.],
       [1., 0., 0., ..., 0., 0., 0.],
       [1., 0., 0., ..., 0., 0., 0.]])

In [None]:

#--------------------------------------------
# 훈련 모델 인코더 정의
#--------------------------------------------

# 입력 문장의 인덱스 시퀀스를 입력으로 받음
enc_inputs = layers.Input(shape=(None,))

# 임베딩 레이어
encoder_outputs = layers.Embedding(len(words), emb_dim)(enc_inputs)

# return_state가 True면 상태값 리턴
# LSTM은 state_h(hidden state)와 state_c(cell state) 2개의 상태 존재
encoder_outputs, state_h, state_c = layers.LSTM(h_size,
                                                dropout=0.1,
                                                recurrent_dropout=0.5,
                                                return_state=True)(encoder_outputs)

# 히든 상태와 셀 상태를 하나로 묶음
encoder_states = [state_h, state_c]



#--------------------------------------------
# 훈련 모델 디코더 정의
#--------------------------------------------

# 목표 문장의 인덱스 시퀀스를 입력으로 받음
dec_inputs = layers.Input(shape=(None,))

# 임베딩 레이어
decoder_embedding = layers.Embedding(len(words), emb_dim)
decoder_outputs = decoder_embedding(dec_inputs)

# 인코더와 달리 return_sequences를 True로 설정하여 모든 타임 스텝 출력값 리턴
# 모든 타임 스텝의 출력값들을 다음 레이어의 Dense()로 처리하기 위함
decoder_lstm = layers.LSTM(h_size,
                           dropout=0.1,
                           recurrent_dropout=0.5,
                           return_state=True,
                           return_sequences=True)

# initial_state를 인코더의 상태로 초기화
decoder_outputs, _, _ = decoder_lstm(decoder_outputs,
                                     initial_state=encoder_states)

# 단어의 개수만큼 노드의 개수를 설정하여 원핫 형식으로 각 단어 인덱스를 출력
decoder_dense = layers.Dense(len(words), activation='softmax')
decoder_outputs = decoder_dense(decoder_outputs)



#--------------------------------------------
# 훈련 모델 정의
#--------------------------------------------

# 입력과 출력으로 함수형 API 모델 생성
model = models.Model([enc_inputs, dec_inputs], decoder_outputs)

# 학습 방법 설정
model.compile(optimizer='rmsprop',
              loss='categorical_crossentropy',
              metrics=['accuracy'])

NameError: name 'layers' is not defined

In [None]:
#--------------------------------------------
#  예측 모델 인코더 정의
#--------------------------------------------

# 훈련 모델의 인코더 상태를 사용하여 예측 모델 인코더 설정
encoder_model = models.Model(enc_inputs, encoder_states)



#--------------------------------------------
# 예측 모델 디코더 정의
#--------------------------------------------

# 예측시에는 훈련시와 달리 타임 스텝을 한 단계씩 수행
# 매번 이전 디코더 상태를 입력으로 받아서 새로 설정
decoder_state_input_h = layers.Input(shape=(h_size,))
decoder_state_input_c = layers.Input(shape=(h_size,))
decoder_states_inputs = [decoder_state_input_h, decoder_state_input_c]    

# 임베딩 레이어
decoder_outputs = decoder_embedding(dec_inputs)

# LSTM 레이어
decoder_outputs, state_h, state_c = decoder_lstm(decoder_outputs,
                                                 initial_state=decoder_states_inputs)

# 히든 상태와 셀 상태를 하나로 묶음
decoder_states = [state_h, state_c]

# Dense 레이어를 통해 원핫 형식으로 각 단어 인덱스를 출력
decoder_outputs = decoder_dense(decoder_outputs)

# 예측 모델 디코더 설정
decoder_model = models.Model([dec_inputs] + decoder_states_inputs,
                      [decoder_outputs] + decoder_states)

In [None]:
# 인덱스를 문장으로 변환
def convert_index_to_text(indexs, vocabulary): 
    
    sentence = ''
    
    # 모든 문장에 대해서 반복
    for index in indexs:
        if index == END_INDEX:
            # 종료 인덱스면 중지
            break;
        if vocabulary.get(index) is not None:
            # 사전에 있는 인덱스면 해당 단어를 추가
            sentence += vocabulary[index]
        else:
            # 사전에 없는 인덱스면 UNK 단어를 추가
            sentence.extend([vocabulary[UNK_INDEX]])
            
        # 빈칸 추가
        sentence += ' '

    return sentence

In [None]:
# 에폭 반복
for epoch in range(20):
    print('Total Epoch :', epoch + 1)

    # 훈련 시작
    history = model.fit([x_encoder, x_decoder],
                        y_decoder,
                        epochs=100,
                        batch_size=64,
                        verbose=0)
    
    # 정확도와 손실 출력
    print('accuracy :', history.history['accuracy'][-1])
    print('loss :', history.history['loss'][-1])
    
    # 문장 예측 테스트
    # (3 박 4일 놀러 가고 싶다) -> (여행 은 언제나 좋죠)
    input_encoder = x_encoder[2].reshape(1, x_encoder[2].shape[0])
    input_decoder = x_decoder[2].reshape(1, x_decoder[2].shape[0])
    results = model.predict([input_encoder, input_decoder])
    
    # 결과의 원핫인코딩 형식을 인덱스로 변환
    # 1축을 기준으로 가장 높은 값의 위치를 구함
    indexs = np.argmax(results[0], 1) 
    
    # 인덱스를 문장으로 변환
    sentence = convert_index_to_text(indexs, index_to_word)
    print(sentence)
    print()

Total Epoch : 1
accuracy : 0.9266666769981384
loss : 0.35812312364578247
맛있게 은 언제나 좋죠 

Total Epoch : 2
accuracy : 0.968999981880188
loss : 0.14514704048633575
가세 은 언제나 좋죠 

Total Epoch : 3
accuracy : 0.9739999771118164
loss : 0.08765653520822525
가세 은 언제나 좋죠 

Total Epoch : 4
accuracy : 0.9783333539962769
loss : 0.06297517567873001
가세 은 언제나 좋죠 

Total Epoch : 5
accuracy : 0.9853333234786987
loss : 0.044016480445861816
가세 은 언제나 좋죠 

Total Epoch : 6
accuracy : 0.9919999837875366
loss : 0.027679210528731346
여행 은 언제나 좋죠 

Total Epoch : 7
accuracy : 0.9953333139419556
loss : 0.017140144482254982
여행 은 언제나 좋죠 

Total Epoch : 8
accuracy : 0.996999979019165
loss : 0.011556596495211124
여행 은 언제나 좋죠 

Total Epoch : 9
accuracy : 0.9990000128746033
loss : 0.005027387291193008
여행 은 언제나 좋죠 

Total Epoch : 10
accuracy : 0.999666690826416
loss : 0.0017524176510050893
여행 은 언제나 좋죠 

Total Epoch : 11
accuracy : 1.0
loss : 0.0006122245686128736
여행 은 언제나 좋죠 

Total Epoch : 12
accuracy : 0.9990000128746033
lo

In [None]:
# 예측을 위한 입력 생성
def make_predict_input(sentence):

    sentences = []
    sentences.append(sentence)
    sentences = pos_tag(sentences)
    input_seq = convert_text_to_index(sentences, word_to_index, enc_input)
    
    return input_seq

In [None]:
# 텍스트 생성
def generate_text(input_seq):
    
    # 입력을 인코더에 넣어 마지막 상태 구함
    states = encoder_model.predict(input_seq)

    # 목표 시퀀스 초기화
    target_seq = np.zeros((1, 1))
    
    # 목표 시퀀스의 첫 번째에 <SOS> 태그 추가
    target_seq[0, 0] = STA_INDEX
    
    # 인덱스 초기화
    indexs = []
    
    # 디코더 타임 스텝 반복
    while 1:
        # 디코더로 현재 타임 스텝 출력 구함
        # 처음에는 인코더 상태를, 다음부터 이전 디코더 상태로 초기화
        decoder_outputs, state_h, state_c = decoder_model.predict(
                                                [target_seq] + states)

        # 결과의 원핫인코딩 형식을 인덱스로 변환
        index = np.argmax(decoder_outputs[0, 0, :])
        indexs.append(index)
        
        # 종료 검사
        if index == END_INDEX or len(indexs) >= max_len:
            break

        # 목표 시퀀스를 바로 이전의 출력으로 설정
        target_seq = np.zeros((1, 1))
        target_seq[0, 0] = index
        
        # 디코더의 이전 상태를 다음 디코더 예측에 사용
        states = [state_h, state_c]

    # 인덱스를 문장으로 변환
    sentence = convert_index_to_text(indexs, index_to_word)
        
    return sentence

In [None]:
# 문장을 인덱스로 변환
input_seq = make_predict_input('3박4일 놀러가고 싶다')
input_seq

array([[372, 366, 236, 244, 412, 183,   0,   0,   0,   0,   0,   0,   0,
          0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,
          0,   0,   0,   0]])

In [None]:
# 예측 모델로 텍스트 생성
sentence = generate_text(input_seq)
sentence

'여행 은 언제나 좋죠 '

In [None]:
# 문장을 인덱스로 변환
input_seq = make_predict_input('3박4일 같이 놀러가고 싶다')
input_seq

array([[372, 366, 236, 153, 244, 412, 183,   0,   0,   0,   0,   0,   0,
          0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,
          0,   0,   0,   0]])

In [None]:
# 예측 모델로 텍스트 생성
sentence = generate_text(input_seq)
sentence

'여행 은 언제나 좋죠 '