## Seq2Seq O&A Chatbot 구현 실습

### 데이터 취득

In [42]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import sentencepiece as spm
import os
import tensorflow as tf

In [7]:
df = pd.read_csv("https://raw.githubusercontent.com/songys/Chatbot_data/refs/heads/master/ChatbotData.csv")
df = df[['Q', 'A']]
df

Unnamed: 0,Q,A
0,12시 땡!,하루가 또 가네요.
1,1지망 학교 떨어졌어,위로해 드립니다.
2,3박4일 놀러가고 싶다,여행은 언제나 좋죠.
3,3박4일 정도 놀러가고 싶다,여행은 언제나 좋죠.
4,PPL 심하네,눈살이 찌푸려지죠.
...,...,...
11818,훔쳐보는 것도 눈치 보임.,티가 나니까 눈치가 보이는 거죠!
11819,훔쳐보는 것도 눈치 보임.,훔쳐보는 거 티나나봐요.
11820,흑기사 해주는 짝남.,설렜겠어요.
11821,힘든 연애 좋은 연애라는게 무슨 차이일까?,잘 헤어질 수 있는 사이 여부인 거 같아요.


### 데이터 전처리

In [16]:
# 학습용 텍스트 파일 생성 (Q + A)
train_text_path = "chatbot_data.txt"

with open(train_text_path, "w", encoding="utf-8") as f:
    for Q, A in zip(df["Q"], df["A"]):
        f.write(Q.strip() + "\t" + A.strip() + "\n")

토크나이저 학습 (sentencepiece 활용)
* 접두사, 접미사 처리

학습용 데이터 Q_input, A_input, A_target 생성

In [17]:
txt_file = "chatbot_data.txt" 


Q_list, A_list = [], []
with open(txt_file, "r", encoding="utf-8") as f:
    for line in f:
        q, a = line.strip().split("\t")
        Q_list.append(q)
        A_list.append(a)

len(Q_list), len(A_list)

(11823, 11823)

In [18]:
spm_model_prefix = "chatbot_spm"
VOCAB_SIZE = 10000

spm_train_cmd = (
    f"--input={txt_file} --model_prefix={spm_model_prefix} "
    f"--vocab_size={VOCAB_SIZE} --character_coverage=0.9995 "
    f"--model_type=bpe --pad_id=0 --bos_id=1 --eos_id=2 --unk_id=3"
)

spm.SentencePieceTrainer.Train(spm_train_cmd)

In [19]:
sp = spm.SentencePieceProcessor()
sp.load(f"{spm_model_prefix}.model")

Q_input, A_input, A_target = [], [], []

for q, a in zip(Q_list, A_list):
    Q_input.append(sp.encode(q, out_type=int)) 
    A_input.append([1] + sp.encode(a, out_type=int))
    A_target.append(sp.encode(a, out_type=int) + [2]) 

##### 패딩 처리

In [None]:
from tensorflow.keras.preprocessing.sequence import pad_sequences

max_len = max(max(map(len, Q_input)), max(map(len, A_input)), max(map(len, A_target)))

Q_input_padded = pad_sequences(Q_input, maxlen=max_len, padding="post", value=0)
A_input_padded = pad_sequences(A_input, maxlen=max_len, padding="post", value=0)
A_target_padded = pad_sequences(A_target, maxlen=max_len, padding="post", value=0)

### 모델 생성

In [None]:
for batch in dataset.take(1): 
    print(batch)

((<tf.Tensor: shape=(64, 30), dtype=int32, numpy=
array([[  61,   89,  186, ...,    0,    0,    0],
       [ 459, 9112,  979, ...,    0,    0,    0],
       [1766,  216,  371, ...,    0,    0,    0],
       ...,
       [3331, 9058, 8923, ...,    0,    0,    0],
       [ 888, 8003, 5286, ...,    0,    0,    0],
       [ 226,   77, 3443, ...,    0,    0,    0]])>, <tf.Tensor: shape=(64, 30), dtype=int32, numpy=
array([[   1, 4195, 2608, ...,    0,    0,    0],
       [   1,   86, 4899, ...,    0,    0,    0],
       [   1,  234, 1149, ...,    0,    0,    0],
       ...,
       [   1, 2851, 3561, ...,    0,    0,    0],
       [   1, 1406,   91, ...,    0,    0,    0],
       [   1, 6233,   29, ...,    0,    0,    0]])>), <tf.Tensor: shape=(64, 30), dtype=int32, numpy=
array([[4195, 2608, 1055, ...,    0,    0,    0],
       [  86, 4899, 4605, ...,    0,    0,    0],
       [ 234, 1149, 1520, ...,    0,    0,    0],
       ...,
       [2851, 3561,   56, ...,    0,    0,    0],
       [140

##### 인코더 생성

In [47]:
from tensorflow.keras import layers, models

# === 하이퍼파라미터 설정 ===
LATENT_DIM = 512  # LSTM 차원
EMBEDDING_DIM = 256  # 임베딩 차원
VOCAB_SIZE = 10000  # SentencePiece 단어 사전 크기

encoder_inputs = layers.Input(shape=(Q_input_padded.shape[1],))
encoder_embedding = layers.Embedding(VOCAB_SIZE, EMBEDDING_DIM, mask_zero=True)(encoder_inputs)
encoder_outputs, state_h, state_c = layers.LSTM(LATENT_DIM, return_state=True)(encoder_embedding)

encoder_states = [state_h, state_c]
encoder_model = models.Model(inputs=encoder_inputs, outputs=encoder_states)


encoder_model.summary()


##### 디코더 (teacher-forcing 모델 )생성

In [48]:
decoder_inputs = layers.Input(shape=(A_input_padded.shape[1],))
decoder_embedding = layers.Embedding(VOCAB_SIZE, EMBEDDING_DIM, mask_zero=True)(decoder_inputs)
decoder_lstm = layers.LSTM(LATENT_DIM, return_sequences=True, return_state=True)

# ⚠️ encoder_states를 직접 전달하는 것이 아니라 모델 내부에서 처리하도록 수정
decoder_outputs, _, _ = decoder_lstm(decoder_embedding, initial_state=encoder_states)

decoder_dense = layers.Dense(VOCAB_SIZE, activation="softmax")
decoder_outputs = decoder_dense(decoder_outputs)

# 디코더 모델
decoder_model = models.Model(inputs=[decoder_inputs, encoder_inputs], outputs=decoder_outputs)

# === 4️⃣ Seq2Seq 모델 통합 ===
seq2seq_model = models.Model([encoder_inputs, decoder_inputs], decoder_outputs)

seq2seq_model.compile(
    loss="sparse_categorical_crossentropy",
    optimizer="adam",
    metrics=["accuracy"]
)

seq2seq_model.summary()


### 모델 학습

##### 학습

In [None]:
# Dataset 변환
BUFFER_SIZE = len(Q_input_padded)
BATCH_SIZE = 64

dataset = tf.data.Dataset.from_tensor_slices((Q_input_padded, A_input_padded, A_target_padded))

# map_func을 올바르게 수정
def map_func(encoder_input, decoder_input, decoder_target):
    return (encoder_input, decoder_input), decoder_target

dataset = dataset.map(map_func)

# drop_remainder=False로 변경하여 데이터 손실 방지
dataset = dataset.shuffle(BUFFER_SIZE).batch(BATCH_SIZE, drop_remainder=False).prefetch(tf.data.AUTOTUNE)


# === 7️⃣ 모델 학습 실행 ===
EPOCHS = 10
seq2seq_model.fit(dataset, epochs=EPOCHS, verbose=1)


✅ 데이터셋 샘플 확인: ((<tf.Tensor: shape=(64, 30), dtype=int32, numpy=
array([[3270,   74,  179, ...,    0,    0,    0],
       [4147, 2366, 8983, ...,    0,    0,    0],
       [2593, 4358,    0, ...,    0,    0,    0],
       ...,
       [1316, 1858, 9077, ...,    0,    0,    0],
       [3842, 3460,   96, ...,    0,    0,    0],
       [ 181, 1618,   51, ...,    0,    0,    0]])>, <tf.Tensor: shape=(64, 30), dtype=int32, numpy=
array([[   1,  277, 3014, ...,    0,    0,    0],
       [   1, 6652, 8916, ...,    0,    0,    0],
       [   1, 1674,  623, ...,    0,    0,    0],
       ...,
       [   1, 1291,   68, ...,    0,    0,    0],
       [   1, 3831, 1036, ...,    0,    0,    0],
       [   1,  129,   29, ...,    0,    0,    0]])>), <tf.Tensor: shape=(64, 30), dtype=int32, numpy=
array([[ 277, 3014, 8915, ...,    0,    0,    0],
       [6652, 8916,    2, ...,    0,    0,    0],
       [1674,  623, 2902, ...,    0,    0,    0],
       ...,
       [1291,   68, 5059, ...,    0,    0,    0

<keras.src.callbacks.history.History at 0x21a3ae8eb10>

##### 디코더 (추론 모델) 생성

In [71]:
A_target_text = [str(text) for text in A_target]
A_target_text

['[4489, 211, 5936, 8916, 2]',
 '[1619, 6422, 8916, 2]',
 '[5136, 1357, 379, 8916, 2]',
 '[5136, 1357, 379, 8916, 2]',
 '[208, 4722, 2060, 9558, 8992, 4071, 8916, 2]',
 '[202, 2566, 1002, 88, 52, 236, 107, 8916, 2]',
 '[202, 2566, 1002, 88, 52, 236, 107, 8916, 2]',
 '[37, 2425, 72, 219, 112, 8916, 2]',
 '[765, 7965, 166, 8916, 2]',
 '[765, 7965, 166, 8916, 2]',
 '[36, 8951, 10, 2585, 290, 8916, 2]',
 '[12, 1056, 292, 21, 8916, 2]',
 '[12, 1056, 292, 21, 8916, 2]',
 '[6330, 2316, 8916, 2]',
 '[4817, 202, 5050, 21, 8916, 2]',
 '[5644, 7273, 137, 8916, 2]',
 '[2232, 1478, 3832, 374, 8916, 2]',
 '[1001, 748, 3418, 8953, 2050, 8927, 8153, 8916, 2]',
 '[1001, 748, 3418, 8953, 2050, 8927, 8153, 8916, 2]',
 '[684, 261, 328, 62, 2786, 2007, 8916, 2]',
 '[2876, 1057, 9063, 2]',
 '[684, 261, 328, 62, 2786, 2007, 8916, 2]',
 '[788, 1360, 1495, 6705, 8916, 5132, 6001, 2362, 6625, 765, 5232, 151, 8916, 2]',
 '[804, 3391, 910, 614, 1329, 1214, 678, 8916, 2]',
 '[804, 3391, 910, 614, 1329, 1214, 678, 

In [67]:

from tensorflow.keras.preprocessing.text import Tokenizer
import numpy as np

# target_tokenizer 생성 (이전 학습 데이터를 활용)
target_tokenizer = Tokenizer()
target_tokenizer.fit_on_texts(A_target)

# 최대 디코더 시퀀스 길이 설정
max_decoder_seq_length = max(len(txt.split()) for txt in target_texts)
print("Max decoder sequence length:", max_decoder_seq_length)

# 디코딩 함수 수정
def decode_sequence(input_seq, encoder_model, decoder_inference_model, target_tokenizer, max_decoder_seq_length):
    """
    입력 문장을 받아 디코더를 통해 예측된 번역 문장을 반환하는 함수
    """
    # 1️⃣ 인코더를 통해 초기 상태값을 얻기
    states_value = encoder_model.predict(input_seq)

    # 2️⃣ 시작 토큰 설정 (보통 "<sos>" 같은 시작 토큰 사용)
    target_seq = np.zeros((1, 1))  # (batch_size=1, step=1)
    target_seq[0, 0] = target_tokenizer.word_index.get("<sos>", 1)  # 시작 토큰 인덱스

    stop_condition = False
    decoded_sentence = []

    while not stop_condition:
        # 3️⃣ 디코더 실행
        output_tokens, h, c = decoder_inference_model.predict([target_seq] + states_value)

        print("Output tokens shape:", output_tokens.shape)  # 디버깅용 출력
        sampled_token_index = np.argmax(output_tokens[0])  # 2D 배열에 맞게 인덱싱 수정
        sampled_word = target_tokenizer.index_word.get(sampled_token_index, "<unk>")  # 없는 단어는 <unk>로 처리

        decoded_sentence.append(sampled_word)

        # 5️⃣ 종료 조건: 종료 토큰이 나오거나 최대 길이를 초과하면 종료
        if (sampled_word == "<eos>") or (len(decoded_sentence) > max_decoder_seq_length):
            stop_condition = True

        # 6️⃣ 다음 입력 준비: 디코더에 예측된 단어를 다시 입력
        target_seq = np.zeros((1, 1))
        target_seq[0, 0] = sampled_token_index

        # 7️⃣ 상태값 업데이트
        states_value = [h, c]

    return " ".join(decoded_sentence)

AttributeError: 'int' object has no attribute 'lower'

### 추론 함수

In [62]:
# 입력 데이터 변환 후 실행
input_text = "안녕"

# 입력을 정수 인덱스 시퀀스로 변환
input_seq = target_tokenizer.texts_to_sequences([input_text])  # [[2]] 같은 형태
input_seq = np.array(input_seq)  # NumPy 배열 변환

# 추론 실행
decoded_sentence = decode_sequence(input_seq, encoder_model, decoder_inference_model, target_tokenizer, max_decoder_seq_length)
print("Predicted sentence:", decoded_sentence)


[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 26ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 32ms/step
Output tokens shape: (1, 10000)
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 27ms/step
Output tokens shape: (1, 10000)
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 28ms/step
Output tokens shape: (1, 10000)
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 28ms/step
Output tokens shape: (1, 10000)
Predicted sentence: <unk> <unk> <unk> <unk>


In [64]:
print(target_tokenizer.word_index)  # 단어 인덱스 확인
print(target_tokenizer.index_word)  # 인덱스-단어 매핑 확인


{'sos': 1, '안녕하세요': 2, '반가워요': 3, '잘': 4, '지내요': 5, '안녕': 6, 'eos': 7}
{1: 'sos', 2: '안녕하세요', 3: '반가워요', 4: '잘', 5: '지내요', 6: '안녕', 7: 'eos'}
