# Google Pegasus
### Tachygraphy Transformers Generative Model
### PyTorch Distributed Data Parallel in use.
### Dated - 25.09.2024

In [1]:
%%writefile ddp.py



# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory



# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session





import torch
from torch.utils.data import DataLoader
from transformers import BartTokenizer, BartForConditionalGeneration, AdamW, BertModel, BertTokenizer
# from torch.cuda.amp import autocast, GradScaler
from datasets import load_dataset
from sklearn.metrics.pairwise import cosine_similarity
import ray
from ray import tune
import torch.nn as nn
import torch.distributed as dist


import os
import gc
import re
import ast
import sys
import copy
import json
import time
import math
import string
import pickle
import random
import joblib
import itertools
import warnings


import argparse # CPMP

import scipy as sp

from tqdm.auto import tqdm

import torch
import torch.nn as nn
from torch.nn import Parameter
import torch.nn.functional as F
from torch.optim import Adam, SGD, AdamW
from torch.utils.data import DataLoader, Dataset

# CPMP imports for DDP
from torch.nn.parallel import DistributedDataParallel
from torch.utils.data.distributed import DistributedSampler

from torch.optim import AdamW
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from transformers import RobertaTokenizer, RobertaModel
from sklearn.model_selection import train_test_split
import optuna
# from optuna.integration import PyTorchLightningPruner
from ray import tune
import ray
from ray import tune
from ray.tune import CLIReporter
from ray.tune import CLIReporter
from ray.tune.schedulers import ASHAScheduler
from ray.tune.schedulers import ASHAScheduler
# from ray.tune.integration.pytorch import TuneReportCallback
from torch.amp import GradScaler, autocast
# from ray.tune.integration.optuna import OptunaSearch
from ray.tune.search.optuna import OptunaSearch
from ray import tune
from ray.tune.search.hyperopt import HyperOptSearch
from torch import autocast
# from ray import tune
# from ray.tune.integration.tensorboard import TensorBoardReporter
from ray.tune.logger import TBXLogger
from torch.utils.tensorboard import SummaryWriter
from ray.train import report
# from ray.tune.integration.jupyter import JupyterNotebookReporter
from ray.tune import JupyterNotebookReporter
# from torch.cuda.amp import GradScaler, autocast
from torch.optim.lr_scheduler import StepLR, CosineAnnealingLR
from ray.tune.schedulers import HyperBandScheduler, AsyncHyperBandScheduler
from transformers import AutoTokenizer, AutoModel
from transformers import PegasusTokenizer, PegasusForConditionalGeneration
from accelerate import notebook_launcher
from accelerate import Accelerator
# import evaluate
from torch.utils.data import Dataset
import torch


def setup(rank, world_size):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'  # Use a port that's available on Kaggle
    os.environ['RANK'] = str(rank)
    os.environ['WORLD_SIZE'] = str(world_size)
    # Initialize process group
    dist.init_process_group("nccl", rank=rank, world_size=world_size)
    torch.cuda.set_device(rank)

def cleanup():
    dist.destroy_process_group()


class TextDataset(Dataset):
    def __init__(self, tokenizer, dataset, max_length=256):
        self.tokenizer = tokenizer
        self.texts = dataset['input']
        self.targets = dataset['target']
        self.max_length = max_length

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = self.texts[idx]
        target = self.targets[idx]
        encodings = self.tokenizer(text, truncation=True, padding='max_length', max_length=self.max_length)
        target_encodings = self.tokenizer(target, truncation=True, padding='max_length', max_length=self.max_length)

        return {
            'input_ids': torch.tensor(encodings['input_ids'], dtype=torch.long),
            'attention_mask': torch.tensor(encodings['attention_mask'], dtype=torch.long),
            'labels': torch.tensor(target_encodings['input_ids'], dtype=torch.long)
        }

# %% [code] {"jupyter":{"outputs_hidden":false}}
def test_train_split(dataset, test_ratio=0.1):
  test_indices = np.random.rand(len(dataset)) < test_ratio
  return dataset[~test_indices], dataset[test_indices]


def calculate_cosine_similarity(pred_text, target_text, model, tokenizer, device, model_type="pegasus"):
    # Move model to GPU if available
    model.to(device)

    # Tokenize and encode both texts, moving inputs to the GPU
    pred_inputs = tokenizer(pred_text, return_tensors="pt", truncation=True, padding=True).to(device)
    target_inputs = tokenizer(target_text, return_tensors="pt", truncation=True, padding=True).to(device)

    # Get embeddings based on the model type
    with torch.no_grad():
#         with torch.cuda.amp.autocast(enabled=True):  # Autocast for mixed precision training/inference
        if model_type in ["bart", "pegasus", "t5"]:  # For seq2seq models like BART, Pegasus, T5
            # Use encoder outputs
            pred_embeddings = model.model.encoder(input_ids=pred_inputs.input_ids, attention_mask=pred_inputs.attention_mask).last_hidden_state.mean(dim=1).to(device)
            target_embeddings = model.model.encoder(input_ids=target_inputs.input_ids, attention_mask=target_inputs.attention_mask).last_hidden_state.mean(dim=1).to(device)

        elif model_type == "llama":  # For LLaMA models
            pred_embeddings = model(**pred_inputs).hidden_states[-1].mean(dim=1).to(device)
            target_embeddings = model(**target_inputs).hidden_states[-1].mean(dim=1).to(device)

        else:  # For models like BERT, RoBERTa (directly exposing `last_hidden_state`)
            pred_embeddings = model(**pred_inputs).last_hidden_state.mean(dim=1).to(device)
            target_embeddings = model(**target_inputs).last_hidden_state.mean(dim=1).to(device)

    # Calculate cosine similarity
    similarity = cosine_similarity(pred_embeddings.cpu(), target_embeddings.cpu())[0].item()
    return similarity



class EarlyStopping:
    def __init__(self, patience=5, verbose=False, delta=0):
        """
        EarlyStopping to stop training when a metric has stopped improving.

        Args:
            patience (int): How long to wait after last time validation loss improved.
            verbose (bool): If True, prints a message for each validation loss improvement.
            delta (float): Minimum change to qualify as an improvement.
        """
        self.patience = patience
        self.verbose = verbose
        self.delta = delta
        self.counter = 0
        self.best_loss = np.inf
        self.early_stop = False

    def __call__(self, val_loss, model):
        if val_loss < self.best_loss - self.delta:
            self.best_loss = val_loss
            self.counter = 0
#             if self.verbose:
#                 print(f'Validation loss improved to {val_loss:.4f}')
#             torch.save(model.state_dict(), 'checkpoint.pt')
        else:
            self.counter += 1
#             if self.verbose:
#                 print(f'Validation loss did not improve. Patience: {self.patience}, Counter: {self.counter}')
        
        if self.counter >= self.patience:
            self.early_stop = True
            
            
            
            
            
def train_model(rank, config, train_dataset, val_dataset, world_size):
    
    setup(rank, world_size)
    
#     tokenizer = BartTokenizer.from_pretrained('facebook/bart-large')
#     model = BartForConditionalGeneration.from_pretrained('facebook/bart-large')

    tokenizer = PegasusTokenizer.from_pretrained('google/pegasus-large')
    model = PegasusForConditionalGeneration.from_pretrained('google/pegasus-large')
#     bert_model = BertModel.from_pretrained('bert-base-uncased')
#     bert_tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

#     device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    
#     bert_model.to(device)

#     model = nn.DataParallel(model, device_ids=[0, 1])
#     model.to(device)
    model = model.to(rank)
    model = DistributedDataParallel(model, device_ids=[rank])
    
#     train_texts = train_ds_pd['input']
#     train_labels = train_ds_pd['target']
#     val_texts = validation_ds_pd['input']
#     val_labels = validation_ds_pd['target']

#     train_dataset = TextDataset(tokenizer, train_texts, train_labels)
#     val_dataset = TextDataset(tokenizer, val_texts, val_labels)

    train_sampler = DistributedSampler(train_dataset,
                                       num_replicas=world_size,
                                       rank=rank,
                                       shuffle=False,
                                       )
    valid_sampler = DistributedSampler(val_dataset,
                                       num_replicas=world_size,
                                       rank=rank,
                                       shuffle=False,
                                       )
    
    train_loader = DataLoader(train_dataset,
                              batch_size=int(config['batch_size']),
                              shuffle=False,
                              sampler=train_sampler)
    val_loader = DataLoader(val_dataset,
                            batch_size=int(config['batch_size']),
                            shuffle=False,
                            sampler=valid_sampler)

    optimizer = AdamW(model.parameters(), lr=config['lr'])
    scaler = torch.cuda.amp.GradScaler()  # Initialize the gradient scaler for mixed precision
    
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=2, verbose=True)
    
    early_stopping = EarlyStopping(patience=3, verbose=True)

    # Training loop with mixed precision
    for epoch in range(int(config['epochs'])):
        model.train()
        
        total_train_loss = 0.0
        
        for batch in train_loader:
            
            input_ids = batch['input_ids'].to(rank)
            attention_mask = batch['attention_mask'].to(rank)
            labels = batch['labels'].to(rank)
            
            optimizer.zero_grad()
            
#             with torch.cuda.amp.autocast(enabled=True):  # Use autocast for mixed precision
            outputs = model(input_ids=input_ids, attention_mask=attention_mask, labels=labels)
            loss = outputs.loss

            # Scale the loss and backpropagate
            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()

            total_train_loss += loss.item()
            
        avg_train_loss = total_train_loss / len(train_loader)

        # Validation loop with mixed precision
        model.eval()
        
        total_val_loss = 0.0
        val_similarity = 0.0
        num_val_samples = 0
        
        with torch.no_grad():
            for batch in val_loader:
                input_ids = batch['input_ids'].to(rank)
                attention_mask = batch['attention_mask'].to(rank)
                labels = batch['labels'].to(rank)
                
#                 with torch.cuda.amp.autocast(enabled=True):
                outputs = model(input_ids=input_ids, attention_mask=attention_mask, labels=labels)
                loss = outputs.loss

#                 preds = model.module.generate(input_ids=input_ids, attention_mask=attention_mask, max_length=512)
                preds = model.module.generate(input_ids=input_ids, attention_mask=attention_mask)
                pred_texts = tokenizer.batch_decode(preds, skip_special_tokens=True)
                target_texts = tokenizer.batch_decode(labels, skip_special_tokens=True)

                for pred_text, target_text in zip(pred_texts, target_texts):
#                     similarity = calculate_cosine_similarity(pred_text, target_text, bert_model, bert_tokenizer)
                    similarity = calculate_cosine_similarity(pred_text, target_text, model.module, tokenizer, device=rank)
                    val_similarity += similarity
                    num_val_samples += 1

                total_val_loss += loss.item()
                
        avg_val_loss = total_val_loss / len(val_loader)
#         avg_val_similarity = val_similarity / len(val_loader)
        avg_val_similarity = val_similarity / num_val_samples
        


        # Report both loss and similarity
#         tune.report(train_loss=avg_train_loss, val_loss=avg_val_loss, val_similarity=avg_val_similarity)
        
#         report({
#             "loss": avg_val_loss,
#             "similarity": avg_val_similarity,
#             "train_loss": avg_train_loss,
#             "early_stopping_epoch": epoch + 1,
#         })

        print(f"""
             loss: {avg_val_loss},
             similarity: {avg_val_similarity},
             train_loss: {avg_train_loss},
             early_stopping_epoch: {epoch + 1},
            """)
        
        
        # Learning rate scheduler step
        scheduler.step(avg_val_loss)
        
        
        early_stopping(avg_val_loss, model)
        if early_stopping.early_stop:
#             print("Early stopping")
            break
    
    if rank == 0:  # Only save from the main process
        torch.save(model.module.state_dict(), "ddp_model.pth")
    
    cleanup()
    
    return model








def run_ddp(demo_fn, world_size):
    
    
    dataset = pd.read_excel('/kaggle/input/dataset-tachygraphy/Tachygraphy_MicroText-AIO-V2.xlsx')
    
    df = dataset
    
    df.rename(columns={'Informal Text':'input', 'Expanded Meaning':'target'}, inplace = True)
    
    df['input'] = df['input'].astype(str)
    df['target'] = df['target'].astype(str)
    
    train_ds_pd, validation_ds_pd = test_train_split(df)
    print("{} examples in training, {} examples in testing.".format(
    len(train_ds_pd), len(validation_ds_pd)))
    
    train_ds_pd = train_ds_pd.reset_index(drop=True)
    validation_ds_pd = validation_ds_pd.reset_index(drop=True)
    
    
    config = {
        "lr": 5e-6,
        "batch_size": 2,
        "epochs": 7
    }
    
    tokenizer = PegasusTokenizer.from_pretrained('google/pegasus-large')
#     bert_tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
    
    train_dataset = TextDataset(tokenizer, train_ds_pd)
    val_dataset = TextDataset(tokenizer, validation_ds_pd)
    
    torch.multiprocessing.spawn(demo_fn,
             args=(config, train_dataset, val_dataset, world_size),
             nprocs=world_size,
             join=True)
    
    

    
if __name__ == "__main__":
    
    import os
    for dirname, _, filenames in os.walk('/kaggle/input'):
        for filename in filenames:
            print(os.path.join(dirname, filename))
    n_gpus = torch.cuda.device_count()
    print(f"total GPUs: {n_gpus}")
    assert n_gpus >= 2, f"Requires at least 2 GPUs to run, but got {n_gpus}"
    world_size = n_gpus
    
    config = {
        "lr": 5e-6,
        "batch_size": 2,
        "epochs": 7
    }
    
    run_ddp(train_model, world_size)

Writing ddp.py


In [2]:
!python /kaggle/working/ddp.py

/kaggle/input/dataset-tachygraphy/Tachygraphy_EmotionMoodtags_Dataset.csv
/kaggle/input/dataset-tachygraphy/Tachygraphy_dataset_main.csv
/kaggle/input/dataset-tachygraphy/Tachygraphy_MicroText-AIO-V2.xlsx
total GPUs: 2
9244 examples in training, 1036 examples in testing.
tokenizer_config.json: 100%|██████████████████| 88.0/88.0 [00:00<00:00, 568kB/s]
spiece.model: 100%|████████████████████████| 1.91M/1.91M [00:00<00:00, 21.7MB/s]
special_tokens_map.json: 100%|████████████████| 65.0/65.0 [00:00<00:00, 479kB/s]
config.json: 100%|█████████████████████████| 3.09k/3.09k [00:00<00:00, 18.6MB/s]
pytorch_model.bin: 100%|████████████████████| 2.28G/2.28G [00:08<00:00, 262MB/s]
Some weights of PegasusForConditionalGeneration were not initialized from the model checkpoint at google/pegasus-large and are newly initialized: ['model.decoder.embed_positions.weight', 'model.encoder.embed_positions.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for p