
# **Korean GPT (Decoder-only) Pretraining** *(PyTorch)*



## [1] Transformer와 비교해 변경이 필요한 부분
- (기존) Encoder–Decoder → (본 과제) **Decoder-only GPT**  
  - **Cross-Attention 제거**, **Causal Self-Attention**(미래 토큰 마스킹) 적용  ← `# [CHG]` 주석으로 코드에 표시  
  - 목적 함수: `L1 = ∑ log P(u_i | u_<i)` (다음 토큰 예측)  
- 입력/출력: **BOS** + 토큰[0..T-2] → 라벨: 토큰[1..T-1] + **EOS**  
- 위치 정보: **학습형 Positional Embedding** 추가 (GPT-1 스타일)  
- 헤드: **LM Head with Weight Tying** (임베딩 가중치 공유)  
- 학습: **AdamW + Cosine LR with Warmup + AMP + Gradient Clipping + Early Stopping**

---

## [2] 모델 입력 형태에 맞게 전처리 수행
- **SentencePiece(Unigram)** 사용 (첨부 노트북 참조).  
  - 특수 토큰 예약: `[PAD]=0, [BOS]=1, [EOS]=2, [UNK]=3, [USR]=4, [SYS]=5`  
  - 대화 데이터는 `<usr> ... </s> <sys> ... </s>` 형식으로 하나의 시퀀스로 결합 → **사전학습(언어모델)**에 바로 사용.
- 오프라인 환경 또는 라이브러리 미설치 시 **Fallback 토크나이저**(공백/문자 수준) 제공.

---

## [3] 입력 블록을 GPT 논문에 기반하여 수정
- 입력 텐서 = **TokenEmbedding + PositionalEmbedding(learned)**  
- 샘플 배치를 프린트하여 **입력 정상 확인** + **길이/마스크 검증**

---

## [4] GPT 모델을 정상적으로 구성 (요약/학습 로그 첨부)
- PyTorch **Decoder-only Transformer** 구현 (Causal mask)  
- `print(#params)`, 학습 로그(loss/perplexity), 체크포인트 저장

---

## [5] 입력에 따른 출력 생성
- **Top-k / Top-p / Temperature / Repetition Penalty** 지원  
- 프롬프트에 대한 **생성 예시 출력**



In [12]:

# Imports & Config
import os, re, math, random, time, json, numpy as np, pandas as pd
import torch, torch.nn as nn, torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

try:
    import sentencepiece as spm
    HAS_SPM = True
except Exception:
    HAS_SPM = False

SEED=42
random.seed(SEED); np.random.seed(SEED); torch.manual_seed(SEED)

SEQ_LEN=128; BATCH=64; EPOCHS=5
EMB_DIM=512; NUM_HEADS=8; FF_DIM=EMB_DIM*4; NUM_LAYERS=8; DROPOUT=0.1
LR=3e-4; WARMUP_STEPS=200; WEIGHT_DECAY=0.1; GRAD_CLIP=1.0; PATIENCE=3
NGRAM_BLOCK=0  # 0=off, 2/3 for no-repeat n-gram

SPECIAL_TOKENS={"PAD":0,"BOS":1,"EOS":2,"UNK":3}
DEVICE="cuda" if torch.cuda.is_available() else "cpu"
print("Device:", DEVICE)


Device: cuda


In [13]:

# Data: build <usr>/<sys>/<eot> sequences
DATA_CSV="data/ChatbotData.csv"
def build_dialog_rows_from_csv(path):
    df=pd.read_csv(path)[["Q","A"]].dropna()
    rows=[]
    for q,a in zip(df["Q"], df["A"]):
        q=str(q).strip(); a=str(a).strip()
        if not q or not a: continue
        rows.append(f"<usr> {q} <eot> <sys> {a} <eot>")
    return rows

if os.path.exists(DATA_CSV):
    rows=build_dialog_rows_from_csv(DATA_CSV)
    text_corpus="\n".join(rows)
    print("Loaded ChatbotData.csv:", len(rows))
else:
    text_corpus=(
        "<usr> 한국어 GPT 모델을 만들고 싶어 <eot> <sys> 사전학습 후 미세조정을 하면 좋아요 <eot>\n"
        "<usr> 어떤 토크나이저가 좋아? <eot> <sys> sentencepiece 권장! 없으면 공백 기반도 가능 <eot>\n"
        "<usr> 학습을 안정적으로 하려면? <eot> <sys> 워밍업, 코사인 스케줄, AMP, weight decay, 반복 패널티 <eot>"
    )
    print("Using tiny demo corpus.")


Loaded ChatbotData.csv: 11823


In [14]:

# Tokenizer (SentencePiece -> fallback)
MODEL_PREFIX="spm_ko_chat"; VOCAB_SIZE=16000
if HAS_SPM:
    if not os.path.exists(MODEL_PREFIX+".model"):
        with open("temp_corpus.txt","w",encoding="utf-8") as f:
            f.write(text_corpus)
        spm.SentencePieceTrainer.Train(
            input="temp_corpus.txt",
            model_prefix=MODEL_PREFIX,
            vocab_size=VOCAB_SIZE,
            model_type="unigram",
            pad_id=SPECIAL_TOKENS["PAD"],
            unk_id=SPECIAL_TOKENS["UNK"],
            bos_id=SPECIAL_TOKENS["BOS"],
            eos_id=SPECIAL_TOKENS["EOS"],
            user_defined_symbols=["<usr>","<sys>","<eot>"],
            character_coverage=0.9995
        )
    sp = spm.SentencePieceProcessor(model_file=MODEL_PREFIX+".model")
    PAD_ID=sp.pad_id(); BOS_ID=sp.bos_id(); EOS_ID=sp.eos_id(); UNK_ID=sp.unk_id()
    USR_ID=sp.piece_to_id("<usr>"); SYS_ID=sp.piece_to_id("<sys>"); EOT_ID=sp.piece_to_id("<eot>")
    VOCAB_SIZE=sp.vocab_size()
else:
    def basic_tokenize(t): return re.sub(r"\s+"," ",t.strip()).split(" ")
    vocab=["[PAD]","[BOS]","[EOS]","[UNK]","<usr>","<sys>","<eot>"]
    for tok in basic_tokenize(text_corpus):
        if tok not in vocab: vocab.append(tok)
    word2id={w:i for i,w in enumerate(vocab)}; id2word={i:w for w,i in word2id.items()}
    PAD_ID=0; BOS_ID=1; EOS_ID=2; UNK_ID=3; USR_ID=word2id["<usr>"]; SYS_ID=word2id["<sys>"]; EOT_ID=word2id["<eot>"]
    VOCAB_SIZE=len(vocab)

def encode(text):
    if HAS_SPM: return sp.encode(text, out_type=int)
    else: return [word2id.get(t, UNK_ID) for t in basic_tokenize(text)]

def decode(ids):
    if HAS_SPM: return sp.decode(ids)
    else: return " ".join([id2word.get(int(i), "[UNK]") for i in ids])

print("Vocab size:", VOCAB_SIZE)


Vocab size: 21807


In [24]:

# Build sequences (X,Y)
def build_sequences(corpus_text, seq_len=SEQ_LEN, stride=SEQ_LEN//2):
    ids=[BOS_ID]+encode(corpus_text)+[EOS_ID]
    X,Y=[],[]
    for start in range(0, max(1,len(ids)-1-seq_len), stride):
        x=ids[start:start+seq_len]; y=ids[start+1:start+1+seq_len]
        if len(x)<seq_len or len(y)<seq_len: break
        X.append(x); Y.append(y)
    import torch
    return torch.tensor(X,dtype=torch.long), torch.tensor(Y,dtype=torch.long)

X,Y=build_sequences(text_corpus, SEQ_LEN)
from torch.utils.data import Dataset, DataLoader
class LMDataset(Dataset):
    def __init__(self,X,Y): self.X=X; self.Y=Y
    def __len__(self): return self.X.shape[0]
    def __getitem__(self,idx): return self.X[idx], self.Y[idx]
train_loader=DataLoader(LMDataset(X,Y), batch_size=BATCH, shuffle=True, drop_last=True)

xb,yb=next(iter(train_loader))
print("Dataset:", X.shape, Y.shape)
print("X[0]->text:", decode(xb[0].tolist()[:80]))
print("Y[0]->text:", decode(yb[0].tolist()[:80]))

Dataset: torch.Size([2083, 128]) torch.Size([2083, 128])
X[0]->text: <usr> 오늘 마지막 인사하러 갑니다. <eot> <sys> 미련없이 정리했긴 바랍니다. <eot> <usr> 오늘 메일을 한통 보냈어 <eot> <sys> 후회가 남지 않길 바랍니다. <eot> <usr> 오늘 무너졌어. <eot> <sys> 힘내지 않아도 돼요. 조금 쉬어가세요. <eot> <usr> 오늘 무작정 찾으러가고싶은데 갈까? <eot> <sys> 전 추천하고 싶지 않아요. <eot> <usr> 오늘 보기로 했던 그녀, 못 보았네 <eot> <sys> 보지 않는 게 더 나았을 수도 있겠네요. <eot> <usr> 오늘 얼굴보고 확실히 헤어졌어 <eot> <sys> 쉽지 않을 결정이었을텐데 맘고생 많았어요. <eot> <usr> 오늘 연락왔네 그래도
Y[0]->text: 오늘 마지막 인사하러 갑니다. <eot> <sys> 미련없이 정리했긴 바랍니다. <eot> <usr> 오늘 메일을 한통 보냈어 <eot> <sys> 후회가 남지 않길 바랍니다. <eot> <usr> 오늘 무너졌어. <eot> <sys> 힘내지 않아도 돼요. 조금 쉬어가세요. <eot> <usr> 오늘 무작정 찾으러가고싶은데 갈까? <eot> <sys> 전 추천하고 싶지 않아요. <eot> <usr> 오늘 보기로 했던 그녀, 못 보았네 <eot> <sys> 보지 않는 게 더 나았을 수도 있겠네요. <eot> <usr> 오늘 얼굴보고 확실히 헤어졌어 <eot> <sys> 쉽지 않을 결정이었을텐데 맘고생 많았어요. <eot> <usr> 오늘 연락왔네 그래도 이젠


In [25]:

# Model — Decoder-only GPT
class TokenPositionalEmbedding(nn.Module):
    def __init__(self, vocab_size, emb_dim, max_len):
        super().__init__()
        self.token_emb=nn.Embedding(vocab_size, emb_dim)
        self.pos_emb=nn.Embedding(max_len, emb_dim)
    def forward(self,x):
        B,T=x.shape
        pos=torch.arange(0,T, device=x.device).unsqueeze(0)
        return self.token_emb(x)+self.pos_emb(pos)  # [CHG] 위치 정보 추가

class GPTBlock(nn.Module):
    def __init__(self, emb_dim, num_heads, ff_dim, dropout=0.1):
        super().__init__()
        self.ln1=nn.LayerNorm(emb_dim)
        self.attn=nn.MultiheadAttention(embed_dim=emb_dim, num_heads=num_heads, dropout=dropout, batch_first=True)
        self.drop1=nn.Dropout(dropout)
        self.ln2=nn.LayerNorm(emb_dim)
        self.ff=nn.Sequential(nn.Linear(emb_dim,ff_dim), nn.GELU(), nn.Linear(ff_dim,emb_dim), nn.Dropout(dropout))
    def forward(self,x,attn_mask=None):
        h=self.ln1(x)
        if attn_mask is not None: attn_mask=attn_mask.to(x.device)  # [CHG]
        attn_out,_=self.attn(h,h,h, attn_mask=attn_mask)            # [CHG] Causal Self-Attention
        x=x+self.drop1(attn_out)
        x=x+self.ff(self.ln2(x))
        return x

class GPTModel(nn.Module):
    def __init__(self, vocab_size, seq_len, emb_dim, num_heads, ff_dim, num_layers, dropout=0.1):
        super().__init__()
        self.emb=TokenPositionalEmbedding(vocab_size, emb_dim, seq_len)
        self.drop=nn.Dropout(dropout)
        self.blocks=nn.ModuleList([GPTBlock(emb_dim,num_heads,ff_dim,dropout) for _ in range(num_layers)])
        self.ln_f=nn.LayerNorm(emb_dim)
        self.token_emb=self.emb.token_emb
        mask=torch.triu(torch.ones(seq_len,seq_len), diagonal=1)
        mask=mask.masked_fill(mask==1, float("-inf"))
        self.register_buffer("causal_mask", mask)
    def forward(self,x):
        h=self.drop(self.emb(x))
        for blk in self.blocks:
            h=blk(h, attn_mask=self.causal_mask)
        h=self.ln_f(h)
        logits=torch.matmul(h, self.token_emb.weight.T)  # [CHG] LM head (tied)
        return logits

def count_params(m): return sum(p.numel() for p in m.parameters())

model=GPTModel(VOCAB_SIZE, SEQ_LEN, EMB_DIM, NUM_HEADS, FF_DIM, NUM_LAYERS, DROPOUT).to(DEVICE)
print(model)
print("Total params:", count_params(model))


GPTModel(
  (emb): TokenPositionalEmbedding(
    (token_emb): Embedding(21807, 512)
    (pos_emb): Embedding(128, 512)
  )
  (drop): Dropout(p=0.1, inplace=False)
  (blocks): ModuleList(
    (0-7): 8 x GPTBlock(
      (ln1): LayerNorm((512,), eps=1e-05, elementwise_affine=True)
      (attn): MultiheadAttention(
        (out_proj): NonDynamicallyQuantizableLinear(in_features=512, out_features=512, bias=True)
      )
      (drop1): Dropout(p=0.1, inplace=False)
      (ln2): LayerNorm((512,), eps=1e-05, elementwise_affine=True)
      (ff): Sequential(
        (0): Linear(in_features=512, out_features=2048, bias=True)
        (1): GELU(approximate='none')
        (2): Linear(in_features=2048, out_features=512, bias=True)
        (3): Dropout(p=0.1, inplace=False)
      )
    )
  )
  (ln_f): LayerNorm((512,), eps=1e-05, elementwise_affine=True)
  (token_emb): Embedding(21807, 512)
)
Total params: 36450816


In [26]:

# Optimizer/Scheduler/AMP/Early stopping
from torch.optim import AdamW
optimizer=AdamW(model.parameters(), lr=LR, weight_decay=WEIGHT_DECAY)
scaler=torch.cuda.amp.GradScaler(enabled=(DEVICE=='cuda'))

def cosine_schedule(step, warmup, total, base_lr):
    if step < warmup: return base_lr*(step+1)/warmup
    progress=(step-warmup)/max(1,total-warmup)
    return 0.5*base_lr*(1+math.cos(math.pi*progress))

def set_lr(opt, lr):
    for pg in opt.param_groups: pg['lr']=lr

total_steps=EPOCHS*len(train_loader)
best_val=float('inf'); patience=PATIENCE; global_step=0

@torch.no_grad()
def evaluate_ppl(loader):
    model.eval()
    total_loss=0.0; total_tok=0
    for xb,yb in loader:
        xb=xb.to(DEVICE); yb=yb.to(DEVICE)
        logits=model(xb); B,T,V=logits.shape
        loss=F.cross_entropy(logits.view(B*T,V), yb.view(B*T))
        total_loss+=loss.item()*(B*T); total_tok+=B*T
    avg=total_loss/max(1,total_tok)
    ppl=math.exp(min(20,avg))
    return avg,ppl


  scaler=torch.cuda.amp.GradScaler(enabled=(DEVICE=='cuda'))


In [27]:

# Train
for epoch in range(1, EPOCHS+1):
    model.train()
    epoch_loss=0.0; epoch_tok=0; t0=time.time()
    for xb,yb in train_loader:
        xb=xb.to(DEVICE); yb=yb.to(DEVICE)
        lr_now=cosine_schedule(global_step, WARMUP_STEPS, total_steps, LR)
        set_lr(optimizer, lr_now)
        with torch.cuda.amp.autocast(enabled=(DEVICE=='cuda')):
            logits=model(xb); B,T,V=logits.shape
            loss=F.cross_entropy(logits.view(B*T,V), yb.view(B*T))
        optimizer.zero_grad(set_to_none=True)
        scaler.scale(loss).backward()
        scaler.unscale_(optimizer)
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        scaler.step(optimizer); scaler.update()
        epoch_loss+=loss.item()*(B*T); epoch_tok+=B*T; global_step+=1
    avg=epoch_loss/max(1,epoch_tok); ppl=math.exp(min(20,avg))
    val_loss,val_ppl=evaluate_ppl(train_loader)
    print(f"[Epoch {epoch}] train loss/token={avg:.4f} (ppl={ppl:.2f}) | val loss={val_loss:.4f} (ppl={val_ppl:.2f}) | {time.time()-t0:.1f}s")
    if val_loss < best_val-1e-4:
        best_val=val_loss; patience=PATIENCE
        torch.save(model.state_dict(), "best_gpt.pt"); print("  ↳ Saved checkpoint: best_gpt.pt")
    else:
        patience-=1
        if patience==0:
            print("Early stopping triggered."); break


  with torch.cuda.amp.autocast(enabled=(DEVICE=='cuda')):


[Epoch 1] train loss/token=218.3830 (ppl=485165195.41) | val loss=72.8744 (ppl=485165195.41) | 17.7s
  ↳ Saved checkpoint: best_gpt.pt
[Epoch 2] train loss/token=47.1861 (ppl=485165195.41) | val loss=34.8932 (ppl=485165195.41) | 13.5s
  ↳ Saved checkpoint: best_gpt.pt
[Epoch 3] train loss/token=33.1291 (ppl=485165195.41) | val loss=28.8556 (ppl=485165195.41) | 13.1s
  ↳ Saved checkpoint: best_gpt.pt
[Epoch 4] train loss/token=28.2932 (ppl=485165195.41) | val loss=26.2836 (ppl=485165195.41) | 13.0s
  ↳ Saved checkpoint: best_gpt.pt
[Epoch 5] train loss/token=25.0784 (ppl=485165195.41) | val loss=22.4719 (ppl=485165195.41) | 13.1s
  ↳ Saved checkpoint: best_gpt.pt


In [39]:

# Generation — return only the <sys> turn
@torch.no_grad()
def generate(user_text, max_new_tokens=80, temperature=0.8, top_k=50, top_p=0.9, rep_penalty=1.15):
    model.eval()
    prompt=f"<usr> {user_text.strip()} <eot> <sys>"
    ids=[BOS_ID]+encode(prompt)
    ids=ids[-SEQ_LEN:]
    x=torch.zeros((1,SEQ_LEN),dtype=torch.long, device=DEVICE)
    cur=len(ids); x[0,:cur]=torch.tensor(ids, device=DEVICE)

    recent=[]
    for _ in range(max_new_tokens):
        logits=model(x)[0,cur-1,:]
        for t in recent[-32:]: logits[t]/=rep_penalty
        # 금지 토큰(시스템 턴에서 <usr>/<sys> 출력 억제)
        logits[USR_ID]=-1e10; logits[SYS_ID]=-1e10
        logits=logits/max(1e-6,temperature)
        probs=F.softmax(logits,dim=-1)
        if top_k>0:
            topv,topi=torch.topk(probs, k=min(top_k, probs.numel()))
            mask=torch.zeros_like(probs); mask[topi]=probs[topi]; probs=mask
        if top_p<1.0:
            sort_probs,sort_idx=torch.sort(probs, descending=True)
            cumsum=torch.cumsum(sort_probs,dim=-1)
            cutoff=(cumsum>top_p).nonzero(as_tuple=False)
            if cutoff.numel()>0:
                k=cutoff[0,0].item()
                keep=sort_idx[:k+1]; mask=torch.zeros_like(probs); mask[keep]=probs[keep]; probs=mask
        probs=probs/probs.sum()
        next_id=torch.multinomial(probs,1).item()
        recent.append(next_id)
        if cur>=SEQ_LEN: x=torch.roll(x,shifts=-1,dims=1); x[0,-1]=next_id
        else: x[0,cur]=next_id; cur+=1
        if next_id in {EOS_ID, EOT_ID}: break

    # <sys> 구간만 추출
    out_ids=x[0,:cur].tolist()
    try: sys_start=out_ids.index(SYS_ID)+1
    except ValueError: sys_start=0
    try: sys_end=out_ids.index(EOT_ID, sys_start)
    except ValueError: sys_end=cur
    return decode(out_ids[sys_start:sys_end])

print("=== Generation Demo (system-only) ===")
print(generate("안녕! 오늘 기분 어때?"))


=== Generation Demo (system-only) ===
만나보세요. 만나보세요. 돼라 돼라 너무 마세요. 마세요.
