In [1]:
!pip install sentencepiece torch torchvision scikit-learn pandas --quiet

In [2]:
import os
import io
import re
import math
import unicodedata
import random
import numpy as np
import pandas as pd
from pathlib import Path

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader

from sklearn.model_selection import train_test_split
import sentencepiece as spm

In [3]:
# ============================================================
# 0) 재현성(재사용) 확보: 시드 고정
# ------------------------------------------------------------
SEED = 42
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(SEED)

In [4]:
# ============================================================
# 1) 데이터 로드
#   - ChatbotData.csv: Q(질문), A(답변), label(의도, 여기선 사용X)
#   - 인터넷이 되는 환경에서 URL로 직접 로딩
# ------------------------------------------------------------
URL = "https://github.com/songys/Chatbot_data/raw/master/ChatbotData.csv"

print("데이터 로딩 중...")
df = pd.read_csv(URL)
print(df.head(3))
print("원본 샘플 수:", len(df))

데이터 로딩 중...
              Q            A  label
0        12시 땡!   하루가 또 가네요.      0
1   1지망 학교 떨어졌어    위로해 드립니다.      0
2  3박4일 놀러가고 싶다  여행은 언제나 좋죠.      0
원본 샘플 수: 11823


In [5]:
# ============================================================
# 2) 전처리 함수 정의
#   - 한국어/영문 혼용 채팅 텍스트에서 잡음을 줄여 토크나이저/모델 학습 안정화
#   - 규칙:
#       (1) 유니코드 정규화(NFKC): 전각/반각/합성문자 통일
#       (2) 허용 문자만 남기기: 한글/영문/숫자/공백/기본문장부호
#       (3) 다중 공백 → 단일 공백, 양끝 공백 제거
#   - 주의: 학습과 추론에서 "동일" 규칙을 써야 함
# ------------------------------------------------------------
def normalize_text(s: str) -> str:
    s = unicodedata.normalize("NFKC", str(s))                           # (1)
    s = re.sub(r"[^0-9A-Za-z가-힣ㄱ-ㅎㅏ-ㅣ\s.,?!~’'\"()\-\:;@/]", " ", s)  # (2)
    s = re.sub(r"\s+", " ", s).strip()                                   # (3)
    return s

# 결측치 제거(Q/A 둘 중 하나라도 NaN이면 제거), 원본 백업
df = df.dropna(subset=["Q", "A"]).copy()
df["Q_orig"] = df["Q"]
df["A_orig"] = df["A"]

# 전처리 적용
df["Q"] = df["Q"].map(normalize_text)
df["A"] = df["A"].map(normalize_text)

# 너무 짧거나(정보 부족) 너무 긴(학습 불안정) 샘플 제거
MIN_CHARS, MAX_CHARS = 1, 128
mask = (
    df["Q"].str.len().between(MIN_CHARS, MAX_CHARS) &
    df["A"].str.len().between(MIN_CHARS, MAX_CHARS)
)
df = df[mask].drop_duplicates(subset=["Q", "A"]).reset_index(drop=True)
print("전처리 후 샘플 수:", len(df))

# 학습/검증 분리 (검증은 과적합 체크 및 체크포인트 기준)
train_df, valid_df = train_test_split(
    df[["Q", "A"]],
    test_size=0.05,
    random_state=SEED,
    shuffle=True
)
print(f"train: {len(train_df)}  | valid: {len(valid_df)}")

전처리 후 샘플 수: 11750
train: 11162  | valid: 588


In [6]:
# ============================================================
# 3) SentencePiece 서브워드 토크나이저 학습
#   - 형태소 분석기 대신 서브워드(언어 비의존적) 사용
#   - Q/A를 합친 "공동 코퍼스"로 1회 학습 → 어휘공간 공유
#   - 특수 토큰 ID 고정: PAD=0, UNK=1, BOS=2, EOS=3
# ------------------------------------------------------------
ARTIFACTS = Path("artifacts")
ARTIFACTS.mkdir(parents=True, exist_ok=True)

corpus_path = ARTIFACTS / "spm_corpus.txt"
with io.open(corpus_path, "w", encoding="utf-8") as f:
    # 질문과 답변을 하나의 코퍼스로 합쳐 학습 → 양 방향 어휘를 모두 커버
    for s in pd.concat([train_df["Q"], train_df["A"]], axis=0).astype(str):
        if s:
            f.write(s + "\n")

spm_model_prefix = str(ARTIFACTS / "spm_ko")
VOCAB_SIZE = 8000  # 데이터 크기/도메인에 따라 4k~16k 탐색 권장

# 이미 학습된 모델이 없으면 학습
if not Path(spm_model_prefix + ".model").exists():
    print("SentencePiece 학습 시작...")
    spm.SentencePieceTrainer.Train(
        input=str(corpus_path),
        model_prefix=spm_model_prefix,
        vocab_size=VOCAB_SIZE,
        model_type="unigram",            # "bpe"도 가능
        character_coverage=0.9995,       # 한/영 혼용 텍스트에서 적절
        pad_id=0, unk_id=1, bos_id=2, eos_id=3,
        pad_piece="[PAD]", unk_piece="[UNK]", bos_piece="<s>", eos_piece="</s>",
        user_defined_symbols=[],
        input_sentence_size=2000000,     # (옵션) 대규모 코퍼스 샘플링 크기
        shuffle_input_sentence=True,
        train_extremely_large_corpus=False
    )
    print("SentencePiece 학습 완료.")
else:
    print("기존 SentencePiece 모델 발견 → 재사용.")

# 학습된 토크나이저 로드
sp = spm.SentencePieceProcessor()
sp.load(spm_model_prefix + ".model")

PAD_ID = sp.pad_id()   # 0
UNK_ID = sp.unk_id()   # 1
BOS_ID = sp.bos_id()   # 2
EOS_ID = sp.eos_id()   # 3

print("Vocab size:", sp.get_piece_size(), "| PAD/BOS/EOS:", PAD_ID, BOS_ID, EOS_ID)

기존 SentencePiece 모델 발견 → 재사용.
Vocab size: 8000 | PAD/BOS/EOS: 0 2 3


In [7]:
# ============================================================
# 4) PyTorch Dataset / DataLoader
#   - encode_text: 문자열 → 서브워드 IDs (최대 길이 컷오프)
#   - ChatDataset: Transformer 입력/출력 쌍 구성
#       * src      = [BOS] + Q + [EOS]        → 인코더 입력
#       * tgt_in   = [BOS] + A                → 디코더 입력(teacher forcing)
#       * tgt_out  =       A + [EOS]          → 디코더 정답
#   - collate_fn: 배치 내 최대길이 기준 오른쪽 PAD로 맞춤
#     ※ DataLoader는 (batch, seq)의 텐서를 뱉고,
#        모델로 넣기 직전에 (seq, batch)로 전치합니다.
# ------------------------------------------------------------
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
print("Device:", DEVICE)

MAX_TOKENS = 64  # 질문/답변 모두 동일한 최대 서브워드 길이(실험으로 조정)

def encode_text(s: str, spm_processor, max_len=MAX_TOKENS):
    # 주의: 학습과 추론에서 동일 전처리(norm) + 동일 토크나이저 사용
    ids = spm_processor.encode(s, out_type=int)
    ids = ids[:max_len - 2]  # BOS/EOS 자리를 위해 -2
    return ids

class ChatDataset(Dataset):
    def __init__(self, frame: pd.DataFrame, spm_processor, max_len=MAX_TOKENS):
        self.q = frame["Q"].tolist()
        self.a = frame["A"].tolist()
        self.sp = spm_processor
        self.max_len = max_len

    def __len__(self):
        return len(self.q)

    def __getitem__(self, i):
        # 문자열 → 서브워드 ID 시퀀스
        src_ids = encode_text(self.q[i], self.sp, self.max_len)
        tgt_ids = encode_text(self.a[i], self.sp, self.max_len)

        # 인코더/디코더 시퀀스 구성(시작/종료 토큰 부착, 1칸 시프트)
        src = [BOS_ID] + src_ids + [EOS_ID]
        tgt_in  = [BOS_ID] + tgt_ids
        tgt_out = tgt_ids + [EOS_ID]

        return torch.tensor(src, dtype=torch.long), \
               torch.tensor(tgt_in, dtype=torch.long), \
               torch.tensor(tgt_out, dtype=torch.long)

def collate_fn(batch):
    # batch: List[ (src, tgt_in, tgt_out) ]
    srcs, tgts_in, tgts_out = zip(*batch)

    def pad_right(seqs, pad_id=PAD_ID):
        # (batch, max_len)로 오른쪽 패딩
        maxlen = max(x.size(0) for x in seqs)
        out = torch.full((len(seqs), maxlen), pad_id, dtype=torch.long)
        for i, s in enumerate(seqs):
            out[i, :s.size(0)] = s
        return out

    return pad_right(srcs), pad_right(tgts_in), pad_right(tgts_out)

train_ds = ChatDataset(train_df, sp, MAX_TOKENS)
valid_ds = ChatDataset(valid_df, sp, MAX_TOKENS)

train_loader = DataLoader(train_ds, batch_size=128, shuffle=True, collate_fn=collate_fn, num_workers=0)
valid_loader = DataLoader(valid_ds, batch_size=128, shuffle=False, collate_fn=collate_fn, num_workers=0)

Device: cuda


In [8]:
# ============================================================
# 5) Positional Encoding (사인/코사인)
#   - Transformer는 RNN처럼 순서를 내재적으로 알지 못하므로
#     위치 정보를 임베딩에 더해줘야 함
#   - 입력/타깃 시퀀스 모두에 적용
# ------------------------------------------------------------
class PositionalEncoding(nn.Module):
    def __init__(self, d_model: int, max_len: int = 2048, dropout: float = 0.1):
        super().__init__()
        self.dropout = nn.Dropout(dropout)

        # pe: (max_len, d_model)
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float32).unsqueeze(1)  # (max_len, 1)
        div_term = torch.exp(torch.arange(0, d_model, 2, dtype=torch.float32) *
                             (-math.log(10000.0) / d_model))
        # 짝수/홀수 차원에 각각 sin/cos
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)

        # Buffer로 등록하면 학습 파라미터가 아닌 "상수"로 취급됨
        self.register_buffer('pe', pe.unsqueeze(1))  # (max_len, 1, d_model)

    def forward(self, x):
        # x: (seq_len, batch, d_model) 이어야 함
        x = x + self.pe[:x.size(0)]
        return self.dropout(x)

In [9]:
# ============================================================
# 6) Transformer 모델 정의
#   - 임베딩 → 포지셔널 인코딩 → (인코더/디코더 스택) → Linear(어휘 크기 로짓)
#   - nn.Transformer 기본 규약: (seq, batch, dim), batch_first=False
# ------------------------------------------------------------
class TransformerChatbot(nn.Module):
    def __init__(self, vocab_size, d_model=256, nhead=8,
                 num_layers=4, dim_ff=1024, dropout=0.1):
        super().__init__()
        self.tok_emb = nn.Embedding(vocab_size, d_model, padding_idx=PAD_ID)
        self.pos_enc = PositionalEncoding(d_model, dropout=dropout)
        self.transformer = nn.Transformer(
            d_model=d_model,
            nhead=nhead,
            num_encoder_layers=num_layers,
            num_decoder_layers=num_layers,
            dim_feedforward=dim_ff,
            dropout=dropout,
            batch_first=False  # 입력/출력은 (seq, batch, dim)
        )
        self.generator = nn.Linear(d_model, vocab_size)

    def forward(self, src, tgt_in,
                src_key_padding_mask, tgt_key_padding_mask, memory_key_padding_mask):
        """
        src  : (S, B)  인코더 입력
        tgt_in: (T, B) 디코더 입력(teacher forcing)
        *_key_padding_mask: (B, S or T), True=가려라(PAD)
        반환: logits (T, B, V)
        """
        # 임베딩 + 스케일링(√d_model)
        src = self.tok_emb(src) * math.sqrt(self.tok_emb.embedding_dim)  # (S, B, d_model)
        tgt = self.tok_emb(tgt_in) * math.sqrt(self.tok_emb.embedding_dim)  # (T, B, d_model)

        # 위치 인코딩 부착
        src = self.pos_enc(src)
        tgt = self.pos_enc(tgt)

        # 디코더 미래 가림 마스크 생성: (T, T)
        tgt_mask = nn.Transformer.generate_square_subsequent_mask(tgt.size(0)).to(src.device)

        # Transformer 전파
        out = self.transformer(
            src=src,
            tgt=tgt,
            tgt_mask=tgt_mask,
            src_key_padding_mask=src_key_padding_mask,
            tgt_key_padding_mask=tgt_key_padding_mask,
            memory_key_padding_mask=memory_key_padding_mask
        )  # (T, B, d_model)

        # 어휘 분포 로짓
        logits = self.generator(out)  # (T, B, V)
        return logits

VOCAB = sp.get_piece_size()
model = TransformerChatbot(VOCAB).to(DEVICE)

# 손실함수: PAD 무시 + 라벨 스무딩(과신 방지)
criterion = nn.CrossEntropyLoss(ignore_index=PAD_ID, label_smoothing=0.1)
# 옵티마이저: Adam (Transformer 권장 세팅)
optimizer = torch.optim.Adam(model.parameters(), lr=3e-4, betas=(0.9, 0.98), eps=1e-9)



In [10]:
# ============================================================
# 7) 학습 유틸(마스크/전치 등)
#   - key_padding_mask: (batch, seq) 형태, 값 True=PAD=가려라
#   - Transformer는 (seq, batch, dim)을 쓰므로, DataLoader 출력
#     (batch, seq)을 (seq, batch)로 전치해서 넣음
# ------------------------------------------------------------
def make_padding_mask(batch_ids):  # (batch, seq) → (batch, seq) bool
    return (batch_ids == PAD_ID)

def to_seq_first(x):  # (batch, seq) → (seq, batch)
    return x.transpose(0, 1).contiguous()

In [11]:
# ============================================================
# 8) 학습 루프
#   - Teacher Forcing: 디코더 입력은 "정답의 이전 토큰"
#   - PAD 무시, Label Smoothing, Gradient Clipping
#   - ppl(Perplexity) = exp(토큰당 평균 손실)
# ------------------------------------------------------------
def run_epoch(dataloader, train=True):
    model.train(train)
    total_loss, total_tokens = 0.0, 0

    for src, tgt_in, tgt_out in dataloader:
        # DataLoader는 (batch, seq) → (seq, batch)로 전치 후 장치로 이동
        src = to_seq_first(src).to(DEVICE)      # (S, B)
        tgt_in = to_seq_first(tgt_in).to(DEVICE)  # (T, B)
        tgt_out = to_seq_first(tgt_out).to(DEVICE)  # (T, B)

        # key_padding_mask 생성: (B, seq), True=PAD=가려라
        src_kpm = make_padding_mask(src.transpose(0, 1)).to(DEVICE)     # (B, S)
        tgt_kpm = make_padding_mask(tgt_in.transpose(0, 1)).to(DEVICE)  # (B, T)

        with torch.set_grad_enabled(train):
            logits = model(src, tgt_in, src_kpm, tgt_kpm, src_kpm)  # (T, B, V)

            # CE 입력 형태: (N, C) vs (N,)
            loss = criterion(
                logits.view(-1, logits.size(-1)),  # (T*B, V)
                tgt_out.reshape(-1)                # (T*B,)
            )
            if train:
                optimizer.zero_grad(set_to_none=True)
                loss.backward()
                # 그래디언트 폭주 방지
                torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
                optimizer.step()

        # PAD를 제외한 실제 토큰 수로 가중 평균 손실 계산
        n_tokens = (tgt_out != PAD_ID).sum().item()
        total_loss += loss.item() * n_tokens
        total_tokens += n_tokens

    avg_loss = total_loss / max(1, total_tokens)
    ppl = math.exp(avg_loss)
    return avg_loss, ppl

EPOCHS = 150
# 에폭 조정에 따라 답변 성능이 달라짐
# 10 > 기본적인 인사만 할 수 있음 (ex 안녕하세요.)
# 50 > 상태의 대한 질문에 공감을 할 수 있음 (ex 싸우면서 정 들 거예요. 아무 생각 하지 말고 푹 주무세요.)
# 100 > 채팅봇 본인의 상태에 대한 대답을 할 수 있음 (ex 위로봇이요. 저는 배터리가 밥이예요.)

best_val = float("inf")
ckpt_path = ARTIFACTS / "best_transformer.pt"

print("학습 시작...")
for epoch in range(1, EPOCHS + 1):
    tr_loss, tr_ppl = run_epoch(train_loader, train=True)
    va_loss, va_ppl = run_epoch(valid_loader, train=False)
    print(f"[{epoch:02d}] train loss {tr_loss:.4f} | ppl {tr_ppl:.2f}  ||  valid loss {va_loss:.4f} | ppl {va_ppl:.2f}")

    # 검증 손실이 가장 낮은 시점의 가중치 저장(조기중단 대용)
    if va_loss < best_val:
        best_val = va_loss
        torch.save(model.state_dict(), ckpt_path)

print("학습 종료. Best valid loss:", best_val)

학습 시작...




[01] train loss 6.4373 | ppl 624.70  ||  valid loss 5.8864 | ppl 360.12
[02] train loss 5.7244 | ppl 306.26  ||  valid loss 5.5409 | ppl 254.90
[03] train loss 5.4169 | ppl 225.18  ||  valid loss 5.2995 | ppl 200.23
[04] train loss 5.1649 | ppl 175.02  ||  valid loss 5.1064 | ppl 165.07
[05] train loss 4.9516 | ppl 141.40  ||  valid loss 4.9709 | ppl 144.16
[06] train loss 4.7600 | ppl 116.74  ||  valid loss 4.8509 | ppl 127.85
[07] train loss 4.5966 | ppl 99.15  ||  valid loss 4.7403 | ppl 114.46
[08] train loss 4.4435 | ppl 85.07  ||  valid loss 4.6622 | ppl 105.87
[09] train loss 4.3039 | ppl 73.98  ||  valid loss 4.5984 | ppl 99.32
[10] train loss 4.1674 | ppl 64.55  ||  valid loss 4.5135 | ppl 91.24
[11] train loss 4.0434 | ppl 57.02  ||  valid loss 4.4720 | ppl 87.53
[12] train loss 3.9223 | ppl 50.52  ||  valid loss 4.4223 | ppl 83.29
[13] train loss 3.8125 | ppl 45.26  ||  valid loss 4.3670 | ppl 78.81
[14] train loss 3.6980 | ppl 40.37  ||  valid loss 4.3289 | ppl 75.86
[15] t

In [12]:
# ============================================================
# 9) 추론(디코딩) - Greedy
#   - 인코더: 질문 문맥 벡터화
#   - 디코더: [BOS]로 시작 → 한 토큰씩 argmax로 이어 붙임
#   - 종료 조건: EOS 생성 or 최대 길이 도달
#   - 한계: 단순/빠르지만 반복/짧은 답 위험 → 필요 시 빔/탑-p로 확장
# ------------------------------------------------------------
@torch.no_grad()
def greedy_decode(question: str, max_len=MAX_TOKENS):
    model.eval()

    # (1) 전처리/토큰화: 학습과 "동일" 규칙 적용
    q_norm = normalize_text(question)
    src_ids = [BOS_ID] + sp.encode(q_norm, out_type=int)[:max_len-2] + [EOS_ID]
    src = torch.tensor(src_ids, dtype=torch.long).unsqueeze(1).to(DEVICE)  # (S, 1)

    # key_padding_mask: (B=1, S), True=PAD
    src_kpm = make_padding_mask(src.transpose(0, 1)).to(DEVICE)

    # (2) 디코더 시작 토큰: BOS (T=1, B=1)
    ys = torch.tensor([BOS_ID], dtype=torch.long, device=DEVICE).unsqueeze(1)

    # (3) 한 토큰씩 생성
    for _ in range(max_len - 1):
        tgt_kpm = make_padding_mask(ys.transpose(0, 1)).to(DEVICE)  # (1, T)
        logits = model(src, ys, src_kpm, tgt_kpm, src_kpm)          # (T, 1, V)
        next_token = logits[-1, 0].argmax(-1).item()                # 마지막 스텝의 분포에서 argmax
        ys = torch.cat([ys, torch.tensor([[next_token]], device=DEVICE)], dim=0)
        if next_token == EOS_ID:
            break

    # (4) BOS 제거, EOS 이전까지만 디코드
    out_ids = ys.squeeze(1).tolist()[1:]
    if EOS_ID in out_ids:
        out_ids = out_ids[:out_ids.index(EOS_ID)]
    return sp.decode(out_ids)

# 학습된 베스트 체크포인트 로드(가능할 때만)
if ckpt_path.exists():
    model.load_state_dict(torch.load(ckpt_path, map_location=DEVICE))
    print("베스트 체크포인트 로드 완료:", ckpt_path)
else:
    print("체크포인트가 없어 현재 가중치로 추론합니다(테스트 목적).")

베스트 체크포인트 로드 완료: artifacts/best_transformer.pt


In [13]:
# ============================================================
# 10) 샘플 질의 테스트
# ------------------------------------------------------------
samples = [
    "안녕하세요",
    "너 이름이 뭐야?",
    "오늘 너무 피곤하다",
    "여자친구랑 싸웠어",
    "공부하기 싫어",
    "1+1이 뭐야?", # 연산을 학습하지 않았으므로 올바른 답변을 하지 못함
    "양자역학에 대해서 설명해줘", # 일상 대화가 아닌 전문지식은 올바른 답변을 하지 못함
    "밥 먹었어?",
    "너 똑똑해?"
]

print("\n=== 샘플 질의 결과 ===")
for q in samples:
    a = greedy_decode(q)
    print("Q:", q)
    print("A:", a)
    print("-" * 60)

# ============================================================
# [요약]
# - 전처리: 학습/추론 동일 규칙 유지(중요)
# - SentencePiece: 공동 코퍼스, 특수토큰 ID 고정
# - DataLoader: (batch, seq) → (seq, batch) 전치
# - key_padding_mask: (batch, seq), True=PAD=무시
# - 디코더: tgt_mask(미래 가림) 반드시 적용
# - 타깃 시프트: tgt_in = [BOS]+A  / tgt_out = A+[EOS]
# - Loss: PAD 무시 + label_smoothing
# - 추론: model.eval() + @torch.no_grad()
# ============================================================


=== 샘플 질의 결과 ===
Q: 안녕하세요
A: 안녕하세요.
------------------------------------------------------------
Q: 너 이름이 뭐야?
A: 저는 위로봇입니다.
------------------------------------------------------------
Q: 오늘 너무 피곤하다
A: 아무 생각 하지 말고 푹 주무세요.
------------------------------------------------------------
Q: 여자친구랑 싸웠어
A: 싸우면서 정 들 거예요.
------------------------------------------------------------
Q: 공부하기 싫어
A: 잠시 쉬어도 돼요.
------------------------------------------------------------
Q: 1+1이 뭐야?
A: 저도 해보고 싶은 알바예요.
------------------------------------------------------------
Q: 양자역학에 대해서 설명해줘
A: 게 대화를 이어나가는게 좋겠어요.
------------------------------------------------------------
Q: 밥 먹었어?
A: 저는 배터리가 밥이예요.
------------------------------------------------------------
Q: 너 똑똑해?
A: 저는 위로해드리는 로봇이에요.
------------------------------------------------------------
