<a href="https://colab.research.google.com/github/gong-aipel/AIFFEL_quest-cr/blob/main/explortion_05_%EC%A0%9C%EC%B6%9C.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
import tensorflow as tf
import tensorflow_datasets as tfds
import numpy as np
import pandas as pd
import re

# Load data
data_path = tf.keras.utils.get_file("ChatbotData.csv",
    origin="https://github.com/songys/Chatbot_data/raw/master/ChatbotData.csv")
data = pd.read_csv(data_path)

# Preprocessing
def preprocess(sentence):
    sentence = sentence.strip()
    sentence = re.sub(r"([?.!,])", r" \1 ", sentence)
    sentence = re.sub(r"[^\uAC00-\uD7A3a-zA-Z?.!,]+", " ", sentence)
    sentence = re.sub(r'\s+', " ", sentence)
    return sentence

questions = [preprocess(q) for q in data['Q']]
answers = [preprocess(a) for a in data['A']]

# Tokenizer
tokenizer = tfds.deprecated.text.SubwordTextEncoder.build_from_corpus(questions + answers, target_vocab_size=2**13)
VOCAB_SIZE = tokenizer.vocab_size + 2
START_TOKEN, END_TOKEN = [VOCAB_SIZE - 2], [VOCAB_SIZE - 1]
MAX_LENGTH = 40

def encode(sentence):
    return START_TOKEN + tokenizer.encode(sentence) + END_TOKEN

input_tensor = [encode(q) for q in questions]
output_tensor = [encode(a) for a in answers]

input_tensor, output_tensor = zip(*[(inp, out) for inp, out in zip(input_tensor, output_tensor) if len(inp) <= MAX_LENGTH and len(out) <= MAX_LENGTH])

input_tensor = tf.keras.preprocessing.sequence.pad_sequences(input_tensor, maxlen=MAX_LENGTH, padding='post')
output_tensor = tf.keras.preprocessing.sequence.pad_sequences(output_tensor, maxlen=MAX_LENGTH, padding='post')

BATCH_SIZE = 64
BUFFER_SIZE = 20000
train_dataset = tf.data.Dataset.from_tensor_slices((
    {
        'inputs': input_tensor,
        'dec_inputs': output_tensor[:, :-1]
    },
    output_tensor[:, 1:]
)).cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE)

# Positional Encoding
class PositionalEncoding(tf.keras.layers.Layer):
    def __init__(self, position, d_model):
        super().__init__()
        angle_rads = self.get_angles(np.arange(position)[:, np.newaxis], np.arange(d_model)[np.newaxis, :], d_model)
        angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])
        angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])
        self.pos_encoding = tf.cast(angle_rads[np.newaxis, ...], dtype=tf.float32)

    def get_angles(self, pos, i, d_model):
        return pos / np.power(10000, (2 * (i//2)) / np.float32(d_model))

    def call(self, x):
        return x + self.pos_encoding[:, :tf.shape(x)[1], :]

# Multi-head Attention Encoder/Decoder Layers
class EncoderLayer(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads, dff, rate=0.1):
        super().__init__()
        self.mha = tf.keras.layers.MultiHeadAttention(num_heads=num_heads, key_dim=d_model)
        self.ffn = tf.keras.Sequential([
            tf.keras.layers.Dense(dff, activation='relu'),
            tf.keras.layers.Dense(d_model)
        ])
        self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = tf.keras.layers.Dropout(rate)
        self.dropout2 = tf.keras.layers.Dropout(rate)

    def call(self, x, training=False, mask=None):
        attn_output = self.mha(x, x, x, attention_mask=mask)
        out1 = self.layernorm1(x + self.dropout1(attn_output, training=training))
        ffn_output = self.ffn(out1)
        return self.layernorm2(out1 + self.dropout2(ffn_output, training=training))

class DecoderLayer(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads, dff, rate=0.1):
        super().__init__()
        self.mha1 = tf.keras.layers.MultiHeadAttention(num_heads=num_heads, key_dim=d_model)
        self.mha2 = tf.keras.layers.MultiHeadAttention(num_heads=num_heads, key_dim=d_model)
        self.ffn = tf.keras.Sequential([
            tf.keras.layers.Dense(dff, activation='relu'),
            tf.keras.layers.Dense(d_model)
        ])
        self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm3 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = tf.keras.layers.Dropout(rate)
        self.dropout2 = tf.keras.layers.Dropout(rate)
        self.dropout3 = tf.keras.layers.Dropout(rate)

    def call(self, x, enc_output, training=False, look_ahead_mask=None, padding_mask=None):
        attn1 = self.mha1(x, x, x, attention_mask=look_ahead_mask)
        out1 = self.layernorm1(x + self.dropout1(attn1, training=training))
        attn2 = self.mha2(out1, enc_output, enc_output, attention_mask=padding_mask)
        out2 = self.layernorm2(out1 + self.dropout2(attn2, training=training))
        ffn_output = self.ffn(out2)
        return self.layernorm3(out2 + self.dropout3(ffn_output, training=training))

# Encoder/Decoder
class Encoder(tf.keras.layers.Layer):
    def __init__(self, num_layers, d_model, num_heads, dff, input_vocab_size, maximum_position_encoding, rate=0.1):
        super().__init__()
        self.embedding = tf.keras.layers.Embedding(input_vocab_size, d_model)
        self.pos_encoding = PositionalEncoding(maximum_position_encoding, d_model)
        self.enc_layers = [EncoderLayer(d_model, num_heads, dff, rate) for _ in range(num_layers)]
        self.dropout = tf.keras.layers.Dropout(rate)

    def call(self, x, training=False, mask=None):
        x = self.embedding(x)
        x *= tf.math.sqrt(tf.cast(tf.shape(x)[-1], tf.float32))
        x = self.pos_encoding(x)
        x = self.dropout(x, training=training)
        for layer in self.enc_layers:
            x = layer(x, training=training, mask=mask)
        return x

class Decoder(tf.keras.layers.Layer):
    def __init__(self, num_layers, d_model, num_heads, dff, target_vocab_size, maximum_position_encoding, rate=0.1):
        super().__init__()
        self.embedding = tf.keras.layers.Embedding(target_vocab_size, d_model)
        self.pos_encoding = PositionalEncoding(maximum_position_encoding, d_model)
        self.dec_layers = [DecoderLayer(d_model, num_heads, dff, rate) for _ in range(num_layers)]
        self.dropout = tf.keras.layers.Dropout(rate)

    def call(self, x, enc_output, training=False, look_ahead_mask=None, padding_mask=None):
        x = self.embedding(x)
        x *= tf.math.sqrt(tf.cast(tf.shape(x)[-1], tf.float32))
        x = self.pos_encoding(x)
        x = self.dropout(x, training=training)
        for layer in self.dec_layers:
            x = layer(x, enc_output, training=training, look_ahead_mask=look_ahead_mask, padding_mask=padding_mask)
        return x

# Masking 함수 정의 (누락되었던 부분)
def create_masks(inp, tar):
    enc_padding_mask = tf.cast(tf.math.not_equal(inp, 0), tf.float32)[:, tf.newaxis, tf.newaxis, :]
    dec_padding_mask = tf.cast(tf.math.not_equal(inp, 0), tf.float32)[:, tf.newaxis, tf.newaxis, :]
    look_ahead_mask = 1 - tf.linalg.band_part(tf.ones((tf.shape(tar)[1], tf.shape(tar)[1])), -1, 0)
    dec_target_padding_mask = tf.cast(tf.math.not_equal(tar, 0), tf.float32)
    combined_mask = tf.maximum(look_ahead_mask, 1 - dec_target_padding_mask[:, tf.newaxis, tf.newaxis, :])
    return enc_padding_mask, combined_mask, dec_padding_mask

# Transformer
class Transformer(tf.keras.Model):
    def __init__(self, num_layers, d_model, num_heads, dff, input_vocab_size, target_vocab_size, pe_input, pe_target, rate=0.1):
        super().__init__()
        self.encoder = Encoder(num_layers, d_model, num_heads, dff, input_vocab_size, pe_input, rate)
        self.decoder = Decoder(num_layers, d_model, num_heads, dff, target_vocab_size, pe_target, rate)
        self.final_layer = tf.keras.layers.Dense(target_vocab_size)

    def call(self, inputs, training=False):
        inp, tar = inputs['inputs'], inputs['dec_inputs']
        enc_padding_mask, combined_mask, dec_padding_mask = create_masks(inp, tar)
        enc_output = self.encoder(inp, training=training, mask=enc_padding_mask)
        dec_output = self.decoder(tar, enc_output, training=training, look_ahead_mask=combined_mask, padding_mask=dec_padding_mask)
        return {'outputs': self.final_layer(dec_output)}

# 하이퍼파라미터 및 모델
model = Transformer(
    num_layers=2,
    d_model=256,
    num_heads=8,
    dff=512,
    input_vocab_size=VOCAB_SIZE,
    target_vocab_size=VOCAB_SIZE,
    pe_input=MAX_LENGTH,
    pe_target=MAX_LENGTH,
)

# 손실 및 학습 설정
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True, reduction='none')

def loss_function(y_true, y_pred):
    mask = tf.math.logical_not(tf.math.equal(y_true, 0))
    loss = loss_object(y_true, y_pred)
    mask = tf.cast(mask, dtype=loss.dtype)
    return tf.reduce_sum(loss * mask) / tf.reduce_sum(mask)

model.compile(optimizer=tf.keras.optimizers.Adam(1e-4), loss=loss_function, metrics=['accuracy'])

# 학습
EPOCHS = 10
model.fit(train_dataset, epochs=EPOCHS)


Downloading data from https://github.com/songys/Chatbot_data/raw/master/ChatbotData.csv
[1m889842/889842[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 0us/step
Epoch 1/10
[1m185/185[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2747s[0m 15s/step - accuracy: 0.0459 - loss: 7.5522
Epoch 2/10
[1m185/185[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2655s[0m 14s/step - accuracy: 0.0548 - loss: 5.5261
Epoch 3/10
[1m185/185[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2707s[0m 15s/step - accuracy: 0.0739 - loss: 4.5072
Epoch 4/10
[1m174/185[0m [32m━━━━━━━━━━━━━━━━━━[0m[37m━━[0m [1m2:41[0m 15s/step - accuracy: 0.0886 - loss: 3.7842

In [None]:
def predict(sentence):
    # 전처리: 공백 제거 + 토크나이징
    sentence = tf.expand_dims(START_TOKEN + tokenizer.encode(sentence) + END_TOKEN, axis=0)

    # 디코더 입력은 시작 토큰으로 시작
    output = tf.expand_dims(START_TOKEN, 0)

    for i in range(MAX_LENGTH):
        # 입력 마스크 생성
        predictions = model(inputs={
            'inputs': sentence,
            'dec_inputs': output
        }, training=False)

        # 마지막 시퀀스 추출
        predictions = predictions[:, -1:, :]
        predicted_id = tf.argmax(predictions, axis=-1, output_type=tf.int32)

        # 종료 토큰이면 멈춤
        if tf.equal(predicted_id, END_TOKEN[0]):
            break

        # 예측 결과를 디코더 입력에 추가
        output = tf.concat([output, predicted_id], axis=-1)

    # 토크나이저로 디코딩
    predicted_sentence = tokenizer.decode([i for i in tf.squeeze(output, axis=0) if i < tokenizer.vocab_size])

    return predicted_sentence


In [None]:
print(predict("안녕하세요"))
print(predict("오늘 기분 어때?"))
print(predict("너는 누구니?"))
