<a href="https://colab.research.google.com/github/Tiabet/BaekJoon/blob/main/KoBART_Baseline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
%%capture
import locale
locale.getpreferredencoding = lambda: "UTF-8"

!pip install transformers
!pip install accelerate
!pip install datasets
!pip install evaluate
!pip install rouge
!pip install konlpy
!pip install pytorch-lightning

In [None]:
import json
import pandas as pd

import logging
import sys

from datasets import Dataset
from torch.utils.data import DataLoader
from tokenizers.processors import TemplateProcessing
import pytorch_lightning as pl
from pytorch_lightning.callbacks import LearningRateMonitor
from pytorch_lightning.loggers import TensorBoardLogger, WandbLogger
import torch
import torch.nn.functional as F
import torchmetrics
from torch.optim.lr_scheduler import CyclicLR
from transformers import BartForConditionalGeneration, AutoTokenizer

DATA

In [None]:
def jsonlload(fname):
    with open(fname, "r", encoding="utf-8") as f:
        lines = f.read().strip().split("\n")
        j_list = [json.loads(line) for line in lines]

    return j_list


def jsonldump(j_list, fname):
    with open(fname, "w", encoding='utf-8') as f:
        for json_data in j_list:
            f.write(json.dumps(json_data, ensure_ascii=False)+'\n')

In [None]:
def StoryDataLoader(fname, tokenizer, batch_size, max_length, mode="train"):
    """
    Build Data Loader

    """

    dataset = Dataset.from_json(fname, mode)

    if not tokenizer.cls_token:
        tokenizer.cls_token = tokenizer.bos_token
    if not tokenizer.sep_token:
        tokenizer.sep_token = tokenizer.eos_token

    tokenizer._tokenizer.post_processor = TemplateProcessing(
        single=f"{tokenizer.cls_token} $0 {tokenizer.sep_token}",
        pair=f"{tokenizer.cls_token} $A {tokenizer.sep_token} $B:1 {tokenizer.sep_token}:1",
        special_tokens=[(tokenizer.cls_token, tokenizer.cls_token_id), (tokenizer.sep_token, tokenizer.sep_token_id)],
    )

    def preprocess_function(examples):
        processed = {}
        tokenizer_input = tokenizer(
            examples["input"]["sentence1"],
            examples["input"]["sentence3"],
            padding="max_length",
            max_length=max_length,
            truncation=True
        )
        processed["input_ids"] = tokenizer_input["input_ids"],
        processed["attention_mask"] = tokenizer_input["attention_mask"]

        if mode == "train":
            tokenizer_output = tokenizer(
                examples["output"],
                padding="max_length",
                max_length=max_length,
                truncation=True
            )
            processed["decoder_input_ids"] = tokenizer_output["input_ids"]
            processed["decoder_attention_mask"] = tokenizer_output["attention_mask"]

        return processed

    dataset = dataset.map(
        preprocess_function,
        remove_columns=dataset.column_names
    ).with_format("torch")
    dataloader = DataLoader(dataset, shuffle=(True if mode=="train" else False), batch_size=batch_size)

    return dataloader


MODULE

UTILS

In [None]:
def get_logger(name: str) -> logging.Logger:
    """Return logger for logging

    Args:
        name: logger name
    """
    logger = logging.getLogger(name)
    logger.propagate = False
    logger.setLevel(logging.DEBUG)
    if not logger.handlers:
        handler = logging.StreamHandler(sys.stdout)
        handler.setFormatter(logging.Formatter("[%(asctime)s] %(message)s"))
        logger.addHandler(handler)
    return logger


Train

In [None]:
import os
import pytorch_lightning as pl
from pytorch_lightning.callbacks import LearningRateMonitor
from pytorch_lightning.loggers import TensorBoardLogger, WandbLogger
from transformers import BartForConditionalGeneration, AutoTokenizer


# Define your configuration parameters here
output_dir = "/content/drive/MyDrive"
model_path = "gogamza/kobart-base-v2"
tokenizer_path = "gogamza/kobart-base-v2"
batch_size = 4
valid_batch_size = 4
max_seq_len = 512
accumulate_grad_batches = 1
epochs = 10
max_learning_rate = 2e-4
min_learning_rate = 1e-5
warmup_rate = 0.1
gpus = 1  # Set this to 0 if you want to run on CPU
logging_interval = 100
evaluate_interval = 500
seed = 42


tokenizer = AutoTokenizer.from_pretrained(tokenizer_path)
# Define your BART model and its configuration
model = BartForConditionalGeneration.from_pretrained(model_path)
model.config.max_length = max_seq_len


train_dataloader = StoryDataLoader("/content/drive/MyDrive/nikluge-sc-2023-test.jsonl", tokenizer, batch_size, max_seq_len)
valid_dataloader = StoryDataLoader("/content/drive/MyDrive/nikluge-sc-2023-dev.jsonl", tokenizer, valid_batch_size, max_seq_len)
total_steps = len(train_dataloader) * epochs


INFO:lightning_fabric.utilities.seed:Global seed set to 42


[+] Save output to "/content/drive/MyDrive"
[+] Set Random Seed to 42
[+] GPU: 1
[+] Load Tokenizer"


You passed along `num_labels=3` with an incompatible id to label map: {'0': 'NEGATIVE', '1': 'POSITIVE'}. The number of labels wil be overwritten to 2.
You passed along `num_labels=3` with an incompatible id to label map: {'0': 'NEGATIVE', '1': 'POSITIVE'}. The number of labels wil be overwritten to 2.
Using cls_token, but it is not set yet.
Using sep_token, but it is not set yet.


[+] Load Dataset


In [None]:
class BARTFinetuner(pl.LightningModule):
    def __init__(self, model, tokenizer, learning_rate, warmup_steps):
        super().__init__()
        self.model = model
        self.tokenizer = tokenizer
        self.learning_rate = learning_rate
        self.warmup_steps = warmup_steps

    def forward(self, input_ids, attention_mask, decoder_input_ids, decoder_attention_mask):
        return self.model(
            input_ids=input_ids,
            attention_mask=attention_mask,
            decoder_input_ids=decoder_input_ids,
            decoder_attention_mask=decoder_attention_mask,
            labels=decoder_input_ids
        )

    def training_step(self, batch, batch_idx):
        input_ids = batch["input_ids"]
        attention_mask = batch["attention_mask"]
        decoder_input_ids = batch["decoder_input_ids"]
        decoder_attention_mask = batch["decoder_attention_mask"]

        outputs = self(
            input_ids=input_ids,
            attention_mask=attention_mask,
            decoder_input_ids=decoder_input_ids,
            decoder_attention_mask=decoder_attention_mask
        )

        loss = outputs.loss
        return loss

    def configure_optimizers(self):
        optimizer = torch.optim.AdamW(self.parameters(), lr=self.learning_rate)
        scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=self.warmup_steps, gamma=0.1)
        return [optimizer], [scheduler]

# Create the Lightning module
bart_finetuner = BARTFinetuner(model, tokenizer, learning_rate=max_learning_rate, warmup_steps=total_steps * warmup_rate)


In [None]:
# Define callbacks
lr_monitor = LearningRateMonitor(logging_interval='step')
tensorboard_logger = TensorBoardLogger(output_dir, name='logs')
wandb_logger = WandbLogger(name='wandb_logs', save_dir=output_dir)

# Initialize the Trainer
trainer = pl.Trainer(
    gpus=gpus,
    max_epochs=epochs,
    accumulate_grad_batches=accumulate_grad_batches,
    logger=[tensorboard_logger, wandb_logger],  # You can choose one or both
    callbacks=[lr_monitor],
    log_every_n_steps=logging_interval,
    val_check_interval=evaluate_interval,
)

# Train the model
trainer.fit(bart_finetuner, train_dataloader, valid_dataloader)


In [None]:
# Save the trained model
model.save_pretrained(output_dir)