## 사용 데이터셋
- 링크 : https://github.com/songys/Chatbot_data/blob/master/ChatbotData.csv
- 무응답으로 일관하여 실패한 것 같음.

In [1]:
import tensorflow as tf
import tensorflow_datasets as tfds
import numpy as np
import matplotlib.pyplot as plt
import os
import re
import pandas as pd
import urllib.request

https://wikidocs.net/31379

### Step 1. 데이터 수집하기
한국어 챗봇 데이터는 송영숙님이 공개한 챗봇 데이터를 사용합니다.  

이 데이터는 아래의 링크에서 다운로드할 수 있습니다.  
- 링크 : https://github.com/songys/Chatbot_data/blob/master/ChatbotData.csv

터미널 경로 설정  
- mkdir -p ~/aiffel/transformer_chatbot/data/  
- ln -s ~/data/* ~/aiffel/transformer_chatbot/data/

In [2]:
# GitHub에서 데이터 다운로드하기
url = "https://raw.githubusercontent.com/songys/Chatbot_data/master/ChatbotData.csv"
urllib.request.urlretrieve(url, filename="ChatBotData.csv")


('ChatBotData.csv', <http.client.HTTPMessage at 0x7ad3fcedf070>)

### Step 2. 데이터 전처리하기
영어 데이터와는 전혀 다른 데이터인 만큼 영어 데이터에 사용했던 전처리와 일부 동일한 전처리도 필요하겠지만 전체적으로는 다른 전처리를 수행해야 할 수도 있습니다.


In [3]:
# Step 2: 데이터 전처리하기
def load_data(file_path):
    data = pd.read_csv(file_path)
    questions = data['Q'].apply(preprocess_sentence).tolist()
    answers = data['A'].apply(preprocess_sentence).tolist()
    return questions, answers

def preprocess_sentence(sentence):
    sentence = re.sub(r"([?.!,])", r" \1 ", sentence)
    sentence = re.sub(r"[^가-힣?.!,]+", r" ", sentence)
    sentence = sentence.strip()
    return sentence

questions, answers = load_data('ChatBotData.csv')


### Step 3. SubwordTextEncoder 사용하기
한국어 데이터는 형태소 분석기를 사용하여 토크나이징을 해야 한다고 많은 분이 알고 있습니다. 하지만 여기서는 형태소 분석기가 아닌 위 실습에서 사용했던 내부 단어 토크나이저인 SubwordTextEncoder를 그대로 사용해보세요.

In [4]:
# 데이터가 없는 경우를 대비하여 예외 처리 추가
if len(questions) == 0 or len(answers) == 0:
    raise ValueError("데이터가 비어 있습니다. 데이터 파일을 확인해주세요.")

# SubwordTextEncoder를 사용하여 데이터 토큰화
subword_encoder = tfds.deprecated.text.SubwordTextEncoder.build_from_corpus(
    questions + answers, target_vocab_size=2**13
)

def tokenize_and_encode(sentences):
    return [subword_encoder.encode(sentence) for sentence in sentences]

questions_tokenized = tokenize_and_encode(questions)
answers_tokenized = tokenize_and_encode(answers)

# 패딩 추가
MAX_LENGTH = 40
def pad_sequences(tokenized_sentences):
    return tf.keras.preprocessing.sequence.pad_sequences(
        tokenized_sentences, maxlen=MAX_LENGTH, padding='post'
    )

questions_padded = pad_sequences(questions_tokenized)
answers_padded = pad_sequences(answers_tokenized)

# Train 데이터셋 생성
target_start_token = subword_encoder.vocab_size
target_end_token = subword_encoder.vocab_size + 1

inputs = questions_padded
outputs = np.array([([target_start_token] + answer + [target_end_token]) for answer in answers_tokenized])
outputs_padded = pad_sequences(outputs)

train_data = tf.data.Dataset.from_tensor_slices((inputs, outputs_padded))
train_data = train_data.shuffle(len(questions)).batch(64)


  outputs = np.array([([target_start_token] + answer + [target_end_token]) for answer in answers_tokenized])


### Step 4. 모델 구성하기
위 실습 내용을 참고하여 트랜스포머 모델을 구현합니다.

In [5]:
# Step 4: 모델 구성하기
class Encoder(tf.keras.layers.Layer):
    def __init__(self, num_layers, d_model, num_heads, units, vocab_size, dropout):
        super(Encoder, self).__init__()
        self.num_layers = num_layers
        self.embedding = tf.keras.layers.Embedding(vocab_size, d_model)
        self.enc_layers = [
            tf.keras.layers.MultiHeadAttention(num_heads=num_heads, key_dim=d_model)
            for _ in range(num_layers)
        ]
        self.dropout = tf.keras.layers.Dropout(dropout)

    def call(self, x, training, mask):
        x = self.embedding(x)
        for i in range(self.num_layers):
            x = self.enc_layers[i](x, x, x, attention_mask=mask)
            x = self.dropout(x, training=training)
        return x

class Decoder(tf.keras.layers.Layer):
    def __init__(self, num_layers, d_model, num_heads, units, vocab_size, dropout):
        super(Decoder, self).__init__()
        self.num_layers = num_layers
        self.embedding = tf.keras.layers.Embedding(vocab_size, d_model)
        self.dec_layers = [
            tf.keras.layers.MultiHeadAttention(num_heads=num_heads, key_dim=d_model)
            for _ in range(num_layers)
        ]
        self.dropout = tf.keras.layers.Dropout(dropout)

    def call(self, x, enc_output, training, look_ahead_mask, padding_mask):
        x = self.embedding(x)
        for i in range(self.num_layers):
            x = self.dec_layers[i](x, enc_output, enc_output, attention_mask=look_ahead_mask)
            x = self.dropout(x, training=training)
        return x, None

In [6]:
# 트랜스포머 모델 구성하기
class Transformer(tf.keras.Model):
    def __init__(self, vocab_size, num_layers, units, d_model, num_heads, dropout):
        super(Transformer, self).__init__()
        self.encoder = Encoder(num_layers, d_model, num_heads, units, vocab_size, dropout)
        self.decoder = Decoder(num_layers, d_model, num_heads, units, vocab_size, dropout)
        self.final_layer = tf.keras.layers.Dense(vocab_size)
    
    def call(self, inputs, targets, training, enc_padding_mask, look_ahead_mask, dec_padding_mask):
        enc_output = self.encoder(inputs, training, enc_padding_mask)  # (batch_size, inp_seq_len, d_model)
        dec_output, _ = self.decoder(targets, enc_output, training, look_ahead_mask, dec_padding_mask)
        final_output = self.final_layer(dec_output)  # (batch_size, tar_seq_len, vocab_size)
        return final_output

# 트랜스포머 하이퍼파라미터 설정
num_layers = 4
d_model = 128
num_heads = 8
units = 512
dropout = 0.1
vocab_size = subword_encoder.vocab_size + 2

# 트랜스포머 모델 인스턴스 생성
transformer = Transformer(vocab_size, num_layers, units, d_model, num_heads, dropout)

# # Optimizer 및 손실 함수 설정
# learning_rate = tf.keras.optimizers.schedules.ExponentialDecay(
#     initial_learning_rate=1e-4, decay_steps=10000, decay_rate=0.9
# )
optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True, reduction='none')

def loss_function(real, pred):
    mask = tf.math.logical_not(tf.math.equal(real, 0))
    loss_ = loss_object(real, pred)
    mask = tf.cast(mask, dtype=loss_.dtype)
    loss_ *= mask
    return tf.reduce_mean(loss_)


In [7]:
# 학습 단계 정의
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='train_accuracy')

@tf.function
def train_step(inp, tar):
    tar_inp = tar[:, :-1]
    tar_real = tar[:, 1:]

    enc_padding_mask, look_ahead_mask, dec_padding_mask = None, None, None

    with tf.GradientTape() as tape:
        predictions = transformer(inp, tar_inp, True, enc_padding_mask, look_ahead_mask, dec_padding_mask)
        loss = loss_function(tar_real, predictions)

    gradients = tape.gradient(loss, transformer.trainable_variables)
    optimizer.apply_gradients(zip(gradients, transformer.trainable_variables))

    train_accuracy.update_state(tar_real, predictions)

    return loss

In [8]:
# 모델 학습하기
EPOCHS = 10
for epoch in range(EPOCHS):
    total_loss = 0

    train_accuracy.reset_states()

    for (batch, (inp, tar)) in enumerate(train_data):
        batch_loss = train_step(inp, tar)
        total_loss += batch_loss

    avg_loss = total_loss / (batch + 1)
    avg_accuracy = train_accuracy.result()

    print(f'Epoch {epoch + 1}, Loss: {avg_loss:.4f}, Accuracy: {avg_accuracy:.4f}')

Epoch 1, Loss: 1.1688, Accuracy: 0.0254
Epoch 2, Loss: 1.1149, Accuracy: 0.0254
Epoch 3, Loss: 1.1138, Accuracy: 0.0255
Epoch 4, Loss: 1.1128, Accuracy: 0.0253
Epoch 5, Loss: 1.1114, Accuracy: 0.0254
Epoch 6, Loss: 1.1108, Accuracy: 0.0256
Epoch 7, Loss: 1.1103, Accuracy: 0.0255
Epoch 8, Loss: 1.1099, Accuracy: 0.0256
Epoch 9, Loss: 1.1096, Accuracy: 0.0256
Epoch 10, Loss: 1.1091, Accuracy: 0.0257


### Step 5. 모델 평가하기
Step 1에서 선택한 전처리 방법을 고려하여 입력된 문장에 대해서 대답을 얻는 예측 함수를 만듭니다.

In [9]:
# Step 5: 모델 평가하기
def evaluate(sentence):
    sentence = preprocess_sentence(sentence)
    inputs = [subword_encoder.encode(sentence)]
    inputs = tf.keras.preprocessing.sequence.pad_sequences(inputs, maxlen=MAX_LENGTH, padding='post')
    inputs = tf.convert_to_tensor(inputs)

    output = tf.convert_to_tensor([[subword_encoder.vocab_size]])
    for i in range(MAX_LENGTH):
        predictions = transformer(inputs, output, False, None, None, None)
        predictions = predictions[:, -1:, :]
        predicted_id = tf.cast(tf.argmax(predictions, axis=-1), tf.int32)
        if predicted_id == subword_encoder.vocab_size + 1:
            break
        output = tf.concat([output, predicted_id], axis=-1)

    predicted_sentence = subword_encoder.decode([i for i in output.numpy()[0] if i < subword_encoder.vocab_size])
    return predicted_sentence


In [10]:
# 예시 문장 예측
def predict(sentence):
    response = evaluate(sentence)
    print(f'User: {sentence}')
    print(f'Bot: {response}')

# 대화 예시
predict("안녕하세요. 반갑습니다")


User: 안녕하세요. 반갑습니다
Bot: 


In [11]:
# 대화 예시
predict("안녕하세요.")

User: 안녕하세요.
Bot: 
