In [1]:
!pip install pytorch-lightning torchmetrics  datasets transformers

Collecting pytorch-lightning
  Downloading pytorch_lightning-2.5.1-py3-none-any.whl.metadata (20 kB)
Collecting torchmetrics
  Downloading torchmetrics-1.7.0-py3-none-any.whl.metadata (21 kB)
Collecting datasets
  Downloading datasets-3.5.0-py3-none-any.whl.metadata (19 kB)
Collecting lightning-utilities>=0.10.0 (from pytorch-lightning)
  Downloading lightning_utilities-0.14.2-py3-none-any.whl.metadata (5.6 kB)
Collecting dill<0.3.9,>=0.3.0 (from datasets)
  Downloading dill-0.3.8-py3-none-any.whl.metadata (10 kB)
Collecting xxhash (from datasets)
  Downloading xxhash-3.5.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (12 kB)
Collecting multiprocess<0.70.17 (from datasets)
  Downloading multiprocess-0.70.16-py311-none-any.whl.metadata (7.2 kB)
Collecting fsspec>=2022.5.0 (from fsspec[http]>=2022.5.0->pytorch-lightning)
  Downloading fsspec-2024.12.0-py3-none-any.whl.metadata (11 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch>=2.1.0->pytorch-lightning

In [2]:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset
from transformers import BertTokenizer
import pytorch_lightning as pl
import random
import pandas as pd
from sklearn.model_selection import train_test_split

In [3]:
# 1. 데이터셋 정의
class KoEnTranslationDataset(Dataset):
    """AI Hub 한국어-영어 번역 데이터셋을 로드하고 토큰화하는 클래스"""
    def __init__(self, data, src_lang='ko', tgt_lang='en', max_length=128):
        self.src_lang = src_lang  # 소스 언어 (한국어)
        self.tgt_lang = tgt_lang  # 타겟 언어 (영어)
        self.max_length = max_length  # 최대 시퀀스 길이
        self.src_tokenizer = BertTokenizer.from_pretrained('bert-base-multilingual-cased')
        self.tgt_tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
        self.data = data  # pandas DataFrame 형식의 데이터 (ko, en 컬럼 포함)

    def __len__(self):
        """데이터셋 크기 반환"""
        return len(self.data)

    def __getitem__(self, idx):
        """특정 인덱스의 데이터 반환"""
        src_text = self.data.iloc[idx][self.src_lang]  # 한국어 문장
        tgt_text = self.data.iloc[idx][self.tgt_lang]  # 영어 문장
        src_encoding = self.src_tokenizer(src_text, max_length=self.max_length, padding='max_length',
                                          truncation=True, return_tensors='pt')
        tgt_encoding = self.tgt_tokenizer(tgt_text, max_length=self.max_length, padding='max_length',
                                          truncation=True, return_tensors='pt')
        return {
            'src_input_ids': src_encoding['input_ids'].squeeze(0),  # [max_length]
            'src_attention_mask': src_encoding['attention_mask'].squeeze(0),
            'tgt_input_ids': tgt_encoding['input_ids'].squeeze(0),
            'tgt_attention_mask': tgt_encoding['attention_mask'].squeeze(0)
        }

In [4]:
# 2. 데이터 모듈
class KoEnTranslationDataModule(pl.LightningDataModule):
    """PyTorch Lightning 데이터 모듈로 학습, 검증, 테스트 데이터 제공"""
    def __init__(self, file_path, batch_size=32, max_length=128):
        super().__init__()
        self.file_path = file_path  # 한영 데이터셋 CSV 파일 경로
        self.batch_size = batch_size
        self.max_length = max_length

    def setup(self, stage=None):
        """데이터 로드 및 train/val/test 분리"""
        data = csv_reader(self.file_path)
        train_data, temp_data = train_test_split(data, test_size=0.3, random_state=42)
        val_data, test_data = train_test_split(temp_data, test_size=0.5, random_state=42)
        self.train_dataset = KoEnTranslationDataset(train_data)
        self.val_dataset = KoEnTranslationDataset(val_data)
        self.test_dataset = KoEnTranslationDataset(test_data)

    def train_dataloader(self):
        """학습 데이터 로더"""
        return DataLoader(self.train_dataset, batch_size=self.batch_size, shuffle=True, num_workers=2)

    def val_dataloader(self):
        """검증 데이터 로더"""
        return DataLoader(self.val_dataset, batch_size=self.batch_size, shuffle=False, num_workers=2)

    def test_dataloader(self):
        """테스트 데이터 로더"""
        return DataLoader(self.test_dataset, batch_size=self.batch_size, shuffle=False, num_workers=2)

In [5]:
# 3. Scaled Dot Product Attention을 포함한 Seq2Seq 번역 모델
class Seq2SeqTranslator(pl.LightningModule):
    def __init__(self, src_vocab_size, tgt_vocab_size, tgt_tokenizer, embedding_dim=256, hidden_dim=512,
                 num_layers=2, dropout=0.5, learning_rate=1e-3):
        super().__init__()
        self.save_hyperparameters(ignore=['tgt_tokenizer'])
        self.tgt_tokenizer = tgt_tokenizer

        self.src_embedding = nn.Embedding(src_vocab_size, embedding_dim, padding_idx=0)
        self.encoder = nn.LSTM(embedding_dim, hidden_dim, num_layers=num_layers,
                               batch_first=True, dropout=dropout if num_layers > 1 else 0)

        self.tgt_embedding = nn.Embedding(tgt_vocab_size, embedding_dim, padding_idx=0)
        self.decoder = nn.LSTM(embedding_dim + hidden_dim, hidden_dim, num_layers=num_layers,
                               batch_first=True, dropout=dropout if num_layers > 1 else 0)
        self.fc = nn.Linear(hidden_dim, tgt_vocab_size)

        self.attention_Wq = nn.Linear(hidden_dim, hidden_dim)
        self.attention_Wk = nn.Linear(hidden_dim, hidden_dim)
        self.attention_Wv = nn.Linear(hidden_dim, hidden_dim)
        self.scale_factor = torch.sqrt(torch.tensor(hidden_dim, dtype=torch.float32))

        self.dropout = nn.Dropout(dropout)
        self.criterion = nn.CrossEntropyLoss(ignore_index=0)

    def scaled_dot_product_attention(self, Q, K, V, mask=None):
        """Scaled Dot Product Attention 계산"""
        # 수정: Q와 K의 차원을 명확히 확인하고 3D로 보장
        if Q.dim() == 2:  # [batch_size, hidden_dim] -> [batch_size, 1, hidden_dim]
            Q = Q.unsqueeze(1)
        if K.dim() != 3:  # K가 3D인지 확인
            raise ValueError("K must be a 3D tensor")

        attention_scores = torch.bmm(Q, K.transpose(1, 2)) / self.scale_factor  # [batch_size, 1, src_len]
        if mask is not None:
            attention_scores = attention_scores.masked_fill(mask == 0, -1e9)
        attention_weights = torch.softmax(attention_scores, dim=-1)
        context = torch.bmm(attention_weights, V)
        return context

    def forward(self, src_input_ids, tgt_input_ids, src_attention_mask=None, teacher_forcing_ratio=0.5):
        src_embedded = self.dropout(self.src_embedding(src_input_ids))
        encoder_outputs, (hidden, cell) = self.encoder(src_embedded)

        batch_size = src_input_ids.size(0)
        max_len = tgt_input_ids.size(1)
        outputs = torch.zeros(batch_size, max_len, self.hparams.tgt_vocab_size).to(self.device)

        decoder_input = tgt_input_ids[:, 0].unsqueeze(1)
        for t in range(1, max_len):
            decoder_embedded = self.dropout(self.tgt_embedding(decoder_input))

            # 수정: Attention 계산 시 hidden[-1]을 사용하며 차원 보장
            Q = self.attention_Wq(hidden[-1])  # [batch_size, hidden_dim]
            K = self.attention_Wk(encoder_outputs)
            V = self.attention_Wv(encoder_outputs)
            if src_attention_mask is not None:
                mask = src_attention_mask.unsqueeze(1)
                context = self.scaled_dot_product_attention(Q, K, V, mask)
            else:
                context = self.scaled_dot_product_attention(Q, K, V)

            decoder_input_combined = torch.cat((decoder_embedded, context), dim=2)
            decoder_output, (hidden, cell) = self.decoder(decoder_input_combined, (hidden, cell))
            output = self.fc(decoder_output.squeeze(1))
            outputs[:, t, :] = output

            teacher_force = random.random() < teacher_forcing_ratio
            top1 = output.argmax(1)
            decoder_input = tgt_input_ids[:, t].unsqueeze(1) if teacher_force else top1.unsqueeze(1)

        return outputs

    def training_step(self, batch, batch_idx):
        src_input_ids = batch['src_input_ids']
        tgt_input_ids = batch['tgt_input_ids']
        src_attention_mask = batch['src_attention_mask']

        outputs = self(src_input_ids, tgt_input_ids, src_attention_mask, teacher_forcing_ratio=0.5)
        outputs = outputs[:, 1:, :].reshape(-1, self.hparams.tgt_vocab_size)
        targets = tgt_input_ids[:, 1:].reshape(-1)

        loss = self.criterion(outputs, targets)
        self.log('train_loss', loss, on_step=True, on_epoch=True, prog_bar=True)
        return loss

    def validation_step(self, batch, batch_idx):
        src_input_ids = batch['src_input_ids']
        tgt_input_ids = batch['tgt_input_ids']
        src_attention_mask = batch['src_attention_mask']

        outputs = self(src_input_ids, tgt_input_ids, src_attention_mask, teacher_forcing_ratio=0.0)
        outputs = outputs[:, 1:, :].reshape(-1, self.hparams.tgt_vocab_size)
        targets = tgt_input_ids[:, 1:].reshape(-1)

        loss = self.criterion(outputs, targets)
        self.log('val_loss', loss, on_epoch=True, prog_bar=True)

    def test_step(self, batch, batch_idx):
        src_input_ids = batch['src_input_ids']
        tgt_input_ids = batch['tgt_input_ids']
        src_attention_mask = batch['src_attention_mask']

        outputs = self(src_input_ids, tgt_input_ids, src_attention_mask, teacher_forcing_ratio=0.0)
        outputs = outputs[:, 1:, :].reshape(-1, self.hparams.tgt_vocab_size)
        targets = tgt_input_ids[:, 1:].reshape(-1)

        loss = self.criterion(outputs, targets)
        self.log('test_loss', loss, on_epoch=True)

    def configure_optimizers(self):
        return torch.optim.Adam(self.parameters(), lr=self.hparams.learning_rate)

    def translate(self, src_input_ids, max_len=128):
        self.eval()
        with torch.no_grad():
            src_embedded = self.src_embedding(src_input_ids)
            encoder_outputs, (hidden, cell) = self.encoder(src_embedded)

            outputs = []
            decoder_input = torch.tensor([self.tgt_tokenizer.cls_token_id]).unsqueeze(0).to(self.device)
            for _ in range(max_len):
                decoder_embedded = self.tgt_embedding(decoder_input)

                # 수정: translate에서도 Q의 차원을 보장
                Q = self.attention_Wq(hidden[-1])  # [1, hidden_dim]
                K = self.attention_Wk(encoder_outputs)
                V = self.attention_Wv(encoder_outputs)
                context = self.scaled_dot_product_attention(Q, K, V)

                decoder_input_combined = torch.cat((decoder_embedded, context), dim=2)
                decoder_output, (hidden, cell) = self.decoder(decoder_input_combined, (hidden, cell))
                output = self.fc(decoder_output.squeeze(1))
                pred_token = output.argmax(1).item()
                if pred_token == self.tgt_tokenizer.sep_token_id:
                    break
                outputs.append(pred_token)
                decoder_input = torch.tensor([pred_token]).unsqueeze(0).to(self.device)

        return self.tgt_tokenizer.decode(outputs, skip_special_tokens=True)

In [6]:
# 4. 학습 실행
def csv_reader(file_path):
    return pd.read_csv(file_path, encoding='cp949')

def main():
    batch_size = 32
    max_epochs = 30
    file_path = 'https://drive.google.com/uc?id=1X3OhxmD6huuChSjIovKlawXUItnXK-El'  # google drive
    data_module = KoEnTranslationDataModule(file_path=file_path, batch_size=batch_size)

    sample_dataset = KoEnTranslationDataset(csv_reader(file_path).head(1))
    src_vocab_size = sample_dataset.src_tokenizer.vocab_size
    tgt_vocab_size = sample_dataset.tgt_tokenizer.vocab_size
    tgt_tokenizer = sample_dataset.tgt_tokenizer
    model = Seq2SeqTranslator(
        src_vocab_size=src_vocab_size,
        tgt_vocab_size=tgt_vocab_size,
        tgt_tokenizer=tgt_tokenizer
    )

    trainer = pl.Trainer(
        max_epochs=max_epochs,
        accelerator='gpu' if torch.cuda.is_available() else 'cpu',
        devices=1,
        log_every_n_steps=10,
        enable_progress_bar=True
    )

    trainer.fit(model, data_module)
    trainer.test(model, datamodule=data_module)

    sample = data_module.test_dataset[0]['src_input_ids'].unsqueeze(0).to(model.device)
    translated = model.translate(sample)
    print(f"Translated: {translated}")

if __name__ == "__main__":
    main()

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/49.0 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/996k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.96M [00:00<?, ?B/s]

config.json:   0%|          | 0.00/625 [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/48.0 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

config.json:   0%|          | 0.00/570 [00:00<?, ?B/s]

INFO:pytorch_lightning.utilities.rank_zero:You are using the plain ModelCheckpoint callback. Consider using LitModelCheckpoint which with seamless uploading to Model registry.
INFO:pytorch_lightning.utilities.rank_zero:GPU available: True (cuda), used: True
INFO:pytorch_lightning.utilities.rank_zero:TPU available: False, using: 0 TPU cores
INFO:pytorch_lightning.utilities.rank_zero:HPU available: False, using: 0 HPUs
INFO:pytorch_lightning.utilities.rank_zero:You are using a CUDA device ('NVIDIA A100-SXM4-40GB') that has Tensor Cores. To properly utilize them, you should set `torch.set_float32_matmul_precision('medium' | 'high')` which will trade-off precision for performance. For more details, read https://pytorch.org/docs/stable/generated/torch.set_float32_matmul_precision.html#torch.set_float32_matmul_precision
INFO:pytorch_lightning.accelerators.cuda:LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
INFO:pytorch_lightning.callbacks.model_summary:
  | Name          | Type             | Para

Sanity Checking: |          | 0/? [00:00<?, ?it/s]

Training: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

INFO:pytorch_lightning.utilities.rank_zero:`Trainer.fit` stopped: `max_epochs=30` reached.
INFO:pytorch_lightning.accelerators.cuda:LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]


Testing: |          | 0/? [00:00<?, ?it/s]

Translated: i'm going to the a lot.
