In [None]:
import pandas as pd
import re

chatbot_data = pd.read_csv('./korean_chatbot_data/ChatbotData.csv')
chatbot_data = chatbot_data.sample(frac=1).reset_index(drop=True)

print(chatbot_data[0: 10])


In [None]:
BATCH_SIZE = 64
EMBEDDING_DIM = 256
UNITS = 1024
MAX_LEN = 30
TIME_STEPS = MAX_LEN


In [None]:
from konlpy.tag import Okt
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
import tensorflow as tf
from tensorflow.keras.layers import Input, Embedding, LSTM, Dense, Dropout, Attention
from tensorflow.keras import Model

okt = Okt()

def clean_sentence (sentence):
  sentence = re.sub(r'[^0-9ㄱ-ㅎㅏ-ㅣ가-힣 ]', r'', sentence)
  return sentence

def process_morph (sentence):
  return ' '.join(okt.morphs(sentence))

def clean_and_morph(sentence):
  sentence = clean_sentence(sentence)
  sentence = process_morph(sentence)
  return sentence

def attach_answer_token (sentence):
  return ('<START> ' + sentence, sentence + ' <END>')

def preprocess (questions, answers):
  ret_questions = []
  ret_answer_ins = []
  ret_answer_outs = []

  for question in questions:
    question_processed = process_morph(clean_sentence(question))
    ret_questions.append(question_processed)

  for answer in answers:
    answer_in, answer_out = attach_answer_token(process_morph(clean_sentence(answer)))
    ret_answer_ins.append(answer_in)
    ret_answer_outs.append(answer_out)
  
  return ret_questions, ret_answer_ins, ret_answer_outs

class TransformUtils:
  def __init__ (self, tokenizer, max_len, start_token, end_token):
    self.tokenizer = tokenizer
    self.max_len = max_len
    self.start_token = start_token
    self.end_token = end_token
  
  def convert_sentences_to_vectors (self, sentences):
    ret = []
    for i in range(len(sentences)):
      sentence = clean_sentence(sentences[i])
      sentence = process_morph(sentences[i])
      ret.append(sentence)
    ret = self.tokenizer.texts_to_sequences(sentences)
    ret = pad_sequences(ret, maxlen=self.max_len, truncating='post', padding='post')
    ret = tf.convert_to_tensor(ret)
    return ret

  def convert_vectors_to_sentences (self, vectors):
    ret = []
    for vector in vectors:
      sentence = ''
      for vi in vector:
        if vi <= 0 or self.tokenizer.index_word[vi.numpy()] is None:
          sentence += '<None>'
        else:
          sentence += self.tokenizer.index_word[vi.numpy()]
        sentence += ' '
      ret.append(sentence)
    return ret


In [None]:
questions, answer_ins, answer_outs = preprocess(chatbot_data['Q'], chatbot_data['A'])
print(questions[0: 10], answer_ins[0: 10], answer_outs[0: 10])
tokenizer = Tokenizer(filters='', lower=False, oov_token='<OOV>')
all_sentences = questions + answer_ins + answer_outs
tokenizer.fit_on_texts(all_sentences)

START_TOKEN = tokenizer.word_index['<START>']
END_TOKEN = tokenizer.word_index['<END>']
VOCAB_SIZE = len(tokenizer.word_index) + 1

In [None]:
transform_utils = TransformUtils(tokenizer, MAX_LEN, START_TOKEN, END_TOKEN)

In [None]:
class Encoder(Model):
  def __init__ (self, input_vocab_size, embedding_dim, enc_units):
    super(Encoder, self).__init__()
    self.input_vocab_size = input_vocab_size
    self.enc_units = enc_units
    self.embedding = tf.keras.layers.Embedding(self.input_vocab_size, embedding_dim)
    self.gru = tf.keras.layers.GRU(self.enc_units, return_sequences=True, return_state=True, recurrent_initializer='glorot_uniform')

  def call (self, inputs, state=None):
    vectors = self.embedding(inputs)
    output, state = self.gru(vectors, initial_state=state)

    return output, state

In [None]:
class Decoder (Model):
  def __init__ (self, output_vocab_size, embedding_dim, dec_units):
    super(Decoder, self).__init__()
    self.output_vocab_size = output_vocab_size
    self.embedding_dim = embedding_dim
    self.dec_units = dec_units
    self.embedding = tf.keras.layers.Embedding(self.output_vocab_size, self.embedding_dim)
    self.gru = tf.keras.layers.GRU(self.dec_units, return_sequences=True, return_state=True, recurrent_initializer='glorot_uniform')
    self.attention = Attention()
    self.fc = tf.keras.layers.Dense(self.output_vocab_size, activation='softmax')
  
  def call (self, inputs, enc_outputs, state=None):
    vectors = self.embedding(inputs)
    rnn_output, rnn_output_state = self.gru(vectors, initial_state=state)
    # <마지막 rnn_output 대신 initial_state를 넣어 준 버전>
    # attention_inputs = tf.concat([state[:, tf.newaxis, :], rnn_output[:, : -1, :]], axis=1)
    # context_vector = self.attention(inputs=[attention_inputs, enc_outputs])
    # <마지막 rnn_output 대신 initial_state를 넣어 준 버전 />
    context_vector = self.attention(inputs=[rnn_output, enc_outputs])
    context_and_output = tf.concat([context_vector, rnn_output], axis=-1)
    attention_vector = self.fc(context_and_output)
    return attention_vector, rnn_output_state


In [None]:
class Seq2SeqTrainer (Model):
  def __init__ (self, encoder, decoder):
    super(Seq2SeqTrainer, self).__init__()
    self.encoder = encoder
    self.decoder = decoder

  def call (self, inputs):
    questions, answer_ins = inputs
    enc_outputs, enc_state = self.encoder(questions)
    dec_result, dec_state = self.decoder(answer_ins, enc_outputs, state=enc_state)
    return dec_result


In [None]:
class TrainLoss (tf.keras.losses.Loss):
  def __init__ (self):
    super(TrainLoss, self).__init__()
    self.loss = tf.keras.losses.SparseCategoricalCrossentropy()
  
  def call (self, y_t, y_pred):
    loss = self.loss(y_t, y_pred)
    return tf.reduce_sum(loss)


In [None]:
question_vectors, answer_in_vectors, answer_out_vectors = \
  transform_utils.convert_sentences_to_vectors(questions), \
  transform_utils.convert_sentences_to_vectors(answer_ins), \
  transform_utils.convert_sentences_to_vectors(answer_outs)

encoder = Encoder(VOCAB_SIZE, EMBEDDING_DIM, UNITS)
encoder(Input(shape=(TIME_STEPS, )))
encoder.summary()
decoder = Decoder(VOCAB_SIZE, EMBEDDING_DIM, UNITS)
decoder(Input(shape=(TIME_STEPS, )), Input(shape=(TIME_STEPS, UNITS, )), state=Input(shape=(UNITS, )))
decoder.summary()
trainer = Seq2SeqTrainer(encoder, decoder)
trainer([Input(shape=(TIME_STEPS, )), Input(shape=(TIME_STEPS, ))])
trainer.summary()
trainer.compile(optimizer=tf.optimizers.Adam(learning_rate=0.0001), loss=TrainLoss())

for idx in range(0, len(question_vectors) - BATCH_SIZE, BATCH_SIZE):
  if idx % (BATCH_SIZE * 100) == 0:
    tf.saved_model.save(encoder, '.\model\encoder')
    tf.saved_model.save(decoder, '.\model\decoder')
  with tf.device('/device:GPU:0'):
    history = trainer.fit([question_vectors[idx: idx + BATCH_SIZE], answer_in_vectors[idx: idx + BATCH_SIZE]], answer_out_vectors[idx: idx + BATCH_SIZE])
  print(idx, history.history)


In [None]:
# encoder = Encoder(VOCAB_SIZE, EMBEDDING_DIM, UNITS)
# tf.saved_model.load('.\model\encoder')

# decoder = Decoder(VOCAB_SIZE, EMBEDDING_DIM, UNITS)
# tf.saved_model.load('.\model\decoder')


In [None]:
class Translator:
  def __init__ (self, encoder, decoder, transform_utils, start_token, end_token):
    self.encoder = encoder
    self.decoder = decoder
    self.transform_utils = transform_utils
    self.start_token = start_token
    self.end_token = end_token
  
  def translate (self, inputs):
    return self.transform_utils.convert_vectors_to_sentences(self.get_translated_vector(inputs))
    
  def get_translated_vector (self, inputs):
    vectors = self.transform_utils.convert_sentences_to_vectors(inputs)
    ret = []
    for vector in vectors:
      ret_vector = [self.start_token]
      enc_output, enc_state = self.encoder(tf.convert_to_tensor([vector]))
      dec_state = enc_state
      cnt = 0
      while ret_vector[-1] != self.end_token and cnt < 30:
        dec_result, dec_state = self.decoder(tf.convert_to_tensor([[ret_vector[-1]]]), enc_output, state=dec_state)
        ret_vector.append(tf.math.argmax(dec_result[0][0]).numpy())
        cnt += 1
      ret.append(ret_vector)
      
    return tf.convert_to_tensor(ret)


In [None]:
# encoder = Encoder(VOCAB_SIZE, EMBEDDING_DIM, UNITS)
# decoder = Decoder(VOCAB_SIZE, EMBEDDING_DIM, UNITS)
translator = Translator(encoder, decoder, transform_utils, START_TOKEN, END_TOKEN)

# sentences = ['오늘 강 추위 래 요']
sentences = ['커피 마시고 싶다']
# sentences = ['고민 상담 좀']
# sentences = ['인성 문제 있는거 아니야']
# sentences = ['공부 하기 싫다']
print(
  transform_utils.convert_vectors_to_sentences(
    transform_utils.convert_sentences_to_vectors(sentences)
  )
)
print(translator.translate(sentences))
