In [None]:
!pip install -q tensorflow pandas tqdm


In [1]:
from google.colab import files
uploaded = files.upload()


Saving ChatBotData_split.csv to ChatBotData_split.csv


In [1]:
import tensorflow as tf
from tensorflow.keras import layers

class PositionalEncoding(layers.Layer):
    def __init__(self, position, d_model):
        super().__init__()
        self.pos_encoding = self.positional_encoding(position, d_model)

    def get_angles(self, pos, i, d_model):
        angles = pos / tf.pow(10000, (2 * (i//2)) / tf.cast(d_model, tf.float32))
        return angles

    def positional_encoding(self, position, d_model):
        angle_rads = self.get_angles(
            pos=tf.range(position, dtype=tf.float32)[:, tf.newaxis],
            i=tf.range(d_model, dtype=tf.float32)[tf.newaxis, :],
            d_model=d_model)

        sines = tf.math.sin(angle_rads[:, 0::2])
        cosines = tf.math.cos(angle_rads[:, 1::2])
        pos_encoding = tf.concat([sines, cosines], axis=-1)
        return pos_encoding[tf.newaxis, ...]

    def call(self, inputs):
        return inputs + self.pos_encoding[:, :tf.shape(inputs)[1], :]

def scaled_dot_product_attention(q, k, v, mask):
    matmul_qk = tf.matmul(q, k, transpose_b=True)
    dk = tf.cast(tf.shape(k)[-1], tf.float32)
    scaled_attention_logits = matmul_qk / tf.math.sqrt(dk)

    if mask is not None:
        scaled_attention_logits += (mask * -1e9)

    attention_weights = tf.nn.softmax(scaled_attention_logits, axis=-1)
    output = tf.matmul(attention_weights, v)
    return output, attention_weights


class MultiHeadAttention(layers.Layer):
    def __init__(self, d_model, num_heads):
        super().__init__()
        assert d_model % num_heads == 0

        self.num_heads = num_heads
        self.depth = d_model // num_heads

        self.wq = layers.Dense(d_model)
        self.wk = layers.Dense(d_model)
        self.wv = layers.Dense(d_model)

        self.dense = layers.Dense(d_model)

    def split_heads(self, x, batch_size):
        x = tf.reshape(x, (batch_size, -1, self.num_heads, self.depth))
        return tf.transpose(x, perm=[0, 2, 1, 3])

    def call(self, v, k, q, attention_mask=None, training=False):
        batch_size = tf.shape(q)[0]

        q = self.wq(q)   #입력 q, k, v에 대해 Dense 통과시켜줌.
        k = self.wk(k)
        v = self.wv(v)

        q = self.split_heads(q, batch_size)  #그리고 각각을 멀티헤드 형태로 쪼갬.
        k = self.split_heads(k, batch_size)
        v = self.split_heads(v, batch_size)

        scaled_attention, _ = scaled_dot_product_attention(q, k, v, attention_mask)
        scaled_attention = tf.transpose(scaled_attention, perm=[0, 2, 1, 3])


        concat_attention = tf.reshape(scaled_attention, (batch_size, -1, self.num_heads * self.depth))
        output = self.dense(concat_attention)
        return output


class EncoderLayer(layers.Layer):
    def __init__(self, d_model, num_heads, dff, rate=0.1):
        super().__init__()


        self.mha = MultiHeadAttention(d_model, num_heads)

        self.ffn = tf.keras.Sequential([
            layers.Dense(dff, activation='relu'),
            layers.Dense(d_model)
        ])

        ## 층 정규화
        self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)


        self.dropout1 = layers.Dropout(rate)
        self.dropout2 = layers.Dropout(rate)


    def call(self, x, mask=None, training=False):
        attn_output = self.mha(x, x, x, attention_mask=mask)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(x + attn_output)

        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output, training=training)
        out2 = self.layernorm2(out1 + ffn_output)

        return out2

class DecoderLayer(layers.Layer):
    def __init__(self, d_model, num_heads, dff, rate=0.1):
        super().__init__()

        self.mha1 = MultiHeadAttention(d_model, num_heads)
        self.mha2 = MultiHeadAttention(d_model, num_heads)

        self.ffn = tf.keras.Sequential([
            layers.Dense(dff, activation='relu'),
            layers.Dense(d_model)
        ])

        self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm3 = layers.LayerNormalization(epsilon=1e-6)

        self.dropout1 = layers.Dropout(rate)
        self.dropout2 = layers.Dropout(rate)
        self.dropout3 = layers.Dropout(rate)

    def call(self, x, enc_output, look_ahead_mask, padding_mask, training):
        attn1 = self.mha1(x, x, x, attention_mask=look_ahead_mask)
        attn1 = self.dropout1(attn1, training=training)
        out1 = self.layernorm1(attn1 + x)

        attn2 = self.mha2(enc_output, enc_output, out1, attention_mask=padding_mask)
        attn2 = self.dropout2(attn2, training=training)
        out2 = self.layernorm2(attn2 + out1)

        ffn_output = self.ffn(out2)
        ffn_output = self.dropout3(ffn_output, training=training)
        out3 = self.layernorm3(ffn_output + out2)

        return out3

class Encoder(layers.Layer):
    def __init__(self, num_layers, d_model, num_heads, dff, input_vocab_size, maximum_position_encoding, rate=0.1):
        super().__init__()

        self.d_model = d_model
        self.num_layers = num_layers

        self.embedding = layers.Embedding(input_vocab_size, d_model)
        self.pos_encoding = PositionalEncoding(maximum_position_encoding, d_model)

        self.enc_layers = [EncoderLayer(d_model, num_heads, dff, rate) for _ in range(num_layers)]
        self.dropout = layers.Dropout(rate)

    def call(self, x, mask=None, training=False):
        x = self.embedding(x)
        x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
        x = self.pos_encoding(x)
        x = self.dropout(x, training=training)

        for enc_layer in self.enc_layers:
            x = enc_layer(x, mask=mask, training=training)

        return x

class Decoder(layers.Layer):
    def __init__(self, num_layers, d_model, num_heads, dff, target_vocab_size, maximum_position_encoding, rate=0.1):
        super().__init__()

        self.d_model = d_model
        self.num_layers = num_layers

        self.embedding = layers.Embedding(target_vocab_size, d_model)
        self.pos_encoding = PositionalEncoding(maximum_position_encoding, d_model)
        self.dec_layers = [DecoderLayer(d_model, num_heads, dff, rate) for _ in range(num_layers)]
        self.dropout = layers.Dropout(rate)

    def call(self, x, enc_output, look_ahead_mask=None, padding_mask=None, training=False):
        x = self.embedding(x)
        x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
        x = self.pos_encoding(x)
        x = self.dropout(x, training=training)

        for dec_layer in self.dec_layers:
            x = dec_layer(x, enc_output, look_ahead_mask=look_ahead_mask, padding_mask=padding_mask, training=training)

        return x


class Transformer(tf.keras.Model):
    def __init__(self, num_layers, d_model, num_heads, dff, input_vocab_size,
                 target_vocab_size, pe_input, pe_target, rate=0.1):
        super().__init__()
        self.encoder = Encoder(num_layers, d_model, num_heads, dff, input_vocab_size, pe_input, rate)
        self.decoder = Decoder(num_layers, d_model, num_heads, dff, target_vocab_size, pe_target, rate)
        self.final_layer = layers.Dense(target_vocab_size)

    def call(self, inputs, training=False):
        inp, tar = inputs
        enc_padding_mask, look_ahead_mask, dec_padding_mask = create_masks(inp, tar)
        enc_output = self.encoder(inp, mask=enc_padding_mask, training=training)
        dec_output = self.decoder(tar, enc_output, look_ahead_mask=look_ahead_mask, padding_mask=dec_padding_mask, training=training)
        final_output = self.final_layer(dec_output)

        return final_output

In [2]:
from sklearn.model_selection import train_test_split

In [3]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import re
import urllib.request
import time
import tensorflow_datasets as tfds
import tensorflow as tf

train_data = pd.read_csv("ChatBotData_split.csv", encoding = 'utf-8')
train_data = train_data.dropna()
train_data = train_data.drop_duplicates(subset = ['res'])

print('질문의 결측치 개수'.format(train_data.isnull().sum()))

questions = []
for sentence in train_data['req']:

    sentence = re.sub(r"([?.!,])", r" \1 ", sentence)
    sentence = sentence.strip()
    questions.append(sentence)

answers = []
for sentence in train_data['res']:

    sentence = re.sub(r"([?.!,])", r" \1 ", sentence)
    sentence = sentence.strip()
    answers.append(sentence)

print(questions[:5], '질문 개수: ',len(questions),'\n')
print(answers[:5], '질문 개수: ',len(questions),'\n')

questions = questions[:60000]
answers = answers[:60000]

질문의 결측치 개수
['너 좋아하는 차 종류 있어 ?', 'ㅋㅋ 마시는 차 말한 거야 !', '완전 곡물류 좋아하네 ㅋㅋ', '그럼 오래 걸리지 않아 ?', '근데 냉침 하는 것도 귀찮겠다 ㅜㅠ'] 질문 개수:  100786 

['무슨 차 ?  자동차 ?  마시는 차 ?', '아하 나 둥글레 ,  옥수수 ,  보리차 좋아해', '야쓰 끓이기 귀찮아서 냉침해 먹어', '끓이는 것보다는 훨씬 오래 걸리지 ㅠ', '응 !  그래서 매일은 안 먹고 가끔 마셔'] 질문 개수:  100786 



In [35]:
print(len(questions))

60000


In [4]:
# 서브워드텍스트인코더를 사용하여 질문, 답변 데이터로부터 단어 집합(Vocabulary) 생성
tokenizer = tfds.deprecated.text.SubwordTextEncoder.build_from_corpus(questions + answers, target_vocab_size=2**13)


# 시작 토큰과 종료 토큰에 대한 정수 부여.
START_TOKEN, END_TOKEN = [tokenizer.vocab_size], [tokenizer.vocab_size + 1]
#리스트 형태로 만들어둔 이유는 나중에 input + START_TOKEN + END_TOKEN 같이 쉽게 붙이기 위해서.


# 시작 토큰과 종료 토큰을 고려하여 단어 집합의 크기를 + 2
VOCAB_SIZE = tokenizer.vocab_size + 2

print('시작 토큰 번호 :',START_TOKEN)
print('종료 토큰 번호 :',END_TOKEN)
print('단어 집합의 크기 :',VOCAB_SIZE)

print(questions[20])

시작 토큰 번호 : [8142]
종료 토큰 번호 : [8143]
단어 집합의 크기 : 8144
다른 지역으로 가는 것도 많이 생기면 좋겠다 ㅜㅠ


In [None]:
######################################################################################################################

In [5]:
question_token_lens = [len(tokenizer.encode(q)) for q in questions]
answer_token_lens = [len(tokenizer.encode(a)) for a in answers]

print("질문 최대 길이:", max(question_token_lens))
print("질문 평균 길이:", sum(question_token_lens) / len(question_token_lens))
print("응답 최대 길이:", max(answer_token_lens))
print("응답 평균 길이:", sum(answer_token_lens) / len(answer_token_lens))

for q,a in zip(questions,answers):
    lens_a = len(tokenizer.encode(a))
    lens_q = len(tokenizer.encode(q))
    #if lens_q == 64:
       # print(q)
    if lens_a == 62:
       print(a)



질문 최대 길이: 62
질문 평균 길이: 9.857816666666666
응답 최대 길이: 62
응답 평균 길이: 9.293883333333333
군인들에 대한 보상이 특히 약한 이유가 박정희 정권 때 베트남 파병 많이 갔었잖아 그래서 거기서 부상 당하거나 죽은 사람들이 많은데 그거 다  보상 해주려니까 돈이 너무 많이 들어서 법을 그  따위로 만들었다던데 진짜 나쁜 의미로 대단한 대통령이여 으휴


In [6]:
for i,a in enumerate(answers):
    lens_a = len(tokenizer.encode(a))
    if lens_a == 65:
        print('인덱스: {}\n, 질문: {}\n, 답변: {}\n\n'.format(i, questions[i],a))
    else:
        pass



In [7]:
# 최대 길이를 40으로 정의
MAX_LENGTH = 63

# 토큰화 / 정수 인코딩 / 시작 토큰과 종료 토큰 추가 / 패딩
def tokenize_and_filter(inputs, outputs):
  tokenized_inputs, tokenized_outputs = [], []

  for (sentence1, sentence2) in zip(inputs, outputs):
    # encode(토큰화 + 정수 인코딩), 시작 토큰과 종료 토큰 추가
    sentence1 = START_TOKEN + tokenizer.encode(sentence1) + END_TOKEN
    sentence2 = START_TOKEN + tokenizer.encode(sentence2) + END_TOKEN

    tokenized_inputs.append(sentence1)
    tokenized_outputs.append(sentence2)

  # 패딩
  tokenized_inputs = tf.keras.preprocessing.sequence.pad_sequences(tokenized_inputs, maxlen=MAX_LENGTH, padding='post')

  tokenized_outputs = tf.keras.preprocessing.sequence.pad_sequences(tokenized_outputs, maxlen=MAX_LENGTH, padding='post')

  return tokenized_inputs, tokenized_outputs



questions, answers = tokenize_and_filter(questions, answers)


print('질문 데이터의 크기(shape) :', questions.shape)
print('답변 데이터의 크기(shape) :', answers.shape)

질문 데이터의 크기(shape) : (60000, 63)
답변 데이터의 크기(shape) : (60000, 63)


In [8]:
#데이터를 학습 데이터, 검증 데이터, 시험 데이터로 구분하여 텐서플로우의 데이터셋 형성
q_train, q_v_t, a_train, a_v_t = train_test_split(questions, answers, test_size=0.2, random_state=42)

q_val, q_test, a_val, a_test = train_test_split(q_v_t, a_v_t, test_size=0.5, random_state=42)




decoder_inputs_train = a_train[:, :-1]
decoder_inputs_val = a_val[:, :-1]
decoder_inputs_test = a_test[:, :-1]

# 디코더의 정답은 첫 번째 토큰을 제외한 것 (한 칸 오른쪽으로 shift)
decoder_targets_train = a_train[:, 1:]
decoder_targets_val = a_val[:, 1:]
decoder_targets_test = a_test[:, 1:]
#encoder_input, decoder_input, decoder_target 이 트리플 세트로 묶여서, 각 문장(샘플)마다 하나씩 학습이 진행되는 구조야.


dataset_train = tf.data.Dataset.from_tensor_slices((
    (q_train, decoder_inputs_train),  # input = (encoder_input, decoder_input)
    decoder_targets_train
))

dataset_val = tf.data.Dataset.from_tensor_slices((
    (q_val, decoder_inputs_val),  # input = (encoder_input, decoder_input)
    decoder_targets_val
))

dataset_test = tf.data.Dataset.from_tensor_slices((
    (q_test, decoder_inputs_test),  # input = (encoder_input, decoder_input)
    decoder_targets_test
))
BATCH_SIZE = 64  #일단 컴퓨터가 맛탱이 갈 수도 있으니까 32로 ㄱㄱ
BUFFER_SIZE = 10000  #일반적으로는 데이터 전체 수 이상을 쓰는 게 좋다네요. 예..




dataset_train = dataset_train.shuffle(BUFFER_SIZE)  #shuffle: 학습 데이터만 섞어줘야 일반화에 효과 있음.
dataset_train = dataset_train.batch(BATCH_SIZE)
dataset_train = dataset_train.prefetch(tf.data.experimental.AUTOTUNE)

dataset_val = dataset_val.batch(BATCH_SIZE)
dataset_val = dataset_val.prefetch(tf.data.experimental.AUTOTUNE)

dataset_test = dataset_test.batch(BATCH_SIZE)
dataset_test = dataset_test.prefetch(tf.data.experimental.AUTOTUNE)

In [9]:
#트랜스포머 모델 객체 선언
transformer = Transformer(
    num_layers=3,
    d_model=128,
    num_heads=4,
    dff=1024,
    input_vocab_size=VOCAB_SIZE,
    target_vocab_size=VOCAB_SIZE,
    pe_input=MAX_LENGTH, # 포지셔닝 인코딩의 입력문장의 최대길이
    pe_target=MAX_LENGTH, # # 그럼 이건 포지셔닝 인코딩의 출력문장의 최대길이겠지?
    rate=0.3
)

In [11]:

def create_padding_mask(seq):
    seq = tf.cast(tf.math.equal(seq, 0), tf.float32)
    return seq[:, tf.newaxis, tf.newaxis, :]  # (batch_size, 1, 1, seq_len)

def create_look_ahead_mask(size):
    mask = 1 - tf.linalg.band_part(tf.ones((size, size)), -1, 0)
    return mask  # (seq_len, seq_len)

def create_masks(inp, tar):
    enc_padding_mask = create_padding_mask(inp)
    dec_padding_mask = create_padding_mask(inp)
    look_ahead_mask = create_look_ahead_mask(tf.shape(tar)[1])
    dec_target_padding_mask = create_padding_mask(tar)
    combined_mask = tf.maximum(dec_target_padding_mask, look_ahead_mask)

    return enc_padding_mask, combined_mask, dec_padding_mask


class CustomSchedule(tf.keras.optimizers.schedules.LearningRateSchedule):
    def __init__(self, d_model, warmup_steps=4000, multiplier=1.0):
        super(CustomSchedule, self).__init__()

        self.d_model = tf.cast(d_model, tf.float32)
        self.warmup_steps = warmup_steps
        self.multiplier = multiplier

    def __call__(self, step):
        # 학습률 계산 공식

        step = tf.cast(step, tf.float32)
        arg1 = tf.math.rsqrt(step)
        arg2 = step * (self.warmup_steps ** -1.5)

        return self.multiplier * tf.math.rsqrt(self.d_model) * tf.math.minimum(arg1, arg2)

    def get_config(self):
        # 직렬화를 위한 설정 반환
        return {
            "d_model": self.d_model,
            "warmup_steps": self.warmup_steps,
            "multiplier": self.multiplier
        }



loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True, reduction='none')

def loss_function(real, pred):
    # 패딩 마스크
    mask = tf.math.logical_not(tf.math.equal(real, 0))
    loss_ = loss_object(real, pred)

    mask = tf.cast(mask, dtype=loss_.dtype)
    loss_ *= mask  # 패딩 제외한 토큰만 계산

    return tf.reduce_mean(loss_)

#learning_rate = CustomSchedule(128)  # 네 모델에서 d_model 사용했을 거야
learning_rate = CustomSchedule(d_model=128, warmup_steps=4000, multiplier=1.2)
optimizer = tf.keras.optimizers.AdamW(weight_decay=1e-4 ,learning_rate=learning_rate, beta_1=0.9, beta_2=0.98, epsilon=1e-9)

def accuracy_function(real, pred):
    real = tf.cast(real, tf.int64)
    accuracies = tf.equal(real, tf.argmax(pred, axis=2))

    mask = tf.math.logical_not(tf.math.equal(real, 0))
    accuracies = tf.math.logical_and(mask, accuracies)

    accuracies = tf.cast(accuracies, dtype=tf.float32)
    mask = tf.cast(mask, dtype=tf.float32)

    return tf.reduce_sum(accuracies) / tf.reduce_sum(mask)



In [12]:
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint


transformer.compile(optimizer=optimizer, loss=loss_function, metrics=[accuracy_function])

early_stop = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=8, restore_best_weights=True)

## 모델체크포인트 콜백 만들기
#checkpoint = ModelCheckpoint(
#    'my_chatbot.keras',        # 저장할 파일 이름
#    save_best_only=True,  # val_loss가 가장 좋을 때만 저장
#    monitor='val_loss',   # 기준은 검증 손실
#)



In [13]:
transformer.fit(dataset_train,  validation_data=dataset_val, epochs=100, callbacks=[early_stop])

Epoch 1/100
[1m750/750[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m101s[0m 57ms/step - accuracy_function: 0.0789 - loss: 1.3740 - val_accuracy_function: 0.1337 - val_loss: 1.1155
Epoch 2/100
[1m750/750[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m38s[0m 50ms/step - accuracy_function: 0.1374 - loss: 1.0981 - val_accuracy_function: 0.1549 - val_loss: 1.0425
Epoch 3/100
[1m750/750[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m38s[0m 51ms/step - accuracy_function: 0.1551 - loss: 1.0374 - val_accuracy_function: 0.1661 - val_loss: 1.0078
Epoch 4/100
[1m750/750[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m37s[0m 50ms/step - accuracy_function: 0.1640 - loss: 1.0016 - val_accuracy_function: 0.1716 - val_loss: 0.9871
Epoch 5/100
[1m750/750[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m37s[0m 50ms/step - accuracy_function: 0.1714 - loss: 0.9750 - val_accuracy_function: 0.1782 - val_loss: 0.9666
Epoch 6/100
[1m750/750[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m38s[0m 51ms/

<keras.src.callbacks.history.History at 0x7851e98918d0>

In [14]:
test_loss, test_accuracy = transformer.evaluate(dataset_test)
print(f"테스트 손실: {test_loss:.4f}")
print(f"테스트 정확도: {test_accuracy:.4f}")

[1m94/94[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 16ms/step - accuracy_function: 0.1947 - loss: 0.9164
테스트 손실: 0.9159
테스트 정확도: 0.1930


In [None]:
#def preprocess_sentence(sentence):
#  # 단어와 구두점 사이에 공백 추가.
#  # ex) 12시 땡! -> 12시 땡 !
#  sentence = re.sub(r"([?.!,])", r" \1 ", sentence)
#  sentence = sentence.strip()
#  return sentence

#def evaluate(sentence):
#  # 입력 문장에 대한 전처리
#  sentence = preprocess_sentence(sentence)

#  # 입력 문장에 시작 토큰과 종료 토큰을 추가
#  sentence = tf.expand_dims(START_TOKEN + tokenizer.encode(sentence) + END_TOKEN, axis=0)  #인코더 인풋

#  output = tf.expand_dims(START_TOKEN, 0)  #디코더 인풋

#      # 디코더의 예측 시작
#  for i in range(MAX_LENGTH):
#    predictions = transformer(inputs=[sentence, output], training=False) # training=False: 트랜스포머 예측만 수행

#    # 현재 시점의 예측 단어를 받아온다.
#    predictions = predictions[:, -1:, :]  #[1, 1, vocab_size] ==> 마지막 토큰에 대한 확률을 모든 단어에 대해서 가져온다.

#    predicted_id = tf.cast(tf.argmax(predictions, axis=-1), tf.int32) #tf.argmax는 확률이 가장 높은 단어 토큰의 인덱스를 빼온다.
                                                # (axis = -1) ==> vocab_size의 차원에서 가져오겠다는 뜻. 단어의 확률분포는 거기에 있으니까.

#    # 만약 현재 시점의 예측 단어가 종료 토큰이라면 예측을 중단
#    if tf.equal(predicted_id, END_TOKEN[0]):
#      break

#    # 현재 시점의 예측 단어를 output(출력)에 연결한다.
#    # output은 for문의 다음 루프에서 디코더의 입력이 된다.
#    output = tf.concat([output, predicted_id], axis=-1)

#  # 단어 예측이 모두 끝났다면 output을 리턴. ==> 한 문장의 토큰들의 배열을 리스트로 리턴.
#  return tf.squeeze(output, axis=0)    ## 배치차원을 없앤다. 왜냐고? 한문장이니깐.


In [15]:
def preprocess_sentence(sentence):
  # 단어와 구두점 사이에 공백 추가.
  # ex) 12시 땡! -> 12시 땡 !
  sentence = re.sub(r"([?.!,])", r" \1 ", sentence)
  sentence = sentence.strip()
  return sentence



import numpy as np

def evaluate_sampled(sentence, top_k=10):
    sentence = preprocess_sentence(sentence)
    sentence = tf.expand_dims(START_TOKEN + tokenizer.encode(sentence) + END_TOKEN, axis=0)
    output = tf.expand_dims(START_TOKEN, 0)

    for i in range(MAX_LENGTH):
        predictions = transformer([sentence, output], training=False)
        predictions = predictions[:, -1:, :]  # [1, 1, vocab_size]
        predictions = tf.squeeze(predictions, axis=0).numpy()[0]

        # 상위 top_k 개 중 무작위 선택
        top_k_indices = predictions.argsort()[-top_k:]
        top_k_probs = tf.nn.softmax(predictions[top_k_indices]).numpy()
        sampled_index = np.random.choice(top_k_indices, p=top_k_probs)

        predicted_id = tf.constant([[sampled_index]])

        if tf.equal(predicted_id[0][0], END_TOKEN[0]):
            break

        output = tf.concat([output, predicted_id], axis=-1)

    return tf.squeeze(output, axis=0)

In [16]:
def predict(sentence):
  real_predict = evaluate_sampled(sentence)

  # real_predict == 디코더가 리턴한 챗봇의 대답에 해당하는 정수 시퀀스
  # tokenizer.decode()를 통해 정수 시퀀스를 문자열로 디코딩.
  predicted_sentence = tokenizer.decode(
      [i for i in real_predict if i < tokenizer.vocab_size])
  # i < tokenizer.vocab_size 로 필터링 해주는 이유는
  #**특수 토큰(UNK, PAD, START, END 등)**이 tokenizer의 vocab 범위 바깥에 있을 수도 있기 때문

  print('Input: {}'.format(sentence))
  print('Output: {}'.format(predicted_sentence))

  return predicted_sentence

In [33]:
output = predict("오잉??")


Input: 오잉??
Output: 응 근데 내가 보기엔 안됨


In [32]:
output = predict("아니 진짜 뭐하는거야 정말")

Input: 아니 진짜 뭐하는거야 정말
Output: 응 맞아 그래서 그거 엄청 비싸


In [21]:
output = predict("정말 기분 좋은 하루야")

Input: 정말 기분 좋은 하루야
Output: 그래도 그렇게 생각하면 좋겠다 .


In [24]:
output = predict("이 모델의 정확도가 너무 낮아서 나는 굉장히 화가 나")

Input: 이 모델의 정확도가 너무 낮아서 나는 굉장히 화가 나
Output: 나도 진짜 잘 몰라


어떤 짓을 해도 정확도와 손실이 개선이 안된다... 아마 모델 자체에 문제가 있거나 토크나이저를 다른 것을 써봐야 할 것 같다.....