# 한국어 데이터로 챗봇 만들기
프로젝트 제출 루브릭
| **학습 목표** | **평가 기준** |
|----------------|----------------|
| 한국어 전처리를 통해 학습 데이터셋을 구축하였다. | 공백과 특수문자 처리, 토크나이징, 병렬데이터 구축의 과정이 적절히 진행되었다. |
| 트랜스포머 모델을 구현하여 한국어 챗봇 모델 학습을 정상적으로 진행하였다. | 구현한 트랜스포머 모델이 한국어 병렬 데이터 학습 시 안정적으로 수렴하였다. |
| 한국어 입력문장에 대해 한국어로 답변하는 함수를 구현하였다. | 한국어 입력문장에 맥락에 맞는 한국어로 답변을 리턴하였다. |

# Step 0. Library

In [112]:
!pip install sentencepiece



In [113]:
import pandas as pd
import sentencepiece as spm
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, random_split
from torch.nn.utils.rnn import pad_sequence
import torch.nn.functional as F
import math
import time
import re
import os

# Step 1. 데이터 수집하기

In [114]:
#!wget https://github.com/songys/Chatbot_data/raw/master/ChatbotData.csv
#!mv ChatbotData.csv data/chatbot

데이터 자체가 이런식으로 표현되어 있다.

| **Q** | **A** | **Label** |
|----------------|----------------|----------------|
|12시 땡!, |하루가 또 가네요., |0|

---

데이터의 Label은 일상다반사 0, 이별(부정) 1, 사랑(긍정) 2로 레이블링로 되어있다.

--- 



# Step 2. 데이터 전처리하기

In [115]:
url = 'https://raw.githubusercontent.com/songys/Chatbot_data/master/ChatbotData.csv'
df = pd.read_csv(url)

In [116]:
def  preprocess_sentence(sentence):
    sentence = re.sub(r"([?.!,])", r" \1 ", sentence)
    sentence = sentence.strip()
    
    return sentence

questions = [preprocess_sentence(q) for q in df['Q']]
answers   = [preprocess_sentence(a) for a in df['A']]

# Step 3. SentencePiece 사용하기

In [117]:
corpus = questions + answers
print('전체 샘플 수 :', len(corpus)/2)
with open('data/chatbot/corpus.txt', 'w', encoding='utf-8') as f:
    for s in corpus:
        f.write(s + '\n')

전체 샘플 수 : 11823.0


In [118]:
spm.SentencePieceTrainer.Train(
    input='data/chatbot/corpus.txt',
    model_prefix='data/chatbot/chatbot_tokenizer',
    vocab_size=6000, # size 2000 4000 6000 8000 성능 차이가 안보임
    model_type='unigram',
    character_coverage=0.9995,
    pad_id=0, unk_id=1, bos_id=2, eos_id=3
)

sp = spm.SentencePieceProcessor(model_file='data/chatbot/chatbot_tokenizer.model')

sentencepiece_trainer.cc(78) LOG(INFO) Starts training with : 
trainer_spec {
  input: data/chatbot/corpus.txt
  input_format: 
  model_prefix: data/chatbot/chatbot_tokenizer
  model_type: UNIGRAM
  vocab_size: 6000
  self_test_sample_size: 0
  character_coverage: 0.9995
  input_sentence_size: 0
  shuffle_input_sentence: 1
  seed_sentencepiece_size: 1000000
  shrinking_factor: 0.75
  max_sentence_length: 4192
  num_threads: 16
  num_sub_iterations: 2
  max_sentencepiece_length: 16
  split_by_unicode_script: 1
  split_by_number: 1
  split_by_whitespace: 1
  split_digits: 0
  pretokenization_delimiter: 
  treat_whitespace_as_suffix: 0
  allow_whitespace_only_pieces: 0
  required_chars: 
  byte_fallback: 0
  vocabulary_output_piece_score: 1
  train_extremely_large_corpus: 0
  seed_sentencepieces_file: 
  hard_vocab_limit: 1
  use_all_vocab: 0
  unk_id: 1
  bos_id: 2
  eos_id: 3
  pad_id: 0
  unk_piece: <unk>
  bos_piece: <s>
  eos_piece: </s>
  pad_piece: <pad>
  unk_surface:  ⁇ 
  enable

In [119]:
# 토크나이징 + BOS/EOS
tokenized_questions = [sp.encode_as_ids(q) for q in questions]
tokenized_answers   = [[sp.bos_id()] + sp.encode_as_ids(a) + [sp.eos_id()] for a in answers]

In [120]:
def sequence_len(data):
    import numpy as np
    """
    문장길이의 평균값, 최대값, 표준편차를 계산해 본다.
    """
    num_tokens = [len(tokens) for tokens in data]
    num_tokens = np.array(num_tokens)

    sequence_mean = np.mean(num_tokens)
    sequence_max = np.max(num_tokens)
    sequence_std = np.std(num_tokens)
    
    print('문장길이 평균 : ', sequence_mean)
    print('문장길이 최대 : ', sequence_max)
    print('문장길이 표준편차 : ', sequence_std)

    max_tokens = sequence_mean + 2 * sequence_std
    maxlen = int(max_tokens)
    print(f'적정 문장 길이 : {maxlen}')
    print(f'전체 문장의 {np.sum([len(tokens) for tokens in data] < max_tokens) / len(num_tokens):.2f}%가 maxlen 설정값 이내에 포함됩니다. ')
    return sequence_mean, sequence_max, sequence_std, maxlen

print("Q")
sequence_len(tokenized_questions)
print("\nA")
sequence_len(tokenized_answers)

Q
문장길이 평균 :  5.945699061151992
문장길이 최대 :  24
문장길이 표준편차 :  2.7859609074600438
적정 문장 길이 : 11
전체 문장의 0.96%가 maxlen 설정값 이내에 포함됩니다. 

A
문장길이 평균 :  9.181510614903155
문장길이 최대 :  38
문장길이 표준편차 :  3.0336364369070785
적정 문장 길이 : 15
전체 문장의 0.96%가 maxlen 설정값 이내에 포함됩니다. 


(np.float64(9.181510614903155),
 np.int64(38),
 np.float64(3.0336364369070785),
 15)

In [121]:
def truncate(seq, max_len):
    return seq[:max_len]

MAX_LEN = 100   # 그냥 높게 잡음
                # sequence_len함수에 따라, 적정문장 길이 기준으로 진행을 해도 성능 차이가 없음

tokenized_questions = [truncate(sp.encode_as_ids(q), MAX_LEN) for q in questions]
tokenized_answers   = [truncate([sp.bos_id()] + sp.encode_as_ids(a) + [sp.eos_id()], MAX_LEN) for a in answers]

print(f"Using MAX_LEN = {MAX_LEN} (all sequences truncated to this length)")

Using MAX_LEN = 100 (all sequences truncated to this length)


# Step 4. 모델 구성하기

## Step 4-1. Model class 선언
- node 학습시 코드와 거의 유사
- 주석은 더 자세히 알기위해 LLM 거치고 봄

In [122]:
class PositionalEncoding(nn.Module):
    """
    입력 시퀀스에 위치 정보를 인코딩(Positional Encoding)으로 더해주는 모듈.
    트랜스포머(Transformer) 모델에서 단어의 순서를 반영하기 위해 사용됨.

    Args:
        position (int): 입력 시퀀스의 최대 길이 (예: 1000)
        d_model (int): 임베딩 벡터의 차원 수 (모델 차원, 예: 512)

    Attributes:
        pos_encoding (torch.Tensor): [1, position, d_model] 형태의 위치 인코딩 텐서
    """

    def __init__(self, position, d_model):
        super().__init__()
        # 사인/코사인 함수를 이용해 위치 인코딩 행렬 생성
        self.pos_encoding = self._build_pos_encoding(position, d_model)

    def _get_angles(self, pos, i, d_model):
        """
        각 위치(pos)와 차원(i)에 대한 angle 값을 계산.

        angle = pos / (10000^(2*(i//2)/d_model))

        Args:
            pos (torch.Tensor): [position, 1] 형태의 위치 벡터
            i (torch.Tensor): [1, d_model] 형태의 차원 인덱스 벡터
            d_model (int): 모델 차원 수

        Returns:
            torch.Tensor: 각 (pos, i)에 대응하는 angle 값
        """
        return pos / (10000 ** ((2 * (i // 2)) / d_model))

    def _build_pos_encoding(self, position, d_model):
        """
        위치 인코딩(Positional Encoding) 행렬을 생성하는 함수.

        각 위치와 차원에 대해 sin, cos 값을 교차로 적용하여
        단어의 순서를 모델이 학습할 수 있도록 함.

        Args:
            position (int): 시퀀스의 최대 길이
            d_model (int): 임베딩 차원 수

        Returns:
            torch.Tensor: [1, position, d_model] 형태의 위치 인코딩 텐서
        """
        # 각 위치와 차원에 대응하는 angle 값 계산
        angle_rads = self._get_angles(
            torch.arange(position, dtype=torch.float32).unsqueeze(1),  # [position, 1]
            torch.arange(d_model, dtype=torch.float32).unsqueeze(0),  # [1, d_model]
            d_model
        )

        # 짝수 인덱스(0, 2, 4, ...)에는 sin, 홀수 인덱스(1, 3, 5, ...)에는 cos 적용
        sines = torch.sin(angle_rads[:, 0::2])
        cosines = torch.cos(angle_rads[:, 1::2])

        # sin과 cos을 번갈아 채워 넣은 위치 인코딩 행렬 구성
        pos_encoding = torch.zeros(position, d_model)
        pos_encoding[:, 0::2] = sines
        pos_encoding[:, 1::2] = cosines

        # 배치 차원(1)을 추가하여 [1, position, d_model] 형태로 반환
        return pos_encoding.unsqueeze(0)

    def forward(self, x):
        """
        입력 임베딩 텐서에 위치 인코딩을 더하는 함수.

        Args:
            x (torch.Tensor): 입력 임베딩 텐서 [batch_size, seq_len, d_model]

        Returns:
            torch.Tensor: 위치 인코딩이 더해진 텐서 [batch_size, seq_len, d_model]
        """
        # 입력 시퀀스 길이에 맞춰 위치 인코딩을 잘라내고, 입력 텐서에 더함
        return x + self.pos_encoding[:, :x.size(1), :].to(x.device)


In [123]:
def scaled_dot_product_attention(query, key, value, mask=None):
    """
    Scaled Dot-Product Attention 계산 함수.

    트랜스포머(Transformer) 구조에서 Query, Key, Value 벡터 간의 유사도를 계산하고,
    그 유사도(Attention Score)를 바탕으로 Value의 가중합을 구함.

    수식:
        Attention(Q, K, V) = softmax( (QK^T) / sqrt(d_k) ) V

    Args:
        query (torch.Tensor): Query 텐서 [batch_size, num_heads, seq_len_q, depth]
        key (torch.Tensor): Key 텐서 [batch_size, num_heads, seq_len_k, depth]
        value (torch.Tensor): Value 텐서 [batch_size, num_heads, seq_len_v, depth_v]
        mask (torch.Tensor, optional): 마스크 텐서 (padding 또는 look-ahead mask). 
                                       0인 위치는 무시됨. 기본값 None.

    Returns:
        tuple[torch.Tensor, torch.Tensor]:
            - output: Attention이 적용된 결과 [batch_size, num_heads, seq_len_q, depth_v]
            - attention_weights: Attention 가중치 행렬 [batch_size, num_heads, seq_len_q, seq_len_k]
    """

    # (1) Query와 Key의 내적 계산 → [batch_size, num_heads, seq_len_q, seq_len_k]
    matmul_qk = torch.matmul(query, key.transpose(-1, -2))

    # (2) 차원 수로 나누어 스케일 조정 (값이 너무 커지는 것을 방지)
    depth = key.size(-1)
    logits = matmul_qk / math.sqrt(depth)

    # (3) 마스크(mask)가 주어지면, 해당 위치는 매우 작은 값(-1e9)으로 채워 softmax 영향 제거
    if mask is not None:
        logits = logits.masked_fill(mask == 0, -1e9)

    # (4) 각 Query에 대해 Key와의 유사도 확률 분포 계산 (softmax)
    attention_weights = F.softmax(logits, dim=-1)

    # (5) Attention 가중치로 Value를 가중합 → 문맥(Context) 벡터 생성
    output = torch.matmul(attention_weights, value)

    return output, attention_weights


In [124]:
class MultiHeadAttention(nn.Module):
    """
    멀티헤드 어텐션(Multi-Head Attention) 모듈.

    트랜스포머의 핵심 구성요소로, 입력 벡터를 여러 개의 '헤드(head)'로 분할하여
    병렬적으로 어텐션을 수행함으로써 서로 다른 의미 공간에서의 관계를 학습하도록 함.

    수식:
        MultiHead(Q, K, V) = Concat(head_1, ..., head_h) W^O
        where head_i = Attention(QW_i^Q, KW_i^K, VW_i^V)

    Args:
        d_model (int): 입력 임베딩 차원 수 (예: 512)
        num_heads (int): 어텐션 헤드의 개수 (예: 8)

    Attributes:
        depth (int): 각 헤드의 임베딩 차원 (d_model / num_heads)
        query_dense (nn.Linear): Query 변환용 선형 레이어
        key_dense (nn.Linear): Key 변환용 선형 레이어
        value_dense (nn.Linear): Value 변환용 선형 레이어
        out_dense (nn.Linear): 여러 헤드의 출력을 다시 합쳐주는 출력 레이어
    """

    def __init__(self, d_model, num_heads):
        super().__init__()
        self.num_heads = num_heads
        self.d_model = d_model

        # (1) d_model이 num_heads로 나누어 떨어져야 함 (각 헤드의 차원이 동일해야 하므로)
        assert d_model % num_heads == 0, "d_model은 num_heads로 나누어 떨어져야 합니다."

        # (2) 각 헤드별 차원 (depth)
        self.depth = d_model // num_heads

        # (3) Query, Key, Value를 위한 선형 변환 레이어 정의
        self.query_dense = nn.Linear(d_model, d_model)
        self.key_dense   = nn.Linear(d_model, d_model)
        self.value_dense = nn.Linear(d_model, d_model)

        # (4) 여러 헤드의 출력을 결합한 후 최종 선형 변환
        self.out_dense   = nn.Linear(d_model, d_model)

    def split_heads(self, x, batch_size):
        """
        입력 텐서를 여러 개의 헤드로 분리하는 함수.

        입력 형태: [batch_size, seq_len, d_model]
        출력 형태: [batch_size, num_heads, seq_len, depth]

        Args:
            x (torch.Tensor): 입력 텐서
            batch_size (int): 배치 크기

        Returns:
            torch.Tensor: 헤드 분할 및 차원 변환된 텐서
        """
        # [batch_size, seq_len, d_model] → [batch_size, seq_len, num_heads, depth]
        x = x.view(batch_size, -1, self.num_heads, self.depth)

        # 어텐션 계산을 위해 차원 순서를 변경
        # [batch_size, num_heads, seq_len, depth]
        return x.permute(0, 2, 1, 3)

    def forward(self, query, key, value, mask=None):
        """
        멀티헤드 어텐션 연산을 수행하는 함수.

        Args:
            query (torch.Tensor): Query 입력 [batch_size, seq_len_q, d_model]
            key (torch.Tensor): Key 입력 [batch_size, seq_len_k, d_model]
            value (torch.Tensor): Value 입력 [batch_size, seq_len_v, d_model]
            mask (torch.Tensor, optional): 어텐션 마스크 (padding 또는 look-ahead). 기본값 None.

        Returns:
            torch.Tensor: 어텐션이 적용된 출력 [batch_size, seq_len_q, d_model]
        """
        batch_size = query.size(0)

        # (1) 입력 벡터를 각각 선형 변환 후, 여러 헤드로 분리
        query = self.split_heads(self.query_dense(query), batch_size)
        key   = self.split_heads(self.key_dense(key), batch_size)
        value = self.split_heads(self.value_dense(value), batch_size)

        # (2) 스케일 조정된 닷프로덕트 어텐션 수행 (각 헤드별로 병렬 계산)
        scaled_attention, _ = scaled_dot_product_attention(query, key, value, mask)

        # (3) 어텐션 결과를 원래 차원 순서로 되돌림
        # [batch_size, num_heads, seq_len_q, depth] → [batch_size, seq_len_q, num_heads, depth]
        scaled_attention = scaled_attention.permute(0, 2, 1, 3).contiguous()

        # (4) 모든 헤드를 하나로 결합 (concat)
        concat_attention = scaled_attention.view(batch_size, -1, self.d_model)

        # (5) 최종 선형 변환을 통해 출력 생성
        output = self.out_dense(concat_attention)

        return output


In [125]:
def create_padding_mask(x, pad_id=0):
    """
    입력 시퀀스 내 패딩 토큰을 감지하여 마스크를 생성하는 함수.

    트랜스포머에서 **패딩 토큰(PAD)** 은 실제 의미가 없으므로,  
    Attention 계산 시 이 위치의 값이 다른 토큰의 연산에 영향을 주지 않도록  
    마스크(mask)를 만들어줌.

    Args:
        x (torch.Tensor): 입력 시퀀스 텐서 [batch_size, seq_len]
        pad_id (int, optional): 패딩 토큰의 ID 값 (기본값: 0)

    Returns:
        torch.Tensor: 패딩 마스크 텐서 [batch_size, 1, 1, seq_len]
                      (1: 유효 토큰, 0: 패딩 위치)
    """
    # pad_id가 아닌 위치는 1(True), pad_id인 위치는 0(False)
    mask = (x != pad_id).unsqueeze(1).unsqueeze(2).float()
    # shape: [batch_size, 1, 1, seq_len]
    return mask


def create_look_ahead_mask(seq_len):
    """
    디코더(Decoder)에서 미래 토큰을 보지 못하도록 하는 마스크 생성 함수.

    트랜스포머의 디코더는 **오토리그레시브(Autoregressive)** 하게 동작해야 하므로,  
    현재 시점 이후의 단어(토큰)를 참고하지 않도록 상삼각 부분을 0으로 채움.

    Args:
        seq_len (int): 시퀀스 길이 (입력 문장의 토큰 개수)

    Returns:
        torch.Tensor: Look-ahead 마스크 [seq_len, seq_len]
                      (하삼각 1, 상삼각 0)
    """
    # torch.tril() → 하삼각(lower triangular) 행렬을 1로 채움
    mask = torch.tril(torch.ones(seq_len, seq_len))
    # shape: [seq_len, seq_len]
    return mask


In [126]:
class EncoderLayer(nn.Module):
    """
    트랜스포머 인코더의 기본 단위 레이어 (Encoder Layer).

    한 층의 인코더는 다음 세 단계로 구성됨:
        1) 멀티헤드 어텐션 (Multi-Head Attention)
        2) 잔차 연결 + 레이어 정규화 (Residual Connection + LayerNorm)
        3) 피드포워드 네트워크 (Position-wise Feed-Forward Network)

    수식:
        Out1 = LayerNorm(x + Dropout(MHA(x, x, x)))
        Out2 = LayerNorm(Out1 + Dropout(FFN(Out1)))

    Args:
        d_model (int): 입력 임베딩 차원 (예: 512)
        num_heads (int): 멀티헤드 어텐션의 헤드 개수 (예: 8)
        ff_dim (int): 피드포워드 네트워크 내부 차원 (예: 2048)
        dropout (float, optional): 드롭아웃 비율 (기본값: 0.1)
    """

    def __init__(self, d_model, num_heads, ff_dim, dropout=0.1):
        super().__init__()

        # (1) 멀티헤드 어텐션 블록
        self.mha = MultiHeadAttention(d_model, num_heads)

        # (2) 포지션별 피드포워드 네트워크
        self.ffn = nn.Sequential(
            nn.Linear(d_model, ff_dim),
            nn.ReLU(),
            nn.Linear(ff_dim, d_model)
        )

        # (3) 잔차 연결 후 정규화
        self.norm1 = nn.LayerNorm(d_model, eps=1e-6)
        self.norm2 = nn.LayerNorm(d_model, eps=1e-6)

        # (4) 드롭아웃
        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)

    def forward(self, x, mask):
        """
        인코더 한 층의 순전파(forward) 연산 수행.

        Args:
            x (torch.Tensor): 입력 텐서 [batch_size, seq_len, d_model]
            mask (torch.Tensor): 패딩 마스크 [batch_size, 1, 1, seq_len]

        Returns:
            torch.Tensor: 인코더 레이어의 출력 [batch_size, seq_len, d_model]
        """
        # (1) 멀티헤드 어텐션 수행
        attn_out = self.mha(x, x, x, mask)
        attn_out = self.dropout1(attn_out)

        # (2) 잔차 연결 및 정규화
        out1 = self.norm1(x + attn_out)

        # (3) 피드포워드 네트워크 적용
        ffn_out = self.ffn(out1)
        ffn_out = self.dropout2(ffn_out)

        # (4) 잔차 연결 및 정규화
        out2 = self.norm2(out1 + ffn_out)

        return out2


class Encoder(nn.Module):
    """
    트랜스포머 인코더 전체 구조 (N개의 Encoder Layer로 구성됨).

    전체 인코더는 다음 단계를 거침:
        1) 임베딩 + 위치 인코딩(Positional Encoding)
        2) 드롭아웃
        3) N개의 EncoderLayer 반복 적용

    Args:
        vocab_size (int): 단어 사전 크기
        num_layers (int): 인코더 레이어 수 (예: 6)
        ff_dim (int): 피드포워드 내부 차원 (예: 2048)
        d_model (int): 임베딩 차원 (예: 512)
        num_heads (int): 어텐션 헤드 수 (예: 8)
        max_len (int, optional): 최대 시퀀스 길이 (기본값: 40)
        dropout (float, optional): 드롭아웃 비율 (기본값: 0.1)
    """

    def __init__(self, vocab_size, num_layers, ff_dim, d_model, num_heads, max_len=40, dropout=0.1):
        super().__init__()

        self.d_model = d_model

        # (1) 단어 임베딩 레이어 (PAD 토큰은 0으로 설정)
        self.embedding = nn.Embedding(vocab_size, d_model, padding_idx=0)

        # (2) 위치 인코딩(Positional Encoding)
        self.pos_encoding = PositionalEncoding(max_len, d_model)

        # (3) 드롭아웃 레이어
        self.dropout = nn.Dropout(dropout)

        # (4) N개의 인코더 레이어를 리스트로 구성
        self.enc_layers = nn.ModuleList([
            EncoderLayer(d_model, num_heads, ff_dim, dropout)
            for _ in range(num_layers)
        ])

    def forward(self, x, mask):
        """
        인코더 전체의 순전파(forward) 연산 수행.

        Args:
            x (torch.Tensor): 입력 시퀀스 [batch_size, seq_len]
            mask (torch.Tensor): 패딩 마스크 [batch_size, 1, 1, seq_len]

        Returns:
            torch.Tensor: 인코더 출력 [batch_size, seq_len, d_model]
        """
        # (1) 임베딩 후 스케일 조정 (√d_model)
        x = self.embedding(x) * math.sqrt(self.d_model)

        # (2) 위치 인코딩 추가
        x = self.pos_encoding(x)

        # (3) 드롭아웃 적용
        x = self.dropout(x)

        # (4) 각 인코더 레이어를 순차적으로 통과
        for layer in self.enc_layers:
            x = layer(x, mask)

        return x


In [127]:
# ==============================================
# 디코더의 기본 구성 단위 (DecoderLayer)
# ==============================================
class DecoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, ff_dim, dropout=0.1):
        super().__init__()

        # (1) 첫 번째: 디코더의 Self-Attention (미래 토큰 가리기)
        self.self_mha = MultiHeadAttention(d_model, num_heads)

        # (2) 두 번째: 인코더-디코더 어텐션 (인코더 출력과 결합)
        self.encdec_mha = MultiHeadAttention(d_model, num_heads)

        # (3) 세 번째: Position-wise Feed Forward Network
        self.ffn = nn.Sequential(
            nn.Linear(d_model, ff_dim),
            nn.ReLU(),
            nn.Linear(ff_dim, d_model)
        )

        # (4) 각 블록 뒤의 Layer Normalization
        self.norm1 = nn.LayerNorm(d_model, eps=1e-6)
        self.norm2 = nn.LayerNorm(d_model, eps=1e-6)
        self.norm3 = nn.LayerNorm(d_model, eps=1e-6)

        # (5) Dropout (과적합 방지용)
        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)
        self.dropout3 = nn.Dropout(dropout)

    def forward(self, x, enc_output, look_ahead_mask, padding_mask):
        """
        x: 디코더 입력 (target embedding)
        enc_output: 인코더 출력 (context)
        look_ahead_mask: 미래 단어 마스킹
        padding_mask: 인코더 패딩 마스크
        """

        # -------------------------------
        # (1) Masked Self-Attention
        #    - 자기 자신 문장에서 미래 단어를 보지 않도록 마스크 적용
        # -------------------------------
        self_attn = self.self_mha(x, x, x, look_ahead_mask)
        self_attn = self.dropout1(self_attn)
        out1 = self.norm1(x + self_attn)  # 잔차연결 + 정규화

        # -------------------------------
        # (2) Encoder-Decoder Attention
        #    - 인코더 출력(enc_output)을 Query로 활용
        #    - 디코더가 인코더 정보에 집중할 수 있게 함
        # -------------------------------
        encdec_attn = self.encdec_mha(out1, enc_output, enc_output, padding_mask)
        encdec_attn = self.dropout2(encdec_attn)
        out2 = self.norm2(out1 + encdec_attn)  # 잔차연결 + 정규화

        # -------------------------------
        # (3) Feed Forward Network
        #    - 각 위치별 독립적 비선형 변환
        # -------------------------------
        ffn_out = self.ffn(out2)
        ffn_out = self.dropout3(ffn_out)
        out3 = self.norm3(out2 + ffn_out)  # 잔차연결 + 정규화

        return out3


# ==============================================
# 디코더 전체 (여러 DecoderLayer 쌓은 형태)
# ==============================================
class Decoder(nn.Module):
    def __init__(self, vocab_size, num_layers, ff_dim, d_model, num_heads, max_len=200, dropout=0.1):
        super().__init__()

        self.d_model = d_model

        # (1) 단어 임베딩 레이어
        #   - 토큰 인덱스를 d_model 차원의 벡터로 변환
        self.embedding = nn.Embedding(vocab_size, d_model, padding_idx=0)

        # (2) 위치 인코딩 (PositionalEncoding)
        #   - 문장 내 단어의 순서 정보를 벡터에 추가
        self.pos_encoding = PositionalEncoding(max_len, d_model)

        # (3) Dropout
        self.dropout = nn.Dropout(dropout)

        # (4) 디코더 층 여러 개 쌓기
        self.dec_layers = nn.ModuleList([
            DecoderLayer(d_model, num_heads, ff_dim, dropout)
            for _ in range(num_layers)
        ])

    def forward(self, x, enc_output, look_ahead_mask, padding_mask):
        """
        x: 디코더 입력 (target 문장, [batch_size, tgt_seq_len])
        enc_output: 인코더 출력 (context)
        look_ahead_mask: 디코더용 마스크 (미래 단어 가리기)
        padding_mask: 인코더 입력의 패딩 위치 마스크
        """

        # (1) 임베딩 + √d_model 스케일링 (논문 방식)
        x = self.embedding(x) * math.sqrt(self.d_model)

        # (2) 위치 인코딩 추가
        x = self.pos_encoding(x)

        # (3) 드롭아웃 적용
        x = self.dropout(x)

        # (4) 여러 개의 디코더 레이어를 순차적으로 통과
        for layer in self.dec_layers:
            x = layer(x, enc_output, look_ahead_mask, padding_mask)

        # (5) 최종 디코더 출력 반환
        return x


In [128]:
# ==============================================
# 전체 Transformer 모델 클래스
#  - Encoder + Decoder + Final Linear Projection
# ==============================================
class Transformer(nn.Module):
    def __init__(
        self,
        vocab_size,        # 단어 집합 크기 (어휘 수)
        num_layers=6,      # 인코더/디코더 층 수
        units=2048,        # FFN 내부 차원 (hidden dimension)
        d_model=512,       # 임베딩 및 모델 차원
        num_heads=8,       # 멀티헤드 어텐션의 헤드 개수
        dropout=0.1,       # 드롭아웃 비율
        max_len=200        # 입력 시퀀스의 최대 길이
    ):
        super().__init__()

        # -------------------------------
        # (1) 인코더 (Encoder)
        # -------------------------------
        # 입력 문장을 인코딩하여 문맥(context) 표현을 생성
        self.encoder = Encoder(
            vocab_size=vocab_size,
            num_layers=num_layers,
            ff_dim=units,
            d_model=d_model,
            num_heads=num_heads,
            max_len=max_len,
            dropout=dropout
        )

        # -------------------------------
        # (2) 디코더 (Decoder)
        # -------------------------------
        # 이전 단어들을 바탕으로 다음 단어를 예측
        self.decoder = Decoder(
            vocab_size=vocab_size,
            num_layers=num_layers,
            ff_dim=units,
            d_model=d_model,
            num_heads=num_heads,
            max_len=max_len,
            dropout=dropout
        )

        # -------------------------------
        # (3) 출력 변환층 (Final Linear Layer)
        # -------------------------------
        # 디코더 출력을 어휘 크기(vocab_size)로 투영하여
        # 각 단어의 예측 확률 분포를 구함
        self.final_linear = nn.Linear(d_model, vocab_size)


    def forward(self, inputs, dec_inputs):
        """
        inputs:      인코더 입력 (source sentence)
        dec_inputs:  디코더 입력 (target sentence shifted right)
        """

        # ======================================================
        # (1) 마스크 생성 단계
        # ======================================================

        # 인코더 입력의 패딩 토큰을 무시하기 위한 마스크
        enc_padding_mask = create_padding_mask(inputs, sp.pad_id())

        # 디코더용 look-ahead 마스크 (미래 단어 가리기)
        #   - seq_len 크기의 하삼각 행렬 생성 (torch.tril)
        look_ahead_mask = create_look_ahead_mask(dec_inputs.size(1)).to(inputs.device)
        look_ahead_mask = look_ahead_mask.unsqueeze(0).unsqueeze(1)  # [1, 1, seq_len, seq_len]

        # 인코더의 패딩 토큰을 디코더 cross-attention에서 무시하기 위한 마스크
        dec_padding_mask = create_padding_mask(inputs, sp.pad_id())

        # ======================================================
        # (2) 인코더 단계 (Encoder)
        # ======================================================
        # 입력 문장을 벡터로 인코딩
        enc_output = self.encoder(inputs, enc_padding_mask)
        # 출력 형태: [batch_size, src_seq_len, d_model]

        # ======================================================
        # (3) 디코더 단계 (Decoder)
        # ======================================================
        # 인코더 출력과 디코더 입력을 결합하여 예측
        dec_output = self.decoder(
            dec_inputs,        # 이전 단어 입력
            enc_output,        # 인코더 컨텍스트
            look_ahead_mask,   # 자기회귀 마스크
            dec_padding_mask   # 패딩 마스크
        )
        # 출력 형태: [batch_size, tgt_seq_len, d_model]

        # ======================================================
        # (4) 출력 변환 (Final Linear)
        # ======================================================
        # 디코더 출력을 단어 확률 분포(logits)로 변환
        logits = self.final_linear(dec_output)
        # 출력 형태: [batch_size, tgt_seq_len, vocab_size]

        return logits


## Step 4-2. Dataset & DataLoader

In [129]:
class ChatbotDataset(Dataset):
    def __init__(self, qs, ans):
        """
        Args:
            qs (list[list[int]]): 토큰화된 질문 시퀀스 (입력 문장)
            ans (list[list[int]]): 토큰화된 답변 시퀀스 (목표 문장)
        """
        self.qs = qs   # 질문(입력) 시퀀스 리스트
        self.ans = ans # 답변(출력) 시퀀스 리스트

    def __len__(self):
        return len(self.qs)

    def __getitem__(self, i):
        """
        i번째 (질문, 답변) 샘플을 텐서 형태로 반환
        Returns:
            tuple(torch.Tensor, torch.Tensor): (질문 텐서, 답변 텐서)
        """
        return torch.tensor(self.qs[i]), torch.tensor(self.ans[i])


def collate(batch):
    """
    DataLoader가 batch 단위로 데이터를 꺼낼 때 호출되는 함수.

    Args:
        batch (list[tuple[Tensor, Tensor]]): [(src1, tgt1), (src2, tgt2), ...]

    Returns:
        tuple[Tensor, Tensor]:
            src (Tensor): [batch_size, max_src_len] 형태의 인코더 입력
            tgt (Tensor): [batch_size, max_tgt_len] 형태의 디코더 입력
    """
    # batch 내의 (src, tgt) 쌍을 각각 분리
    src, tgt = zip(*batch)

    # -------------------------------
    # 질문 시퀀스 패딩 (Encoder 입력)
    # -------------------------------
    # 길이가 다른 문장들을 동일한 길이로 맞추기 위해 패딩 추가
    # sp.pad_id() : SentencePiece 토크나이저의 패딩 토큰 ID
    src = pad_sequence(src, batch_first=True, padding_value=sp.pad_id())

    # -------------------------------
    # 답변 시퀀스 패딩 (Decoder 입력)
    # -------------------------------
    tgt = pad_sequence(tgt, batch_first=True, padding_value=sp.pad_id())

    # 패딩된 텐서 반환
    return src, tgt



# 전체 데이터셋
full_dataset = ChatbotDataset(tokenized_questions, tokenized_answers)

# 80% train, 10% valid, 10% test
train_size = int(0.8 * len(full_dataset))
valid_size = int(0.1 * len(full_dataset))
test_size = len(full_dataset) - train_size - valid_size

train_dataset, valid_dataset, test_dataset = random_split(
    full_dataset, [train_size, valid_size, test_size],
    generator=torch.Generator().manual_seed(42)
)

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True, collate_fn=collate)
valid_loader = DataLoader(valid_dataset, batch_size=64, shuffle=False, collate_fn=collate)
test_loader  = DataLoader(test_dataset,  batch_size=64, shuffle=False, collate_fn=collate)

print(f"Train: {len(train_dataset)} | Valid: {len(valid_dataset)} | Test: {len(test_dataset)}")

Train: 9458 | Valid: 1182 | Test: 1183


## Step 4-3. Model Train

In [130]:
def get_lr_lambda(d_model, warmup_steps=4000):
    """
    Transformer 논문의 학습률 스케줄링
    lr = d_model^(-0.5) * min(step^(-0.5), step * warmup_steps^(-1.5))
    """
    d_model = float(d_model)
    def lr_lambda(step):
        step = step + 1  # 0-based -> 1-based
        return (d_model ** -0.5) * min(step ** -0.5, step * (warmup_steps ** -1.5))
    return lr_lambda

In [131]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
VOCAB_SIZE = sp.get_piece_size()

model = Transformer(
    vocab_size=VOCAB_SIZE,
    num_layers=2,
    units=512,
    d_model=128,
    num_heads=8,
    dropout=0.4, # 과적합으로 규제 강화
    max_len=MAX_LEN
).to(device)

optimizer = optim.AdamW(model.parameters(), lr=1.0, betas=(0.9, 0.98), eps=1e-9)  # lr은 scheduler가 조절
scheduler = optim.lr_scheduler.LambdaLR(optimizer, lr_lambda=get_lr_lambda(d_model=128, warmup_steps=1000))

criterion = nn.CrossEntropyLoss(ignore_index=sp.pad_id(), label_smoothing=0.15) # 과적합으로 규제 강화

In [132]:
# 모델 저장 경로
os.makedirs("data/chatbot/checkpoints", exist_ok=True)
best_val_loss = float('inf')

In [133]:
@torch.no_grad()  # 그래디언트 계산 비활성화 (메모리 절약, 속도 향상)
def evaluate(model, loader, crit):
    """
    Args:
        model: 평가할 Transformer 모델
        loader: 검증용 DataLoader
        crit: 손실 함수 (예: nn.CrossEntropyLoss)
    
    Returns:
        float: 검증 데이터셋 평균 손실
    """

    model.eval()

    total_loss = 0.0  # 배치 손실 누적 변수

    for src, tgt in loader:
        src, tgt = src.to(device), tgt.to(device)

        tgt_input = tgt[:, :-1] # 디코더 입력: 마지막 토큰 제외 (teacher forcing)
        tgt_target = tgt[:, 1:].contiguous().view(-1)  # 디코더 목표: 첫 번째 토큰 제외 (예측할 정답)
                                                       # view(-1) → 1차원으로 펼쳐 CrossEntropyLoss 계산 준비
        
        logits = model(src, tgt_input)  # [batch, tgt_seq_len, vocab_size]
        loss = crit(logits.view(-1, VOCAB_SIZE), tgt_target)
        total_loss += loss.item()

    return total_loss / len(loader)


In [134]:
def train_one_epoch(model, loader, opt, scheduler, crit):
    """
    Args:
        model: 학습할 Transformer 모델
        loader: 학습용 DataLoader
        opt: 옵티마이저 (예: AdamW)
        scheduler: 학습률 스케줄러
        crit: 손실 함수 (예: nn.CrossEntropyLoss)

    Returns:
        float: epoch 단위 평균 손실
    """
    model.train()

    total_loss = 0.0  # 배치 손실 누적 변수

    for step, (src, tgt) in enumerate(loader):
        src, tgt = src.to(device), tgt.to(device)

        tgt_input = tgt[:, :-1] # 디코더 입력: 마지막 토큰 제외 (teacher forcing)

        
        tgt_target = tgt[:, 1:].contiguous().view(-1)  # 디코더 목표: 첫 번째 토큰 제외 (예측할 정답)
                                                       # view(-1) → 1차원으로 펼쳐 CrossEntropyLoss 계산 준비

        opt.zero_grad()
        logits = model(src, tgt_input)  # [batch, tgt_seq_len, vocab_size]
        loss = crit(logits.view(-1, VOCAB_SIZE), tgt_target)
        loss.backward()

        # -------------------------------
        # gradient clipping
        #   - exploding gradient 방지 (최대 L2 norm 1.0)
        # -------------------------------
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        opt.step()
        scheduler.step()

        total_loss += loss.item()

        if (step + 1) % 100 == 0:
            print(f"  Step {step+1} | Loss: {loss.item():.4f} | LR: {scheduler.get_last_lr()[0]:.6f}")

    return total_loss / len(loader)


In [135]:
N_EPOCHS = 100
for epoch in range(1, N_EPOCHS + 1):
    start = time.time()
    train_loss = train_one_epoch(model, train_loader, optimizer, scheduler, criterion)
    val_loss = evaluate(model, valid_loader, criterion)

    if val_loss < best_val_loss:
        best_val_loss = val_loss
        torch.save(model.state_dict(), "data/chatbot/checkpoints/best_transformer.pth")
        print(f"  [SAVE] Best model saved | Val Loss: {val_loss:.4f}")

    print(f'Epoch {epoch:02d} | Train: {train_loss:.4f} | Val: {val_loss:.4f} | '
          f'PPL: {math.exp(val_loss):6.2f} | {time.time()-start:.1f}s\n')

  Step 100 | Loss: 6.5827 | LR: 0.000282
  [SAVE] Best model saved | Val Loss: 6.2387
Epoch 01 | Train: 7.3831 | Val: 6.2387 | PPL: 512.17 | 3.1s

  Step 100 | Loss: 6.0713 | LR: 0.000696
  [SAVE] Best model saved | Val Loss: 5.8425
Epoch 02 | Train: 6.0766 | Val: 5.8425 | PPL: 344.63 | 3.0s

  Step 100 | Loss: 5.4650 | LR: 0.001110
  [SAVE] Best model saved | Val Loss: 5.5452
Epoch 03 | Train: 5.7200 | Val: 5.5452 | PPL: 256.00 | 3.0s

  Step 100 | Loss: 5.5579 | LR: 0.001523
  [SAVE] Best model saved | Val Loss: 5.3296
Epoch 04 | Train: 5.4399 | Val: 5.3296 | PPL: 206.35 | 3.0s

  Step 100 | Loss: 5.4318 | LR: 0.001937
  [SAVE] Best model saved | Val Loss: 5.1864
Epoch 05 | Train: 5.2167 | Val: 5.1864 | PPL: 178.82 | 3.0s

  Step 100 | Loss: 5.0335 | LR: 0.002351
  [SAVE] Best model saved | Val Loss: 5.1004
Epoch 06 | Train: 5.0345 | Val: 5.1004 | PPL: 164.08 | 3.1s

  Step 100 | Loss: 4.9812 | LR: 0.002764
  [SAVE] Best model saved | Val Loss: 5.0406
Epoch 07 | Train: 4.8959 | Val: 

# Step 5. 모델 평가하기

In [136]:
model.load_state_dict(torch.load("data/chatbot/checkpoints/best_transformer.pth"))
print("Best model loaded.\n")

test_loss = evaluate(model, test_loader, criterion)
print(f"=== FINAL TEST LOSS: {test_loss:.4f} | PPL: {math.exp(test_loss):6.2f} ===\n")

Best model loaded.

=== FINAL TEST LOSS: 4.7472 | PPL: 115.27 ===



In [137]:
@torch.no_grad()  # 추론 시 그래디언트 계산 비활성화 (메모리 절약, 속도 향상)
def respond(model, sp, sentence, max_len=50):
    """
    Args:
        model: 학습된 Transformer 모델
        sp: SentencePiece 토크나이저 객체
        sentence: 사용자 입력 문장 (str)
        max_len: 생성할 최대 토큰 길이
    
    Returns:
        str: 모델이 생성한 답변 문장
    """
    
    model.eval()
    src = torch.tensor([sp.encode_as_ids(sentence)], device=device)  # [1, src_seq_len]
    enc_padding_mask = create_padding_mask(src, sp.pad_id())
    enc_output = model.encoder(src, enc_padding_mask)  # [1, src_seq_len, d_model]

    dec_input = torch.tensor([[sp.bos_id()]], device=device)  # [1, 1]
    output_ids = []  # 생성된 토큰 ID를 저장할 리스트

    for _ in range(max_len):

 
        look_ahead_mask = create_look_ahead_mask(dec_input.size(1)).to(device)
        look_ahead_mask = look_ahead_mask.unsqueeze(0).unsqueeze(1)  # [1, 1, seq_len, seq_len]
        dec_padding_mask = create_padding_mask(src, sp.pad_id())
        dec_output = model.decoder(dec_input, enc_output, look_ahead_mask, dec_padding_mask)  # [1, seq_len, d_model]
        logits = model.final_linear(dec_output[:, -1, :])  # [1, vocab_size]
        next_id = logits.argmax(dim=-1).item()  # 가장 확률 높은 토큰 선택 (greedy decoding)
        output_ids.append(next_id)

        if next_id == sp.eos_id():
            break
        dec_input = torch.cat([dec_input, torch.tensor([[next_id]], device=device)], dim=1)  # [1, seq_len+1]

    return sp.decode_ids(output_ids)

In [138]:
test_indices = test_dataset.indices  # 원본 df에서의 인덱스 리스트
test_questions = [df.Q[idx] for idx in test_indices]
test_answers   = [df.A[idx] for idx in test_indices]

for i in range(len(test_questions)):
    q = test_questions[i]
    real_a = test_answers[i]
    pred_a = respond(model, sp, q)

    print(f"\nQ: {q}")
    print(f"실제 A: {real_a}")
    print(f"모델 A: {pred_a}")


Q: 둘 다 바보였다
실제 A: 바보의 사랑이네요.
모델 A: 좋은 생각이에요 .

Q: 일만 하고 살았는데 이 모양
실제 A: 자신의 인생을 사세요.
모델 A: 뭘 꿨는지 궁금하네요 .

Q: 쉬는날인데 왜 만나자고 안하지
실제 A: 직접 만나자고 연락해보세요.
모델 A: 그건 좀 아닌건 좀 아닌건 아닐 거예요 .

Q: 비정규직이라 차별받아
실제 A: 차별이 가장 안 좋은 건데 슬프네요.
모델 A: 뭐든 잘하면 기술이죠 .

Q: 오늘도 공기가 뿌얘
실제 A: 마스크 착용 하시고 외출하세요.
모델 A: 좋은 생각이에요 .

Q: 만난지 얼마 안됐는데 고백받았어
실제 A: 정말 좋아했나봐요.
모델 A: 사랑은 알다가도 모르니까요 .

Q: 파마할까
실제 A: 한번 해보세요.
모델 A: 좋은 생각이에요 .

Q: 젊어지고 싶어
실제 A: 마음이라도 어리게 사세요.
모델 A: 뭘 먹든 맛있게 드세요 .

Q: 자기 소개 또 어떻게 해
실제 A: 강렬하고 자신감있게 하면 되지 않을까싶어요.
모델 A: 뭘 꿨는지 궁금하네요 .

Q: 그게 잘 안되네.
실제 A: 마음이 쉽지 않죠.
모델 A: 이제 그만 놓아주세요 .

Q: 그녀는 나를 어떻게 생각할까?
실제 A: 아직은 잘 모르겠어요.
모델 A: 좋은 생각이에요 .

Q: 답정녀 좀 싫어
실제 A: 못 이기니 피할 수 있으면 피하세요.
모델 A: 그게 인생이죠 .

Q: 지금이 겨울이라 참 다행이이네
실제 A: 제가 곁에 있을게요.
모델 A: 좋은 생각이에요 .

Q: 씁쓸하네.
실제 A: 제가 있잖아요.
모델 A: 금방 지나갈 거예요 .

Q: 짝남의 여사친들 보기 질투나.
실제 A: 질투난다고 살짝 말해봐요.
모델 A: 그건 소액이 아니에요 .

Q: 그녀의 환심을 살 수 있는 방법이 뭐가 있을까?
실제 A: 그녀가 좋아하는게 뭔지 찾아봐요.
모델 A: 뭐든 잘하면 기술이죠 .

Q: 사랑해서 너무 행복해
실제 A: 어쩐지 행복해 보이더라구요.
모델 A: 뭐든 잘하면 기술이죠 .

Q: 미친것같아
실제 A: 