<a href="https://colab.research.google.com/github/JaeGwon-Lee/Flex_Study/blob/main/Transformer_Chatbot.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## 1. 데이터 로드

#### 데이터 로드

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import re                    # re (regular expression) : 정규 표현식 지원
import urllib.request
import time
import tensorflow_datasets as tfds
import tensorflow as tf

In [None]:
urllib.request.urlretrieve('https://raw.githubusercontent.com/songys/Chatbot_data/master/ChatbotData.csv', filename='ChatBotData.csv')
train = pd.read_csv('ChatBotData.csv')
train.head()

Unnamed: 0,Q,A,label
0,12시 땡!,하루가 또 가네요.,0
1,1지망 학교 떨어졌어,위로해 드립니다.,0
2,3박4일 놀러가고 싶다,여행은 언제나 좋죠.,0
3,3박4일 정도 놀러가고 싶다,여행은 언제나 좋죠.,0
4,PPL 심하네,눈살이 찌푸려지죠.,0


In [None]:
len(train)

11823

In [None]:
train.isnull().sum()

Q        0
A        0
label    0
dtype: int64

In [None]:
train['label'].unique()

array([0, 1, 2])

#### 구두점 구분

- 학습 기반의 토크나이저 사용  
- 구두점(특수기호) 앞에 공백을 추가해서 다른 문자들과 구분  

In [None]:
questions = []
for sentence in train['Q']:
  sentence = re.sub(r"([?.!,])", r" \1", sentence)    # re.sub(정규표현식(패턴), 교체할 문자열, 문자열) : 문자열 바꾸기
                                                     # 문자열 앞에 r이 붙으면 해당 문자열이 구성된 그대로 문자열로 반환 ex) 'abc\n' -> abc / r'abc\n' -> abc\n
                                                     # \1 : 정규표현식에서의 첫번째 ( )
  sentence = sentence.strip()                        # strip : 양쪽 문자열 및 공백 제거
  questions.append(sentence)

In [None]:
answers = []
for sentence in train['A']:
  sentence = re.sub(r"([?.!,])", r" \1", sentence)
  sentence = sentence.strip()
  answers.append(sentence)

In [None]:
print(questions[:5])
print(answers[:5])

['12시 땡 !', '1지망 학교 떨어졌어', '3박4일 놀러가고 싶다', '3박4일 정도 놀러가고 싶다', 'PPL 심하네']
['하루가 또 가네요 .', '위로해 드립니다 .', '여행은 언제나 좋죠 .', '여행은 언제나 좋죠 .', '눈살이 찌푸려지죠 .']


## 2. 단어 집합 생성

In [None]:
# 서브워드텍스트인코더 : 자주 사용되는 서브워드 단어로 토큰을 분리하는 토크나이저로 학습 데이터를 학습하여 서브워드로 구성된 단어 집합 생성
tokenizer = tfds.deprecated.text.SubwordTextEncoder.build_from_corpus(questions + answers, target_vocab_size=2**13)

In [None]:
# 시작 토큰과 종료 토큰 번호 설정
START_TOKEN, END_TOKEN = [tokenizer.vocab_size], [tokenizer.vocab_size + 1]
# 시작 토큰과 종료 토큰을 포함한 단어 집합 크기 설정
VOCAB_SIZE = tokenizer.vocab_size + 2

In [None]:
print('SOS :', START_TOKEN)
print('EOS :', END_TOKEN)
print('단어 집합 크기 :', VOCAB_SIZE)

SOS : [8176]
EOS : [8177]
단어 집합 크기 : 8178


## 3. 정수 인코딩과 패딩

#### 정수 인코딩 예시

In [None]:
sample_string = questions[20]

In [None]:
tokenized_string = tokenizer.encode(sample_string)
print('정수 인코딩 후의 문장 : {}'.format(tokenized_string))

original_string = tokenizer.decode(tokenized_string)
print('기존 문장 : {}'.format(original_string))

정수 인코딩 후의 문장 : [5765, 610, 3507, 141, 684, 3745, 848]
기존 문장 : 가스비 비싼데 감기 걸리겠어


In [None]:
for ts in tokenized_string :
  print('{} ---> {}'.format(ts, tokenizer.decode([ts])))

5765 ---> 가스
610 ---> 비 
3507 ---> 비싼
141 ---> 데 
684 ---> 감기 
3745 ---> 걸리
848 ---> 겠어


이탤릭체 텍스트#### 정수 인코딩

In [None]:
MAX_LENGTH = 40

def tokenize_and_filter(inputs, outputs) :
  tokenized_inputs, tokenized_outputs = [], []

  for (sentence1, sentence2) in zip(inputs, outputs) :
    sentence1 = START_TOKEN + tokenizer.encode(sentence1) + END_TOKEN
    sentence2 = START_TOKEN + tokenizer.encode(sentence2) + END_TOKEN

    tokenized_inputs.append(sentence1)
    tokenized_outputs.append(sentence2)

  tokenized_inputs = tf.keras.preprocessing.sequence.pad_sequences(tokenized_inputs, maxlen=MAX_LENGTH, padding='post')
  tokenized_outputs = tf.keras.preprocessing.sequence.pad_sequences(tokenized_outputs, maxlen=MAX_LENGTH, padding='post')    # padding='post' : 끝 부분에 패딩

  return tokenized_inputs, tokenized_outputs

In [None]:
questions, answers = tokenize_and_filter(questions, answers)

In [None]:
questions.shape, answers.shape

((11823, 40), (11823, 40))

In [None]:
questions[0], answers[0]

(array([8176, 7914, 4205, 3058,   41, 8177,    0,    0,    0,    0,    0,
           0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
           0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
           0,    0,    0,    0,    0,    0,    0], dtype=int32),
 array([8176, 3842,   74, 7893,    1, 8177,    0,    0,    0,    0,    0,
           0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
           0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
           0,    0,    0,    0,    0,    0,    0], dtype=int32))

## 4. 인코더와 디코더의 입력, 레이블 만들기

In [None]:
BATCH_SIZE = 64
BUFFER_SIZE = 20000

dataset = tf.data.Dataset.from_tensor_slices((
    {
        'inputs': questions,
        'dec_inputs': answers[:, :-1]    # 디코더 입력 : 마지막 패딩 토큰 제거
    },
    {
        'outputs': answers[:, 1:]    # 시작 토큰 제거
    },
))

dataset = dataset.cache()    # cache : 데이터셋을 메모리 또는 로컬 저장소에 캐시
dataset = dataset.shuffle(BUFFER_SIZE)
dataset = dataset.batch(BATCH_SIZE)
dataset = dataset.prefetch(tf.data.experimental.AUTOTUNE)    # prefeth : 데이터 로드 시간을 줄이기 위해 미리 메모리에 적재시킴(얼마만큼)
                                                             # AUTOTUNE : 병렬처리 수준 위임

In [None]:
print(answers[0])
print(answers[:1][:, :-1])
print(answers[:1][:, 1:])

[8176 3842   74 7893    1 8177    0    0    0    0    0    0    0    0
    0    0    0    0    0    0    0    0    0    0    0    0    0    0
    0    0    0    0    0    0    0    0    0    0    0    0]
[[8176 3842   74 7893    1 8177    0    0    0    0    0    0    0    0
     0    0    0    0    0    0    0    0    0    0    0    0    0    0
     0    0    0    0    0    0    0    0    0    0    0]]
[[3842   74 7893    1 8177    0    0    0    0    0    0    0    0    0
     0    0    0    0    0    0    0    0    0    0    0    0    0    0
     0    0    0    0    0    0    0    0    0    0    0]]


## 5. 트랜스포머 만들기

#### 트랜스포머 모델

In [None]:
class PositionalEncoding(tf.keras.layers.Layer):
  def __init__(self, position, d_model):
    super(PositionalEncoding, self).__init__()
    self.pos_encoding = self.positional_encoding(position, d_model)

  def get_angles(self, position, i, d_model):
    angles = 1 / tf.pow(10000, (2*(i//2)) / tf.cast(d_model, tf.float32))    # tf.pow : 거듭제곱 / tf.cast : d_model 형태를 float으로
    return position * angles
  
  def positional_encoding(self, position, d_model):
    angle_rads = self.get_angles(
        position=tf.range(position, dtype=tf.float32)[:, tf.newaxis],
        i=tf.range(d_model, dtype=tf.float32)[tf.newaxis, :],
        d_model=d_model)    # tf.newaxis : 차원을 늘려줌 ex) (4,) -> (4,1)
    
    sines = tf.math.sin(angle_rads[:, 0::2])    # 0::2 : 0부터 끝까지 두 칸 간격으로
    cosines = tf.math.cos(angle_rads[:, 1::2])    # 1::2 : 1부터 끝까지 두 칸 간격으로

    angle_rads = np.zeros(angle_rads.shape)
    angle_rads[:, 0::2] = sines
    angle_rads[:, 1::2] = cosines
    pos_encoding = tf.constant(angle_rads)    # tf.constant : 상수 텐서 만들기
    pos_encoding = pos_encoding[tf.newaxis, ...]

    print(pos_encoding.shape)
    return tf.cast(pos_encoding, tf.float32)

def call(self, inputs):
  return inputs + self.pos_encoding[:, :tf.shape(inputs)[1], :]

In [None]:
def scaled_dot_product_attention(query, key, value, mask):

  # 어텐션 스코어 행렬 (Q와 K의 곱)
  matmul_qk = tf.matmul(query, key, transpose_b = True)

  # 스케일링
  depth = tf.cast(tf.shape(key)[-1], tf.float32)
  logits = matmul_qk / tf.math.sqrt(depth)

  # 마스킹 (어텐션 스코어 행렬의 마스킹 할 위치에 매우 작은 음수값 넣기 => 소프트맥스 지나면 0이 됨)
  if mask is not None:
    logits += (mask * -1e9)
  
  attention_weights = tf.nn.softmax(logits, axis=-1)    # 어텐션 분포
  output = tf.matmul(attention_weights, value)    # 어텐션 값

  return output, attention_weights

In [None]:
class MultiHeadAttention(tf.keras.layers.Layer):

  def __init__(self, d_model, num_heads, name='multi_head_attention'):
    super(MultiHeadAttention, self).__init__(name=name)
    self.num_heads = num_heads
    self.d_model = d_model

    assert d_model % self.num_heads == 0    # assert : 조건이 거짓일때 에러를 발생시킴
    
    self.depth = d_model // self.num_heads

    # W_q, W_k, W_v에 해당하는 밀집층 정의
    self.query_dense = tf.keras.layers.Dense(units=d_model)
    self.key_dense = tf.keras.layers.Dense(units=d_model)
    self.value_dense = tf.keras.layers.Dense(units=d_model)

    # W_0에 해당하는 밀집층 정의
    self.dense = tf.keras.layers.Dense(units=d_model)

  def split_heads(self, inputs, batch_size):
    inputs = tf.reshape(
        inputs, shape=(batch_size, -1, self.num_heads, self.depth))
    return tf.transpose(inputs, perm=[0,2,1,3])    # perm : 차원의 순서
                                                   # 여기선 (batch_size, self.num_heads, -1, self.depth)
  def call(self, inputs):
    query, key, value, mask = inputs['query'], inputs['key'], inputs['value'], inputs['mask']
    batch_size = tf.shape(query)[0]

    # 1. W_q, W_k, W_v에 해당하는 밀집층 지나기
    query = self.query_dense(query)
    key = self.key_dense(key)
    value = self.value_dense(value)

    # 2. 헤드 나누기
    query = self.split_heads(query, batch_size)
    key = self.split_heads(key, batch_size)
    value = self.split_heads(value, batch_size)

    # 3. 스케일드 닷 프로덕트 어텐션
    scaled_attention, _ = scaled_dot_product_attention(query, key, value, mask)    # ( batch_size, num_heads, query의 문장 길이, d_model/num_heads )
    scaled_attention = tf.transpose(scaled_attention, perm=[0,2,1,3])

    # 4. 헤드 연결하기
    concat_attention = tf.reshape(scaled_attention, (batch_size, -1, self.d_model))

    # 5. W_0에 해당하는 밀집층 지나기
    outputs = self.dense(concat_attention)

    return outputs

In [None]:
def create_padding_mask(x):
  mask = tf.cast(tf.math.equal(x,0), tf.float32)    # tf.cast : 형태를 float으로 / tf.math.equal : x == y의 truth value(TRUE or FALSE) 반환

  # ( batch_size, 1, 1, key의 문장길이 )
  return mask[:, tf.newaxis, tf.newaxis, :]    # tf.newaxis : 차원을 늘려줌 ex) (4,) -> (4,1)

In [None]:
def encoder_layer(dff, d_model, num_heads, dropout, name='encoder_layer'):
  inputs = tf.keras.Input(shape=(None, d_model), name='input')

  padding_mask = tf.keras.Input(shape=(1,1,None), name='padding_mask')

  attention = MultiHeadAttention(d_model, num_heads, name='attention')({
          'query': inputs, 'key': inputs, 'value': inputs,
          'mask': padding_mask
      })
  
  attention = tf.keras.layers.Dropout(rate=dropout)(attention)
  attention = tf.keras.layers.LayerNormalization(epsilon=1e-6)(inputs + attention)

  outputs = tf.keras.layers.Dense(units=dff, activation='relu')(attention)
  outputs = tf.keras.layers.Dense(units=d_model)(outputs)

  outputs = tf.keras.layers.Dropout(rate=dropout)(outputs)
  outputs = tf.keras.layers.LayerNormalization(epsilon=1e-6)(attention + outputs)

  return tf.keras.Model(inputs=[inputs,padding_mask], outputs=outputs, name=name)

In [None]:
def encoder(vocab_size, num_layers, dff, d_model, num_heads, dropout, name='encoder'):
  inputs = tf.keras.Input(shape=(None,), name='inputs')

  padding_mask = tf.keras.Input(shape=(1,1,None), name='padding_mask')

  embeddings = tf.keras.layers.Embedding(vocab_size, d_model)(inputs)
  embeddings *= tf.math.sqrt(tf.cast(d_model, tf.float32))
  embeddings = PositionalEncoding(vocab_size, d_model)(embeddings)
  outputs = tf.keras.layers.Dropout(rate=dropout)(embeddings)

  for i in range(num_layers):
    outputs = encoder_layer(dff=dff, d_model=d_model, num_heads=num_heads, 
                            dropout=dropout, name='encoder_layer_{}'.format(i))([outputs, padding_mask])
  
  return tf.keras.Model(inputs=[inputs,padding_mask], outputs=outputs, name=name)

In [None]:
def create_look_ahead_mask(x):
  seq_len = tf.shape(x)[1]
  look_ahead_mask = 1 - tf.linalg.band_part(tf.ones((seq_len, seq_len)), -1, 0)    # band_part 부분 : 아래 삼각형 부분을 1로 유지하고 나머지는 0으로 바꿈
  padding_mask = create_padding_mask(x)    # 패딩 마스크도 포함
  return tf.maximum(look_ahead_mask, padding_mask)

In [None]:
def decoder_layer(dff, d_model, num_heads, dropout, name='decoder_layer'):
  inputs = tf.keras.Input(shape=(None, d_model), name='inputs')
  enc_outputs = tf.keras.Input(shape=(None, d_model), name='encoder_outputs')

  look_ahead_mask = tf.keras.Input(shape=(1,None,None), name='look_ahead_mask')
  padding_mask = tf.keras.Input(shape=(1,1,None), name='padding_mask')

  attention1 = MultiHeadAttention(d_model, num_heads, name='attention_1')(inputs={
      'query': inputs, 'key': inputs, 'value': inputs,
      'mask': look_ahead_mask
  })

  attention1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)(attention1 + inputs)

  attention2 = MultiHeadAttention(d_model, num_heads, name='attention_2')(inputs={
      'query': attention1, 'key': enc_outputs, 'value': enc_outputs,
      'mask': padding_mask
  })

  attention2 = tf.keras.layers.Dropout(rate=dropout)(attention2)
  attention2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)(attention2 + attention1)

  outputs = tf.keras.layers.Dense(units=dff, activation='relu')(attention2)
  outputs = tf.keras.layers.Dense(units=d_model)(outputs)

  outputs = tf.keras.layers.Dropout(rate=dropout)(outputs)
  outputs = tf.keras.layers.LayerNormalization(epsilon=1e-6)(outputs + attention2)

  return tf.keras.Model(inputs=[inputs, enc_outputs, look_ahead_mask, padding_mask],
                        outputs=outputs, name=name)

In [None]:
def decoder(vocab_size, num_layers, dff, d_model, num_heads, dropout, name='decoder'):
  inputs = tf.keras.Input(shape=(None,), name='inputs')
  enc_outputs = tf.keras.Input(shape=(None, d_model), name='encoder_outputs')

  look_ahead_mask = tf.keras.Input(shape=(1,None,None), name='look_ahead_mask')
  padding_mask = tf.keras.Input(shape=(1,1,None), name='padding_mask')

  embeddings = tf.keras.layers.Embedding(vocab_size, d_model)(inputs)
  embeddings *= tf.math.sqrt(tf.cast(d_model, tf.float32))
  embeddings = PositionalEncoding(vocab_size, d_model)(embeddings)
  outputs = tf.keras.layers.Dropout(rate=dropout)(embeddings)

  for i in range(num_layers):
    outputs = decoder_layer(dff=dff, d_model=d_model, num_heads=num_heads,
                            dropout=dropout, name='decoder_layer_{}'.format(i)
                            )(inputs=[outputs, enc_outputs, look_ahead_mask, padding_mask])
  
  return tf.keras.Model(inputs=[inputs, enc_outputs, look_ahead_mask, padding_mask],
                        outputs=outputs, name=name)

In [None]:
def transformer(vocab_size, num_layers, dff, d_model, num_heads, dropout, name='transformer'):
  inputs = tf.keras.Input(shape=(None,), name='inputs')
  dec_inputs = tf.keras.Input(shape=(None,), name='dec_inputs')

  enc_padding_mask = tf.keras.layers.Lambda(
      create_padding_mask, output_shape=(1,1,None), name='enc_padding_mask')(inputs)
  look_ahead_mask = tf.keras.layers.Lambda(
      create_look_ahead_mask, output_shape=(1,None,None), name='look_ahead_mask')(dec_inputs)
  dec_padding_mask = tf.keras.layers.Lambda(
      create_padding_mask, output_shape=(1,1,None), name='dec_padding_mask')(inputs)
      
  enc_outputs = encoder(vocab_size=vocab_size, num_layers=num_layers, dff=dff, d_model=d_model,
                        num_heads=num_heads, dropout=dropout)(inputs=[inputs, enc_padding_mask])
  dec_outputs = decoder(vocab_size=vocab_size, num_layers=num_layers, dff=dff, d_model=d_model,
                        num_heads=num_heads, dropout=dropout)(inputs=[dec_inputs, enc_outputs, look_ahead_mask, dec_padding_mask])

  outputs = tf.keras.layers.Dense(units=vocab_size, name='outputs')(dec_outputs)

  return tf.keras.Model(inputs=[inputs, dec_inputs], outputs=outputs, name=name)

#### 모델 학습

In [None]:
D_MODEL = 256
NUM_LAYERS = 2
NUM_HEADS = 8
DFF = 512
DROPOUT = 0.1

model = transformer(
    vocab_size=VOCAB_SIZE,
    num_layers=NUM_LAYERS,
    dff=DFF,
    d_model=D_MODEL,
    num_heads=NUM_HEADS,
    dropout=DROPOUT
)

(1, 8178, 256)
(1, 8178, 256)


In [None]:
def loss_function(y_true, y_pred):
  y_true = tf.reshape(y_true, shape=(-1, MAX_LENGTH - 1))

  loss = tf.keras.losses.SparseCategoricalCrossentropy(
      from_logits=True, reduction='none')(y_true, y_pred)
  mask = tf.cast(tf.not_equal(y_true, 0), tf.float32)
  loss = tf.multiply(loss, mask)

  return tf.reduce_mean(loss)

In [None]:
class CustomSchedule(tf.keras.optimizers.schedules.LearningRateSchedule):

  def __init__(self, d_model, warmup_steps=4000):
    super(CustomSchedule, self).__init__()
    self.d_model = d_model
    self.d_model = tf.cast(self.d_model, tf.float32)
    self.warmup_steps = warmup_steps

  def __call__(self, step):
    arg1 = tf.math.rsqrt(step)    # tf.math.rsqrt : reciprocal of square root 제곱근의 역수 (-0.5 제곱)
    arg2 = step * (self.warmup_steps**-1.5)

    return tf.math.rsqrt(self.d_model) * tf.math.minimum(arg1, arg2)

In [None]:
learning_rate = CustomSchedule(D_MODEL)

optimizer = tf.keras.optimizers.Adam(learning_rate, beta_1=0.9, beta_2=0.98, epsilon=1e-9)

def accuracy(y_true, y_pred):
  y_true = tf.reshape(y_true, shape=(-1, MAX_LENGTH-1))
  return tf.keras.metrics.sparse_categorical_accuracy(y_true, y_pred)

model.compile(optimizer=optimizer, loss=loss_function, metrics=[accuracy])

In [None]:
EPOCHS = 50
model.fit(dataset, epochs=EPOCHS)

Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50


<keras.callbacks.History at 0x7fb752e4b710>

## 6. 챗봇 평가하기

In [None]:
def preprocess_sentence(sentence):
  sentence = re.sub(r"([?.!,])", r" \1 ", sentence)
  sentence = sentence.strip()
  return sentence

In [None]:
def evaluate(sentence):
  sentence = preprocess_sentence(sentence)
  sentence = tf.expand_dims(START_TOKEN + tokenizer.encode(sentence) + END_TOKEN, axis=0)
  output = tf.expand_dims(START_TOKEN, 0)

  for i in range(MAX_LENGTH):
    predictions = model(inputs=[sentence, output], training=False)
    predictions = predictions[:, -1:, :]
    predicted_id = tf.cast(tf.argmax(predictions, axis=-1), tf.int32)

    if tf.equal(predicted_id, END_TOKEN[0]):
      break
    output = tf.concat([output, predicted_id], axis=-1)
    
  return tf.squeeze(output, axis=0)

In [None]:
def predict(sentence):
  prediction = evaluate(sentence)
  predicted_sentence = tokenizer.decode([i for i in prediction if i < tokenizer.vocab_size])

  print('Input: {}'.format(sentence))
  print('Output: {}'.format(predicted_sentence))

  return predicted_sentence

In [None]:
output = predict("영화 볼래?")

Input: 영화 볼래?
Output: 최신 영화가 좋을 것 같아요 .


In [None]:
output = predict("고민이 있어")

Input: 고민이 있어
Output: 제가 들어드릴게요 .


In [None]:
output = predict("너무 화가나")

Input: 너무 화가나
Output: 자신을 비난하지 마세요 .


In [None]:
output = predict("게임하고싶당")

Input: 게임하고싶당
Output: 어서 충전 하세요 .


In [None]:
output = predict("공부 해야하나?")

Input: 공부 해야하나?
Output: 지금도 늦지 않았어요 .
