# Jokes Generation with Text-To-Text Transfer Transformer 
Author: Agnieszka Mikołajczyk

> *What's the best part of a pregnancy joke? The delivery.*

~ Kaggle joke dataset

Let's train the model to tell jokes!

![](https://i.pinimg.com/originals/a5/cd/55/a5cd552a3aff2fc86fb99815bf970580.jpg)

# T5: Text-To-Text Transfer Transformer

T5 is an encoder-decoder model pre-trained on a multi-task mixture of unsupervised and supervised tasks and for which each task is converted into a text-to-text format. T5 works well on a variety of tasks out-of-the-box by prepending a different prefix to the input corresponding to each task, e.g., for translation: translate English to German: …, for summarization: summarize: 

T5 uses relative scalar embeddings. Encoder input padding can be done on the left and on the right.

![](https://miro.medium.com/max/1400/1*oPH8tAGqu3aUp6qjMtqcHg.png)

More: https://huggingface.co/docs/transformers/model_doc/t5


In [None]:
!pip install torch
!pip install transformers
!pip install pytorch_lightning
!pip install SentencePiece

Download kaggle joke dataset: https://www.kaggle.com/datasets/abhinavmoudgil95/short-jokes/download

Upload it below.

In [None]:
from google.colab import files
uploaded = files.upload()

In [None]:
!unzip -q "archive.zip" #uzipping dataset

In [None]:
!head shortjokes.csv


In [None]:

from torch.utils.data import Dataset
from tqdm import tqdm
import csv

class JokesDataset(Dataset):
    def __init__(self, data_path, max_len=1000, append_prefix="Generate joke: "):
        self.append_prefix = append_prefix

        self.samples = list()
        with open(data_path, "r", encoding="utf-8") as f:
            csv_reader = csv.reader(f)
            header = next(csv_reader)
            for line in tqdm(csv_reader):
                self.samples.append(line)
                if len(self.samples) > max_len:
                    break

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        sample = {}
        sample["input"] = (
            self.append_prefix
            + " ".join((str(self.samples[idx][1]).split(" "))[0:4])
        )
        sample["target"] = self.samples[idx][1]
        return sample


In [None]:
import os
from typing import Tuple, List
from torch.utils.data import DataLoader, random_split
import torch

class DataloaderCreator:
    """
    DataloaderCreator creates a dataset and split it into train and val subsets.
    """

    def __init__(self, data_path, ratio, batch_size, workers):
        self.data_path = data_path
        self.ratio = ratio
        self.batch_size = batch_size
        self.workers = workers

    def _get_split_length(
        self, dataset: torch.utils.data.ConcatDataset
    ) -> Tuple[int, int]:
        train_val_ratio = self.ratio
        train_len = round(len(dataset) * train_val_ratio)
        val_len = len(dataset) - train_len
        return train_len, val_len

    def get_dataloaders(self):
        train = JokesDataset(self.data_path)

        train_len, val_len = self._get_split_length(train)

        train, val = random_split(
            train, [train_len, val_len], generator=torch.Generator().manual_seed(0)
        )

        dataloader_train = DataLoader(
            train,
            shuffle=True,
            batch_size=self.batch_size,
            num_workers=self.workers,
            drop_last=False,
        )

        dataloader_val = DataLoader(
            val,
            shuffle=False,
            batch_size=self.batch_size,
            num_workers=self.workers,
            drop_last=False,
        )
        return dataloader_train, dataloader_val


Now we will seed everything so the results are reproducible

In [None]:
import random
import numpy as np
import torch
import os
os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
os.environ["CUDA_VISIBLE_DEVICES"] = "0"
os.environ["TOKENIZERS_PARALLELISM"] = "true"

def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
seed_everything(2021)

## Loading data
Now, we will preprocess and load training set

In [None]:
loader = DataloaderCreator(
        "shortjokes.csv",
        ratio=0.9,
        batch_size=8,
        workers=2,
    )
dataloader_train, dataloader_val = loader.get_dataloaders()



In [None]:
for batch in dataloader_train:
  print(batch['input'])
  print(batch['target'])
  break

## Define the model
We define T5ForConditionalGeneration from Transformers.

In [None]:
import torch
from transformers import T5Tokenizer, T5ForConditionalGeneration
import pytorch_lightning as pl
from torch.optim.lr_scheduler import MultiplicativeLR


class JokeT5(pl.LightningModule):
    """JokeT5 Model for jokes gneration"""

    def __init__(
        self,
        lr=1e-5,
        multiply_lr_step=0.9,
        warmup_steps=100.0,
        model_path="t5-small",
        model_save_dir="joke-t5.pkl",
        model_load_dir=None,
    ):
        super().__init__()

        self.lr = lr
        self.model_save_dir = model_save_dir
        self.model = T5ForConditionalGeneration.from_pretrained(model_path)
        self.tokenizer = T5Tokenizer.from_pretrained(model_path)
        self.warmup_steps = warmup_steps
        self.multiply_lr_step = multiply_lr_step


    def forward(self, input_sequences, output_sequences, **kwargs):
        input_sequences = [sequence for sequence in input_sequences]
        input_tokens = self.tokenizer(
            input_sequences,
            padding=True,
            truncation=False,
            return_tensors="pt",
        )
        input_ids = input_tokens.input_ids
        attention_mask = input_tokens.attention_mask

        target_encoding = self.tokenizer(
            output_sequences,
            padding=True,
            truncation=True,
        )
        labels = target_encoding.input_ids

        # replace padding token id's of the labels by -100
        labels = labels = [
            [
                (label if label != self.tokenizer.pad_token_id else -100)
                for label in labels_example
            ]
            for labels_example in labels
        ]
        labels = torch.tensor(labels)

        loss = self.model(
            input_ids=input_ids.to(self.device),
            attention_mask=attention_mask.to(self.device),
            labels=labels.to(self.device),
        ).loss
        return loss

    def training_step(self, batch, batch_idx):
        input_sequences, output_sequences = batch["input"], batch["target"]
        loss = self(input_sequences, output_sequences)
        self.log("loss", loss, batch_size=1)
        return {"loss": loss}

    def training_epoch_end(self, outputs):
        if self.trainer.global_step > 0:
            print("Saving model...")
            torch.save(self.model.state_dict(), self.model_save_dir)

    def validation_step(self, batch, batch_idx):
        input_sequences, output_sequences = batch["input"], batch["target"]
        loss = self(input_sequences, output_sequences)
        self.log("validation_loss", loss, batch_size=1)

    def validation_epoch_end(self, out):
        if self.trainer.global_step > 0:
            print("Saving model...")
            torch.save(self.model.state_dict(), self.model_save_dir)

    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), lr=self.lr)

        def lambd(epoch):
            return self.multiply_lr_step

        scheduler = MultiplicativeLR(optimizer, lr_lambda=lambd)
        return [optimizer], [scheduler]

    def optimizer_step(
        self,
        epoch,
        batch_idx,
        optimizer,
        optimizer_idx,
        optimizer_closure,
        on_tpu=False,
        using_native_amp=False,
        using_lbfgs=False,
    ):
        if self.trainer.global_step < self.warmup_steps:
            lr_scale = min(1.0, float(self.trainer.global_step + 1) / self.warmup_steps)
            for pg in optimizer.param_groups:
                pg["lr"] = lr_scale * self.lr

        optimizer.step(closure=optimizer_closure)


## Training

In [None]:
model = JokeT5()

trainer = pl.Trainer(
    max_epochs=5,
    gpus=[0],
    progress_bar_refresh_rate=50,
    accumulate_grad_batches=4,
)

trainer = pl.Trainer(max_epochs=10,log_every_n_steps=10)

In [None]:
trainer.fit(model,dataloader_train, dataloader_val)