In [1]:
import torch
print(torch.__version__)

2.7.1+cu118


In [7]:
# Step 1. 데이터 수집하기


In [None]:
# Step 2. 데이터 전처리하기

In [2]:
# 4. 트랜스포머의 입력 이해하기
# 먼저 앞으로 진행하면서 필요한 패키지를 임포트하겠습니다!
!pip install sentencepiece
# sentencepiece 는 구글이 개발한 오픈소스 토크나이저로, 텍스트를 단어, 형태소 또는 문맥에 따라 의미 있는 단위(subword)로 분리해줍

Collecting sentencepiece
  Downloading sentencepiece-0.2.1-cp312-cp312-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl.metadata (10 kB)
Downloading sentencepiece-0.2.1-cp312-cp312-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl (1.4 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.4/1.4 MB[0m [31m14.2 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: sentencepiece
Successfully installed sentencepiece-0.2.1


In [3]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
import torch.optim as optim
import torch.optim.lr_scheduler as lr_scheduler
import sentencepiece as spm

import math
import os
import re
import urllib.request
import zipfile
import numpy as np
import matplotlib.pyplot as plt

In [4]:
import pandas as pd
# 현재 작업 디렉토리 확인
current_directory = os.getcwd()
print(f"현재 작업 디렉토리: {current_directory}")
# CSV 파일 로드
file_path_attempt = '/home/jovyan/work//transformer_chatbot/data/ChatbotData.csv' # 이전에 입력했던 경로에 문제가 없다면 이 경로를 다시 사용해 보세요.

try:
    df = pd.read_csv(file_path_attempt)
    print(f"파일이 성공적으로 로드되었습니다. 데이터 크기: {df.shape}")
except FileNotFoundError:
    print(f"오류: {file_path_attempt} 경로에서 파일을 찾을 수 없습니다. 경로를 다시 확인해 주세요.")
    print("팁: 파일의 절대 경로를 사용하거나, os.getcwd()로 현재 작업 디렉토리를 확인하여 상대 경로를 정확히 맞춰주세요.")

현재 작업 디렉토리: /home/jovyan/work/AIFFEL_quest_rs/Exploration/Ex07
파일이 성공적으로 로드되었습니다. 데이터 크기: (11823, 3)


In [5]:
# 질문(Q)과 답변(A)을 합쳐서 텍스트 파일로 저장
with open('songys_chatbot.txt', 'w', encoding='utf-8') as f:
    for text in df['Q']:
        f.write(text + '\n')
    for text in df['A']:
        f.write(text + '\n')

print("데이터가 'songys_chatbot.txt' 파일로 성공적으로 저장되었습니다.")

데이터가 'songys_chatbot.txt' 파일로 성공적으로 저장되었습니다.


In [6]:
# 2. SentencePiece 모델 학습


# SentencePiece 모델 학습
spm.SentencePieceTrainer.train(
    '--input=songys_chatbot.txt '  # 위에서 생성한 입력 텍스트 파일
    '--model_prefix=chatbot_sp '  # 생성될 모델 파일의 접두사
    '--vocab_size=8000 '          # 어휘 크기 설정
    '--model_type=bpe '           # BPE(Byte-Pair Encoding) 모델 사용
    '--unk_id=0 '
    '--pad_id=1 '
    '--bos_id=2 '                 # [START] 토큰 ID
    '--eos_id=3'                  # [END] 토큰 ID
)

print("SentencePiece 모델 학습이 완료되었습니다. 'chatbot_sp.model', 'chatbot_sp.vocab' 파일이 생성되었습니다.")

SentencePiece 모델 학습이 완료되었습니다. 'chatbot_sp.model', 'chatbot_sp.vocab' 파일이 생성되었습니다.


sentencepiece_trainer.cc(178) LOG(INFO) Running command: --input=songys_chatbot.txt --model_prefix=chatbot_sp --vocab_size=8000 --model_type=bpe --unk_id=0 --pad_id=1 --bos_id=2 --eos_id=3
sentencepiece_trainer.cc(78) LOG(INFO) Starts training with : 
trainer_spec {
  input: songys_chatbot.txt
  input_format: 
  model_prefix: chatbot_sp
  model_type: BPE
  vocab_size: 8000
  self_test_sample_size: 0
  character_coverage: 0.9995
  input_sentence_size: 0
  shuffle_input_sentence: 1
  seed_sentencepiece_size: 1000000
  shrinking_factor: 0.75
  max_sentence_length: 4192
  num_threads: 16
  num_sub_iterations: 2
  max_sentencepiece_length: 16
  split_by_unicode_script: 1
  split_by_number: 1
  split_by_whitespace: 1
  split_digits: 0
  pretokenization_delimiter: 
  treat_whitespace_as_suffix: 0
  allow_whitespace_only_pieces: 0
  required_chars: 
  byte_fallback: 0
  vocabulary_output_piece_score: 1
  train_extremely_large_corpus: 0
  seed_sentencepieces_file: 
  hard_vocab_limit: 1
  use_a

In [None]:
# Step 3. SentencePiece 사용하기

In [9]:
# SentencePiece 모델 로드
sp = spm.SentencePieceProcessor()
sp.load('chatbot_sp.model')

class ChatbotDataset(Dataset):
    def __init__(self, df, sp, max_seq_length):
        self.df = df
        self.sp = sp
        self.max_seq_length = max_seq_length
        self.q_tokens = [self.sp.encode_as_ids(q) for q in df['Q']]
        self.a_tokens = [self.sp.encode_as_ids(a) for a in df['A']]

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        # 질문 토큰화 및 특수 토큰 추가 (입력)
        q_ids = [self.sp.bos_id()] + self.q_tokens[idx] + [self.sp.eos_id()]
        
        # 답변 토큰화 및 특수 토큰 추가 (출력)
        a_ids = [self.sp.bos_id()] + self.a_tokens[idx] + [self.sp.eos_id()]
        
        # 패딩
        q_padded = self.pad_sequence(q_ids)
        a_padded = self.pad_sequence(a_ids)

        return torch.tensor(q_padded, dtype=torch.long), torch.tensor(a_padded, dtype=torch.long)
    
    def pad_sequence(self, sequence):
        if len(sequence) >= self.max_seq_length:
            return sequence[:self.max_seq_length]
        else:
            padding = [self.sp.pad_id()] * (self.max_seq_length - len(sequence))
            return sequence + padding


In [11]:
# 5. 전처리 함수 호출 한국어 전처리를 통해 학습 데이터셋을 구축하였다.
def preprocess_data(dataframe, sp_processor, max_len):
    """
    데이터프레임과 SentencePiece 프로세서를 사용하여
    전처리된 PyTorch Dataset을 생성합니다.
    """
    return ChatbotDataset(dataframe, sp_processor, max_len)

In [12]:
# 에러가 난다 다시 코딩을 바꾼다
# 클래스 외부에서 토큰 ID를 디코딩하려면, self 대신에 위에서 생성한 sp 객체와 dataset 객체를 직접 사용해야 합니다.
# Dataset 클래스 인스턴스 생성
max_len = 50 
dataset = ChatbotDataset(df, sp, max_len)

# 데이터셋의 첫 번째 항목 확인
q_tensor, a_tensor = dataset[0]
print("질문 텐서 (첫 번째 샘플):", q_tensor.shape)
print("답변 텐서 (첫 번째 샘플):", a_tensor.shape)
print("질문 토큰 ID:", q_tensor)
print("답변 토큰 ID:", a_tensor)

# 토큰 ID를 다시 문장으로 변환하여 확인
# 'self' 대신에 위에서 생성한 'sp' 객체를 직접 사용합니다.
q_sentence = sp.decode_ids(q_tensor.tolist())
a_sentence = sp.decode_ids(a_tensor.tolist())

print("질문 원문:", q_sentence)
print("답변 원문:", a_sentence)

질문 텐서 (첫 번째 샘플): torch.Size([50])
답변 텐서 (첫 번째 샘플): torch.Size([50])
질문 토큰 ID: tensor([   2, 5566, 6957, 3207, 7063,    3,    1,    1,    1,    1,    1,    1,
           1,    1,    1,    1,    1,    1,    1,    1,    1,    1,    1,    1,
           1,    1,    1,    1,    1,    1,    1,    1,    1,    1,    1,    1,
           1,    1,    1,    1,    1,    1,    1,    1,    1,    1,    1,    1,
           1,    1])
답변 토큰 ID: tensor([   2, 4489,  211, 5936, 6916,    3,    1,    1,    1,    1,    1,    1,
           1,    1,    1,    1,    1,    1,    1,    1,    1,    1,    1,    1,
           1,    1,    1,    1,    1,    1,    1,    1,    1,    1,    1,    1,
           1,    1,    1,    1,    1,    1,    1,    1,    1,    1,    1,    1,
           1,    1])
질문 원문: 12시 땡!
답변 원문: 하루가 또 가네요.


In [13]:
# 데이터셋의 첫 번째 샘플을 가져와 테스트합니다.
q_tensor, a_tensor = dataset[0]

# 토큰 ID 텐서 출력
print("### 토큰 ID 출력 ###")
print(f"질문 텐서 (토큰 ID): {q_tensor}")
print(f"답변 텐서 (토큰 ID): {a_tensor}")

# 토큰 ID를 다시 문장으로 디코딩하여 확인
decoded_q = sp.decode_ids(q_tensor.tolist())
decoded_a = sp.decode_ids(a_tensor.tolist())
print("\n### 디코딩된 문장 출력 ###")
print(f"질문 (디코딩): {decoded_q}")
print(f"답변 (디코딩): {decoded_a}")

# 원본 문장과 비교하여 전처리가 잘 되었는지 확인
print("\n### 원본 문장과 비교 ###")
print(f"원본 질문: {df['Q'].iloc[0]}")
print(f"원본 답변: {df['A'].iloc[0]}")

### 토큰 ID 출력 ###
질문 텐서 (토큰 ID): tensor([   2, 5566, 6957, 3207, 7063,    3,    1,    1,    1,    1,    1,    1,
           1,    1,    1,    1,    1,    1,    1,    1,    1,    1,    1,    1,
           1,    1,    1,    1,    1,    1,    1,    1,    1,    1,    1,    1,
           1,    1,    1,    1,    1,    1,    1,    1,    1,    1,    1,    1,
           1,    1])
답변 텐서 (토큰 ID): tensor([   2, 4489,  211, 5936, 6916,    3,    1,    1,    1,    1,    1,    1,
           1,    1,    1,    1,    1,    1,    1,    1,    1,    1,    1,    1,
           1,    1,    1,    1,    1,    1,    1,    1,    1,    1,    1,    1,
           1,    1,    1,    1,    1,    1,    1,    1,    1,    1,    1,    1,
           1,    1])

### 디코딩된 문장 출력 ###
질문 (디코딩): 12시 땡!
답변 (디코딩): 하루가 또 가네요.

### 원본 문장과 비교 ###
원본 질문: 12시 땡!
원본 답변: 하루가 또 가네요.


In [14]:
# 배치 크기 설정 (예: 64)
BATCH_SIZE = 64

# DataLoader 생성
train_dataloader = DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True)

# 첫 번째 배치 확인
# DataLoader를 사용하면 데이터가 텐서 묶음으로 반환됩니다.
for batch in train_dataloader:
    q_batch, a_batch = batch
    
    print("첫 번째 배치의 질문 텐서 크기:", q_batch.shape) # (BATCH_SIZE, max_len)
    print("첫 번째 배치의 답변 텐서 크기:", a_batch.shape) # (BATCH_SIZE, max_len)
    
    # 첫 번째 배치만 확인하고 루프 종료
    break

첫 번째 배치의 질문 텐서 크기: torch.Size([64, 50])
첫 번째 배치의 답변 텐서 크기: torch.Size([64, 50])


In [27]:
# 트랜스포머 모델 정의 및 학습하기 트랜스포머 모델을 구현하여 한국어 챗봇 모델 학습을 정상적으로 진행하였다.

class TransformerModel(nn.Module): # 완성된 모델을 구현하기위해 변경
    def __init__(self, 
                 vocab_size, 
                 d_model=512,          # 임베딩 및 내부 표현 차원
                 nhead=8,              # 멀티헤드 어텐션의 헤드 수
                 num_encoder_layers=6, # 인코더 층 수
                 num_decoder_layers=6, # 디코더 층 수
                 dim_feedforward=2048, 
                 dropout=0.1):
        """
        초기화 함수. 모델의 구성 요소를 정의합니다.
        
        Args:
            vocab_size (int): 단어 집합의 크기. 모델이 예측할 수 있는 총 단어의 개수.
            d_model (int): 모델의 임베딩 차원.
            nhead (int): 어텐션 헤드의 개수.
            num_encoder_layers (int): 인코더 스택의 레이어 수.
            num_decoder_layers (int): 디코더 스택의 레이어 수.
            dim_feedforward (int): 피드포워드 신경망의 은닉층 차원.
            dropout (float): 드롭아웃 비율.
        """
        super(TransformerModel, self).__init__()
        
        # 임베딩 레이어와 위치 인코딩
        # 1. 단어를 벡터로 변환하는 임베딩 레이어
        # (vocab_size, d_model) 크기의 임베딩 테이블을 만듭니다.
        self.embedding = nn.Embedding(vocab_size, d_model)
        
        # 2. 단어의 위치 정보를 추가하는 위치 인코딩 레이어
        # 트랜스포머는 순서 정보를 직접 처리하지 못하므로, 이 레이어를 통해 위치 정보를 주입합니다.
        self.positional_encoding = PositionalEncoding(d_model)

        # 3. PyTorch의 내장된 핵심 트랜스포머 모듈 (인코더와 디코더)
        # 트랜스포머 인코더와 디코더
        # 이 모듈이 멀티-헤드 어텐션, 피드포워드 신경망 등 핵심 연산을 처리합니다.
        self.transformer = nn.Transformer(
            d_model=d_model,
            nhead=nhead,
            num_encoder_layers=num_encoder_layers,
            num_decoder_layers=num_decoder_layers,
            dim_feedforward=dim_feedforward,
            dropout=dropout,
            batch_first=True
        )
        
        # 4. 최종 출력 레이어
        # 디코더의 출력을 받아, 단어 집합 크기(vocab_size)에 맞는 로짓(logit)을 생성합니다.
        self.fc = nn.Linear(d_model, vocab_size)
    
    def forward(self, src, tgt, src_mask, tgt_mask, src_padding_mask, tgt_padding_mask):
        """
        순전파 함수. 입력(src)과 타겟(tgt)을 받아 출력을 계산합니다.
        
        Args:
            src (Tensor): 인코더에 들어갈 소스(질문) 시퀀스 텐서.
            tgt (Tensor): 디코더에 들어갈 타겟(답변) 시퀀스 텐서.
            src_mask (Tensor): 소스 마스크 (패딩 토큰을 가리기 위함).
            tgt_mask (Tensor): 타겟 마스크 (미래 토큰을 가리기 위함).
            src_padding_mask (Tensor): 소스 패딩 마스크.
            tgt_padding_mask (Tensor): 타겟 패딩 마스크.
        """
        # 1. 입력 시퀀스에 임베딩과 위치 인코딩 적용
        # 임베딩 값에 루트(d_model)을 곱하여 스케일링을 합니다.
        src = self.embedding(src) * torch.sqrt(torch.tensor(self.transformer.d_model, dtype=torch.float32))
        src = self.positional_encoding(src)

        # 2. 타겟 시퀀스에 임베딩과 위치 인코딩 적용
        tgt = self.embedding(tgt) * torch.sqrt(torch.tensor(self.transformer.d_model, dtype=torch.float32))
        tgt = self.positional_encoding(tgt)
        
        # 3. 트랜스포머 모델에 입력
        # 인코더와 디코더를 통과하여 출력을 생성합니다.
        output = self.transformer(src, tgt, src_mask, tgt_mask, src_padding_mask, tgt_padding_mask)
        # 4. 최종 출력 레이어를 통과시켜 다음 단어의 확률을 예측
        output = self.fc(output)
        
        return output
        
# PositionalEncoding 클래스: 단어의 위치 정보를 인코딩합니다.
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-torch.log(torch.tensor(10000.0)) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)

    def forward(self, x):
        # 입력 텐서에 미리 계산된 위치 인코딩 값을 더해줍니다.
        x = x + self.pe[:x.size(0), :]
        return x

In [29]:
# 모델 하이퍼파라미터
VOCAB_SIZE = 8000 # SentencePiece 학습 시 설정한 어휘 크기
D_MODEL = 512           # 모델의 임베딩 차원
NHEADS = 8              # 멀티-헤드 어텐션의 헤드 수
NUM_ENCODER_LAYERS = 6  # 인코더 레이어 수
NUM_DECODER_LAYERS = 6  # 디코더 레이어 수
DROPOUT = 0.1           # 드롭아웃 비율

# GPU 사용 설정
# CUDA를 사용할 수 있으면 'cuda'를, 아니면 'cpu'를 사용합니다.
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"현재 사용 가능한 장치: {device}")
"""
# 수정 전
# 모델 인스턴스 생성 및 장치로 이동
model = TransformerModel(
    vocab_size=VOCAB_SIZE, 
    d_model=D_MODEL,
    nhead=NHEADS,
    num_encoder_layers=NUM_ENCODER_LAYERS,
    num_decoder_layers=NUM_DECODER_LAYERS,
    dropout=DROPOUT
).to(device)
"""
# 수정 후
model = TransformerModel(
    vocab_size=VOCAB_SIZE, 
    d_model=D_MODEL,
    nhead=NHEADS,
    num_encoder_layers=NUM_ENCODER_LAYERS,
    num_decoder_layers=NUM_DECODER_LAYERS,
    dropout=DROPOUT,
)

# 옵티마이저 정의: 모델의 파라미터(가중치)를 최적화(업데이트)하는 역할을 합니다.
# Adam 옵티마이저는 학습률(lr)을 효율적으로 조절하여 성능을 높입니다.
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001, betas=(0.9, 0.98), eps=1e-9)

# 손실 함수 정의: 모델의 예측값과 실제 정답 간의 오차를 계산합니다.
# CrossEntropyLoss는 분류 문제에 사용되며, ignore_index를 통해 패딩 토큰은 손실 계산에서 제외합니다.
criterion = nn.CrossEntropyLoss(ignore_index=sp.pad_id())

현재 사용 가능한 장치: cuda


In [36]:
#어텐션 마스크 및 훈련/평가 함수 정의하기
"""
트랜스포머 모델은 다음 두 종류의 마스크를 사용합니다.

소스 마스크 (src_mask): 인코더가 패딩(Padding) 토큰을 무시하도록 합니다.

타겟 마스크 (tgt_mask): 디코더가 아직 생성되지 않은 미래의 토큰을 보지 못하도록 합니다.

이러한 마스크는 학습 중 모델이 "부정행위"를 하지 않도록 방지하는 중요한 역할을 합니다. 아래 코드는 이 두 가지 마스크를 생성하는 함수를 정의합니다.
"""
def create_masks(src, tgt, sp):
    # 인코더의 입력(src)에 대한 패딩 마스크 생성
    # 패딩 토큰(sp.pad_id())의 위치를 True로 표시하여 어텐션 계산에서 제외합니다.
    src_padding_mask = (src == sp.pad_id())

    # 디코더의 입력(tgt)에 대한 미래 토큰 마스크 생성
    # 디코더가 현재 단어 이후의 단어를 보지 못하도록 가립니다.
    tgt_len = tgt.shape[1]
    tgt_mask = nn.Transformer.generate_square_subsequent_mask(tgt_len).to(src.device)

    # 디코더의 입력(tgt)에 대한 패딩 마스크 생성
    tgt_padding_mask = (tgt == sp.pad_id())

    return tgt_mask, src_padding_mask, tgt_padding_mask

# generate_masks 함수는 Ex07.ipynb 코드에서 다르게 구현될 수 있습니다.
# 위 코드는 일반적인 PyTorch 구현 방식을 따르며,
# Ex07.ipynb에서는 src_mask가 사용되지 않을 수 있습니다.
# (torch.nn.Transformer에 기본적으로 내장되어 있기 때문)

In [None]:
"""
2. 훈련 함수 정의
이제 DataLoader에서 데이터를 가져와 모델을 학습시킬 함수를 정의합니다. 
이 함수는 한 에폭(epoch) 동안의 훈련 과정을 담고 있습니다.
"""

In [39]:
def train_epoch(model, optimizer, criterion, dataloader, sp, device):
    model.train()  # 모델을 훈련 모드로 설정
    total_loss = 0.0

    for src, tgt in dataloader:
        src, tgt = src.to(device), tgt.to(device)

        # 타겟 시퀀스를 모델의 입력과 정답으로 분리합니다.
        # 예: [START, 문장1, 문장2, END] -> 입력: [START, 문장1, 문장2], 정답: [문장1, 문장2, END]
        tgt_input = tgt[:, :-1]
        tgt_output = tgt[:, 1:]

        # 마스크 생성
        tgt_mask, src_padding_mask, tgt_padding_mask = create_masks(src, tgt_input, sp)

        # 옵티마이저 초기화
        optimizer.zero_grad()

        # 순전파 (Forward Pass): 모델에 입력과 마스크를 전달
        # 주의: 여기서는 src_mask를 사용하지 않습니다. PyTorch의 nn.Transformer가 src_padding_mask로 대체할 수 있습니다.
        predictions = model(src, tgt_input, src_mask=None, tgt_mask=tgt_mask,
                            src_key_padding_mask=src_padding_mask,
                            tgt_key_padding_mask=tgt_padding_mask)

        # 손실 계산: 예측값과 정답 간의 오차를 계산
        # view()를 사용하여 텐서를 1차원으로 펼친 후 손실 함수에 전달합니다.
        loss = criterion(predictions.view(-1, predictions.shape[-1]), tgt_output.reshape(-1))

        # 역전파 (Backward Pass)
        loss.backward()
        
        # 옵티마이저로 파라미터 업데이트
        optimizer.step()
        
        total_loss += loss.item()

    return total_loss / len(dataloader)

In [40]:
# 전체 훈련 루프 실행
# 마지막으로, 위에서 정의한 함수들을 사용하여 모델을 여러 에폭 동안 훈련시키는 루프를 실행합니다.
# 하이퍼파라미터 정의 (이전 단계에서 이미 정의한 경우 생략 가능)
NUM_EPOCHS = 10 
BATCH_SIZE = 64

# (이전 코드에서 정의된) model, optimizer, criterion, train_dataloader, sp, device 변수들이 필요합니다.

# 훈련 시작
print("모델 훈련 시작...")
for epoch in range(1, NUM_EPOCHS + 1):
    train_loss = train_epoch(model, optimizer, criterion, train_dataloader, sp, device)
    print(f"Epoch [{epoch}/{NUM_EPOCHS}], Loss: {train_loss:.4f}")

print("모델 훈련 완료.")

모델 훈련 시작...


TypeError: TransformerModel.forward() got an unexpected keyword argument 'src_key_padding_mask'

In [41]:
# 수정된 train_epoch 함수
def train_epoch(model, optimizer, criterion, dataloader, sp, device):
    model.train()  # 모델을 훈련 모드로 설정
    total_loss = 0.0

    for src, tgt in dataloader:
        src, tgt = src.to(device), tgt.to(device)

        tgt_input = tgt[:, :-1]
        tgt_output = tgt[:, 1:]

        # 마스크 생성
        tgt_mask, src_padding_mask, tgt_padding_mask = create_masks(src, tgt_input, sp)

        # 옵티마이저 초기화
        optimizer.zero_grad()

        # 순전파 (Forward Pass)
        # model.forward()의 인자 이름에 맞게 수정
        predictions = model(src, tgt_input,
                            src_mask=None,
                            tgt_mask=tgt_mask,
                            src_padding_mask=src_padding_mask,
                            tgt_padding_mask=tgt_padding_mask)

        # 손실 계산
        loss = criterion(predictions.view(-1, predictions.shape[-1]), tgt_output.reshape(-1))

        # 역전파 (Backward Pass)
        loss.backward()
        
        # 옵티마이저로 파라미터 업데이트
        optimizer.step()
        
        total_loss += loss.item()

    return total_loss / len(dataloader)

# 전체 훈련 루프
NUM_EPOCHS = 10 

print("모델 훈련 시작...")
for epoch in range(1, NUM_EPOCHS + 1):
    train_loss = train_epoch(model, optimizer, criterion, train_dataloader, sp, device)
    print(f"Epoch [{epoch}/{NUM_EPOCHS}], Loss: {train_loss:.4f}")

print("모델 훈련 완료.")

모델 훈련 시작...


RuntimeError: Expected all tensors to be on the same device, but found at least two devices, cpu and cuda:0! (when checking argument for argument index in method wrapper_CUDA__index_select)

In [47]:
import torch
import torch.nn as nn
import math

class TransformerModel(nn.Module):
    def __init__(self, vocab_size, d_model=512, nhead=8, num_encoder_layers=6, num_decoder_layers=6, dim_feedforward=2048, dropout=0.1):
        super(TransformerModel, self).__init__()
        
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.positional_encoding = PositionalEncoding(d_model)

        self.transformer = nn.Transformer(
            d_model=d_model,
            nhead=nhead,
            num_encoder_layers=num_encoder_layers,
            num_decoder_layers=num_decoder_layers,
            dim_feedforward=dim_feedforward,
            dropout=dropout,
            batch_first=True
        )
        
        self.fc = nn.Linear(d_model, vocab_size)
    
    def forward(self, src, tgt, src_mask, tgt_mask, src_padding_mask, tgt_padding_mask):
        # 1. 입력 시퀀스에 임베딩과 위치 인코딩 적용
        # **수정된 부분:** torch.tensor()로 생성된 텐서를 .to(src.device)로 이동
        src = self.embedding(src) * torch.sqrt(torch.tensor(self.transformer.d_model, dtype=torch.float32).to(src.device))
        src = self.positional_encoding(src)

        # 2. 타겟 시퀀스에 임베딩과 위치 인코딩 적용
        # **수정된 부분:** torch.tensor()로 생성된 텐서를 .to(tgt.device)로 이동
        tgt = self.embedding(tgt) * torch.sqrt(torch.tensor(self.transformer.d_model, dtype=torch.float32).to(tgt.device))
        tgt = self.positional_encoding(tgt)
        
        # 3. 트랜스포머 모델에 입력
        output = self.transformer(src, tgt, src_mask, tgt_mask, src_padding_mask, tgt_padding_mask)
        
        # 4. 최종 출력 레이어를 통과시켜 다음 단어의 확률을 예측
        output = self.fc(output)
        
        return output

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:x.size(0), :]
        return x

In [48]:
# 수정된 create_masks 함수: 마스크를 입력 텐서와 동일한 장치로 이동시킵니다.
def create_masks(src, tgt, sp):
    # 인코더의 입력(src)에 대한 패딩 마스크 생성
    # 패딩 토큰(sp.pad_id())의 위치를 True로 표시하여 어텐션 계산에서 제외합니다.
    src_padding_mask = (src == sp.pad_id())

    # 디코더의 입력(tgt)에 대한 미래 토큰 마스크 생성
    # 디코더가 현재 단어 이후의 단어를 보지 못하도록 가립니다.
    tgt_len = tgt.shape[1]
    tgt_mask = nn.Transformer.generate_square_subsequent_mask(tgt_len).to(src.device)

    # 디코더의 입력(tgt)에 대한 패딩 마스크 생성
    tgt_padding_mask = (tgt == sp.pad_id())

    # 반환하는 마스크는 3개입니다.
    return tgt_mask, src_padding_mask, tgt_padding_mask

# 수정된 train_epoch 함수
def train_epoch(model, optimizer, criterion, dataloader, sp, device):
    model.train()
    total_loss = 0.0

    for src, tgt in dataloader:
        src, tgt = src.to(device), tgt.to(device)

        tgt_input = tgt[:, :-1]
        tgt_output = tgt[:, 1:]

        # 수정된 create_masks 함수를 호출하여 3개의 마스크를 받습니다.
        tgt_mask, src_padding_mask, tgt_padding_mask = create_masks(src, tgt_input, sp)

        optimizer.zero_grad()

        # 순전파 (Forward Pass)
        # model의 forward() 메서드 인자명과 일치시켜야 합니다.
        predictions = model(src, tgt_input,
                            src_mask=None,
                            tgt_mask=tgt_mask,
                            src_padding_mask=src_padding_mask,
                            tgt_padding_mask=tgt_padding_mask)

        # 손실 계산
        loss = criterion(predictions.view(-1, predictions.shape[-1]), tgt_output.reshape(-1))

        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()

    return total_loss / len(dataloader)

# 전체 훈련 루프
NUM_EPOCHS = 10 

print("모델 훈련 시작...")
for epoch in range(1, NUM_EPOCHS + 1):
    train_loss = train_epoch(model, optimizer, criterion, train_dataloader, sp, device)
    print(f"Epoch [{epoch}/{NUM_EPOCHS}], Loss: {train_loss:.4f}")

print("모델 훈련 완료.")

모델 훈련 시작...


RuntimeError: Expected all tensors to be on the same device, but found at least two devices, cpu and cuda:0! (when checking argument for argument index in method wrapper_CUDA__index_select)

In [None]:
# GPU로 해서 자꾸 에러가 발생한다.

In [51]:

# TransformerModel 클래스 (수정 없음)
class TransformerModel(nn.Module):
    def __init__(self, vocab_size, d_model=512, nhead=8, num_encoder_layers=6, num_decoder_layers=6, dim_feedforward=2048, dropout=0.1):
        super(TransformerModel, self).__init__()
        
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.positional_encoding = PositionalEncoding(d_model)

        self.transformer = nn.Transformer(
            d_model=d_model,
            nhead=nhead,
            num_encoder_layers=num_encoder_layers,
            num_decoder_layers=num_decoder_layers,
            dim_feedforward=dim_feedforward,
            dropout=dropout,
            batch_first=True
        )
        
        self.fc = nn.Linear(d_model, vocab_size)
    
    def forward(self, src, tgt, src_mask, tgt_mask, src_padding_mask, tgt_padding_mask):
        src = self.embedding(src) * math.sqrt(self.transformer.d_model)
        src = self.positional_encoding(src)

        tgt = self.embedding(tgt) * math.sqrt(self.transformer.d_model)
        tgt = self.positional_encoding(tgt)
        
        output = self.transformer(src, tgt, 
                                 src_mask=src_mask, 
                                 tgt_mask=tgt_mask, 
                                 src_key_padding_mask=src_padding_mask, 
                                 tgt_key_padding_mask=tgt_padding_mask)
        
        output = self.fc(output)
        
        return output

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:x.size(0), :]
        return x

# 마스크 생성 함수 (수정 없음)
def create_masks(src, tgt, sp):
    src_padding_mask = (src == sp.pad_id()).float()
    tgt_len = tgt.shape[1]
    tgt_mask = nn.Transformer.generate_square_subsequent_mask(tgt_len).to(src.device)
    tgt_padding_mask = (tgt == sp.pad_id()).float()
    return tgt_mask, src_padding_mask, tgt_padding_mask

# 훈련 함수 (수정 없음)
def train_epoch(model, optimizer, criterion, dataloader, sp, device):
    model.train()
    total_loss = 0.0
    for src, tgt in dataloader:
        src, tgt = src.to(device), tgt.to(device)
        tgt_input = tgt[:, :-1]
        tgt_output = tgt[:, 1:]
        tgt_mask, src_padding_mask, tgt_padding_mask = create_masks(src, tgt_input, sp)
        optimizer.zero_grad()
        predictions = model(src, tgt_input,
                            src_mask=None,
                            tgt_mask=tgt_mask,
                            src_padding_mask=src_padding_mask,
                            tgt_padding_mask=tgt_padding_mask)
        loss = criterion(predictions.view(-1, predictions.shape[-1]), tgt_output.reshape(-1))
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    return total_loss / len(dataloader)


# CPU 사용으로 변경: torch.cuda.is_available() 검사를 제거하고 'cpu'로 고정
device = torch.device('cpu')
print(f"현재 사용 가능한 장치: {device}")

# 모델 인스턴스 생성 및 장치로 이동
model = TransformerModel(
    vocab_size=8000, 
    d_model=512,
    nhead=8,
    num_encoder_layers=6,
    num_decoder_layers=6,
    dropout=0.1
).to(device)

# 옵티마이저와 손실 함수 정의
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001, betas=(0.9, 0.98), eps=1e-9)
criterion = nn.CrossEntropyLoss(ignore_index=sp.pad_id())

# 훈련 루프 실행
NUM_EPOCHS = 10 

print("모델 훈련 시작...")
for epoch in range(1, NUM_EPOCHS + 1):
    train_loss = train_epoch(model, optimizer, criterion, train_dataloader, sp, device)
    print(f"Epoch [{epoch}/{NUM_EPOCHS}], Loss: {train_loss:.4f}")

print("모델 훈련 완료.")

현재 사용 가능한 장치: cpu
모델 훈련 시작...


KeyboardInterrupt: 

In [None]:
# 에러가 결려서 에러를 수정

In [53]:
# TransformerModel 클래스 (이전 수정사항 모두 반영)
class TransformerModel(nn.Module):
    def __init__(self, vocab_size, d_model=512, nhead=8, num_encoder_layers=6, num_decoder_layers=6, dim_feedforward=2048, dropout=0.1):
        super(TransformerModel, self).__init__()
        
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.positional_encoding = PositionalEncoding(d_model)

        self.transformer = nn.Transformer(
            d_model=d_model,
            nhead=nhead,
            num_encoder_layers=num_encoder_layers,
            num_decoder_layers=num_decoder_layers,
            dim_feedforward=dim_feedforward,
            dropout=dropout,
            batch_first=True
        )
        
        self.fc = nn.Linear(d_model, vocab_size)
    
    def forward(self, src, tgt, src_mask, tgt_mask, src_padding_mask, tgt_padding_mask):
        src = self.embedding(src) * torch.sqrt(torch.tensor(self.transformer.d_model, dtype=torch.float32).to(src.device))
        src = self.positional_encoding(src)

        tgt = self.embedding(tgt) * torch.sqrt(torch.tensor(self.transformer.d_model, dtype=torch.float32).to(tgt.device))
        tgt = self.positional_encoding(tgt)
        
        output = self.transformer(src, tgt, 
                                 src_mask=src_mask, 
                                 tgt_mask=tgt_mask, 
                                 src_key_padding_mask=src_padding_mask, 
                                 tgt_key_padding_mask=tgt_padding_mask)
        
        output = self.fc(output)
        
        return output

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:x.size(0), :]
        return x

def create_masks(src, tgt, sp):
    src_padding_mask = (src == sp.pad_id()).float()
    tgt_len = tgt.shape[1]
    tgt_mask = nn.Transformer.generate_square_subsequent_mask(tgt_len).to(src.device)
    tgt_padding_mask = (tgt == sp.pad_id()).float()
    return tgt_mask, src_padding_mask, tgt_padding_mask

def train_epoch(model, optimizer, criterion, dataloader, sp, device, log_interval=100):
    model.train()
    total_loss = 0.0
    
    # enumerate를 사용하여 배치 인덱스를 가져옵니다.
    for i, (src, tgt) in enumerate(dataloader):
        src, tgt = src.to(device), tgt.to(device)
        tgt_input = tgt[:, :-1]
        tgt_output = tgt[:, 1:]

        tgt_mask, src_padding_mask, tgt_padding_mask = create_masks(src, tgt_input, sp)

        optimizer.zero_grad()
        predictions = model(src, tgt_input,
                            src_mask=None,
                            tgt_mask=tgt_mask,
                            src_padding_mask=src_padding_mask,
                            tgt_padding_mask=tgt_padding_mask)

        loss = criterion(predictions.view(-1, predictions.shape[-1]), tgt_output.reshape(-1))
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()

        # [수정] Step별 Loss와 Acc를 계산하고 출력하는 코드
        if i % log_interval == 0 and i != 0:
            # Loss 계산
            current_loss = loss.item()

            # Accuracy (정확도) 계산
            # topk를 사용하여 예측이 가장 높은 토큰을 찾습니다.
            _, predicted_tokens = torch.max(predictions, dim=-1)
            correct_predictions = (predicted_tokens == tgt_output)
            
            # 패딩 토큰은 정확도 계산에서 제외합니다.
            # ignore_index가 -100이므로, 0을 제외하도록 수정
            # (SentencePiece의 pad_id()는 0입니다)
            non_pad_elements = (tgt_output != sp.pad_id())
            correct_predictions_non_pad = correct_predictions[non_pad_elements]
            
            accuracy = correct_predictions_non_pad.sum().item() / non_pad_elements.sum().item()
            
            print(f"[Epoch {epoch}, Step {i}] Loss: {current_loss:.4f}, Acc: {accuracy:.4f}")

    return total_loss / len(dataloader)


# 데이터셋 크기 제한
MAX_SAMPLES = 50000

# 기존 데이터셋 로드
# 여기서는 예시로 DataLoader를 직접 생성합니다. 실제 코드에 맞게 수정해주세요.
# 예시: train_data, test_data = get_data() 또는 TensorDataset()
# (아래 두 줄은 사용자님의 실제 데이터셋 로딩 코드로 대체해야 합니다.)
# train_data = ...
# test_data = ...

# 데이터셋의 앞부분만 잘라내기
# len() 함수를 사용하여 전체 데이터셋의 길이를 확인합니다.
if 'train_data' in locals() and len(train_data) > MAX_SAMPLES:
    train_data = train_data[:MAX_SAMPLES]

# DataLoader 재정의 (이 부분은 사용자의 기존 코드에 맞게 수정해주세요)
# 예시: train_dataloader = DataLoader(train_data, batch_size=BATCH_SIZE, shuffle=True)
# 예시: test_dataloader = DataLoader(test_data, batch_size=BATCH_SIZE, shuffle=False)

# CPU 사용으로 변경
device = torch.device('cpu')
print(f"현재 사용 가능한 장치: {device}")

# 모델 인스턴스 생성 및 장치로 이동
model = TransformerModel(
    vocab_size=8000, 
    d_model=512,
    nhead=8,
    num_encoder_layers=6,
    num_decoder_layers=6,
    dropout=0.1
).to(device)

optimizer = torch.optim.Adam(model.parameters(), lr=0.0001, betas=(0.9, 0.98), eps=1e-9)
criterion = nn.CrossEntropyLoss(ignore_index=sp.pad_id())

# 훈련 루프 실행
NUM_EPOCHS = 10 

print("모델 훈련 시작...")
for epoch in range(1, NUM_EPOCHS + 1):
    train_loss = train_epoch(model, optimizer, criterion, train_dataloader, sp, device)
    print(f"Epoch [{epoch}/{NUM_EPOCHS}], Loss: {train_loss:.4f}")

print("모델 훈련 완료.")


현재 사용 가능한 장치: cpu
모델 훈련 시작...


KeyboardInterrupt: 

In [56]:
import torch
import torch.nn as nn
from torch.utils.data import TensorDataset, DataLoader
import math

# TransformerModel 클래스
# forward 메서드 내부에 새로 생성되는 텐서를 .to(src.device)로 GPU에 옮기는 코드가 포함되어 있습니다.
class TransformerModel(nn.Module):
    def __init__(self, vocab_size, d_model=512, nhead=8, num_encoder_layers=6, num_decoder_layers=6, dim_feedforward=2048, dropout=0.1):
        super(TransformerModel, self).__init__()
        
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.positional_encoding = PositionalEncoding(d_model)

        self.transformer = nn.Transformer(
            d_model=d_model,
            nhead=nhead,
            num_encoder_layers=num_encoder_layers,
            num_decoder_layers=num_decoder_layers,
            dim_feedforward=dim_feedforward,
            dropout=dropout,
            batch_first=True
        )
        
        self.fc = nn.Linear(d_model, vocab_size)
    
    def forward(self, src, tgt, src_mask, tgt_mask, src_padding_mask, tgt_padding_mask):
        src = self.embedding(src) * torch.sqrt(torch.tensor(self.transformer.d_model, dtype=torch.float32).to(src.device))
        src = self.positional_encoding(src)

        tgt = self.embedding(tgt) * torch.sqrt(torch.tensor(self.transformer.d_model, dtype=torch.float32).to(tgt.device))
        tgt = self.positional_encoding(tgt)
        
        output = self.transformer(src, tgt, 
                                 src_mask=src_mask, 
                                 tgt_mask=tgt_mask, 
                                 src_key_padding_mask=src_padding_mask, 
                                 tgt_key_padding_mask=tgt_padding_mask)
        
        output = self.fc(output)
        
        return output

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:x.size(0), :]
        return x

def create_masks(src, tgt, sp):
    src_padding_mask = (src == sp.pad_id()).float()
    tgt_len = tgt.shape[1]
    tgt_mask = nn.Transformer.generate_square_subsequent_mask(tgt_len).to(src.device)
    tgt_padding_mask = (tgt == sp.pad_id()).float()
    return tgt_mask, src_padding_mask, tgt_padding_mask

def train_epoch(model, optimizer, criterion, dataloader, sp, device):
    model.train()
    total_loss = 0.0
    for src, tgt in dataloader:
        src, tgt = src.to(device), tgt.to(device)
        tgt_input = tgt[:, :-1]
        tgt_output = tgt[:, 1:]
        tgt_mask, src_padding_mask, tgt_padding_mask = create_masks(src, tgt_input, sp)
        optimizer.zero_grad()
        
        # 수정된 부분: model의 forward() 메서드 인자명에 맞게 변경
        predictions = model(src, tgt_input,
                            src_mask=None,
                            tgt_mask=tgt_mask,
                            src_padding_mask=src_padding_mask,
                            tgt_padding_mask=tgt_padding_mask)
                            
        loss = criterion(predictions.view(-1, predictions.shape[-1]), tgt_output.reshape(-1))
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    return total_loss / len(dataloader)


# 데이터셋 크기 제한 (이전 단계와 동일)
MAX_SAMPLES = 50000
# 여기서는 예시로 DataLoader를 직접 생성합니다. 실제 코드에 맞게 수정해주세요.
# 예시: train_data, test_data = get_data() 또는 TensorDataset()
# (아래 두 줄은 사용자님의 실제 데이터셋 로딩 코드로 대체해야 합니다.)
# train_data = ...
# test_data = ...

if 'train_data' in locals() and len(train_data) > MAX_SAMPLES:
    train_data = train_data[:MAX_SAMPLES]

# DataLoader 재정의 (이 부분은 사용자의 기존 코드에 맞게 수정해주세요)
# 예시: train_dataloader = DataLoader(train_data, batch_size=BATCH_SIZE, shuffle=True)
# 예시: test_dataloader = DataLoader(test_data, batch_size=BATCH_SIZE, shuffle=False)


# 1. CUDA 사용 가능 여부 확인 후 'device' 변수 설정
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"현재 사용 가능한 장치: {device}")

# 2. 모델 인스턴스 생성 및 장치로 이동
model = TransformerModel(
    vocab_size=8000, 
    d_model=512,
    nhead=8,
    num_encoder_layers=6,
    num_decoder_layers=6,
    dropout=0.1
).to(device)

optimizer = torch.optim.Adam(model.parameters(), lr=0.0001, betas=(0.9, 0.98), eps=1e-9)
criterion = nn.CrossEntropyLoss(ignore_index=sp.pad_id())

# 훈련 루프 실행
NUM_EPOCHS = 20 

print("모델 훈련 시작...")
for epoch in range(1, NUM_EPOCHS + 1):
    train_loss = train_epoch(model, optimizer, criterion, train_dataloader, sp, device)
    print(f"Epoch [{epoch}/{NUM_EPOCHS}], Loss: {train_loss:.4f}")

print("모델 훈련 완료.")

현재 사용 가능한 장치: cuda
모델 훈련 시작...
Epoch [1/20], Loss: 6.1058
Epoch [2/20], Loss: 5.5136
Epoch [3/20], Loss: 5.2292
Epoch [4/20], Loss: 4.9403
Epoch [5/20], Loss: 4.6626
Epoch [6/20], Loss: 4.4011
Epoch [7/20], Loss: 4.1543
Epoch [8/20], Loss: 3.9167
Epoch [9/20], Loss: 3.6896
Epoch [10/20], Loss: 3.4721
Epoch [11/20], Loss: 3.2600
Epoch [12/20], Loss: 3.0719
Epoch [13/20], Loss: 2.8944
Epoch [14/20], Loss: 2.7250
Epoch [15/20], Loss: 2.5715
Epoch [16/20], Loss: 2.4247
Epoch [17/20], Loss: 2.2991
Epoch [18/20], Loss: 2.1858
Epoch [19/20], Loss: 2.0794
Epoch [20/20], Loss: 1.9870
모델 훈련 완료.


In [60]:
# 1. 챗봇 추론 함수 정의하기

def sentence_generation(model, sentence, sp, device, max_len=50):
    # 훈련 모드를 비활성화하고 평가 모드로 전환
    model.eval()
    
    # 입력 문장을 토큰 ID로 변환
    tokens = sp.encode_as_ids(sentence)
    
    # [START] 토큰을 문장 앞에 추가
    input_ids = [sp.bos_id()] + tokens
    
    # 텐서로 변환하고 디바이스로 이동
    src_tensor = torch.tensor(input_ids).unsqueeze(0).to(device)
    
    # 타겟 시퀀스 초기화: [START] 토큰으로 시작
    tgt_tensor = torch.tensor([sp.bos_id()]).unsqueeze(0).to(device)
    
    # 최대 길이까지 반복하며 단어 생성
    for _ in range(max_len):
        # 마스크 생성 (디코더는 미래의 토큰을 보지 못함)
        tgt_len = tgt_tensor.shape[1]
        tgt_mask = nn.Transformer.generate_square_subsequent_mask(tgt_len).to(device)
        
        # 모델 예측
        # 수정된 부분: model의 forward() 메서드 인자명에 맞게 변경
        predictions = model(src_tensor, tgt_tensor, src_mask=None, tgt_mask=tgt_mask,
                            src_padding_mask=None,
                            tgt_padding_mask=None)
                            
        # 마지막 예측 토큰의 인덱스를 가져옴
        last_prediction = predictions[:, -1, :]
        
        # 가장 높은 확률의 다음 토큰을 선택
        _, next_token = torch.max(last_prediction, dim=-1)
        next_token = next_token.item()
        
        # 예측된 토큰을 타겟 시퀀스에 추가
        tgt_tensor = torch.cat([tgt_tensor, torch.tensor([[next_token]]).to(device)], dim=1)
        
        # [END] 토큰이 예측되면 생성을 중단
        if next_token == sp.eos_id():
            break
            
    # 생성된 타겟 시퀀스(ID)를 다시 문장으로 변환
    generated_sentence_ids = tgt_tensor.squeeze().tolist()
    
    # 불필요한 토큰([START], [END], [PAD]) 제거 후 문장으로 디코딩
    generated_sentence = sp.decode(generated_sentence_ids)
    
    return generated_sentence

In [61]:
# 챗봇 테스트 실행하기 한국어 입력문장에 대해 한국어로 답변하는 함수를 구현하였다.
# 기존에 정의한 model, sp, device 변수를 사용합니다.
# model = ...
# sp = ...
# device = ...

# 테스트할 문장
sentence = "오늘 날씨 어때?"
print(f"입력 : {sentence}")

# 챗봇에게 질문하고 응답 받기
response = sentence_generation(model, sentence, sp, device)
print(f"출력 : {response}")

# 다른 문장으로 테스트
sentence = "어디에 있었어?"
print(f"입력 : {sentence}")
response = sentence_generation(model, sentence, sp, device)
print(f"출력 : {response}")

입력 : 오늘 날씨 어때?
출력 : 좋은 곳으로 데려다 줄 거예요.
입력 : 어디에 있었어?
출력 : 마음이 허전하겠어요.


In [None]:
# 역시 동문 서답...
# 데이터를 좀더 늘려서 진행해 보자 그리고 에폭도 늘려 보자

In [63]:

# TransformerModel 클래스
# forward 메서드 내부에 새로 생성되는 텐서를 .to(src.device)로 GPU에 옮기는 코드가 포함되어 있습니다.
class TransformerModel(nn.Module):
    def __init__(self, vocab_size, d_model=512, nhead=8, num_encoder_layers=6, num_decoder_layers=6, dim_feedforward=2048, dropout=0.1):
        super(TransformerModel, self).__init__()
        
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.positional_encoding = PositionalEncoding(d_model)

        self.transformer = nn.Transformer(
            d_model=d_model,
            nhead=nhead,
            num_encoder_layers=num_encoder_layers,
            num_decoder_layers=num_decoder_layers,
            dim_feedforward=dim_feedforward,
            dropout=dropout,
            batch_first=True
        )
        
        self.fc = nn.Linear(d_model, vocab_size)
    
    def forward(self, src, tgt, src_mask, tgt_mask, src_padding_mask, tgt_padding_mask):
        src = self.embedding(src) * torch.sqrt(torch.tensor(self.transformer.d_model, dtype=torch.float32).to(src.device))
        src = self.positional_encoding(src)

        tgt = self.embedding(tgt) * torch.sqrt(torch.tensor(self.transformer.d_model, dtype=torch.float32).to(tgt.device))
        tgt = self.positional_encoding(tgt)
        
        output = self.transformer(src, tgt, 
                                 src_mask=src_mask, 
                                 tgt_mask=tgt_mask, 
                                 src_key_padding_mask=src_padding_mask, 
                                 tgt_key_padding_mask=tgt_padding_mask)
        
        output = self.fc(output)
        
        return output

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:x.size(0), :]
        return x

def create_masks(src, tgt, sp):
    src_padding_mask = (src == sp.pad_id()).float()
    tgt_len = tgt.shape[1]
    tgt_mask = nn.Transformer.generate_square_subsequent_mask(tgt_len).to(src.device)
    tgt_padding_mask = (tgt == sp.pad_id()).float()
    return tgt_mask, src_padding_mask, tgt_padding_mask

def train_epoch(model, optimizer, criterion, dataloader, sp, device):
    model.train()
    total_loss = 0.0
    for src, tgt in dataloader:
        src, tgt = src.to(device), tgt.to(device)
        tgt_input = tgt[:, :-1]
        tgt_output = tgt[:, 1:]
        tgt_mask, src_padding_mask, tgt_padding_mask = create_masks(src, tgt_input, sp)
        optimizer.zero_grad()
        
        # 수정된 부분: model의 forward() 메서드 인자명에 맞게 변경
        predictions = model(src, tgt_input,
                            src_mask=None,
                            tgt_mask=tgt_mask,
                            src_padding_mask=src_padding_mask,
                            tgt_padding_mask=tgt_padding_mask)
                            
        loss = criterion(predictions.view(-1, predictions.shape[-1]), tgt_output.reshape(-1))
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    return total_loss / len(dataloader)


# 데이터셋 크기 제한 (이전 단계와 동일)
MAX_SAMPLES = 100000
# 여기서는 예시로 DataLoader를 직접 생성합니다. 실제 코드에 맞게 수정해주세요.
# 예시: train_data, test_data = get_data() 또는 TensorDataset()
# (아래 두 줄은 사용자님의 실제 데이터셋 로딩 코드로 대체해야 합니다.)
# train_data = ...
# test_data = ...

if 'train_data' in locals() and len(train_data) > MAX_SAMPLES:
    train_data = train_data[:MAX_SAMPLES]

# DataLoader 재정의 (이 부분은 사용자의 기존 코드에 맞게 수정해주세요)
# 예시: train_dataloader = DataLoader(train_data, batch_size=BATCH_SIZE, shuffle=True)
# 예시: test_dataloader = DataLoader(test_data, batch_size=BATCH_SIZE, shuffle=False)


# 1. CUDA 사용 가능 여부 확인 후 'device' 변수 설정
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"현재 사용 가능한 장치: {device}")

# 2. 모델 인스턴스 생성 및 장치로 이동
model = TransformerModel(
    vocab_size=8000, 
    d_model=512,
    nhead=8,
    num_encoder_layers=6,
    num_decoder_layers=6,
    dropout=0.1
).to(device)

optimizer = torch.optim.Adam(model.parameters(), lr=0.0001, betas=(0.9, 0.98), eps=1e-9)
criterion = nn.CrossEntropyLoss(ignore_index=sp.pad_id())

# 훈련 루프 실행
NUM_EPOCHS = 30 

print("모델 훈련 시작...")
for epoch in range(1, NUM_EPOCHS + 1):
    train_loss = train_epoch(model, optimizer, criterion, train_dataloader, sp, device)
    print(f"Epoch [{epoch}/{NUM_EPOCHS}], Loss: {train_loss:.4f}")

print("모델 훈련 완료.")

현재 사용 가능한 장치: cuda
모델 훈련 시작...
Epoch [1/30], Loss: 6.1001
Epoch [2/30], Loss: 5.5243
Epoch [3/30], Loss: 5.2421
Epoch [4/30], Loss: 4.9662
Epoch [5/30], Loss: 4.6924
Epoch [6/30], Loss: 4.4299
Epoch [7/30], Loss: 4.1767
Epoch [8/30], Loss: 3.9381
Epoch [9/30], Loss: 3.7099
Epoch [10/30], Loss: 3.4950
Epoch [11/30], Loss: 3.2885
Epoch [12/30], Loss: 3.0925
Epoch [13/30], Loss: 2.9141
Epoch [14/30], Loss: 2.7436
Epoch [15/30], Loss: 2.5872
Epoch [16/30], Loss: 2.4379
Epoch [17/30], Loss: 2.3113
Epoch [18/30], Loss: 2.1884
Epoch [19/30], Loss: 2.0808
Epoch [20/30], Loss: 1.9847
Epoch [21/30], Loss: 1.8994
Epoch [22/30], Loss: 1.8199
Epoch [23/30], Loss: 1.7475
Epoch [24/30], Loss: 1.6860
Epoch [25/30], Loss: 1.6290
Epoch [26/30], Loss: 1.5849
Epoch [27/30], Loss: 1.5389
Epoch [28/30], Loss: 1.5014
Epoch [29/30], Loss: 1.4696
Epoch [30/30], Loss: 1.4389
모델 훈련 완료.


In [64]:
# 1. 챗봇 추론 함수 정의하기

def sentence_generation(model, sentence, sp, device, max_len=50):
    # 훈련 모드를 비활성화하고 평가 모드로 전환
    model.eval()
    
    # 입력 문장을 토큰 ID로 변환
    tokens = sp.encode_as_ids(sentence)
    
    # [START] 토큰을 문장 앞에 추가
    input_ids = [sp.bos_id()] + tokens
    
    # 텐서로 변환하고 디바이스로 이동
    src_tensor = torch.tensor(input_ids).unsqueeze(0).to(device)
    
    # 타겟 시퀀스 초기화: [START] 토큰으로 시작
    tgt_tensor = torch.tensor([sp.bos_id()]).unsqueeze(0).to(device)
    
    # 최대 길이까지 반복하며 단어 생성
    for _ in range(max_len):
        # 마스크 생성 (디코더는 미래의 토큰을 보지 못함)
        tgt_len = tgt_tensor.shape[1]
        tgt_mask = nn.Transformer.generate_square_subsequent_mask(tgt_len).to(device)
        
        # 모델 예측
        # 수정된 부분: model의 forward() 메서드 인자명에 맞게 변경
        predictions = model(src_tensor, tgt_tensor, src_mask=None, tgt_mask=tgt_mask,
                            src_padding_mask=None,
                            tgt_padding_mask=None)
                            
        # 마지막 예측 토큰의 인덱스를 가져옴
        last_prediction = predictions[:, -1, :]
        
        # 가장 높은 확률의 다음 토큰을 선택
        _, next_token = torch.max(last_prediction, dim=-1)
        next_token = next_token.item()
        
        # 예측된 토큰을 타겟 시퀀스에 추가
        tgt_tensor = torch.cat([tgt_tensor, torch.tensor([[next_token]]).to(device)], dim=1)
        
        # [END] 토큰이 예측되면 생성을 중단
        if next_token == sp.eos_id():
            break
            
    # 생성된 타겟 시퀀스(ID)를 다시 문장으로 변환
    generated_sentence_ids = tgt_tensor.squeeze().tolist()
    
    # 불필요한 토큰([START], [END], [PAD]) 제거 후 문장으로 디코딩
    generated_sentence = sp.decode(generated_sentence_ids)
    
    return generated_sentence

In [65]:
# 챗봇 테스트 실행하기 한국어 입력문장에 대해 한국어로 답변하는 함수를 구현하였다.
# 기존에 정의한 model, sp, device 변수를 사용합니다.
# model = ...
# sp = ...
# device = ...

# 테스트할 문장
sentence = "오늘 날씨 어때?"
print(f"입력 : {sentence}")

# 챗봇에게 질문하고 응답 받기
response = sentence_generation(model, sentence, sp, device)
print(f"출력 : {response}")

# 다른 문장으로 테스트
sentence = "어디에 있었어?"
print(f"입력 : {sentence}")
response = sentence_generation(model, sentence, sp, device)
print(f"출력 : {response}")

입력 : 오늘 날씨 어때?
출력 : 저는 위로해드리는 로봇이에요.
입력 : 어디에 있었어?
출력 : 직접 물어보세요.


In [None]:
# 여전히 동문 서답...
# 뭐가 문제인가?