In [375]:
import tensorflow as tf
import tensorflow_datasets as tfds
import os
import re
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt


# 데이터 로드
df = pd.read_csv(r"/content/ChatbotData.csv")
questions = df['Q'].tolist()
answers = df['A'].tolist()
print(f"전체 샘플 수: {len(questions)}")

전체 샘플 수: 11823


In [None]:
df

In [None]:
print('전체 샘플 수 :', len(questions))

#전처리 함수

In [None]:
# 1) 전처리 함수
def preprocess_sentence(sentence):
  # 단어와 구두점 사이에 공백 추가
  sentence = re.sub(r"([?.!,])", r" \1 ", sentence)
  sentence = re.sub(r'\s+', ' ', sentence)
  sentence = re.sub(r'[" "]+', " ", sentence)
  # 한글, 영어, 숫자, 그리고 일부 구두점(?, !, ., ,)을 제외한 모든 문자 제거
  sentence = re.sub(r"[^가-힣a-zA-Z0-9?.!,]+", " ", sentence)
  sentence = sentence.strip()
  return sentence

In [None]:
# 2) 토크나이저 및 단어 집합 생성

MAX_LENGTH = 40  # 문장의 최대 길이 설정

In [None]:
# 질문과 답변 데이터셋에 대해서 Vocabulary 생성
tokenizer = tfds.deprecated.text.SubwordTextEncoder.build_from_corpus(
    questions + answers, target_vocab_size=2**13)

# 특수 토큰인 START_TOKEN과 END_TOKEN을 고려하여 +2
VOCAB_SIZE = tokenizer.vocab_size + 2
START_TOKEN = tokenizer.vocab_size
END_TOKEN = tokenizer.vocab_size + 1

In [None]:
print(f'단어 집합의 크기 (VOCAB_SIZE): {VOCAB_SIZE}')
print(f'START_TOKEN 인덱스: {START_TOKEN}')
print(f'END_TOKEN 인덱스: {END_TOKEN}')

In [None]:
# 3) 질문/답변을 정수 인코딩, 패딩, 필터링
def tokenize_and_filter(inputs, outputs):
  tokenized_inputs, tokenized_outputs = [], []

  for (sentence1, sentence2) in zip(inputs, outputs):
    # 정수 인코딩
    sentence1 = [START_TOKEN] + tokenizer.encode(preprocess_sentence(sentence1)) + [END_TOKEN]
    sentence2 = [START_TOKEN] + tokenizer.encode(preprocess_sentence(sentence2)) + [END_TOKEN]

    # 길이 필터링: MAX_LENGTH 이하인 경우만 추가
    if len(sentence1) <= MAX_LENGTH and len(sentence2) <= MAX_LENGTH:
      tokenized_inputs.append(sentence1)
      tokenized_outputs.append(sentence2)


  # 패딩: 최대 길이로 맞춤
  tokenized_inputs = tf.keras.preprocessing.sequence.pad_sequences(
      tokenized_inputs, maxlen=MAX_LENGTH, padding='post', dtype='int32')
  tokenized_outputs = tf.keras.preprocessing.sequence.pad_sequences(
      tokenized_outputs, maxlen=MAX_LENGTH, padding='post',dtype='int32')

  # return tokenized_inputs, tokenized_outputs
  return np.array(tokenized_inputs, dtype=np.int32), np.array(tokenized_outputs, dtype=np.int32)

questions, answers = tokenize_and_filter(questions, answers)

print('단어 집합의 크기 (VOCAB_SIZE) :', VOCAB_SIZE)
print('인코딩된 질문의 크기 (shape) :', questions.shape)
print('인코딩된 답변의 크기 (shape) :', answers.shape)


In [None]:
# questions = np.array(questions, dtype=np.int32) # 형식변환
# answers = np.array(answers, dtype=np.int32)
# questions_tensor = tf.constant(questions, dtype=tf.int32)
# answers_tensor = tf.constant(answers, dtype=tf.int32)

In [None]:
# 4) 텐서플로우 Dataset 구성
BATCH_SIZE = 64
BUFFER_SIZE = 20000

#넘피 배열을 텐서플로우 텐서로 변환

dataset = tf.data.Dataset.from_tensor_slices(
    (
        {
            'inputs': questions,
            'dec_inputs': answers[:, :-1]  # 디코더 입력: <start>부터 시작
        },
        {
            'outputs': answers[:, 1:]  # 디코더 레이블: 다음 토큰부터 시작
        }
    )
)
def cast_to_int32(features, labels):
    features['inputs'] = tf.cast(features['inputs'], tf.int32)
    features['dec_inputs'] = tf.cast(features['dec_inputs'], tf.int32)
    labels['outputs'] = tf.cast(labels['outputs'], tf.int32)
    return features, labels

dataset = dataset.map(cast_to_int32, num_parallel_calls=tf.data.AUTOTUNE)
dataset = dataset.cache()
dataset = dataset.shuffle(BUFFER_SIZE)
dataset = dataset.batch(BATCH_SIZE)
# dataset = dataset.prefetch(tf.data.AUTOTUNE)# tf.data.experimental.AUTOTUNE 대신 표준 tf.data.AUTOTUNE 사용 (최적화)
dataset = dataset.prefetch(tf.data.AUTOTUNE)


# 마스킹


In [None]:
def create_padding_mask(x):
  # 마스킹할 값을 1로 표시 (패딩 토큰인 0인 경우)
  mask = tf.cast(tf.math.equal(x, 0), tf.float32)
  # (batch_size, 1, 1, sequence_length) 형태로 리턴
  return mask[:, tf.newaxis, tf.newaxis, :]

def create_look_ahead_mask(x):
    seq_len = tf.shape(x)[1]

    # Look-ahead mask: (1, 1, seq_len, seq_len)
    look_ahead_mask = 1 - tf.linalg.band_part(tf.ones((seq_len, seq_len)), -1, 0)
    look_ahead_mask = look_ahead_mask[tf.newaxis, tf.newaxis, :, :]

    # Padding mask: (batch_size, 1, 1, seq_len)
    padding_mask = create_padding_mask(x)

    # Combine with broadcasting
    combined_mask = tf.maximum(look_ahead_mask, padding_mask)
    return combined_mask

# 트랜스포머 모델


In [None]:
# 1) 스케일드 닷 프로덕트 어텐션 함수
def scaled_dot_product_attention(query, key, value, mask):
  # 어텐션 가중치는 Q와 K의 전치 행렬의 곱
  matmul_qk = tf.matmul(query, key, transpose_b=True)

  # 스케일링: depth의 제곱근으로 나눈다
  depth = tf.cast(tf.shape(key)[-1], tf.float32)
  logits = matmul_qk / tf.math.sqrt(depth)

  # 마스킹 (패딩 마스크 또는 룩-어헤드 마스크)
  if mask is not None:
    logits += (mask * -1e9)

  # Softmax를 통해 어텐션 가중치 획득
  attention_weights = tf.nn.softmax(logits, axis=-1)
  output = tf.matmul(attention_weights, value)

  return output, attention_weights

In [None]:
# 2) 멀티 헤드 어텐션 클래스
class MultiHeadAttention(tf.keras.layers.Layer):

  def __init__(self, d_model, num_heads, name="multi_head_attention"):
    super(MultiHeadAttention, self).__init__(name=name)
    self.num_heads = num_heads
    self.d_model = d_model

    assert d_model % self.num_heads == 0

    self.depth = d_model // self.num_heads

    self.query_dense = tf.keras.layers.Dense(units=d_model)
    self.key_dense = tf.keras.layers.Dense(units=d_model)
    self.value_dense = tf.keras.layers.Dense(units=d_model)

    self.dense = tf.keras.layers.Dense(units=d_model)

  def split_heads(self, inputs, batch_size):
    inputs = tf.reshape(
        inputs, shape=(batch_size, -1, self.num_heads, self.depth))
    return tf.transpose(inputs, perm=[0, 2, 1, 3])

  def call(self, inputs):
    query, key, value, mask = inputs['query'], inputs['key'], inputs['value'], inputs['mask']
    batch_size = tf.shape(query)[0]

    # Q, K, V에 각각 Dense Layer 적용
    query = self.query_dense(query)
    key = self.key_dense(key)
    value = self.value_dense(value)

    # Multi-Head로 나누기
    query = self.split_heads(query, batch_size)
    key = self.split_heads(key, batch_size)
    value = self.split_heads(value, batch_size)

    # 스케일드 닷 프로덕트 어텐션 적용
    scaled_attention, _ = scaled_dot_product_attention(
        query, key, value, mask)

    scaled_attention = tf.transpose(scaled_attention, perm=[0, 2, 1, 3])

    # Head들을 다시 concatenate
    concat_attention = tf.reshape(scaled_attention,
                                  (batch_size, -1, self.d_model))

    # Final Dense Layer
    outputs = self.dense(concat_attention)

    return outputs

In [None]:

# 3) 포지션 와이즈 피드 포워드 네트워크 함수
def feed_forward_network(d_model, units):
  return tf.keras.Sequential([
      tf.keras.layers.Dense(units=units, activation='relu'),
      tf.keras.layers.Dense(units=d_model)
  ])

In [None]:
# 4) 포지셔널 인코딩 레이어

class PositionalEncoding(tf.keras.layers.Layer):
  def __init__(self, position, d_model):
    super(PositionalEncoding, self).__init__()
    self.pos_encoding = self.positional_encoding(position, d_model)

  def get_angles(self, position, i, d_model):
    angles = 1 / tf.pow(10000, (2 * (i // 2)) / tf.cast(d_model, tf.float32))
    return position * angles

  def positional_encoding(self, position, d_model):
    angle_rads = self.get_angles(
        position=tf.range(position, dtype=tf.float32)[:, tf.newaxis],
        i=tf.range(d_model, dtype=tf.float32)[tf.newaxis, :],
        d_model=d_model)

    sines = tf.math.sin(angle_rads[:, 0::2])
    cosines = tf.math.cos(angle_rads[:, 1::2])

    pos_encoding = tf.stack([sines, cosines], axis=0)
    pos_encoding = tf.transpose(pos_encoding,[1, 2, 0])
    pos_encoding = tf.reshape(pos_encoding, [position, d_model])

    pos_encoding = pos_encoding[tf.newaxis, ...]
    return tf.cast(pos_encoding, tf.float32)

  def call(self, inputs): #수정
     if isinstance(inputs, tf.SparseTensor):
        inputs = tf.sparse.to_dense(inputs)
     return inputs + self.pos_encoding[:, :tf.shape(inputs)[1], :]

# 인코더, 디코더 레이어

In [None]:
# 1) 인코더 레이어
def encoder_layer(units, d_model, num_heads, dropout, name="encoder_layer"):
  inputs = tf.keras.Input(shape=(None, d_model), name="inputs")
  padding_mask = tf.keras.Input(shape=(1, 1, None), name="padding_mask")

  # Multi-Head Attention (Self-Attention)
  attention = MultiHeadAttention(
      d_model, num_heads, name="multi_head_attention")({
          'query': inputs,
          'key': inputs,
          'value': inputs,
          'mask': padding_mask
      })
  attention = tf.keras.layers.Dropout(rate=dropout)(attention)
  attention = tf.keras.layers.LayerNormalization(epsilon=1e-6)(inputs + attention)

  # Feed Forward Network
  outputs = feed_forward_network(d_model, units)(attention)
  outputs = tf.keras.layers.Dropout(rate=dropout)(outputs)
  outputs = tf.keras.layers.LayerNormalization(epsilon=1e-6)(attention + outputs)

  return tf.keras.Model(
      inputs=[inputs, padding_mask], outputs=outputs, name=name)

In [None]:
# 2) 인코더
def encoder(vocab_size, num_layers, units, d_model, num_heads, dropout, name="encoder"):
  inputs = tf.keras.Input(shape=(None,), name="inputs")
  padding_mask = tf.keras.Input(shape=(1, 1, None), name="padding_mask")

  # 임베딩 레이어
  embeddings = tf.keras.layers.Embedding(vocab_size, d_model)(inputs)
  embeddings *= tf.math.sqrt(tf.cast(d_model, tf.float32))

  # 포지셔널 인코딩
  embeddings = PositionalEncoding(vocab_size, d_model)(embeddings)
  outputs = tf.keras.layers.Dropout(rate=dropout)(embeddings)

  # N개의 인코더 레이어 쌓기
  for i in range(num_layers):
    outputs = encoder_layer(
        units=units,
        d_model=d_model,
        num_heads=num_heads,
        dropout=dropout,
        name="encoder_layer_{}".format(i)
    )([outputs, padding_mask])

  return tf.keras.Model(
      inputs=[inputs, padding_mask], outputs=outputs, name=name)

In [None]:
# 3) 디코더 레이어
def decoder_layer(units, d_model, num_heads, dropout, name="decoder_layer"):
  inputs = tf.keras.Input(shape=(None, d_model), name="inputs")
  enc_outputs = tf.keras.Input(shape=(None, d_model), name="encoder_outputs")
  look_ahead_mask = tf.keras.Input(shape=(1, None, None), name="look_ahead_mask")
  padding_mask = tf.keras.Input(shape=(1, 1, None), name="padding_mask")

  # Masked Multi-Head Attention (Self-Attention)
  attention1 = MultiHeadAttention(
      d_model, num_heads, name="multi_head_attention_1")({
          'query': inputs,
          'key': inputs,
          'value': inputs,
          'mask': look_ahead_mask
      })
  attention1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)(attention1 + inputs)

  # Multi-Head Attention (Encoder-Decoder Attention)
  attention2 = MultiHeadAttention(
      d_model, num_heads, name="multi_head_attention_2")({
          'query': attention1,
          'key': enc_outputs,
          'value': enc_outputs,
          'mask': padding_mask
      })
  attention2 = tf.keras.layers.Dropout(rate=dropout)(attention2)
  attention2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)(attention2 + attention1)

  # Feed Forward Network
  outputs = feed_forward_network(d_model, units)(attention2)
  outputs = tf.keras.layers.Dropout(rate=dropout)(outputs)
  outputs = tf.keras.layers.LayerNormalization(epsilon=1e-6)(attention2 + outputs)

  return tf.keras.Model(
      inputs=[inputs, enc_outputs, look_ahead_mask, padding_mask],
      outputs=outputs,
      name=name)

In [None]:
# 4) 디코더
def decoder(vocab_size, num_layers, units, d_model, num_heads, dropout, name='decoder'):
  inputs = tf.keras.Input(shape=(None,), name='inputs')
  enc_outputs = tf.keras.Input(shape=(None, d_model), name='encoder_outputs')
  look_ahead_mask = tf.keras.Input(shape=(1, None, None), name='look_ahead_mask')
  padding_mask = tf.keras.Input(shape=(1, 1, None), name='padding_mask')

  # 임베딩 레이어
  embeddings = tf.keras.layers.Embedding(vocab_size, d_model)(inputs)
  embeddings *= tf.math.sqrt(tf.cast(d_model, tf.float32))

  # 포지셔널 인코딩
  embeddings = PositionalEncoding(vocab_size, d_model)(embeddings)
  outputs = tf.keras.layers.Dropout(rate=dropout)(embeddings)

  # N개의 디코더 레이어 쌓기
  for i in range(num_layers):
    outputs = decoder_layer(
        units=units,
        d_model=d_model,
        num_heads=num_heads,
        dropout=dropout,
        name='decoder_layer_{}'.format(i),
    )(inputs=[outputs, enc_outputs, look_ahead_mask, padding_mask])

  return tf.keras.Model(
      inputs=[inputs, enc_outputs, look_ahead_mask, padding_mask],
      outputs=outputs,
      name=name)

In [369]:
# 5) 트랜스포머 모델
def transformer(vocab_size, num_layers, units, d_model, num_heads, dropout, name="transformer"):
  # 입력 정의
  inputs = tf.keras.Input(shape=(None,), name="inputs") # 질문
  dec_inputs = tf.keras.Input(shape=(None,), name="dec_inputs") # 답변의 시작

  # 마스크 정의
  enc_padding_mask = tf.keras.layers.Lambda(
      create_padding_mask, output_shape=(1, 1, None), name='enc_padding_mask')(inputs)
  look_ahead_mask = tf.keras.layers.Lambda(
      create_look_ahead_mask, output_shape=(1, None, None), name='look_ahead_mask')(dec_inputs)
  dec_padding_mask = tf.keras.layers.Lambda(
      create_padding_mask, output_shape=(1, 1, None), name='dec_padding_mask')(inputs)

  # 인코더
  encoder_instance = encoder(
      vocab_size=vocab_size, num_layers=num_layers, units=units, d_model=d_model,
      num_heads=num_heads, dropout=dropout)
  enc_outputs = encoder_instance(inputs=[inputs, enc_padding_mask])

  # 디코더
  decoder_instance = decoder(
      vocab_size=vocab_size, num_layers=num_layers, units=units, d_model=d_model,
      num_heads=num_heads, dropout=dropout)
  dec_outputs = decoder_instance(inputs=[dec_inputs, enc_outputs, look_ahead_mask, dec_padding_mask])

  # 최종 출력 레이어
  outputs = tf.keras.layers.Dense(units=vocab_size, name="outputs")(dec_outputs)

  # 모델 인스턴스 생성 시 인코더와 디코더를 속성으로 추가하여 예측 단계에서 사용 가능하도록 함
  model = tf.keras.Model(inputs=[inputs, dec_inputs], outputs=outputs, name=name)
  model.encoder = encoder_instance
  model.decoder = decoder_instance
  model.final_layer = model.layers[-1]

  return model

# 5. 모델 학습 설정

In [370]:
# 1) 학습률 스케줄러
class CustomSchedule(tf.keras.optimizers.schedules.LearningRateSchedule):

  def __init__(self, d_model, warmup_steps=4000):
    super(CustomSchedule, self).__init__()
    self.d_model = tf.cast(d_model, tf.float32)
    self.warmup_steps = tf.cast(warmup_steps, tf.float32)

  def __call__(self, step):
    step = tf.cast(step, tf.float32)
    arg1 = tf.math.rsqrt(step)
    arg2 = step * (self.warmup_steps**-1.5)

    return tf.math.rsqrt(self.d_model) * tf.math.minimum(arg1, arg2)


In [371]:
# 2) 모델 파라미터 및 인스턴스화
tf.keras.backend.clear_session()

# 하이퍼파라미터
NUM_LAYERS = 2  # 인코더와 디코더의 층 수
D_MODEL = 256   # 임베딩 차원
NUM_HEADS = 8   # 어텐션 헤드의 수
UNITS = 512     # Feed Forward Network의 은닉층 크기
DROPOUT = 0.1   # 드롭아웃 비율
EPOCHS = 10     # 학습 에폭

model = transformer(
    vocab_size=VOCAB_SIZE,
    num_layers=NUM_LAYERS,
    units=UNITS,
    d_model=D_MODEL,
    num_heads=NUM_HEADS,
    dropout=DROPOUT)

In [372]:
# 3) 손실 함수 및 옵티마이저
def loss_function(y_true, y_pred):
  # 패딩 토큰에 대한 손실을 무시하는 로직 (최적화)
  y_true = tf.reshape(y_true, shape=(-1, MAX_LENGTH - 1))
  loss = tf.keras.losses.SparseCategoricalCrossentropy(
      from_logits=True, reduction='none')(y_true, y_pred)
  mask = tf.cast(tf.not_equal(y_true, 0), tf.float32)
  loss = tf.multiply(loss, mask)
  return tf.reduce_sum(loss) / tf.reduce_sum(mask)

def accuracy(y_true, y_pred):
  y_true = tf.reshape(y_true, shape=(-1, MAX_LENGTH - 1))
  return tf.keras.metrics.sparse_categorical_accuracy(y_true, y_pred)

# 옵티마이저 설정
learning_rate = CustomSchedule(D_MODEL)
optimizer = tf.keras.optimizers.Adam(
    learning_rate, beta_1=0.9, beta_2=0.98, epsilon=1e-9)

model.compile(optimizer=optimizer, loss=loss_function, metrics=[accuracy])



In [373]:
# 4) 모델 학습
print(f"모델 학습 시작 (EPOCHS: {EPOCHS})")

model.fit(dataset, epochs=EPOCHS, verbose=1)

모델 학습 시작 (EPOCHS: 10)
Epoch 1/10


TypeError: Expected float32, but got outputs of type 'str'.

# 예측


In [None]:
# 1) 디코더 추론 함수
def decoder_inference(sentence):
  sentence = preprocess_sentence(sentence)

  # 입력 문장을 정수 인코딩 후 배치 차원 추가
  sentence = tf.expand_dims(
      START_TOKEN + tokenizer.encode(sentence) + END_TOKEN, axis=0)

  # 인코더 출력은 한 번만 계산
  enc_padding_mask = create_padding_mask(sentence)
  enc_outputs = model.encoder(
      [sentence, enc_padding_mask], training=False)

  # 디코더 입력: START_TOKEN만 있는 상태에서 시작
  dec_sequence = tf.expand_dims(START_TOKEN, axis=0)

  for _ in range(MAX_LENGTH):
    # 디코더의 두 번째 어텐션을 위한 패딩 마스크
    dec_padding_mask = create_padding_mask(sentence)
    # 디코더의 첫 번째 어텐션을 위한 룩-어헤드 마스크
    look_ahead_mask = create_look_ahead_mask(dec_sequence)

    # 디코더 예측 (다음 토큰 예측)
    predictions = model.decoder(
        [dec_sequence, enc_outputs, look_ahead_mask, dec_padding_mask], training=False)

    # 최종 출력 레이어 적용
    predictions = model.final_layer(predictions)
    # 현재 시점의 예측 결과 (가장 마지막 토큰)
    predictions = predictions[:, -1:, :]
    predicted_id = tf.cast(tf.argmax(predictions, axis=-1), tf.int32)

    # 만약 현재 예측한 단어가 종료 토큰이라면 for문을 종료
    if tf.equal(predicted_id, END_TOKEN[0]):
      break

    # 예측한 단어들은 지속적으로 output_sequence에 추가됩니다.
    # 이 output_sequence는 다시 디코더의 입력이 됩니다.
    dec_sequence = tf.concat([dec_sequence, predicted_id], axis=-1)

  return tf.squeeze(dec_sequence, axis=0)

In [None]:
# 2) 문장 생성 함수
def sentence_generation(sentence):
  # 입력 문장에 대해서 디코더를 동작 시켜 예측된 정수 시퀀스를 리턴받습니다.
  prediction = decoder_inference(sentence)

  # 정수 시퀀스를 다시 텍스트 시퀀스로 변환합니다.
  predicted_sentence = tokenizer.decode(
      [i for i in prediction if i < tokenizer.vocab_size])
  return predicted_sentence


  print('입력 : {}'.format(sentence))
  print('출력 : {}'.format(predicted_sentence))



# 테스트

In [None]:
sentence_generation("안녕")
sentence_generation("배가 너무 고파")