# Seq2Seq Q&A Chatbot 구현
- 실행시 모델을 로드하는 부분("<여기서부터 시작해주세요>")부터 실행해 주시면 되겠습니다!

## 데이터 취득

In [1]:
import numpy as np
import pandas as pd

df = pd.read_csv('https://raw.githubusercontent.com/songys/Chatbot_data/refs/heads/master/ChatbotData.csv')
df = df[['Q','A']]
df

Unnamed: 0,Q,A
0,12시 땡!,하루가 또 가네요.
1,1지망 학교 떨어졌어,위로해 드립니다.
2,3박4일 놀러가고 싶다,여행은 언제나 좋죠.
3,3박4일 정도 놀러가고 싶다,여행은 언제나 좋죠.
4,PPL 심하네,눈살이 찌푸려지죠.
...,...,...
11818,훔쳐보는 것도 눈치 보임.,티가 나니까 눈치가 보이는 거죠!
11819,훔쳐보는 것도 눈치 보임.,훔쳐보는 거 티나나봐요.
11820,흑기사 해주는 짝남.,설렜겠어요.
11821,힘든 연애 좋은 연애라는게 무슨 차이일까?,잘 헤어질 수 있는 사이 여부인 거 같아요.


### 데이터 전처리

##### 토커나이저 학습 (sentencepiece 활용)

- 접두사, 접미사 처리 (bos, eos)

In [10]:
q_inputs = []
a_inputs = []
a_targets = []

q_inputs = df['Q'].astype(str).tolist()
a_inputs  = ['<bos> ' + a for a in df['A'].astype(str)]
a_targets = [a + ' <eos>' for a in df['A'].astype(str)]

In [11]:
print(q_inputs[5000:5005])
print(a_inputs[5000:5005])
print(a_targets[5000:5005])

['학원폭력 짜증나', '학폭 없어졌으면', '학회 가고 있어', '학회 가는 중', '한 가지를 보면 다른 것은 보지 않아도 알지?']
['<bos> 학교 폭력은 범죄에요.', '<bos> 학교 폭력은 범죄에요.', '<bos> 잘 다녀 오세요.', '<bos> 잘 다녀 오세요.', '<bos> 한가지만 보면 몰라요.']
['학교 폭력은 범죄에요. <eos>', '학교 폭력은 범죄에요. <eos>', '잘 다녀 오세요. <eos>', '잘 다녀 오세요. <eos>', '한가지만 보면 몰라요. <eos>']


코퍼스 만들기
- 인코더용 (Q)
- 디코더용 (A)

In [13]:
from pathlib import Path

encoder_corpus_path = Path("Q_corpus.txt")
with encoder_corpus_path.open("w", encoding="utf-8") as f:
    for q in q_inputs:   # 질문 리스트
        f.write(q.strip() + "\n")

In [14]:
decoder_corpus_path = Path("A_corpus.txt")
with decoder_corpus_path.open("w", encoding="utf-8") as f:
    for a_in, a_tgt in zip(a_inputs, a_targets):
        f.write(a_in.strip() + "\n")   # <bos> + 답변
        f.write(a_tgt.strip() + "\n")  # 답변 + <eos>

##### 학습용 데이터 Q_input, A_input, A_target 생성 (토큰화 및 시퀀싱)

- Q 토큰화

In [None]:
import sentencepiece as spt

input_path = 'Q_corpus.txt'

spt.SentencePieceTrainer.Train(
    input=input_path,
    model_prefix="sp_enc",
    vocab_size=10_000,
    model_type="unigram",
    character_coverage=1.0,     # 한글이면 1.0 권장
    pad_id=0, pad_piece="<pad>",# Keras pad=0, mask_zero=True와 안전하게 호환
    unk_id=1,                   # 기본 0→1로 이동
    bos_id=-1, eos_id=-1,       # (선택) 인코더에선 BOS/EOS 비활성화
    hard_vocab_limit=False      # 코퍼스 작을 때 안전장치(권장)
)

In [29]:
# 토커나이저 모델 로드 & 토큰화 및 시퀀싱
sp_enc = spt.SentencePieceProcessor(model_file="sp_enc.model")

q_inputs_seqs = [sp_enc.encode(s, out_type=int) for s in q_inputs]

print(sp_enc.id_to_piece(0))
print(q_inputs[0]) 
print(sp_enc.encode_as_pieces(q_inputs[0]))
print(q_inputs_seqs[0]) # 토큰화 및 시퀀싱 확인

<pad>
12시 땡!
['▁12', '시', '▁땡', '!']
[2569, 524, 2013, 125]


- A 토큰화

In [24]:
spt.SentencePieceTrainer.Train(
    input="A_corpus.txt",
    model_prefix="sp_dec",
    vocab_size=10000,
    model_type="unigram",
    character_coverage=1.0,          # 한글 권장
    user_defined_symbols=["<bos>", "<eos>"],
    pad_id=0, pad_piece="<pad>",     # 패딩은 0번 → mask_zero=True와 호환
    unk_id=1,                        # UNK는 1번으로 이동(0과 충돌 방지)
    bos_id=-1, eos_id=-1,            # SP 기본 <s>, </s> 비활성화(우리는 <bos>/<eos> 사용)
    hard_vocab_limit=False
)

In [44]:
sp_dec = spt.SentencePieceProcessor(model_file="sp_dec.model")

PAD_ID = sp_dec.piece_to_id("<pad>")
BOS_ID = sp_dec.piece_to_id("<bos>")
EOS_ID = sp_dec.piece_to_id("<eos>")

# 1) 원본 A만 토큰화
y_ids_list = [sp_dec.encode(a, out_type=int) for a in df['A'].astype(str).tolist()]

# 2) 디코더 입력/타깃 구성 (ID 직접 부착)
a_inputs_seqs  = [[BOS_ID] + y for y in y_ids_list]     # <bos> + A
a_targets_seqs = [y + [EOS_ID] for y in y_ids_list]     # A + <eos>

# 3) 무결성 체크
i = 0
assert a_inputs_seqs[i][0] == BOS_ID
assert a_targets_seqs[i][-1] == EOS_ID
assert a_inputs_seqs[i][1:] == a_targets_seqs[i][:-1]   # shift OK


print(sp_dec.id_to_piece(0), sp_dec.id_to_piece(1), sp_dec.id_to_piece(2), sp_dec.id_to_piece(3))

<pad> <unk> <bos> <eos>


패딩
- q_inputs_seqs
- a_inputs_seqs
- a_targets_seqs

In [45]:
VOCAB_SIZE = 10000

# ── vocab size (SP는 학습 시점에 고정됨: 반드시 모델의 piece_size 사용!)
enc_vocab_size = sp_enc.get_piece_size()   # 인코더용 vocab 크기
dec_vocab_size = sp_dec.get_piece_size()   # 디코더용 vocab 크기


# ── max length (패딩 길이)
enc_max_len = max(len(seq) for seq in q_inputs_seqs)                 # 인코더 입력 길이
dec_max_len = max(
    max(len(seq) for seq in a_inputs_seqs),                       # "<bos> …"
    max(len(seq) for seq in a_targets_seqs)                       # "… <eos>"
)

# ⚠️ 주의: Keras Tokenizer처럼 min(VOCAB_SIZE, …)로 줄이면
# 실제 ID가 임베딩 input_dim보다 커져서 에러납니다.
# → Embedding(input_dim=enc_vocab_size / dec_vocab_size) 로 그대로 쓰세요.

In [46]:
from tensorflow.keras.preprocessing.sequence import pad_sequences

# PAD ID 확인 (둘 다 0이면 mask_zero=True 쓰기 좋음)
enc_PAD = sp_enc.piece_to_id("<pad>") if "<pad>" in {sp_enc.id_to_piece(i) for i in range(enc_vocab_size)} else 0
dec_PAD = sp_dec.piece_to_id("<pad>")  # 우리가 학습 때 0으로 설정했다면 0

enc_inputs_padded  = pad_sequences(q_inputs_seqs,  maxlen=enc_max_len, padding='pre', value=enc_PAD)
dec_inputs_padded  = pad_sequences(a_inputs_seqs,  maxlen=dec_max_len, padding='post',value=dec_PAD)
dec_targets_padded = pad_sequences(a_targets_seqs, maxlen=dec_max_len, padding='post', value=dec_PAD)


In [47]:
# 최종 확인
print(enc_inputs_padded[0])
print(dec_inputs_padded[0])
print(dec_targets_padded[0])

[   0    0    0    0    0    0    0    0    0    0    0    0    0    0
    0    0    0    0    0    0 2569  524 2013  125]
[  2 371  11 307   4  11  12   5   0   0   0   0   0   0   0   0   0   0
   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0
   0]
[371  11 307   4  11  12   5   3   0   0   0   0   0   0   0   0   0   0
   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0
   0]


In [48]:
# shift 정렬 확인
i = 0
assert dec_inputs_padded[i][0] == sp_dec.piece_to_id("<bos>")
# 마지막 유효 토큰이 <eos>인지 확인 (패딩 제외)
last_non_pad = int((dec_targets_padded[i] != PAD_ID).sum()) - 1
assert dec_targets_padded[i][last_non_pad] == sp_dec.piece_to_id("<eos>")


### 모델 생성 및 학습
- `encoder + decoder(teach_forcing)` 구조의 모델 생성 및 학습

In [49]:
EMBED_DIM  = 256
LATENT_DIM = 256  # 인코더/디코더 동일 차원 권장

##### 인코더 생성

In [None]:
import tensorflow as tf
from tensorflow.keras import layers, models

# enc_max_len         # 인코더 입력 시퀀스 최대 길이 (pre-padding 기준)
# sp_enc              # SentencePiece 인코더 토크나이저
# PAD_ID = sp_enc.piece_to_id("<pad>")  # 0 이어야 mask_zero=True가 정상 동작

assert PAD_ID == 0, "Embedding(mask_zero=True)를 쓰려면 PAD_ID가 0이어야 합니다."

EMBEDDING_DIM = 256
LATENT_DIM    = 512   # 강사님 코드와 동일

# ── 인코더 모델 ─────────────────────────────────────────────────────────────
encoder_inputs = layers.Input(shape=(enc_max_len,))  # 강사님처럼 고정 길이
# (원한다면 shape=(None,)로 해도 됨)

en_embedding_layer = layers.Embedding(
    input_dim=enc_vocab_size,      # SP vocab 그대로 ( +1 하지 않기 )
    output_dim=EMBEDDING_DIM,
    mask_zero=True,                # PAD=0 마스킹
)
x = en_embedding_layer(encoder_inputs)

encoder_outputs, h, c = layers.LSTM(
    LATENT_DIM, return_state=True
)(x)
encoder_states = [h, c]

encoder_model = models.Model(inputs=encoder_inputs, outputs=encoder_states)
encoder_model.summary()

##### 디코더 생성 (teacher-forcing 모델) 생성

In [None]:
decoder_inputs = layers.Input(shape=(None,))  # (B, T_dec)

dec_embedding  = layers.Embedding(
    input_dim=dec_vocab_size,
    output_dim=EMBED_DIM,
    mask_zero=True,
    name="dec_embedding"
)
y = dec_embedding(decoder_inputs)

decoder_lstm = layers.LSTM(
    LATENT_DIM, return_sequences=True, return_state=True
)
y, _, _ = decoder_lstm(y, initial_state=encoder_states)

decoder_dense = layers.Dense(dec_vocab_size, activation="softmax")
decoder_outputs = decoder_dense(y)

# 학습용 모델(인코더 + 디코더 결합) 
decoder_teacher_forcing_model = models.Model(
    inputs=[encoder_inputs, decoder_inputs],
    outputs=decoder_outputs,
)

decoder_teacher_forcing_model.summary()

##### 학습

In [57]:
decoder_teacher_forcing_model.compile(
    loss='sparse_categorical_crossentropy',
    optimizer='adam',
    metrics=['accuracy']
)


history = decoder_teacher_forcing_model.fit(
    [enc_inputs_padded, dec_inputs_padded],  # 인코더 입력, 디코더 입력
    dec_targets_padded,                      # 디코더 타깃 (정수 라벨, shape: (N, T_dec))
    batch_size=64,
    epochs=30,
    validation_split=0.2
)

Epoch 1/30
[1m148/148[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m69s[0m 451ms/step - accuracy: 0.2201 - loss: 0.0080 - val_accuracy: 0.0865 - val_loss: 7.7730
Epoch 2/30
[1m148/148[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m66s[0m 448ms/step - accuracy: 0.2197 - loss: 0.0176 - val_accuracy: 0.0856 - val_loss: 7.7229
Epoch 3/30
[1m148/148[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m67s[0m 451ms/step - accuracy: 0.2198 - loss: 0.0131 - val_accuracy: 0.0864 - val_loss: 7.7711
Epoch 4/30
[1m148/148[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m66s[0m 444ms/step - accuracy: 0.2201 - loss: 0.0088 - val_accuracy: 0.0865 - val_loss: 7.8260
Epoch 5/30
[1m148/148[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m66s[0m 449ms/step - accuracy: 0.2201 - loss: 0.0068 - val_accuracy: 0.0874 - val_loss: 7.8476
Epoch 6/30
[1m148/148[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m82s[0m 447ms/step - accuracy: 0.2201 - loss: 0.0062 - val_accuracy: 0.0869 - val_loss: 7.8992
Epoch 7/30

In [58]:
# 모델 저장
decoder_teacher_forcing_model.save('chatbot_teaching_force_model.keras')

---
---

## 여기서부터 시작해주세요

In [59]:
from tensorflow.keras.models import load_model
chatbot_teaching_force_model = load_model('chatbot_teaching_force_model.keras')

### 모델 추론
`encoder + decoder(inference)` 구조의 모델로 추론

##### 디코더 (추론 모델) 생성

In [None]:
enc_len = enc_inputs_padded.shape[1]
dec_len = dec_inputs_padded.shape[1]

# 디코더 추론 모델 (강사님 포맷 그대로)
decoder_hidden_state = layers.Input(shape=(LATENT_DIM,), name="dec_state_h")
decoder_cell_state  = layers.Input(shape=(LATENT_DIM,), name="dec_state_c")
decoder_states_inputs = [decoder_hidden_state, decoder_cell_state]

decoder_single_input = layers.Input(shape=(1,), name="dec_token_in")

x = dec_embedding(decoder_single_input)                     # 공유 임베딩
x, h, c = decoder_lstm(x, initial_state=decoder_states_inputs)  # 공유 LSTM
decoder_states = [h, c]

decoder_outputs_ = decoder_dense(x)                         # 공유 Dense

decoder_inference_model = models.Model(
    inputs=[decoder_single_input] + decoder_states_inputs,
    outputs=[decoder_outputs_] + decoder_states,
    name="decoder_infer"
)
decoder_inference_model.summary()

##### 추론 함수

In [None]:
# translate 함수 (강사님 흐름 그대로, SP에 맞게) 
def translate(input_seq):
    # 1) 인코더 상태
    encoder_states_value = encoder_model.predict(input_seq, verbose=0)
    decoder_states_value = encoder_states_value

    sos_index = BOS_ID
    eos_index = EOS_ID

    # 2) 디코더 시작 토큰
    target_seq = np.zeros((1, 1), dtype="int32")
    target_seq[0, 0] = sos_index

    # 3) 생성 루프
    out_ids = []
    for _ in range(dec_len):  # 최대 디코더 길이만큼
        output_tokens, h, c = decoder_inference_model.predict(
            [target_seq] + decoder_states_value, verbose=0
        )
        pred_proba = output_tokens[0, -1, :]      # (vocab,)
        pred_index = int(np.argmax(pred_proba))   # 그리디

        if pred_index == eos_index:
            break

        if pred_index != PAD_ID and pred_index != sos_index:
            out_ids.append(pred_index)

        target_seq[0, 0] = pred_index
        decoder_states_value = [h, c]

    # 4) SentencePiece 복원
    return sp_dec.decode(out_ids)

##### 테스트

In [None]:
# 헬퍼: 질문 문자열을 input_seq로 변환 (학습 때와 같은 패딩 방향, pre)
def make_input_seq(q_text, padding='pre'):
    q_ids = sp_enc.encode(q_text, out_type=int)
    q_pad = pad_sequences([q_ids], maxlen=enc_len, padding=padding, value=PAD_ID).astype("int32")
    return q_pad


input_seq = make_input_seq("안녕하세요", padding='pre')
print(translate(input_seq))

안녕하세요.


### 간단한 Chatbot 구현

1. 사용자의 입력을 받아 (처리)
2. 추론 함수에 전달해서
3. 응답을 출력
4. 1~3 '종료' 전까지 반복

In [71]:
import builtins

while True:
    input_q = builtins.input("질문을 입력해주세요:")
    print("Q:", input_q)
    
    if input_q == '종료':
        break
    
    # 입력 시퀀싱 처리
    input_q_seq = make_input_seq(input_q)
    
    # 추론함수에 전달 & 응답 출력
    ans = translate(input_q_seq)
    print("A:", ans)
    print()

Q: 안녕
A: 안녕하세요.

Q: 나 배고파
A: 얼른 맛난 음식 드세요.

Q: 무엇을 하면 좋을까
A: 애교 하나주세요.

Q: 종료
