In [1]:
"""
[텍스트 블럭] 과제 평가기준 대응 요약

1) "Transformer(Encoder-Decoder) → GPT(Decoder-only)" 변경사항
- 인코더 삭제: 질문(Q)과 답변(A)을 하나의 시퀀스로 합쳐 한 개의 디코더 스택만 사용합니다.
- 자기회귀(causal) 마스킹: 미래 토큰을 가리는 상삼각 마스크를 전 블록에 적용합니다.
- 포지션 인코딩: GPT 논문 방식에 맞춰 "학습 가능한 위치 임베딩"으로 교체했습니다.
- 입력 포맷: [BOS] + Q + [SEP] + A + [EOS] 로 단일 시퀀스 구성, 다음 토큰 예측으로 학습.
- 손실 계산: 기본은 전체 토큰에 대해 LM loss(다음 토큰 예측)를 적용하되,
LOSS_ON_ANSWER_ONLY=True으로 Q 구간 손실을 무시하고 A 구간만 학습하도록 선택 가능하게 했습니다.

2) 전처리 및 데이터 변형
- 기존 전처리(normalize_text) 유지: 학습/추론 동일 규칙을 강제합니다.
- SentencePiece 공통 어휘: Q/A를 합친 코퍼스로 학습. GPT 입력을 위해 [SEP] 심볼을 추가했습니다.
- Dataset: (input_ids, labels) 한 쌍을 반환합니다. labels는 input_ids를 1칸 오른쪽으로 민 형태이며,
패딩 Q+SEP 구간은 IGNORE_INDEX(-100)로 채워 손실에서 제외합니다.

3) GPT 입력 블럭
- TokenEmbedding + PositionEmbedding + Dropout →
[Decoder 블록 × L]: (Pre-LN) LayerNorm → Masked MHA → Residual → LayerNorm → MLP(GELU) → Residual →
Final LayerNorm → Vocab Linear
- 위치 정보는 각 시퀀스 길이만큼 0..L-1 인덱스를 부여해 학습 가능한 임베딩으로 더합니다.

4) 모델 구성/확인 (summary, fit)
- torchinfo가 있으면 summary를 출력합니다(없으면 파라미터 수/텐서 크기만 출력).
- 학습 루프(run_epoch)를 제공하여 model.fit에 준하는 에폭별 loss/ppl을 출력합니다.
- 체크포인트(best valid loss) 저장/로딩 로직 포함.

5) 추론 및 동작 확인
- 입력: "[BOS] + Q + [SEP]"를 넣고, EOS 또는 최대 길이까지 오토리그레시브로 생성합니다.
- 출력: [SEP] 이후부터 EOS 전까지의 토큰을 디코드하여 답변 문자열로 반환합니다.
"""

!pip install sentencepiece torch torchvision scikit-learn pandas --quiet

In [2]:
# ========================
# 1) 패키지 임포트
# ========================
import os
import io
import re
import math
import unicodedata
import random
from pathlib import Path
from dataclasses import dataclass

import numpy as np
import pandas as pd

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader

from sklearn.model_selection import train_test_split
import sentencepiece as spm

In [3]:
# ========================
# 2) 재현성(시드 고정)
# ========================
SEED = 42
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)

if torch.cuda.is_available():
    torch.cuda.manual_seed_all(SEED)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
print("Device:", DEVICE)

Device: cuda


In [4]:
# ========================
# 3) 데이터 로드 & 전처리
# ========================
URL = "https://github.com/songys/Chatbot_data/raw/master/ChatbotData.csv"
print("데이터 로딩 중...")
df = pd.read_csv(URL)
print(df.head(3))
print("원본 샘플 수:", len(df))

# 정규화: 학습/추론 동일 규칙 유지(중요)
def normalize_text(s: str) -> str:
    s = unicodedata.normalize("NFKC", str(s))
    s = re.sub(r"[^0-9A-Za-z가-힣ㄱ-ㅎㅏ-ㅣ\s.,?!~’'\"()\-\:;@/]", " ", s)
    s = re.sub(r"\s+", " ", s).strip()
    return s

# 결측/중복/길이 필터링
MIN_CHARS, MAX_CHARS = 1, 128
df = df.dropna(subset=["Q", "A"]).copy()
df["Q"] = df["Q"].astype(str).map(normalize_text)
df["A"] = df["A"].astype(str).map(normalize_text)

mask = (
    df["Q"].str.len().between(MIN_CHARS, MAX_CHARS) &
    df["A"].str.len().between(MIN_CHARS, MAX_CHARS)
)
df = df[mask].drop_duplicates(subset=["Q", "A"]).reset_index(drop=True)
print("전처리 후 샘플 수:", len(df))

train_df, valid_df = train_test_split(
    df[["Q", "A"]],
    test_size=0.05,
    random_state=SEED,
    shuffle=True
)
print(f"train: {len(train_df)} | valid: {len(valid_df)}")

데이터 로딩 중...
              Q            A  label
0        12시 땡!   하루가 또 가네요.      0
1   1지망 학교 떨어졌어    위로해 드립니다.      0
2  3박4일 놀러가고 싶다  여행은 언제나 좋죠.      0
원본 샘플 수: 11823
전처리 후 샘플 수: 11750
train: 11162 | valid: 588


In [5]:
# ========================
# 4) SentencePiece 학습(+ [SEP])
# ========================
ARTIFACTS = Path("artifacts")
ARTIFACTS.mkdir(parents=True, exist_ok=True)

corpus_path = ARTIFACTS / "spm_corpus.txt"
with io.open(corpus_path, "w", encoding="utf-8") as f:
    for s in pd.concat([train_df["Q"], train_df["A"]], axis=0).astype(str):
        if s:
            f.write(s + "\n")

spm_model_prefix = str(ARTIFACTS / "spm_ko_gpt")
VOCAB_SIZE = 8000

need_train_spm = True
if Path(spm_model_prefix + ".model").exists():
    # 이미 있으면 [SEP] 존재 여부 체크 → 없으면 재학습
    sp_tmp = spm.SentencePieceProcessor()
    sp_tmp.load(spm_model_prefix + ".model")
    need_train_spm = (sp_tmp.piece_to_id("[SEP]") == -1)

if need_train_spm:
    print("SentencePiece 학습 시작([SEP] 추가 포함)...")
    spm.SentencePieceTrainer.Train(
        input=str(corpus_path),
        model_prefix=spm_model_prefix,
        vocab_size=VOCAB_SIZE,
        model_type="unigram",
        character_coverage=0.9995,
        pad_id=0, unk_id=1, bos_id=2, eos_id=3,
        pad_piece="[PAD]", unk_piece="[UNK]", bos_piece="<s>", eos_piece="</s>",
        user_defined_symbols=["[SEP]"], # ★ 추가: GPT 입력을 위한 구분자
        input_sentence_size=2000000,
        shuffle_input_sentence=True,
        train_extremely_large_corpus=False
    )
    print("SentencePiece 학습 완료.")
else:
    print("기존 SentencePiece 모델 발견 → 재사용.")

sp = spm.SentencePieceProcessor()
sp.load(spm_model_prefix + ".model")

PAD_ID = sp.pad_id()   # 0
UNK_ID = sp.unk_id()   # 1
BOS_ID = sp.bos_id()   # 2
EOS_ID = sp.eos_id()   # 3
VOCAB  = sp.get_piece_size()

SEP_ID = sp.piece_to_id("[SEP]")
if SEP_ID == -1:
    raise ValueError("SentencePiece vocab에 [SEP]이 없습니다. user_defined_symbols=['[SEP]']로 재학습 필요.")

print("Vocab size:", VOCAB, "| PAD/BOS/EOS/SEP:", PAD_ID, BOS_ID, EOS_ID, SEP_ID)


기존 SentencePiece 모델 발견 → 재사용.
Vocab size: 8000 | PAD/BOS/EOS/SEP: 0 2 3 4


In [6]:
# ========================
# 5) Dataset / DataLoader (Decoder-only용)
# ========================
MAX_TOKENS = 128
IGNORE_INDEX = -100
LOSS_ON_ANSWER_ONLY = True  # Q+SEP 구간 손실 무시

def encode_text(s: str, spm_processor, max_len=MAX_TOKENS):
    ids = spm_processor.encode(s, out_type=int)
    return ids[: max_len - 2]

class GPTChatDataset(Dataset):
    """Q, A → [BOS] Q [SEP] A [EOS] 단일 시퀀스
    input_ids = seq[:-1], labels = seq[1:]
    (옵션) Q+SEP 구간은 IGNORE_INDEX로 손실 제외
    """
    def __init__(self, frame: pd.DataFrame, spm_processor, max_len=MAX_TOKENS):
        self.q = frame["Q"].tolist()
        self.a = frame["A"].tolist()
        self.sp = spm_processor
        self.max_len = max_len

    def __len__(self):
        return len(self.q)

    def __getitem__(self, i):
        q_ids = encode_text(self.q[i], self.sp, self.max_len)
        a_ids = encode_text(self.a[i], self.sp, self.max_len)

        seq = [BOS_ID] + q_ids + [SEP_ID] + a_ids + [EOS_ID]
        seq = seq[: max(2, self.max_len)]  # 최소 2토큰 유지

        input_ids = seq[:-1]
        labels    = seq[1:]

        if LOSS_ON_ANSWER_ONLY and (SEP_ID in seq):
            sep_pos = seq.index(SEP_ID)
            for t in range(0, min(sep_pos, len(labels))):  # ← 여기서 끝! (a₁은 학습에 포함)
                labels[t] = IGNORE_INDEX


        return (
            torch.tensor(input_ids, dtype=torch.long),
            torch.tensor(labels,    dtype=torch.long),
        )

def collate_fn(batch):
    input_seqs, label_seqs = zip(*batch)
    maxlen = max(x.size(0) for x in input_seqs)

    inputs = torch.full((len(batch), maxlen), PAD_ID,       dtype=torch.long)
    labels = torch.full((len(batch), maxlen), IGNORE_INDEX, dtype=torch.long)

    for i, (inp, lab) in enumerate(zip(input_seqs, label_seqs)):
        L = inp.size(0)
        inputs[i, :L] = inp
        labels[i, :L] = lab
    return inputs, labels  # (B, T)

train_ds = GPTChatDataset(train_df, sp, MAX_TOKENS)
valid_ds = GPTChatDataset(valid_df, sp, MAX_TOKENS)

train_loader = DataLoader(train_ds, batch_size=128, shuffle=True, collate_fn=collate_fn, num_workers=0)
valid_loader = DataLoader(valid_ds, batch_size=128, shuffle=False, collate_fn=collate_fn, num_workers=0)

print(next(iter(train_loader))[0].shape)  # (B, T) 확인용

torch.Size([128, 36])


In [7]:
# ========================
# 6) GPT 블록/모델 (Decoder-only)
# ========================
@dataclass
class GPTConfig:
    vocab_size: int
    max_seq_len: int = 128
    d_model: int = 256
    n_heads: int = 8
    n_layers: int = 6
    attn_pdrop: float = 0.1
    resid_pdrop: float = 0.1
    emb_pdrop: float = 0.1
    mlp_ratio: int = 4

class GPTBlock(nn.Module):
    """Pre-LN: LN → Masked MHA → Residual, LN → MLP(GELU) → Residual"""
    def __init__(self, config: GPTConfig):
        super().__init__()
        self.ln1 = nn.LayerNorm(config.d_model)
        self.attn = nn.MultiheadAttention(
            embed_dim=config.d_model,
            num_heads=config.n_heads,
            dropout=config.attn_pdrop,
            batch_first=False,
        )
        self.drop1 = nn.Dropout(config.resid_pdrop)

        self.ln2 = nn.LayerNorm(config.d_model)
        self.mlp = nn.Sequential(
            nn.Linear(config.d_model, config.mlp_ratio * config.d_model),
            nn.GELU(),
            nn.Linear(config.mlp_ratio * config.d_model, config.d_model),
            nn.Dropout(config.resid_pdrop),
        )

    def forward(self, x, attn_mask, key_padding_mask):
        # x: (T, B, C)
        x_norm = self.ln1(x)
        attn_out, _ = self.attn(
            x_norm, x_norm, x_norm,
            attn_mask=attn_mask,              # (T, T) True=mask
            key_padding_mask=key_padding_mask,# (B, T) True=ignore
            need_weights=False,
        )
        x = x + self.drop1(attn_out)
        x = x + self.mlp(self.ln2(x))
        return x

class GPTLanguageModel(nn.Module):
    def __init__(self, config: GPTConfig):
        super().__init__()
        self.config = config
        self.tok_emb = nn.Embedding(config.vocab_size, config.d_model, padding_idx=PAD_ID)
        self.pos_emb = nn.Embedding(config.max_seq_len, config.d_model)
        self.drop = nn.Dropout(config.emb_pdrop)

        self.blocks = nn.ModuleList([GPTBlock(config) for _ in range(config.n_layers)])
        self.ln_f = nn.LayerNorm(config.d_model)
        self.head = nn.Linear(config.d_model, config.vocab_size, bias=False)
        # weight tying
        self.head.weight = self.tok_emb.weight

    def forward(self, input_ids, attn_mask=None, key_padding_mask=None):
        # input_ids: (B, T)
        B, T = input_ids.shape
        if T > self.config.max_seq_len:
            raise ValueError(f"시퀀스 길이 {T} > max_seq_len {self.config.max_seq_len}")

        pos = torch.arange(0, T, device=input_ids.device).unsqueeze(0)  # (1, T)
        x = self.tok_emb(input_ids) + self.pos_emb(pos)                 # (B, T, C)
        x = self.drop(x)
        x = x.transpose(0, 1).contiguous()                              # (T, B, C)

        if attn_mask is None:
            attn_mask = torch.triu(torch.ones(T, T, device=input_ids.device, dtype=torch.bool), diagonal=1)

        for blk in self.blocks:
            x = blk(x, attn_mask=attn_mask, key_padding_mask=key_padding_mask)

        x = self.ln_f(x)                                                # (T, B, C)
        logits = self.head(x)                                           # (T, B, V)
        return logits

config = GPTConfig(vocab_size=VOCAB, max_seq_len=MAX_TOKENS, d_model=256, n_heads=8, n_layers=6)
model = GPTLanguageModel(config).to(DEVICE)
print("모델 준비 완료")


모델 준비 완료


In [8]:
# ========================
# 7) summary
# ========================

def print_model_summary():
    try:
        from torchinfo import summary
        summary(model, input_size=(2, 32), dtypes=[torch.long], device=DEVICE)
    except Exception as e:
        n_params = sum(p.numel() for p in model.parameters())
        n_train = sum(p.numel() for p in model.parameters() if p.requires_grad)
        print("torchinfo 요약 실패:", e)
        print(f"모델 파라미터: total={n_params:,} trainable={n_train:,}")

print_model_summary()

torchinfo 요약 실패: No module named 'torchinfo'
모델 파라미터: total=6,819,840 trainable=6,819,840


In [9]:
# ========================
# 8) 학습 유틸 (fit)
# ========================

criterion = nn.CrossEntropyLoss(ignore_index=IGNORE_INDEX, label_smoothing=0.1)
optimizer = torch.optim.AdamW(model.parameters(), lr=3e-4, betas=(0.9, 0.95), eps=1e-8, weight_decay=0.01)

def to_masks(inputs: torch.Tensor):
    # inputs: (B, T)
    B, T = inputs.shape
    attn_mask = torch.triu(torch.ones(T, T, device=inputs.device, dtype=torch.bool), diagonal=1)
    key_padding_mask = (inputs == PAD_ID)  # (B, T) True=PAD
    return attn_mask, key_padding_mask

def run_epoch(dataloader, train=True):
    model.train(train)
    total_loss, total_tokens = 0.0, 0

    for inputs, labels in dataloader:
        inputs = inputs.to(DEVICE)   # (B, T)
        labels = labels.to(DEVICE)   # (B, T)

        attn_mask, key_padding_mask = to_masks(inputs)
        logits = model(inputs, attn_mask=attn_mask, key_padding_mask=key_padding_mask)  # (T, B, V)
        

        loss = criterion(
            logits.view(-1, logits.size(-1)),          # (T*B, V)
            labels.transpose(0, 1).reshape(-1)         # (T*B,)
        )
        if train:
            optimizer.zero_grad(set_to_none=True)
            loss.backward()
            nn.utils.clip_grad_norm_(model.parameters(), 1.0)
            optimizer.step()

        valid_tokens = (labels != IGNORE_INDEX).sum().item()
        total_loss += loss.item() * max(1, valid_tokens)
        total_tokens += max(1, valid_tokens)

    avg_loss = total_loss / max(1, total_tokens)
    ppl = math.exp(avg_loss)
    return avg_loss, ppl

In [10]:
# ========================
# 9) 학습 루프 (에포크 조정)
# ========================

EPOCHS = 150
ckpt_path = ARTIFACTS / "best_gpt1.pt"
best_val = float("inf")

print("학습 시작...")
for epoch in range(1, EPOCHS + 1):
    tr_loss, tr_ppl = run_epoch(train_loader, train=True)
    va_loss, va_ppl = run_epoch(valid_loader, train=False)
    print(f"[{epoch:02d}] train loss {tr_loss:.4f} | ppl {tr_ppl:.2f}  ||  valid loss {va_loss:.4f} | ppl {va_ppl:.2f}")
    if va_loss < best_val:
        best_val = va_loss
        torch.save(model.state_dict(), ckpt_path)

print("학습 종료. Best valid loss:", best_val)
if ckpt_path.exists():
    model.load_state_dict(torch.load(ckpt_path, map_location=DEVICE))
    print("베스트 체크포인트 로드 완료:", ckpt_path)


학습 시작...
[01] train loss 31.6494 | ppl 55610144822849.61  ||  valid loss 18.6051 | ppl 120249150.64
[02] train loss 16.8403 | ppl 20589909.41  ||  valid loss 13.6607 | ppl 856598.44
[03] train loss 12.8820 | ppl 393177.58  ||  valid loss 10.6610 | ppl 42657.99
[04] train loss 10.3937 | ppl 32651.65  ||  valid loss 8.7694 | ppl 6434.59
[05] train loss 8.9183 | ppl 7467.10  ||  valid loss 7.7646 | ppl 2355.68
[06] train loss 8.0260 | ppl 3059.43  ||  valid loss 7.0995 | ppl 1211.41
[07] train loss 7.4323 | ppl 1689.70  ||  valid loss 6.6606 | ppl 781.03
[08] train loss 7.0243 | ppl 1123.60  ||  valid loss 6.3638 | ppl 580.44
[09] train loss 6.7173 | ppl 826.59  ||  valid loss 6.1395 | ppl 463.81
[10] train loss 6.4923 | ppl 660.04  ||  valid loss 5.9664 | ppl 390.11
[11] train loss 6.3075 | ppl 548.69  ||  valid loss 5.8416 | ppl 344.34
[12] train loss 6.1543 | ppl 470.74  ||  valid loss 5.7553 | ppl 315.86
[13] train loss 6.0318 | ppl 416.44  ||  valid loss 5.6892 | ppl 295.66
[14] trai

In [11]:
# ========================
# 8) 샘플 추론
# ========================

@torch.no_grad()
def greedy_decode_gpt(question: str, max_new_tokens: int = 64, min_new_tokens: int = 5):
    model.eval()
    q_norm = normalize_text(question)
    q_ids = sp.encode(q_norm, out_type=int)

    # 프롬프트: [BOS] Q [SEP]
    seq = [BOS_ID] + q_ids + [SEP_ID]
    seq = seq[: MAX_TOKENS - 1]
    x = torch.tensor(seq, dtype=torch.long, device=DEVICE).unsqueeze(0)  # (1, T)

    for step in range(max_new_tokens):
        T = x.size(1)
        T_cond = min(T, MAX_TOKENS)
        x_cond = x[:, -T_cond:]

        attn_mask, key_padding_mask = to_masks(x_cond)
        logits = model(x_cond, attn_mask=attn_mask, key_padding_mask=key_padding_mask)  # (T, 1, V)

        # ---- 억제 규칙들 ----
        logits_step = logits[-1, 0]                   # (V,)
        logits_step[PAD_ID] = -1e9                    # PAD 금지
        logits_step[SEP_ID] = -1e9                    # SEP 재생성 금지
        if step < min_new_tokens:
            logits_step[EOS_ID] = -1e9                # 초반 EOS 금지 → 빈 답 방지

        # 바로 직전 토큰 반복 억제(단순하고 효과적)
        last_id = int(x[0, -1].item())
        logits_step[last_id] -= 5.0                   # 같은 토큰 연타를 강하게 깎기

        next_id = int(torch.argmax(logits_step).item())
        x = torch.cat([x, torch.tensor([[next_id]], device=DEVICE, dtype=torch.long)], dim=1)
        if next_id == EOS_ID:
            break

    # [SEP] 이후 ~ EOS 이전 디코드
    out = x[0].tolist()
    start = out.index(SEP_ID) + 1 if SEP_ID in out else 0
    end = start + out[start:].index(EOS_ID) if EOS_ID in out[start:] else len(out)
    return sp.decode(out[start:end])


samples = [
    "안녕하세요",
    "너 이름이 뭐야?",
    "오늘 너무 피곤하다",
    "여자친구랑 싸웠어",
    "공부하기 싫어",
    "1+1이 뭐야?",
    "양자역학에 대해서 설명해줘",
    "밥 먹었어?",
    "너 똑똑해?"
]

print("\n=== 샘플 질의 결과 ===")
for q in samples:
    a = greedy_decode_gpt(q)
    print("Q:", q)
    print("A:", a)
    print("-" * 60)


=== 샘플 질의 결과 ===
Q: 안녕하세요
A: 안녕하세요. 다른 사람의 삶에 한눈팔며 살기에는 자신의 인생이 너무나도 소중합니다.
------------------------------------------------------------
Q: 너 이름이 뭐야?
A: 저는 마음을 이어주는 위로봇입니다.
------------------------------------------------------------
Q: 오늘 너무 피곤하다
A: 운명을 더 사랑해주세요.
------------------------------------------------------------
Q: 여자친구랑 싸웠어
A: 싸우면서 정 들 거예요.
------------------------------------------------------------
Q: 공부하기 싫어
A: 잠시 쉬어도 돼요.
------------------------------------------------------------
Q: 1+1이 뭐야?
A: 사랑은 유지하는 게 중요한데 대단하네요.
------------------------------------------------------------
Q: 양자역학에 대해서 설명해줘
A: 부모님만의 변덕이 심하네요.
------------------------------------------------------------
Q: 밥 먹었어?
A: 저는 배터리가 밥이예요.
------------------------------------------------------------
Q: 너 똑똑해?
A: 달콤한 말이예요.
------------------------------------------------------------
