# **12. 어텐션이면 충분한 막강한 트랜스포머**
---
* 출판사 : 생능 출판사( http://www.booksr.co.kr/ )
* 으뜸 파이썬 저자 : 강영민, 박동규, 김성수
*  소스코드 저장소 : https://github.com/dknife/ML2nd
*  저작권 : 본 노트북 코드는 자유롭게 배포가능하지만 위의 출판사, 저서, 저자표기와 함께 배포해 주십시오.
---

### **LAB 12-1 트랜스포머를 이용한 챗봇**

In [None]:
# ================================
# 1. 설치 및 Import
# ================================
# !pip install transformers tensorflow pandas -q

import tensorflow as tf
from tensorflow.keras import layers, optimizers
import numpy as np
import pandas as pd
import re
from transformers import AutoTokenizer

In [None]:
# ================================
# 2. 데이터 다운로드 & 로드
# ================================
site = "https://raw.githubusercontent.com/"
filepath = "songys/Chatbot_data/master/ChatbotData.csv"

df = pd.read_csv(site+filepath)
print(f"총 데이터 수: {len(df)}")
df[:10]

총 데이터 수: 11823


Unnamed: 0,Q,A,label
0,12시 땡!,하루가 또 가네요.,0
1,1지망 학교 떨어졌어,위로해 드립니다.,0
2,3박4일 놀러가고 싶다,여행은 언제나 좋죠.,0
3,3박4일 정도 놀러가고 싶다,여행은 언제나 좋죠.,0
4,PPL 심하네,눈살이 찌푸려지죠.,0
5,SD카드 망가졌어,다시 새로 사는 게 마음 편해요.,0
6,SD카드 안돼,다시 새로 사는 게 마음 편해요.,0
7,SNS 맞팔 왜 안하지ㅠㅠ,잘 모르고 있을 수도 있어요.,0
8,SNS 시간낭비인 거 아는데 매일 하는 중,시간을 정하고 해보세요.,0
9,SNS 시간낭비인데 자꾸 보게됨,시간을 정하고 해보세요.,0


In [None]:
# ================================
# 3. 전처리
# ================================
def clean_text(t):
    return re.sub(r"[?.!,]", "", str(t)).strip()

questions = df['Q'].apply(clean_text).tolist()
answers = [f"<START> {clean_text(a)} <END>" for a in df['A'].tolist()]

In [None]:
# ================================
# 4. KoGPT2 토크나이저
# ================================
tokenizer = AutoTokenizer.from_pretrained("skt/kogpt2-base-v2")
if tokenizer.pad_token is None:
    tokenizer.pad_token = tokenizer.eos_token
tokenizer.add_special_tokens({"additional_special_tokens": ["<START>", "<END>", "[SEP]"]})

vocab_size = len(tokenizer)
PAD_ID = np.int32(tokenizer.pad_token_id)
START_ID = np.int32(tokenizer.convert_tokens_to_ids("<START>"))
END_ID = np.int32(tokenizer.convert_tokens_to_ids("<END>"))

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


config.json: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

In [None]:
# ================================
# 5. MAX_LEN 계산 (95 percentile → 40 제한)
# ================================
def get_length(text):
    return len(tokenizer.encode(f"<START> {text} [SEP]", add_special_tokens=False))

q_lens = [get_length(q) for q in questions]
a_lens = [get_length(a.replace("<START> ", "").replace(" <END>", "")) for a in answers]
all_lens = q_lens + a_lens
MAX_LEN = min(int(np.percentile(all_lens, 95)), 40)
print(f"MAX_LEN: {MAX_LEN}")

MAX_LEN: 15


In [None]:
# ================================
# 6. 데이터 인코딩
# ================================
def encode_input(text):
    return tokenizer.encode(
        f"<START> {text} [SEP]",
        max_length=MAX_LEN,
        truncation=True,
        padding='max_length',
        add_special_tokens=False
    )

def encode_output(text):
    return tokenizer.encode(text, max_length=MAX_LEN, truncation=True, padding='max_length')

In [None]:
print("인코딩 중...")
encoder_input = np.array([encode_input(q) for q in questions], dtype=np.int32)
full_answers = [encode_output(a) for a in answers]
print(f"인코딩 완료...{encoder_input.shape} : {len(full_answers)}")

인코딩 중...
인코딩 완료...(11823, 15) : 11823


In [None]:
decoder_input_list = []
decoder_output_list = []

for seq in full_answers:
    if len(seq) <= 1:
        continue
    dec_in = [START_ID] + seq[:-1]
    dec_out = seq[1:]
    dec_in += [PAD_ID] * (MAX_LEN - len(dec_in))
    dec_out += [PAD_ID] * (MAX_LEN - len(dec_out))
    decoder_input_list.append(dec_in)
    decoder_output_list.append(dec_out)

decoder_input = np.array(decoder_input_list, dtype=np.int32)
decoder_output = np.array(decoder_output_list, dtype=np.int32)

print(f"Shapes: {encoder_input.shape}, {decoder_input.shape}, {decoder_output.shape}")

Shapes: (11823, 15), (11823, 15), (11823, 15)


In [None]:
# ================================
# 7. 고정된 Sin/Cos Positional Encoding (파라미터 0!)
# ================================
def get_positional_encoding(max_len, d_model):
    pe = np.zeros((max_len, d_model))
    position = np.arange(0, max_len)[:, np.newaxis]
    div_term = np.exp(np.arange(0, d_model, 2) * (-np.log(10000.0) / d_model))
    pe[:, 0::2] = np.sin(position * div_term)
    pe[:, 1::2] = np.cos(position * div_term)
    return tf.constant(pe, dtype=tf.float32)  # (max_len, d_model)

pos_encoding = get_positional_encoding(MAX_LEN, 256)

In [None]:
def create_padding_mask(seq):
    return tf.cast(tf.math.equal(seq, PAD_ID),
                   tf.float32)[:, tf.newaxis, tf.newaxis, :]


In [None]:

def create_look_ahead_mask(seq_len):
    return 1 - tf.linalg.band_part(tf.ones((seq_len, seq_len)), -1, 0)



In [None]:
def create_decoder_mask(dec_input):
    seq_len = tf.shape(dec_input)[1]
    batch_size = tf.shape(dec_input)[0]
    look_ahead = create_look_ahead_mask(seq_len)[tf.newaxis,
                                                 tf.newaxis, :, :]
    look_ahead = tf.tile(look_ahead, [batch_size, 1, 1, 1])
    dec_padding = create_padding_mask(dec_input)
    dec_padding = tf.tile(dec_padding, [1, 1, seq_len, 1])
    return tf.maximum(look_ahead, dec_padding)

In [None]:
# ================================
# 9. Multi-Head Attention
# ================================
class MultiHeadAttention(layers.Layer):
    def __init__(self, d_model, num_heads):
        super().__init__()
        self.num_heads = num_heads
        self.d_model = d_model
        self.depth = d_model // num_heads
        self.wq = layers.Dense(d_model, use_bias=False)
        self.wk = layers.Dense(d_model, use_bias=False)
        self.wv = layers.Dense(d_model, use_bias=False)
        self.wo = layers.Dense(d_model)

    def split_heads(self, x, batch):
        x = tf.reshape(x, (batch, -1, self.num_heads, self.depth))
        return tf.transpose(x, [0, 2, 1, 3])

    def call(self, q, k, v, mask=None):
        batch = tf.shape(q)[0]
        q, k, v = self.wq(q), self.wk(k), self.wv(v)
        q, k, v = self.split_heads(q, batch), self.split_heads(k, batch), self.split_heads(v, batch)

        matmul = tf.matmul(q, k, transpose_b=True)
        dk = tf.cast(tf.shape(k)[-1], tf.float32)
        scaled = matmul / tf.math.sqrt(dk)

        if mask is not None:
            scaled = scaled + (mask * -1e9)

        attn = tf.nn.softmax(scaled, axis=-1)
        out = tf.matmul(attn, v)
        out = tf.transpose(out, [0, 2, 1, 3])
        out = tf.reshape(out, (batch, -1, self.d_model))
        return self.wo(out)

In [None]:
# ================================
# 10. Encoder / Decoder Layer
# ================================
class EncoderLayer(layers.Layer):
    def __init__(self, d_model, num_heads, dff):
        super().__init__()
        self.mha = MultiHeadAttention(d_model, num_heads)
        self.ffn = tf.keras.Sequential([layers.Dense(dff, activation='relu'), layers.Dense(d_model)])
        self.ln1 = layers.LayerNormalization(epsilon=1e-6)
        self.ln2 = layers.LayerNormalization(epsilon=1e-6)

    def call(self, x, mask):
        x = self.ln1(x + self.mha(x, x, x, mask))
        return self.ln2(x + self.ffn(x))

In [None]:
class DecoderLayer(layers.Layer):
    def __init__(self, d_model, num_heads, dff):
        super().__init__()
        self.mha1 = MultiHeadAttention(d_model, num_heads)
        self.mha2 = MultiHeadAttention(d_model, num_heads)
        self.ffn = tf.keras.Sequential([layers.Dense(dff, activation='relu'), layers.Dense(d_model)])
        self.ln1 = layers.LayerNormalization(epsilon=1e-6)
        self.ln2 = layers.LayerNormalization(epsilon=1e-6)
        self.ln3 = layers.LayerNormalization(epsilon=1e-6)

    def call(self, x, enc, look_mask, pad_mask):
        x = self.ln1(x + self.mha1(x, x, x, look_mask))
        x = self.ln2(x + self.mha2(x, enc, enc, pad_mask))
        return self.ln3(x + self.ffn(x))


In [None]:
# ================================
# 11. 모델 정의 (KerasTensor 오류 해결!)
# ================================
def build_transformer(vocab_size, num_layers=3, d_model=256, num_heads=8, dff=512, max_len=MAX_LEN):
    enc_in = layers.Input((max_len,), name='enc')
    dec_in = layers.Input((max_len,), name='dec')

    enc_mask = layers.Lambda(create_padding_mask, output_shape=(1, 1, max_len))(enc_in)
    dec_mask = layers.Lambda(create_decoder_mask, output_shape=(1, max_len, max_len))(dec_in)

    # Token Embedding
    token_emb = layers.Embedding(vocab_size, d_model)
    enc_emb = token_emb(enc_in)
    dec_emb = token_emb(dec_in)

    # 고정된 Positional Encoding 추가 (Lambda로 감싸기!)
    def add_pe(x, pe):
        seq_len = tf.shape(x)[1]
        return x + pe[:seq_len, :]

    enc_emb = layers.Lambda(lambda x: add_pe(x, pos_encoding))(enc_emb)
    dec_emb = layers.Lambda(lambda x: add_pe(x, pos_encoding))(dec_emb)

    enc = enc_emb
    for _ in range(num_layers):
        enc = EncoderLayer(d_model, num_heads, dff)(enc, enc_mask)

    dec = dec_emb
    for _ in range(num_layers):
        dec = DecoderLayer(d_model, num_heads, dff)(dec, enc, dec_mask, enc_mask)

    logits = layers.Dense(vocab_size)(dec)
    return tf.keras.Model([enc_in, dec_in], logits)

model = build_transformer(vocab_size, num_layers=3, d_model=256, num_heads=8, dff=512, max_len=MAX_LEN)
model.summary()

In [None]:
initial_learning_rate = 5e-4
lr_schedule = tf.keras.optimizers.schedules.CosineDecayRestarts(
    initial_learning_rate,
    first_decay_steps=1000
)
optimizer = tf.keras.optimizers.Adam(lr_schedule)


model.compile(optimizer=optimizer,
              loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
              metrics=['accuracy'])

history = model.fit([encoder_input, decoder_input], decoder_output,
          batch_size=64,
          epochs=20)


Epoch 1/20
[1m185/185[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m51s[0m 137ms/step - accuracy: 0.3690 - loss: 6.3911
Epoch 2/20
[1m185/185[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m16s[0m 87ms/step - accuracy: 0.4843 - loss: 3.4509
Epoch 3/20
[1m185/185[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m17s[0m 91ms/step - accuracy: 0.5517 - loss: 3.1058
Epoch 4/20
[1m185/185[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m16s[0m 88ms/step - accuracy: 0.5576 - loss: 3.0046
Epoch 5/20
[1m185/185[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m16s[0m 88ms/step - accuracy: 0.5630 - loss: 2.9391
Epoch 6/20
[1m185/185[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m17s[0m 91ms/step - accuracy: 0.5575 - loss: 2.9673
Epoch 7/20
[1m185/185[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m18s[0m 98ms/step - accuracy: 0.5624 - loss: 2.9391
Epoch 8/20
[1m185/185[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m17s[0m 92ms/step - accuracy: 0.5702 - loss: 2.8636
Epoch 9/20
[1m185/185

In [None]:
import numpy as np

def predict(sentence, max_len=MAX_LEN):
    # 1) 질문 전처리 & 토큰화
    sentence = clean_text(sentence)
    enc_input = encode_input(sentence)                 # (15,)
    enc_input = np.expand_dims(enc_input, 0)           # (1, 15)

    # 2) 디코더 최초 입력은 <START> 만 넣고 나머지는 패딩
    dec_seq = np.full((1, max_len), PAD_ID, dtype=np.int32)
    dec_seq[0, 0] = START_ID                           # 첫 토큰 <START>

    # 3) 토큰 하나씩 생성
    for i in range(1, max_len):
        # teacher-forcing 모델에 넣기
        preds = model.predict([enc_input, dec_seq], verbose=0)  # (1, 15, vocab)
        next_id = np.argmax(preds[0, i-1])               # greedy 선택

        dec_seq[0, i] = next_id                          # 다음 입력에 반영

        if next_id == END_ID:                            # <END> 이면 끝
            break

    # 4) 생성된 토큰 ID → 한글 문장
    generated = dec_seq[0, 1:i]                          # <START> 제외
    text = tokenizer.decode(generated, skip_special_tokens=True)
    return text

In [None]:
while True:
    input_string = input(">>")
    if input_string == 'exit':
        break
    print(predict(input_string))

>>안녕
저도을 말고하봐 
>>오늘은 어때?
저도 쉬보면 마요
>>하하 재미있는 말이구나.
저도 쉬도 마요
>>저기 여행은 어디가 좋을까?
그은 흘네 
>>말을 아직 잘 못 하는구나
저도 쉬보면 마요
>>날씨가 춥다
저도을 말고 마봐 
>>exit


In [None]:
# temperature 조정, 반복 패널티, top-k 샘플링 적용
import numpy as np
import tensorflow as tf

def predict2(sentence,
            max_len=MAX_LEN,
            temperature=0.7,
            top_k=20,
            repetition_penalty=1.2):
    # 1) 질문 → 토큰
    sentence = clean_text(sentence)
    enc_in = np.expand_dims(encode_input(sentence), 0)   # (1, 15)

    # 2) 디코더 초기 시퀀스
    dec_seq = np.full((1, max_len), PAD_ID, dtype=np.int32)
    dec_seq[0, 0] = START_ID

    generated = []          # 이미 생성한 토큰 ID 보관

    # 3) 토큰 하나씩 생성
    for i in range(1, max_len):
        preds = model.predict([enc_in, dec_seq], verbose=0)  # (1, 15, vocab)
        logits = preds[0, i-1]                              # (vocab,)

        # 반복 페널티: 생성에 쓴 토큰은 로짓을 작게
        for token_id in generated:
            logits[token_id] /= repetition_penalty

        # temperature + top-k
        logits = logits / temperature
        top_vals, top_inds = tf.nn.top_k(logits, k=top_k)
        probs = tf.nn.softmax(top_vals).numpy().ravel()
        next_id = np.random.choice(top_inds.numpy(), p=probs)

        # 다음 입력에 반영
        dec_seq[0, i] = next_id
        generated.append(next_id)

        if next_id == END_ID:        # <END> 만나면 끊기
            break

    # 4) 토큰 → 한글
    text = tokenizer.decode(generated, skip_special_tokens=True)
    return text

In [None]:
while True:
    input_string = input(">>")
    if input_string == 'exit':
        break
    print(predict2(input_string))

>>안녕?
또하 좋봐 니 
>>오늘은 날씨가 춥네
혼자하나어 찮요
>>늦게 퇴근하게 되었어
혼자는 관리 쉬 건에 
>>내일을 일찍 퇴근해야지
다른도하겠요
>>학교에 일찍 등교하려면 이제 퇴근해야지
사람 사람이니까 찮요
>>밖은 추우려나?
오늘도가요
>>exit
