<a href="https://colab.research.google.com/github/AnggitaGayatri/Copywriting-Otomatis-Produk-Fashion-Menggunakan-T5/blob/Dyah-dev/T5-Indonesia.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Install Required Libraries

In [None]:
!pip install --quiet pytorch-lightning transformers seaborn wget nltk rouge-score

  Preparing metadata (setup.py) ... [?25l[?25hdone
  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m815.2/815.2 kB[0m [31m21.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m926.4/926.4 kB[0m [31m49.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Building wheel for wget (setup.py) ... [?25l[?25hdone
  Building wheel for rouge-score (setup.py) ... [?25l[?25hdone


# Import Libraries

In [None]:
# Import Libraries
import pytorch_lightning as pl
import pandas as pd
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
from pytorch_lightning.callbacks import ModelCheckpoint
from pytorch_lightning.loggers import TensorBoardLogger
from pytorch_lightning.callbacks import Callback
from sklearn.model_selection import train_test_split
from transformers import AdamW, T5ForConditionalGeneration, T5TokenizerFast as T5Tokenizer
from nltk.translate.bleu_score import sentence_bleu
from rouge_score import rouge_scorer
import seaborn as sns
from pylab import rcParams
import matplotlib.pyplot as plt
import warnings
warnings.filterwarnings("ignore")
import os

# Global Settings
pl.seed_everything(42)
sns.set(style='whitegrid', palette='muted', font_scale=1.2)
rcParams['figure.figsize'] = 16, 6

INFO:lightning_fabric.utilities.seed:Seed set to 42


#  Load Dataset

In [None]:
# Load Dataset
def load_and_preprocess_data(filepath):
    """Load and preprocess the dataset."""
    df = pd.read_csv(filepath)
    df['deskripsi'] = df['deskripsi'].astype(str)
    df['keywords'] = df['keywords'].astype(str)
    train_df, test_df = train_test_split(df, test_size=0.1)

    # Save train and test data for download
    train_df.to_csv("train_data.csv", index=False)
    test_df.to_csv("test_data.csv", index=False)

    return train_df, test_df

# Initialize Tokenizer

In [None]:
# Initialize Tokenizer
MODEL_NAME = "t5-base"
tokenizer = T5Tokenizer.from_pretrained(MODEL_NAME)

spiece.model:   0%|          | 0.00/792k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.39M [00:00<?, ?B/s]

config.json:   0%|          | 0.00/1.21k [00:00<?, ?B/s]

# Dataset Class

In [None]:
# Dataset Class
class CopywritingDataset(Dataset):
    """Custom Dataset for T5 Copywriting."""
    def __init__(self, data, tokenizer, text_max_token_len=128, copywriting_max_token_len=512):
        self.data = data
        self.tokenizer = tokenizer
        self.text_max_token_len = text_max_token_len
        self.copywriting_max_token_len = copywriting_max_token_len

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        data_row = self.data.iloc[index]
        text, copywriting = data_row["keywords"], data_row["deskripsi"]

        text_encoding = self.tokenizer(
            text, max_length=self.text_max_token_len, padding="max_length",
            truncation=True, return_attention_mask=True, return_tensors="pt"
        )
        copywriting_encoding = self.tokenizer(
            copywriting, max_length=self.copywriting_max_token_len, padding="max_length",
            truncation=True, return_attention_mask=True, return_tensors="pt"
        )
        labels = copywriting_encoding["input_ids"]
        labels[labels == 0] = -100  # Ignore padding token

        return {
            "text": text,
            "copywriting": copywriting,
            "text_input_ids": text_encoding["input_ids"].flatten(),
            "text_attention_mask": text_encoding["attention_mask"].flatten(),
            "labels": labels.flatten(),
            "labels_attention_mask": copywriting_encoding["attention_mask"].flatten()
        }


# Data Module

In [None]:
# DataModule
class CopywritingDataModule(pl.LightningDataModule):
    """DataModule for managing DataLoader."""
    def __init__(self, train_df, test_df, tokenizer, batch_size=4, text_max_token_len=128, copywriting_max_token_len=512):
        super().__init__()
        self.train_df = train_df
        self.test_df = test_df
        self.tokenizer = tokenizer
        self.batch_size = batch_size
        self.text_max_token_len = text_max_token_len
        self.copywriting_max_token_len = copywriting_max_token_len

    def setup(self, stage=None):
        self.train_dataset = CopywritingDataset(self.train_df, self.tokenizer, self.text_max_token_len, self.copywriting_max_token_len)
        self.test_dataset = CopywritingDataset(self.test_df, self.tokenizer, self.text_max_token_len, self.copywriting_max_token_len)

    def train_dataloader(self):
        return DataLoader(self.train_dataset, batch_size=self.batch_size, shuffle=True, num_workers=0)

    def val_dataloader(self):
        return DataLoader(self.test_dataset, batch_size=self.batch_size, shuffle=False, num_workers=0)

# Model

In [None]:
# Model
class CopywritingModel(pl.LightningModule):
    """T5 Model for Copywriting Generation."""
    def __init__(self):
        super().__init__()
        self.model = T5ForConditionalGeneration.from_pretrained(MODEL_NAME)

    def forward(self, input_ids, attention_mask, decoder_attention_mask, labels=None):
        return self.model(input_ids, attention_mask=attention_mask, labels=labels, decoder_attention_mask=decoder_attention_mask)

    def training_step(self, batch, batch_idx):
        loss = self._shared_step(batch)
        self.log("train_loss", loss, prog_bar=True)
        return loss

    def validation_step(self, batch, batch_idx):
        loss = self._shared_step(batch)
        self.log("val_loss", loss, prog_bar=True)
        return loss

    def configure_optimizers(self):
        return AdamW(self.parameters(), lr=1e-4)

    def _shared_step(self, batch):
        output = self(
            input_ids=batch["text_input_ids"],
            attention_mask=batch["text_attention_mask"],
            decoder_attention_mask=batch["labels_attention_mask"],
            labels=batch["labels"]
        )
        return output.loss

In [None]:
class LogEpochMetricsCallback(Callback):
    def __init__(self):
        self.epoch_logs = []

    def on_train_epoch_end(self, trainer, pl_module):
        train_loss = trainer.callback_metrics.get("train_loss", None)
        val_loss = trainer.callback_metrics.get("val_loss", None)

        log = f"Epoch {trainer.current_epoch + 1}: Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}"
        print(log)
        self.epoch_logs.append(log)

        # Optionally save to a file
        with open("epoch_metrics_log.txt", "a") as f:
            f.write(log + "\n")

# Training Loop

In [None]:
# Training Loop
if __name__ == "__main__":
    FILEPATH = "data_matahari_tfidf.csv"
    train_df, test_df = load_and_preprocess_data(FILEPATH)

    # Download train and test data files
    from google.colab import files
    files.download("train_data.csv")
    files.download("test_data.csv")

    # Initialize data module and model
    data_module = CopywritingDataModule(
        train_df, test_df, tokenizer, batch_size=4
    )

    model = CopywritingModel()

    # ModelCheckpoint callback to save the best model
    checkpoint_callback = ModelCheckpoint(
        dirpath="checkpoints", filename="best-checkpoint", save_top_k=1, verbose=True, monitor="val_loss", mode="min"
    )

    # TensorBoardLogger for logging
    logger = TensorBoardLogger("lightning_logs", name="copywriting-t5")

    # Initialize the LogEpochMetricsCallback
    callback_metrics_logger = LogEpochMetricsCallback()

    # Initialize the Trainer
    trainer = pl.Trainer(
        logger=logger,
        callbacks=[checkpoint_callback, callback_metrics_logger],  # Add both callbacks here
        max_epochs=5,
        accelerator="gpu" if torch.cuda.is_available() else "cpu",
        devices=1 if torch.cuda.is_available() else None
    )

    # Train the model
    trainer.fit(model, data_module)

    # Save training logs
    with open("training_logs.txt", "w") as f:
        for epoch in range(trainer.current_epoch):  # Iterate over all epochs
            # Ensure that logged metrics are available
            train_loss = trainer.logged_metrics.get("train_loss", None)
            val_loss = trainer.logged_metrics.get("val_loss", None)

            # Convert tensor to scalar if it's a tensor
            if train_loss is not None:
                train_loss = train_loss.item()  # Convert tensor to scalar
            if val_loss is not None:
                val_loss = val_loss.item()  # Convert tensor to scalar

            # Write to file
            f.write(f"Epoch {epoch + 1}: Train Loss: {train_loss}, Validation Loss: {val_loss}\n")

    # Download the logs
    files.download("training_logs.txt")

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

model.safetensors:   0%|          | 0.00/892M [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/147 [00:00<?, ?B/s]

INFO:pytorch_lightning.utilities.rank_zero:GPU available: True (cuda), used: True
INFO:pytorch_lightning.utilities.rank_zero:TPU available: False, using: 0 TPU cores
INFO:pytorch_lightning.utilities.rank_zero:HPU available: False, using: 0 HPUs
INFO:pytorch_lightning.accelerators.cuda:LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
INFO:pytorch_lightning.callbacks.model_summary:
  | Name  | Type                       | Params | Mode
------------------------------------------------------------
0 | model | T5ForConditionalGeneration | 222 M  | eval
------------------------------------------------------------
222 M     Trainable params
0         Non-trainable params
222 M     Total params
891.614   Total estimated model params size (MB)
0         Modules in train mode
541       Modules in eval mode


Sanity Checking: |          | 0/? [00:00<?, ?it/s]

Passing a tuple of `past_key_values` is deprecated and will be removed in Transformers v4.48.0. You should pass an instance of `EncoderDecoderCache` instead, e.g. `past_key_values=EncoderDecoderCache.from_legacy_cache(past_key_values)`.


Training: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

INFO:pytorch_lightning.utilities.rank_zero:Epoch 0, global step 1424: 'val_loss' reached 0.30267 (best 0.30267), saving model to '/content/checkpoints/best-checkpoint.ckpt' as top 1


Epoch 1: Train Loss: 0.3743, Val Loss: 0.3027


Validation: |          | 0/? [00:00<?, ?it/s]

INFO:pytorch_lightning.utilities.rank_zero:Epoch 1, global step 2848: 'val_loss' reached 0.20866 (best 0.20866), saving model to '/content/checkpoints/best-checkpoint.ckpt' as top 1


Epoch 2: Train Loss: 0.0792, Val Loss: 0.2087


Validation: |          | 0/? [00:00<?, ?it/s]

INFO:pytorch_lightning.utilities.rank_zero:Epoch 2, global step 4272: 'val_loss' reached 0.17332 (best 0.17332), saving model to '/content/checkpoints/best-checkpoint.ckpt' as top 1


Epoch 3: Train Loss: 0.0049, Val Loss: 0.1733


Validation: |          | 0/? [00:00<?, ?it/s]

INFO:pytorch_lightning.utilities.rank_zero:Epoch 3, global step 5696: 'val_loss' reached 0.16540 (best 0.16540), saving model to '/content/checkpoints/best-checkpoint.ckpt' as top 1


Epoch 4: Train Loss: 0.0083, Val Loss: 0.1654


Validation: |          | 0/? [00:00<?, ?it/s]

INFO:pytorch_lightning.utilities.rank_zero:Epoch 4, global step 7120: 'val_loss' reached 0.16284 (best 0.16284), saving model to '/content/checkpoints/best-checkpoint.ckpt' as top 1


Epoch 5: Train Loss: 0.0062, Val Loss: 0.1628


INFO:pytorch_lightning.utilities.rank_zero:`Trainer.fit` stopped: `max_epochs=5` reached.


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

# Load Best Model

In [None]:
# Load Best Model
trained_model = CopywritingModel.load_from_checkpoint(checkpoint_callback.best_model_path)
trained_model.freeze()

# Save the trained model and tokenizer
trained_model.model.save_pretrained("t5_copywriting_model")
tokenizer.save_pretrained("t5_copywriting_model")

('t5_copywriting_model/tokenizer_config.json',
 't5_copywriting_model/special_tokens_map.json',
 't5_copywriting_model/spiece.model',
 't5_copywriting_model/added_tokens.json',
 't5_copywriting_model/tokenizer.json')

# Evaluation Metrics

In [None]:
# Evaluation Metrics
def evaluate_model(test_df, trained_model, tokenizer):
    bleu_scores = []
    rouge_scores = {"rouge1": [], "rouge2": [], "rougeL": []}
    scorer = rouge_scorer.RougeScorer(["rouge1", "rouge2", "rougeL"], use_stemmer=True)

    predictions = []
    for _, row in test_df.iterrows():
        text = row["keywords"]
        reference = row["deskripsi"]

        # Tokenize input text
        text_encoding = tokenizer(
            text, max_length=128, padding="max_length", truncation=True, return_tensors="pt"
        )

        # Move tensors to the model's device
        device = next(trained_model.model.parameters()).device
        input_ids = text_encoding["input_ids"].to(device)
        attention_mask = text_encoding["attention_mask"].to(device)

        # Generate predictions
        generated_ids = trained_model.model.generate(
            input_ids=input_ids,
            attention_mask=attention_mask,
            max_length=150,
            num_beams=3,
            temperature=1.0,
            top_k=40,
            top_p=0.85,
            repetition_penalty=1.2,
            early_stopping=True
        )

        # Decode the generated text
        generated_copywriting = tokenizer.decode(generated_ids[0], skip_special_tokens=True, clean_up_tokenization_spaces=True)
        predictions.append(generated_copywriting)

        # Calculate BLEU Score
        bleu_scores.append(sentence_bleu([reference.split()], generated_copywriting.split()))

        # Calculate ROUGE Scores
        rouge_result = scorer.score(reference, generated_copywriting)
        for key in rouge_scores:
            rouge_scores[key].append(rouge_result[key].fmeasure)

    # Calculate average scores
    avg_bleu = np.mean(bleu_scores)
    avg_rouge = {key: np.mean(values) for key, values in rouge_scores.items()}

    return predictions, avg_bleu, avg_rouge

# Evaluate the model
predictions, avg_bleu, avg_rouge = evaluate_model(test_df, trained_model, tokenizer)

# Display results
print("Average BLEU Score:", avg_bleu)
print("Average ROUGE Scores:", avg_rouge)

Average BLEU Score: 0.7057614553131678
Average ROUGE Scores: {'rouge1': 0.8354144583768445, 'rouge2': 0.7574962142693866, 'rougeL': 0.811511276509577}


# Save Predictions

In [None]:
# Save predictions to CSV
predictions_df = pd.DataFrame({
    'Keywords': test_df['keywords'],
    'Reference': test_df['deskripsi'],
    'Generated Copywriting': predictions
})
predictions_df.to_csv('predictions.csv', index=False)
print("Predictions saved to predictions.csv")


Predictions saved to predictions.csv


# Prediction

In [None]:
# Prediction Function
def copywriting(text):
    text_encoding = tokenizer(
        text,
        max_length=512,
        padding="max_length",
        truncation=True,
        return_tensors="pt"
    )

    # Move tensors to model's device
    device = next(trained_model.model.parameters()).device
    text_encoding = {k: v.to(device) for k, v in text_encoding.items()}

    # Generate prediction
    generated_ids = trained_model.model.generate(
        input_ids=text_encoding["input_ids"],
        attention_mask=text_encoding["attention_mask"],
        max_length=150,
        num_beams=3,
        temperature=1.0,
        top_k=40,
        top_p=0.85,
        repetition_penalty=1.2,
        early_stopping=True
    )

    # Decode the generated text
    return tokenizer.decode(generated_ids[0], skip_special_tokens=True, clean_up_tokenization_spaces=True)

In [None]:
# Example prediction
example_text = "baju, koko, katun, nyaman, pria"
print("Generated Copywriting:", copywriting(example_text))

Generated Copywriting: bahan katun baju koko pria lengan pendek dilengkapi kancing pada bagian depan motif baju koko nyaman saat digunakan fashionable baju koko pria dengan design yang casual dan timeless, cocok untuk dijadikan outfit keseharian saat beraktifitas. bahan yang berkualitas akan nyaman saat digunakan sepanjang hari.
