# Poem Summarization Code

In [None]:
!pip show pytorch-lightning

In [None]:
!pip install --quiet transformers
!pip install --quiet pytorch-lightning

In [None]:
import seaborn as sns
from transformers import AdamW, T5ForConditionalGeneration, T5TokenizerFast as T5Tokenizer
import pytorch_lightning as pl
import torch
from torch.utils.data import Dataset, DataLoader
from pytorch_lightning.callbacks import ModelCheckpoint
from pytorch_lightning.loggers import TensorBoardLogger
import textwrap
from pathlib import Path

import seaborn as sns
from tqdm.auto import tqdm
from pylab import rcParams
import matplotlib.pyplot as plt
from matplotlib import rc
import pandas as pd
from sklearn.model_selection import train_test_split

In [None]:
train_df = pd.read_csv('',encoding='utf8') #Enter location of the file poemsum_train.csv in the quotes
valid_df = pd.read_csv('',encoding='utf8') #Enter location of the file poemsum_valid.csv in the quotes
test_df = pd.read_csv('',encoding='utf8')  #Enter location of the file poemsum_test.csv in the quotes

In [None]:
import re
import string
def clean_text(text):
    # Remove extra white spaces and new lines
    text = str(text)
    text = re.sub('\s+', ' ', text)
    text = re.sub('\n', ' ', text)
    # Remove non-alphanumeric and non-punctuation characters
    text = re.sub('[^a-zA-Z0-9\s{}]+'.format(re.escape(string.punctuation)), '', text)
    # Strip leading/trailing white spaces
    text = text.strip()
    return text

In [None]:
train_df['cleaned_text'] = train_df['ctext'].apply(clean_text)
train_df['text'] = train_df['text'].apply(clean_text)
train_df['Title'] = train_df['Title'].apply(clean_text)

valid_df['cleaned_text'] = valid_df['ctext'].apply(clean_text)
valid_df['text'] = valid_df['text'].apply(clean_text)
valid_df['Title'] = valid_df['Title'].apply(clean_text)

test_df['cleaned_text'] = test_df['ctext'].apply(clean_text)
test_df['text'] = test_df['text'].apply(clean_text)
test_df['Title'] = test_df['Title'].apply(clean_text)

In [None]:
def get_all(title,ctext):
    return title + " - " + ctext

train_df['poem'] = train_df.apply(lambda x: get_all(x.Title, x.cleaned_text), axis=1)
valid_df['poem'] = valid_df.apply(lambda x: get_all(x.Title, x.cleaned_text), axis=1)
test_df['poem'] = test_df.apply(lambda x: get_all(x.Title, x.cleaned_text), axis=1)

In [None]:
train_df['cleaned_text'].iloc[0]

In [None]:
train_df['poem'].iloc[0]

In [None]:
train_df = train_df[['text','poem']]
train_df.columns=["summary", "text"]
train_df=train_df.dropna()

valid_df = valid_df[['text','poem']]
valid_df.columns=["summary", "text"]
valid_df=valid_df.dropna()

test_df = test_df[['text','poem']]
test_df.columns=["summary", "text"]
test_df=test_df.dropna()

In [None]:
class NewsSummaryDataset(Dataset):
    def __init__(
        self,
        data: pd.DataFrame,
        tokenizer: T5Tokenizer,
        text_max_token_len: int = 512,
        summary_max_token_len: int = 256):

        self.tokenizer = tokenizer
        self.data = data
        self.text_max_token_len = text_max_token_len
        self.summary_max_token_len = summary_max_token_len
    def __len__(self):
        return len(self.data)

    def __getitem__(self, index:int):
        data_row = self.data.iloc[index]
        text = data_row["text"]

        text_encoding = tokenizer(data_row["text"],max_length=self.text_max_token_len,
                                 padding="max_length",
                                 truncation=True,
                                 return_attention_mask=True,
                                 add_special_tokens=True,
                                 return_tensors="pt")

        summary = data_row["summary"]
        summary_encoding = tokenizer(summary,max_length=self.summary_max_token_len,
                                 padding="max_length",
                                 truncation=True,
                                 return_attention_mask=True,
                                 add_special_tokens=True,
                                 return_tensors="pt")

        labels= summary_encoding["input_ids"]
        labels[labels == 0] = -100

        return dict(
            text=text,
            summary=summary,
            text_input_ids=text_encoding["input_ids"].flatten(),
            text_attention_mask=text_encoding["attention_mask"].flatten(),
            labels=labels.flatten(),
            labels_attention_mask=summary_encoding["attention_mask"].flatten())


In [None]:
class NewsSummaryDataModule(pl.LightningDataModule):
    def __init__(self,
                train_df:pd.DataFrame,
                test_df:pd.DataFrame,
                tokenizer:T5Tokenizer,
                batch_size: int = 8,
                text_max_token_len: int = 512,
                summary_max_token_len: int = 256):
        super().__init__()
        self.train_df=train_df
        self.test_df=test_df

        self.batch_size=batch_size
        self.tokenizer=tokenizer
        self.text_max_token_len=text_max_token_len
        self.summary_max_token_len= summary_max_token_len

    def setup(self, stage=None):
        self.train_dataset =  NewsSummaryDataset(
            self.train_df,
            self.tokenizer,
            self.text_max_token_len,
            self.summary_max_token_len
        )
        self.test_dataset =  NewsSummaryDataset(
            self.test_df,
            self.tokenizer,
            self.text_max_token_len,
            self.summary_max_token_len
        )

    def train_dataloader(self):
        return DataLoader(
            self.train_dataset,
            batch_size=self.batch_size,
            shuffle=True,
            num_workers=2)

    def val_dataloader(self):
        return DataLoader(
            self.test_dataset,
            batch_size=self.batch_size,
            shuffle=False,
            num_workers=2)

    def test_dataloader(self):
        return DataLoader(
            self.test_dataset,
            batch_size=self.batch_size,
            shuffle=False,
            num_workers=2)



In [None]:
from transformers import AdamW, T5ForConditionalGeneration, T5TokenizerFast as T5Tokenizer
from transformers import BartTokenizerFast as BartTokenizer, BartForConditionalGeneration
from transformers import ProphetNetForConditionalGeneration, ProphetNetTokenizer
from transformers import PegasusForConditionalGeneration, PegasusTokenizerFast as PegasusTokenizer
from transformers import AutoTokenizer

MODEL_NAME = 't5-base'

tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)

In [None]:
N_EPOCHS = 3
BATCH_SIZE = 10

data_module = NewsSummaryDataModule(train_df,valid_df,tokenizer,batch_size=BATCH_SIZE)

# Model

In [None]:
class NewsSummaryModel(pl.LightningModule):
    def __init__(self):
        super().__init__()
        self.model = T5ForConditionalGeneration.from_pretrained(MODEL_NAME, return_dict=True)

    def forward(self, inputs_ids, attention_mask, decoder_attention_mask, labels=None):
        output = self.model(inputs_ids,
                            attention_mask=attention_mask,
                            labels=labels,
                            decoder_attention_mask=decoder_attention_mask)
        return output.loss, output.logits

    def step(self, batch, batch_idx):
        input_ids=batch["text_input_ids"]
        attention_mask = batch["text_attention_mask"]
        labels=batch["labels"]
        labels_attention_mask = batch["labels_attention_mask"]

        loss, outputs = self.forward(inputs_ids=input_ids,
                             attention_mask=attention_mask,
                             decoder_attention_mask=labels_attention_mask,
                             labels=labels)
        return loss, outputs

    def training_step(self, batch, batch_idx):
        loss, outputs = self.step(batch, batch_idx)

        self.log("train_loss",loss, prog_bar=True, logger=True)
        return loss

    def validation_step(self, batch, batch_idx):
        loss, outputs = self.step(batch, batch_idx)
        self.log("val_loss",loss, prog_bar=True, logger=True)
        return loss

    def test_step(self, batch, batch_idx):
        loss, outputs = self.step(batch, batch_idx)
        self.log("test_loss",loss, prog_bar=True, logger=True)
        return loss

    def configure_optimizers(self):
        return AdamW(self.parameters(), lr=0.0001)


In [None]:
model = NewsSummaryModel()

In [None]:
checkpoint_callback = ModelCheckpoint(
                        dirpath="checkpoints",
                        filename="best-checkpoint",
                        save_top_k=1,
                        verbose=True,
                        monitor="val_loss",
                        mode="min")
logger = TensorBoardLogger("lightning_logs", name="news-summary")

from pytorch_lightning.callbacks.progress import ProgressBar
class LitProgressBar(ProgressBar):

    def init_validation_tqdm(self):
        bar = super().init_validation_tqdm()
        bar.set_description('running validation ...')
        bar.refresh_rate=30
        return bar

bar = LitProgressBar()

trainer = pl.Trainer(logger=logger,
                    enable_checkpointing=checkpoint_callback,
                    max_epochs=N_EPOCHS,
                    gpus=1,
                    progress_bar_refresh_rate=30)

In [None]:
trainer.fit(model,data_module)

In [None]:
trained_model = NewsSummaryModel.load_from_checkpoint(
    trainer.checkpoint_callback.best_model_path
    )

In [None]:
import torch

# Check if CUDA is available
if torch.cuda.is_available():
    print("CUDA is available")
else:
    print("CUDA is not available")

In [None]:
def summarize(text):
    # Check if CUDA is available
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    # Move model and data to CUDA device if available
    trained_model.to(device)

    text_encoding = tokenizer(
        text,
        max_length=512,
        padding="max_length",
        truncation=True,
        return_attention_mask=True,
        add_special_tokens=True,
        return_tensors='pt'
        ).to(device)

    generated_ids = trained_model.model.generate(
        input_ids=text_encoding["input_ids"],
        attention_mask=text_encoding["attention_mask"],
        max_length=150,
        num_beams=2,
        repetition_penalty=2.5,
        length_penalty=1.0,
        early_stopping=True)

    # Move generated IDs to CPU if CUDA is available
    generated_ids = generated_ids.cpu()

    preds = [
        tokenizer.decode(gen_id, skip_special_tokens=True, clean_up_tokenization_spaces=True) for gen_id in generated_ids
    ]
    return "".join(preds)


In [None]:
test_df['pred'] = test_df['text'].apply(summarize)

In [None]:
test_df.to_csv('', index=False) #Enter name of the newly created predictions file in the quotes