## 챗봇 코드를 활용한 GPT-1 모델 구현

### 1. Transformer에서 변경된 부분

인코더-디코더 구조에서 디코더만 사용하는 구조로 사용하기 위해 chatbot코드에서 다음과 같은 부분 변경

1. **Encoder 클래스 전체 삭제**
  - 디코더 블록만 사용하므로, 문맥을 인코딩하는 _Encoder_, _EncoderLayer_ 클래스 삭제

2. **DecoderLayer 내부 구조 변경**
  - 인코더의 출력을 받아 처리하는 부분(인코더-디코더 어텐션 레이어, LayerNorm) 제거 

3. **Decoder 클래스 입력 변경**
  - _DecoderLayer_ 에서 인코더-디코더 어텐션이 없어져, 입력 값으로 받던 _enc_outputs_ 제거

4. **모델 입력 임베딩 방식 변경**
  - 기존 _Transformer_ 에서는 임베딩 값에 `sqrt(d_model)`을 곱해 스케일링했지만, 논문에서는 해당 부분이 없어 스케일링 부분 제거

5. **Transformer 클래스를 GPT1 클래스로 변경**
  - _Transformer 클래스_ 를, _Decoder_ 만 사용하는 _GPT1 클래스_ 로 변경.
  - _forward_ 에서도 인코더 관련 로직 제거하고, 디코더 입력만 처리하도록 변경

6. **데이터셋 구성 변경**
  - **입력: `[BOS] + sequence`**, **타겟: `sequence + [EOS]`** 형태로 변경

In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F

from torch.utils.data import Dataset
from torch.utils.data import DataLoader
import torch.optim as optim
import sentencepiece as spm

import math
import os
import re
import csv

### 2. 모델 구성요소 정의

In [2]:
# 단어의 순서 정보를 모델에 알려주기 위한 클래스
class PositionalEncoding(nn.Module):
    def __init__(self, position, d_model):
        super(PositionalEncoding, self).__init__()
        self.d_model = d_model
        self.position = position

        self.pos_encoding = self._build_pos_encoding(position, d_model)

    def _get_angles(self, position, i, d_model):
        return 1.0 / (10000.0 ** ((2.0 * (i // 2)) / d_model)) * position

    def _build_pos_encoding(self, position, d_model):
        pos = torch.arange(position, dtype=torch.float32).unsqueeze(1)
        i = torch.arange(d_model, dtype=torch.float32).unsqueeze(0)

        angle_rads = self._get_angles(pos, i, d_model)
        sines = torch.sin(angle_rads[:, 0::2])
        cosines = torch.cos(angle_rads[:, 1::2])

        pos_encoding = torch.zeros(position, d_model)
        pos_encoding[:, 0::2] = sines
        pos_encoding[:, 1::2] = cosines

        pos_encoding = pos_encoding.unsqueeze(0)  # shape: [1, position, d_model]
        return pos_encoding

    def forward(self, x):
        # 입력 텐서(x)에 위치 정보를 더해 반환
        return x + self.pos_encoding[:, :x.size(1), :].to(x.device)

In [3]:
# 어텐션
def scaled_dot_product_attention(query, key, value, mask=None):
    matmul_qk = torch.matmul(query, key.transpose(-1, -2))
    depth = key.size(-1)
    logits = matmul_qk / math.sqrt(depth)
    if mask is not None:
        logits = logits + (mask * -1e9)
    attention_weights = F.softmax(logits, dim=-1)
    output = torch.matmul(attention_weights, value)
    return output, attention_weights

In [4]:
# 멀티 헤드 어텐션: 어텐션을 여러 번 수행하여 다른 관점의 정보를 종합하는 클래스
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads, name="multi_head_attention"):
        super(MultiHeadAttention, self).__init__()
        self.num_heads = num_heads
        self.d_model = d_model

        assert d_model % num_heads == 0
        self.depth = d_model // num_heads

        # Q, K, V를 위한 Linear 레이어
        self.query_dense = nn.Linear(d_model, d_model)
        self.key_dense = nn.Linear(d_model, d_model)
        self.value_dense = nn.Linear(d_model, d_model)

        # 최종 출력을 위한 Linear 레이어
        self.out_dense = nn.Linear(d_model, d_model)

    def split_heads(self, x, batch_size):
        # d_model 차원을 num_heads와 depth로 분할
        x = x.view(batch_size, -1, self.num_heads, self.depth)
        return x.permute(0, 2, 1, 3)  # (batch_size, num_heads, seq_len, depth)

    def forward(self, query, key, value, mask=None):
        batch_size = query.size(0)

        # Q, K, V에 각각 Linear 레이어 적용
        query = self.query_dense(query)
        key = self.key_dense(key)
        value = self.value_dense(value)

        # 헤드 분할
        query = self.split_heads(query, batch_size)
        key = self.split_heads(key, batch_size)
        value = self.split_heads(value, batch_size)

        # 스케일드 닷 프로덕트 어텐션 수행
        scaled_attention, _ = scaled_dot_product_attention(query, key, value, mask)

        # 헤드 다시 합치기
        scaled_attention = scaled_attention.permute(0, 2, 1, 3).contiguous()
        concat_attention = scaled_attention.view(batch_size, -1, self.d_model)

        # 최종 Linear 레이어 적용
        output = self.out_dense(concat_attention)
        return output

In [5]:
# 마스킹 함수
def create_padding_mask(x):
    # x == 0 위치를 찾아 float형 1로 변환
    mask = (x == 0).float()
    # (batch_size, seq_len) -> (batch_size, 1, 1, seq_len)
    mask = mask.unsqueeze(1).unsqueeze(2)
    return mask

In [6]:
def create_look_ahead_mask(x):
    seq_len = x.size(1)

    # (seq_len, seq_len) 크기의 하삼각 행렬(tril) 생성 후 1에서 빼서
    # 상삼각이 1, 하삼각(자기 자신 포함)이 0이 되도록 설정
    # => 미래 토큰(자신 인덱스보다 큰 위치) 마스킹
    look_ahead_mask = 1 - torch.tril(torch.ones((seq_len, seq_len)))

    # 패딩 마스크 생성 (shape: (batch_size, 1, 1, seq_len))
    padding_mask = create_padding_mask(x)

    # look_ahead_mask: (seq_len, seq_len) -> (1, seq_len, seq_len)
    look_ahead_mask = look_ahead_mask.unsqueeze(0)
    # -> (1, seq_len, seq_len) -> (1, 1, seq_len, seq_len)
    look_ahead_mask = look_ahead_mask.unsqueeze(1)
    look_ahead_mask = look_ahead_mask.to(x.device)

    # look-ahead 마스크와 패딩 마스크를 합성 (둘 중 하나라도 1이면 마스킹)
    # 최종 shape은 브로드캐스팅으로 (batch_size, 1, seq_len, seq_len)
    combined_mask = torch.max(look_ahead_mask, padding_mask)
    return combined_mask

In [7]:
class DecoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, ff_dim, dropout=0.1):
        super(DecoderLayer, self).__init__()

        # 1. Masked Multi-Head Self-Attention
        self.self_mha = MultiHeadAttention(d_model, num_heads)
        self.norm1 = nn.LayerNorm(d_model, eps=1e-6)

        # 2. Feed-Forward Network
        self.ffn = nn.Sequential(
            nn.Linear(d_model, ff_dim),
            nn.ReLU(),
            nn.Linear(ff_dim, d_model)
        )
        self.norm2 = nn.LayerNorm(d_model, eps=1e-6)
        
        # Dropout
        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)

    # forward 메소드에서 enc_outputs 관련 코드 제거, out 숫자도 변경
    def forward(self, x, look_ahead_mask=None):
        # 1. Masked Multi-Head Self-Attention
        self_attn_out = self.self_mha(x, x, x, mask=look_ahead_mask)
        self_attn_out = self.dropout1(self_attn_out)
        out1 = self.norm1(x + self_attn_out)  # 잔차 연결 + LayerNorm

        # 2. Feed-Forward Network
        ffn_out = self.ffn(out1)
        ffn_out = self.dropout2(ffn_out)
        out2 = self.norm2(out1 + ffn_out)  # 잔차 연결 + LayerNorm

        return out2

In [8]:
class Decoder(nn.Module):
    def __init__(self,
                 vocab_size,
                 num_layers,
                 ff_dim,
                 d_model,
                 num_heads,
                 dropout=0.1):
        super(Decoder, self).__init__()
        self.d_model = d_model

        # (1) 임베딩 레이어
        self.embedding = nn.Embedding(vocab_size, d_model)

        # (2) 포지셔널 인코딩
        # 실제 학습 시에는 최대 시퀀스 길이에 맞추어 쓰기도 함
        self.pos_encoding = PositionalEncoding(position=vocab_size, d_model=d_model)

        self.dropout = nn.Dropout(dropout)

        # (3) DecoderLayer 쌓기
        self.dec_layers = nn.ModuleList([
            DecoderLayer(d_model, num_heads, ff_dim, dropout)
            for _ in range(num_layers)
        ])

    def forward(self, x, look_ahead_mask=None):
        # 논문에서 h0 = UWe + Wp로 되어있어 UWe에서 d_model의 제곱근을 곱하지 않아 해당 부분 주석 처리
        x = self.embedding(x) # * math.sqrt(self.d_model)
        x = self.pos_encoding(x)
        # 수식에는 드롭아웃은 안보이지만 논문에
        # and attention dropouts with a rate of 0.1 for regularization 이라고 되어있어 유지
        x = self.dropout(x)

        for layer in self.dec_layers:
            # enc_outputs, padding_mask 제거
            x = layer(x, look_ahead_mask)

        return x

In [9]:
# Transformer를 가져와 GPT로 변경
class GPT1(nn.Module):
    def __init__(self,
                 vocab_size,
                 num_layers,
                 units,
                 d_model,
                 num_heads,
                 dropout=0.1):
        super(GPT1, self).__init__()

        self.decoder = Decoder (
            vocab_size=vocab_size,
            num_layers=num_layers,
            ff_dim=units,
            d_model=d_model,
            num_heads=num_heads,
            dropout=dropout
        )
        self.final_linear = nn.Linear(d_model, vocab_size)

    # forward 메소드에서 인코더 관련 로직(inputs, 마스크) 모두 제거
    def forward(self, dec_inputs):
        # 1) 디코더가 사용할 look-ahead 마스크 생성
        look_ahead_mask = create_look_ahead_mask(dec_inputs)

        # 2) 디코더만으로 출력 계산
        dec_outputs = self.decoder(x=dec_inputs, look_ahead_mask=look_ahead_mask)

        # 3) 최종 출력층 통과
        logits = self.final_linear(dec_outputs)
        return logits

### 3. 데이터 준비 및 토크나이저

In [10]:
# 전처리 함수
def preprocess_sentence(sentence):
    sentence = sentence.lower().strip()
    sentence = re.sub(r'([?.!,])', r' \1 ', sentence)
    sentence = re.sub(r'[" "]+', ' ', sentence)
    sentence = re.sub(r'[^가-힣a-zA-Z0-9?.!,]+', ' ', sentence)
    sentence = sentence.strip()
    return sentence

In [11]:
# SentencePiece 모델 학습
data_dir = 'data'
corpus_file = os.path.join(data_dir, 'chatbot_corpus.txt')
model_prefix = 'spm_chatbot_gpt'
vocab_size = 8000

# SentencePiece 모델 학습
spm.SentencePieceTrainer.Train(
    input=corpus_file,
    model_prefix=model_prefix,
    vocab_size=vocab_size,
    character_coverage=1.0,
    model_type='bpe',
    max_sentence_length=999999,
    bos_id=1,
    eos_id=2,
    pad_id=0,
    unk_id=3,
)

sentencepiece_trainer.cc(78) LOG(INFO) Starts training with : 
trainer_spec {
  input: data/chatbot_corpus.txt
  input_format: 
  model_prefix: spm_chatbot_gpt
  model_type: BPE
  vocab_size: 8000
  self_test_sample_size: 0
  character_coverage: 1
  input_sentence_size: 0
  shuffle_input_sentence: 1
  seed_sentencepiece_size: 1000000
  shrinking_factor: 0.75
  max_sentence_length: 999999
  num_threads: 16
  num_sub_iterations: 2
  max_sentencepiece_length: 16
  split_by_unicode_script: 1
  split_by_number: 1
  split_by_whitespace: 1
  split_digits: 0
  pretokenization_delimiter: 
  treat_whitespace_as_suffix: 0
  allow_whitespace_only_pieces: 0
  required_chars: 
  byte_fallback: 0
  vocabulary_output_piece_score: 1
  train_extremely_large_corpus: 0
  seed_sentencepieces_file: 
  hard_vocab_limit: 1
  use_all_vocab: 0
  unk_id: 3
  bos_id: 1
  eos_id: 2
  pad_id: 0
  unk_piece: <unk>
  bos_piece: <s>
  eos_piece: </s>
  pad_piece: <pad>
  unk_surface:  ⁇ 
  enable_differential_privacy:

In [12]:
sp = spm.SentencePieceProcessor()
sp.Load(f'{model_prefix}.model')

True

In [13]:
# GPT 언어 모델 학습을 위한 데이터셋으로 변경
class GPTDataset(Dataset):
    def __init__(self, csv_file, sp_model, max_length=40):
        super().__init__()
        self.sp = sp_model
        self.max_length = max_length
        self.data = []

        with open(csv_file, 'r', encoding='utf-8') as f:
            reader = csv.reader(f)
            next(reader)  # 헤더 행 건너뛰기
            pairs = [(row[0], row[1]) for row in reader]

        for q_text, a_text in pairs:
            # 질문과 답변 문장을 모두 독립적인 학습 데이터로 사용
            for text in [q_text, a_text]:
                ids = self.sp.EncodeAsIds(text)

                bos_id = self.sp.bos_id()
                eos_id = self.sp.eos_id()
                pad_id = self.sp.pad_id()

                # dec_input: 모델의 입력. [BOS] 토큰으로 시작
                # target: 모델의 출력(정답). [EOS] 토큰으로 끝남
                # dec_input을 보고 target을 예측하도록 학습
                dec_input = [bos_id] + ids
                target = ids + [eos_id]

                if len(dec_input) > max_length or len(target) > max_length:
                    continue

                dec_input += [pad_id] * (max_length - len(dec_input))
                target += [pad_id] * (max_length - len(target))

                self.data.append({
                    'dec_input': dec_input,
                    'target': target
                })

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        sample = self.data[idx]
        dec_input = torch.tensor(sample['dec_input'], dtype=torch.long)
        target = torch.tensor(sample['target'], dtype=torch.long)
        # 인코더 입력 리턴 값에서 삭제
        return dec_input, target

In [14]:
# 데이터셋 및 데이터로더 생성
csv_file_path = 'data/ChatbotData.csv'
BATCH_SIZE = 64
MAX_LENGTH = 20

dataset = GPTDataset(csv_file_path, sp, max_length=MAX_LENGTH)
dataloader = DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True)

print(f'총 샘플 수: {len(dataset)}개')

총 샘플 수: 23625개


### 4. 모델 학습

In [15]:
# 하이퍼파라미터 설정
NUM_LAYERS = 3
D_MODEL = 256
NUM_HEADS = 8
UNITS = 512
DROPOUT = 0.1
VOCAB_SIZE = vocab_size

# 모델 생성
model = GPT1(
    vocab_size=VOCAB_SIZE,
    num_layers=NUM_LAYERS,
    units=UNITS,
    d_model=D_MODEL,
    num_heads=NUM_HEADS,
    dropout=DROPOUT
    )

# 모델 구조 출력
print(model)

GPT1(
  (decoder): Decoder(
    (embedding): Embedding(8000, 256)
    (pos_encoding): PositionalEncoding()
    (dropout): Dropout(p=0.1, inplace=False)
    (dec_layers): ModuleList(
      (0-2): 3 x DecoderLayer(
        (self_mha): MultiHeadAttention(
          (query_dense): Linear(in_features=256, out_features=256, bias=True)
          (key_dense): Linear(in_features=256, out_features=256, bias=True)
          (value_dense): Linear(in_features=256, out_features=256, bias=True)
          (out_dense): Linear(in_features=256, out_features=256, bias=True)
        )
        (norm1): LayerNorm((256,), eps=1e-06, elementwise_affine=True)
        (ffn): Sequential(
          (0): Linear(in_features=256, out_features=512, bias=True)
          (1): ReLU()
          (2): Linear(in_features=512, out_features=256, bias=True)
        )
        (norm2): LayerNorm((256,), eps=1e-06, elementwise_affine=True)
        (dropout1): Dropout(p=0.1, inplace=False)
        (dropout2): Dropout(p=0.1, inplace

In [None]:
loss_function = nn.CrossEntropyLoss(ignore_index=sp.pad_id())
optimizer = optim.AdamW(model.parameters(), lr=5e-4, betas=(0.9, 0.98), eps=1e-9)

# 스케쥴러는 챗봇 프로젝트 때 학습 진행이 지지부진해서 고정 학습률로 진행
# scheduler = lr_scheduler.LambdaLR(optimizer, lr_lambda=get_lr_lambda(D_MODEL, warmup_steps=4000))

def accuracy_function(y_pred, y_true, pad_id=0):
    """
    y_pred: (batch_size, seq_len, vocab_size)
    y_true: (batch_size, seq_len)
    """
    preds = y_pred.argmax(dim=-1)  # (batch_size, seq_len)
    mask = (y_true != pad_id)
    correct = (preds == y_true) & mask
    acc = correct.float().sum() / mask.float().sum()
    return acc

In [17]:
device = torch.device('mps' if torch.backends.mps.is_available() else 'cpu')
model = model.to(device)
print(f'Using device: {device}')

Using device: mps


In [18]:
def train_step(model, batch, optimizer, loss_function, device):
    model.train()
    # enc_input 제거
    dec_input, target = [x.to(device) for x in batch]

    optimizer.zero_grad()

    # 모델에 디코더 입력만 전달
    logits = model(dec_input)

    loss = loss_function(logits.permute(0, 2, 1), target)
    loss.backward()
    optimizer.step()

    return loss.item(), accuracy_function(logits, target, pad_id=sp.pad_id())

In [19]:
def train(model, dataloader, optimizer, loss_function, num_epochs, device):
    model.to(device)

    for epoch in range(num_epochs):
        total_loss, total_acc = 0, 0
        for step, batch in enumerate(dataloader):
            loss, acc = train_step(model, batch, optimizer, loss_function, device)
            total_loss += loss
            total_acc += acc

            # 일정 스텝마다 로그 출력
            if step % 100 == 0:
                print(f'[Epoch {epoch+1}, Step {step}] Loss: {loss:.4f}, Acc: {acc:.4f}')

        avg_loss = total_loss / len(dataloader)
        avg_acc = total_acc / len(dataloader)
        print(f'Epoch {epoch+1} Completed - Avg Loss: {avg_loss:.4f}, Avg Acc: {avg_acc:.4f}')

In [20]:
%%time

train(
    model=model,
    dataloader=dataloader,
    optimizer=optimizer,
    loss_function=loss_function,
    num_epochs=25,
    device=device
)

[Epoch 1, Step 0] Loss: 9.1256, Acc: 0.0000
[Epoch 1, Step 100] Loss: 6.3962, Acc: 0.1990
[Epoch 1, Step 200] Loss: 6.1499, Acc: 0.2262
[Epoch 1, Step 300] Loss: 6.0385, Acc: 0.2506
Epoch 1 Completed - Avg Loss: 6.3608, Avg Acc: 0.2118
[Epoch 2, Step 0] Loss: 5.6927, Acc: 0.2306
[Epoch 2, Step 100] Loss: 5.8626, Acc: 0.2272
[Epoch 2, Step 200] Loss: 5.6189, Acc: 0.2424
[Epoch 2, Step 300] Loss: 5.4333, Acc: 0.2697
Epoch 2 Completed - Avg Loss: 5.5858, Avg Acc: 0.2523
[Epoch 3, Step 0] Loss: 5.4292, Acc: 0.2353
[Epoch 3, Step 100] Loss: 5.0997, Acc: 0.2721
[Epoch 3, Step 200] Loss: 5.1872, Acc: 0.2547
[Epoch 3, Step 300] Loss: 5.1327, Acc: 0.2736
Epoch 3 Completed - Avg Loss: 5.0696, Avg Acc: 0.2770
[Epoch 4, Step 0] Loss: 4.7622, Acc: 0.2920
[Epoch 4, Step 100] Loss: 4.6921, Acc: 0.3013
[Epoch 4, Step 200] Loss: 4.6063, Acc: 0.2914
[Epoch 4, Step 300] Loss: 4.6991, Acc: 0.2909
Epoch 4 Completed - Avg Loss: 4.6399, Avg Acc: 0.3031
[Epoch 5, Step 0] Loss: 4.1053, Acc: 0.3357
[Epoch 5, St

### 5. 모델 추론

In [21]:
def gpt_inference(model, prompt, tokenizer, device='cpu', top_k=10):
    START_TOKEN = tokenizer.bos_id()
    END_TOKEN = tokenizer.eos_id()
    MAX_LENGTH = 40

    input_ids = [START_TOKEN] + tokenizer.encode(prompt)
    dec_input = torch.tensor([input_ids], dtype=torch.long, device=device)

    model.eval()
    with torch.no_grad():
        for i in range(MAX_LENGTH - len(input_ids)):
            logits = model(dec_input)
            last_step_logits = logits[:, -1, :]

            probs = F.softmax(last_step_logits, dim=-1)
            top_k_probs, top_k_indices = torch.topk(probs, k=top_k, dim=-1)

            predicted_id = torch.multinomial(top_k_probs, num_samples=1)
            predicted_id = top_k_indices.gather(-1, predicted_id)

            # 첫 번째로 생성된 토큰이 END_TOKEN이라도 무시하고 계속 진행
            if i > 0 and predicted_id.item() == END_TOKEN:
                break

            dec_input = torch.cat([dec_input, predicted_id.view(1,1)], dim=1)

    output_sequence = dec_input.squeeze(0).tolist()
    return output_sequence[len(input_ids):]

In [22]:
def generate_sentence(model, sentence, tokenizer, device='cpu', top_k=10):
    # 문장 전처리
    processed_sentence = preprocess_sentence(sentence)

    # 프롬프트 형식 구성
    prompt = f"Q: {processed_sentence}\\nA:"

    # 추론 수행
    answer_seq = gpt_inference(model, prompt, tokenizer, device=device, top_k=top_k)

    # 결과 디코딩
    predicted_sentence = tokenizer.decode(
        [token for token in answer_seq if token not in [tokenizer.bos_id(), tokenizer.eos_id(), tokenizer.pad_id()]]
    )

    print('입력 문장:', sentence)
    print('생성 문장:', predicted_sentence)
    print()
    return predicted_sentence

In [23]:
sentences = [
    '벌써 12시가 지났네.',
    '요즘 너무 힘들어.',
    '너무 지루해.',
    '안녕하세요.',
    '요즘 날씨가 너무 좋다.',
    '사고가 났어.',
    '다음 주에 뭐 할까?',
    '수업이 재미있었어.',
    '오늘은 뭐 할까?',
    '여기 어때?',
]

for sentence in sentences:
    generate_sentence(model, sentence, sp, device=device, top_k=10)

입력 문장: 벌써 12시가 지났네.
생성 문장: 해서 말을 했어

입력 문장: 요즘 너무 힘들어.
생성 문장: 가한 여자가 좋아.

입력 문장: 너무 지루해.
생성 문장: 하네

입력 문장: 안녕하세요.
생성 문장: 하네.

입력 문장: 요즘 날씨가 너무 좋다.
생성 문장: 다.

입력 문장: 사고가 났어.
생성 문장: 로 여행하고싶어

입력 문장: 다음 주에 뭐 할까?
생성 문장: 후

입력 문장: 수업이 재미있었어.
생성 문장: 가한 썸인가요?

입력 문장: 오늘은 뭐 할까?
생성 문장: 란  ⁇ 

입력 문장: 여기 어때?
생성 문장: 를 못 듣겠어

