# 전처리

In [1]:
import re
def normalize_text(s):
  if not isinstance(s,str):
    s=str(s)
  s=s.lower()
  s=re.sub(r"[^가-힣a-z0-9.,?!\s]", " ", s)
  s=re.sub(r"\s+", " ", s).strip()
  return s

In [2]:
txt = "안녕하세요!!! 😀 테스트입니다~ 123 🚗 Hello!!"
print(normalize_text(txt))

안녕하세요!!! 테스트입니다 123 hello!!


In [3]:
import pandas as pd
data=pd.read_csv('/content/ChatbotData.csv')
data.head()

Unnamed: 0,Q,A,label
0,12시 땡!,하루가 또 가네요.,0
1,1지망 학교 떨어졌어,위로해 드립니다.,0
2,3박4일 놀러가고 싶다,여행은 언제나 좋죠.,0
3,3박4일 정도 놀러가고 싶다,여행은 언제나 좋죠.,0
4,PPL 심하네,눈살이 찌푸려지죠.,0


In [4]:
data['Q']=data['Q'].apply(normalize_text)
data['A']=data['A'].apply(normalize_text)

data.head()

Unnamed: 0,Q,A,label
0,12시 땡!,하루가 또 가네요.,0
1,1지망 학교 떨어졌어,위로해 드립니다.,0
2,3박4일 놀러가고 싶다,여행은 언제나 좋죠.,0
3,3박4일 정도 놀러가고 싶다,여행은 언제나 좋죠.,0
4,ppl 심하네,눈살이 찌푸려지죠.,0


In [5]:
import os
import sentencepiece as spm

CORPUS_TXT = "/content/corpus_ko.txt"
MODEL_PREFIX = "/content/spm_ko"
VOCAB_SIZE = 8000
MODEL_TYPE = "unigram"
PAD_ID, BOS_ID, EOS_ID, UNK_ID = 0, 1, 2, 3

In [6]:
df_c=data[['A','Q']].fillna('').copy()
df_c=df_c[(df_c['Q'].str.len()>0)&(df_c['A'].str.len()>0)].reset_index(drop=True)

In [7]:
with open(CORPUS_TXT, 'w') as f:
  for col in ['Q', 'A']:
    for s in df_c[col].astype(str):
      if s.strip():
        f. write(s.strip()+"\n")

In [8]:
spm.SentencePieceTrainer.Train(input=CORPUS_TXT,
    model_prefix=MODEL_PREFIX,
    vocab_size=VOCAB_SIZE,
    character_coverage=1.0,
    model_type=MODEL_TYPE,
    pad_id=0, bos_id=1, eos_id=2, unk_id=3,
    max_sentence_length=999999)

In [9]:
sp = spm.SentencePieceProcessor()
sp.load(f"{MODEL_PREFIX}.model")
print("ids: pad/bos/eos/unk =", sp.pad_id(), sp.bos_id(), sp.eos_id(), sp.unk_id())

sample = df_c["Q"].iloc[0]
print("sample:", sample)
print("pieces:", sp.encode(sample, out_type=str))
print("ids   :", sp.encode(sample, out_type=int))

ids: pad/bos/eos/unk = 0 1 2 3
sample: 12시 땡!
pieces: ['▁12', '시', '▁', '땡', '!']
ids   : [4275, 549, 5, 7825, 64]


In [10]:
import torch
from torch.utils.data import Dataset, DataLoader
PAD, BOS, EOS= sp.pad_id(), sp.bos_id(), sp.eos_id()
MAX_LEN=40

def encode_with_bos_eos(text):
  ids=sp.encode(str(text), out_type=int)
  return [BOS]+ids+[EOS]

class KoChatDataset(Dataset):
  def __init__(self,df,max_len=MAX_LEN):
    self.items=[]
    for q,a in zip(df_c['Q'], df_c['A']):
      q_ids=encode_with_bos_eos(q)
      a_ids=encode_with_bos_eos(a)
      if len(q_ids)>max_len or len(a_ids)>max_len:
        continue

      q_ids=q_ids+[PAD]*(max_len-len(q_ids))
      a_ids=a_ids+[PAD]*(max_len-len(a_ids))

      dec_in=a_ids[:-1]
      target=a_ids[1:]
      self.items.append((
          torch.tensor(q_ids,dtype=torch.long),
          torch.tensor(dec_in,dtype=torch.long),
          torch.tensor(target, dtype=torch.long)
      ))

  def __len__(self):
    return len(self.items)

  def __getitem__(self,i):
    return self.items[i]


In [11]:
from torch.utils.data import random_split
full_ds=KoChatDataset(df_c)
n_total=len(full_ds)
n_val=int(0.1*n_total)
n_train=n_total-n_val

train_ds,val_ds=random_split(full_ds,[n_train,n_val])
train_dl=DataLoader(train_ds,batch_size=64,shuffle=True, drop_last=True)
val_dl=DataLoader(val_ds,batch_size=64,shuffle=False)

# 모델 설계

In [12]:
import math
import torch.nn as nn
import torch.nn.functional as F
from typing import Optional

def create_padding_mask(x,pad_id):
  mask=(x==pad_id).float()
  return mask.unsqueeze(1).unsqueeze(2)

def create_look_ahead_mask(tgt_len,device=None):
  m=torch.triu(torch.ones(tgt_len, tgt_len, device=device), diagonal=1)
  m=m.unsqueeze(0).unsqueeze(0)
  return m

def combine_masks(causal,pad):
  if causal is None:
    return pad
  if pad is None:
    return causal
  return torch.max(causal,pad)


class PositionalEncoding(nn.Module):
    def __init__(self, d_model: int, max_len: int = 512):
        super().__init__()
        pe = torch.zeros(max_len, d_model)
        pos = torch.arange(0, max_len).unsqueeze(1).float()
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(pos * div_term)
        pe[:, 1::2] = torch.cos(pos * div_term)
        self.register_buffer("pe", pe.unsqueeze(0))  # (1, max_len, d_model)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        # x: (B,L,D)
        return x + self.pe[:, : x.size(1), :]


def scaled_dot_product_attention(Q,K,V,mask=None):
  dk=K.size(-1)
  scores=torch.matmul(Q,K.transpose(-1,-2))/math.sqrt(dk)
  if mask is not None:
    scores=scores+(mask*(-1e9))
  attn=F.softmax(scores,dim=-1)
  out=torch.matmul(attn,V)
  return out,attn


class MultiHeadAttention(nn.Module):
  def __init__(self,d_model,num_heads,dropout=0.1):
    super().__init__()
    self.num_heads=num_heads
    self.d_model=d_model
    self.depth=d_model//num_heads

    self.Wq=nn.Linear(d_model,d_model)
    self.Wk=nn.Linear(d_model,d_model)
    self.Wv=nn.Linear(d_model,d_model)
    self.Wo=nn.Linear(d_model,d_model)
    self.drop=nn.Dropout(dropout)

  def split_heads(self,x):
    B,L,D=x.shape
    x=x.view(B,L,self.num_heads, self.depth)
    return x.permute(0,2,1,3)

  def combine_heads(self,x):
    B,H,L,d=x.shape
    x=x.permute(0,2,1,3).contiguous()
    return x.view(B,L,H*d)

  def forward(self,q,k,v,mask=None):
    Q=self.split_heads(self.Wq(q))
    K=self.split_heads(self.Wk(k))
    V=self.split_heads(self.Wv(v))
    if mask is not None and mask.dim() == 4:
            pass
    out, attn = scaled_dot_product_attention(Q, K, V, mask)
    out = self.combine_heads(self.drop(out))
    return self.Wo(out)


class PositionwiseFFN(nn.Module):
    def __init__(self, d_model, ff_dim, dropout= 0.1):
        super().__init__()
        self.lin1 = nn.Linear(d_model, ff_dim)
        self.lin2 = nn.Linear(ff_dim, d_model)
        self.drop = nn.Dropout(dropout)

    def forward(self, x):
        return self.lin2(self.drop(F.relu(self.lin1(x))))


class EncoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, ff_dim, dropout=0.1):
        super().__init__()
        self.mha = MultiHeadAttention(d_model, num_heads, dropout)
        self.ffn = PositionwiseFFN(d_model, ff_dim, dropout)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.drop = nn.Dropout(dropout)

    def forward(self, x, src_mask= None):
        attn_out = self.mha(x, x, x, src_mask)
        x = self.norm1(x + self.drop(attn_out))
        ffn_out = self.ffn(x)
        x = self.norm2(x + self.drop(ffn_out))
        return x

class DecoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, ff_dim, dropout=0.1):
        super().__init__()
        self.self_mha = MultiHeadAttention(d_model, num_heads, dropout)
        self.cross_mha = MultiHeadAttention(d_model, num_heads, dropout)
        self.ffn = PositionwiseFFN(d_model, ff_dim, dropout)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
        self.drop = nn.Dropout(dropout)

    def forward(self, x, enc_out, tgt_mask= None, cross_mask = None):
        _x = self.self_mha(x, x, x, tgt_mask)
        x = self.norm1(x + self.drop(_x))
        _x = self.cross_mha(x, enc_out, enc_out, cross_mask)
        x = self.norm2(x + self.drop(_x))
        _x = self.ffn(x)
        x = self.norm3(x + self.drop(_x))
        return x

class Decoder(nn.Module):
    def __init__(self, vocab_size, d_model, num_layers, num_heads, ff_dim, pad_id, dropout=0.1, max_len=512):
        super().__init__()
        self.pad = pad_id
        self.emb = nn.Embedding(vocab_size, d_model, padding_idx=pad_id)
        self.pos = PositionalEncoding(d_model, max_len)
        self.layers = nn.ModuleList([
            DecoderLayer(d_model, num_heads, ff_dim, dropout) for _ in range(num_layers)
        ])

    def forward(self, tgt, enc_out, src_mask):
        x = self.pos(self.emb(tgt))
        B, Lt = tgt.size()
        causal = create_look_ahead_mask(Lt, device=tgt.device)
        padmask_tgt = create_padding_mask(tgt, self.pad)
        tgt_mask = combine_masks(causal, padmask_tgt)
        cross_mask = src_mask
        for layer in self.layers:
            x = layer(x, enc_out, tgt_mask, cross_mask)
        return x

class Encoder(nn.Module):
    def __init__(self, vocab_size, d_model, num_layers, num_heads, ff_dim, pad_id, dropout=0.1, max_len=512):
        super().__init__()
        self.pad = pad_id
        self.emb = nn.Embedding(vocab_size, d_model, padding_idx=pad_id)
        self.pos = PositionalEncoding(d_model, max_len)
        self.layers = nn.ModuleList([
            EncoderLayer(d_model, num_heads, ff_dim, dropout) for _ in range(num_layers)
        ])

    def forward(self, src):
        x = self.pos(self.emb(src))
        src_mask = create_padding_mask(src, self.pad)
        for layer in self.layers:
            x = layer(x, src_mask)
        return x, src_mask

class TransformerSeq2Seq(nn.Module):
    def __init__(self, vocab_size, d_model, num_layers, num_heads, ff_dim, pad_id, dropout=0.1, max_len=512):
        super().__init__()
        self.encoder = Encoder(vocab_size, d_model, num_layers, num_heads, ff_dim, pad_id, dropout, max_len)
        self.decoder = Decoder(vocab_size, d_model, num_layers, num_heads, ff_dim, pad_id, dropout, max_len)
        self.out = nn.Linear(d_model, vocab_size)
        self.pad = pad_id

    def forward(self, src, tgt):
        enc_out, src_mask = self.encoder(src)
        dec_out = self.decoder(tgt, enc_out, src_mask)
        logits = self.out(dec_out)
        return logits

class TransformerWarmupLR(torch.optim.lr_scheduler.LambdaLR):
  def __init__(self, optimizer, d_model, warmup_steps=4000):
    def lr_lambda(step):
      step=step+1
      return (d_model ** -0.5) * min(step ** -0.5, step * (warmup_steps ** -1.5))
    super().__init__(optimizer, lr_lambda=lr_lambda)

def train_step(model,batch,optimizer,loss_fn,device):
  model.train()
  src,tgt_in,tgt_out=[x.to(device) for x in batch]
  optimizer.zero_grad()
  logits=model(src,tgt_in)
  B,L,V=logits.shape
  loss=loss_fn(logits.reshape(B*L,V), tgt_out.reshape(B*L))
  loss.backward()
  optimizer.step()
  with torch.no_grad():
    preds=logits.argmax(-1)
    mask=(tgt_out!=model.pad)
    acc=(preds.eq(tgt_out) & mask).float().sum()/mask.float().sum()
  return loss.item(), acc.item()

In [13]:
vocab_size = 8000
d_model    = 192
n_layers   = 3
n_heads    = 6
ff_dim     = 768
pad_id     = 0
max_len    = 40


In [14]:
device='cuda' if torch.cuda.is_available() else 'cpu'
model=TransformerSeq2Seq(vocab_size, d_model, n_layers, n_heads, ff_dim, pad_id, 0.1, max_len).to(device)
optimizer=torch.optim.Adam(model.parameters(), lr=1.0, betas=(0.9, 0.98), eps=1e-9)
scheduler=TransformerWarmupLR(optimizer, d_model, warmup_steps=4000)
loss_fn=nn.CrossEntropyLoss(ignore_index=pad_id).to(device)

In [15]:
BATCH_SIZE   = 64
EPOCHS       = 5
MAX_LEN      = 40
LR           = 1.0
CLIP_NORM    = 1.0
VAL_FRACTION = 0.1
VOCAB_SIZE = 8000

@torch.no_grad()
def validate():
  model.eval()
  total_loss, total_acc, n_batches=0,0,0
  for batch in val_dl:
    src,dec_in,tgt=[x.to(device) for x in batch]
    logits=model(src,dec_in)
    B,L,V=logits.shape
    loss=loss_fn(logits.reshape(B*L,V), tgt.reshape(B*L))
    preds=logits.argmax(-1)
    mask=(tgt!=PAD)
    acc = (preds.eq(tgt) & mask).float().sum() / mask.float().sum()
    total_loss += loss.item()
    total_acc  += acc.item()
    n_batches  += 1
  return total_loss / max(1,n_batches), total_acc / max(1,n_batches)


# 학습

In [17]:
import time
for epoch in range(1, 20):
  model.train()
  epoch_loss, epoch_acc, n_batches=0,0,0
  t0=time.time()
  for batch in train_dl:
    loss, acc=train_step(model,batch,optimizer,loss_fn, device)
    torch.nn.utils.clip_grad_norm_(model.parameters(), CLIP_NORM)
    scheduler.step()
    epoch_loss += loss
    epoch_acc  += acc
    n_batches  += 1
  train_loss = epoch_loss / max(1,n_batches)
  train_acc  = epoch_acc  / max(1,n_batches)

  val_loss, val_acc = validate()
  dt = time.time() - t0
  print(f"[{epoch:02d}] {dt:.1f}s  train loss {train_loss:.4f} acc {train_acc:.4f} | "
          f"val loss {val_loss:.4f} acc {val_acc:.4f}")

[01] 6.4s  train loss 4.2883 acc 0.3625 | val loss 4.4016 acc 0.3704
[02] 6.3s  train loss 3.9754 acc 0.3874 | val loss 4.2126 acc 0.3901
[03] 6.4s  train loss 3.6749 acc 0.4141 | val loss 4.0478 acc 0.4033
[04] 6.3s  train loss 3.3698 acc 0.4425 | val loss 3.9035 acc 0.4209
[05] 6.6s  train loss 3.0609 acc 0.4768 | val loss 3.7798 acc 0.4329
[06] 6.3s  train loss 2.7283 acc 0.5168 | val loss 3.6582 acc 0.4524
[07] 6.8s  train loss 2.3887 acc 0.5627 | val loss 3.6086 acc 0.4629
[08] 6.3s  train loss 2.0515 acc 0.6135 | val loss 3.5134 acc 0.4790
[09] 6.4s  train loss 1.7197 acc 0.6657 | val loss 3.4615 acc 0.4924
[10] 6.4s  train loss 1.4046 acc 0.7187 | val loss 3.4696 acc 0.5055
[11] 6.4s  train loss 1.1300 acc 0.7674 | val loss 3.4795 acc 0.5172
[12] 6.5s  train loss 0.8928 acc 0.8116 | val loss 3.5505 acc 0.5253
[13] 6.4s  train loss 0.7133 acc 0.8428 | val loss 3.6209 acc 0.5317
[14] 6.5s  train loss 0.5772 acc 0.8702 | val loss 3.6973 acc 0.5403
[15] 6.4s  train loss 0.4840 acc 0

# 테스트

In [18]:
def decoder_inference(model, sentence, tokenizer, device='cpu', max_length=40,start_token=None, end_token=None):
  START_TOKEN=start_token if start_token is not None else tokenizer.bos_id()
  END_TOKEN=end_token if end_token is not None else tokenizer.eos_id()
  sentence=normalize_text(sentence)

  enc_input_ids=[START_TOKEN]+tokenizer.encode(sentence)+[END_TOKEN]
  enc_input=torch.tensor([enc_input_ids], dtype=torch.long, device=device)

  dec_input=torch.tensor([[START_TOKEN]], dtype=torch.long, device=device)

  model.eval()
  generated_tokens=[]

  with torch.no_grad():
    for _ in range(max_length):
      logits=model(enc_input, dec_input)
      last_step_logits=logits[:,-1,:]
      predicted_id=torch.argmax(last_step_logits, dim=-1)
      token_id=predicted_id.item()
      if token_id==END_TOKEN:
        break
      generated_tokens.append(token_id)
      dec_input=torch.cat([dec_input,predicted_id.unsqueeze(0)], dim=1)
  output_sequence=[START_TOKEN]+generated_tokens
  return output_sequence, tokenizer.decode(generated_tokens)

In [19]:
def test_inference(example):
  tokens, decoded=decoder_inference(model, example, sp, device=device)
  print(f'question: {example}')
  print(f'answer: {decoded}')


In [20]:
test_inference('안녕!')

question: 안녕!
answer: 안녕하세요.


In [26]:
test_inference('뭐 하고 있어요?')

question: 뭐 하고 있어요?
answer: 저는 위로봇이요.


In [22]:
test_inference('오늘 날씨가 어때요?')

question: 오늘 날씨가 어때요?
answer: 오늘은 예능이요.


In [23]:
test_inference('너 누구예요?')

question: 너 누구예요?
answer: 저도 궁금하네요.


In [27]:
test_inference('사람들 왜 살아요?')

question: 사람들 왜 살아요?
answer: 그렇게 만들었나요.


# 결과:

**한국어 전처리.**

normalize_text:
소문자화, 정규식으로 한글/영문/숫자/기본 문장부호(.,?!) 만 유지, 공백 정리.

정제된 Q/A로 코퍼스 파일 생성.

SentencePiece(UNigram, vocab=8000, coverage=1.0) 학습, 스페셜 토큰 ID 고정:
PAD=0, BOS=1, EOS=2, UNK=3.

Teacher Forcing 데이터 구성:

인코더 입력: [BOS] + encode(Q) + [EOS] (패딩),

디코더: dec_input=a[:-1], target=a[1:].

**모델.**

사인/코사인 Positional Encoding,

Multi-Head Attention(Q/K/V/Out = Linear(d_model, d_model)), 헤드 분할/결합,

EncoderLayer/DecoderLayer + Residual/LayerNorm, FFN(ReLU, Dropout),

마스크: 패딩 마스크 (B,1,1,L), look-ahead 상삼각 마스크 (1,1,L,L).

**학습.**

손실: CrossEntropyLoss(ignore_index=PAD),

지표: PAD 제외 토큰 정확도,

검증: random_split(검증 10%),

Warmup LR 스케줄러(Transformer 방식, warmup_steps=4000),

Gradient Clipping(clip_grad_norm_(…, 1.0)),

하이퍼파라미터 예시: d_model=192, heads=6, ff=768, layers=3, MAX_LEN=40, batch=64.

**훈련 과정에서 학습 데이터의 손실은 꾸준히 감소하고 정확도는 0.36에서 0.90 이상까지 상승했습니다. 검증 데이터에서도 성능이 개선되었지만, 약 12~13 에포크 이후부터는 과적합 현상이 나타나면서 train loss는 계속 줄어드는 반면 val loss는 점차 증가했습니다. 그럼에도 불구하고 모델은 간단한 질문에는 대답할 수 있는 수준에 도달하여 기본적인 대화형 챗봇으로 활용 가능합니다**.

Greedy decoding: [BOS]로 시작, 마지막 로짓 argmax 반복, [EOS] 또는 최대 길이에서 중단.
