# PyTorch Training Script with Hugging Face, Accelerate, and DeepSpeed

This notebook provides a parameterized training loop using PyTorch. It demonstrates how to load models and datasets from Hugging Face, distribute the model across multiple GPUs with Accelerate and DeepSpeed, and includes a custom dataset class that tokenizes examples simultaneously with the training loop.

In [None]:
# We'll need to install and setup accelerate and deepspeed configs for this notebook.
# I'll also be installing one of my other packages for a model that I have private.
# This package contains nice utilities that I'd rather not code again.

!pip install --upgrade deepspeed accelerate sentia evaluate datasets transformers rouge_score
!pip install --pre torch torchvision torchaudio --index-url https://download.pytorch.org/whl/nightly/cu121
!python -m pip install -i https://pypi.anaconda.org/mpi4py/simple mpi4py
!sudo apt install -y openmpi-bin
!sudo apt install -y mpich
!pip install --upgrade deepspeed accelerate sentia evaluate datasets transformers rouge_score
!pip install --pre torch torchvision torchaudio --index-url https://download.pytorch.org/whl/nightly/cu121
!python -m pip install -i https://pypi.anaconda.org/mpi4py/simple mpi4py
!accelerate config

In [None]:
# Import necessary modules

import torch
from torch.utils.data import DataLoader, Dataset
import os
from torch.optim import AdamW
import torch.nn.functional as F
from sentia import SENTIAForCausalLM # This the model I was talking about,
# it includes methods for accuracy calculation
from transformers import AutoTokenizer, AutoModelForCausalLM
from accelerate import Accelerator
import accelerate
import wandb
from tqdm import tqdm
import sacrebleu
from datasets import load_dataset
from dataclasses import dataclass, field
from typing import Optional, Tuple
from evaluate import load as load_metric
import warnings

## Custom Dataset Classes

These classes are designed to tokenize data on-the-fly, which can be more memory-efficient for large datasets.

In [None]:
class ConversationDataset(Dataset):
    def __init__(self, tokenizer, max_length=512, data=None, device="cuda"):
        self.data = data
        self.device = device
        self.tokenizer = tokenizer
        self.max_length = max_length
        
    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        try:
            # Most of the time I'll be using InstructMix for instruction-tuning
            user = self.data[idx]["instruction"]
            input = self.data[idx]["input"] or ""
            assistant = self.data[idx]["output"]
        except KeyError:
            # If I'm using MMLU for evaluation
            user = self.data[idx]["question"]
            ans_index = self.data[idx]["answer"]
            assistant = self.data[idx]["choices"][ans_index]
        
        input_text = f"<|USER|> {user} {input} <|ASSISTANT|> {assistant} <|endoftext|>"
        target_text = f"<|USER|> {user} {input} <|ASSISTANT|> {assistant} <|endoftext|>"
        input_ids = self.tokenizer.encode(input_text, add_special_tokens=True, max_length=self.max_length, truncation=True)
        target_ids = self.tokenizer.encode(target_text, add_special_tokens=True, max_length=self.max_length, truncation=True)
        input_ids += [self.tokenizer.pad_token_id] * (self.max_length - len(input_ids))
        target_ids += [self.tokenizer.pad_token_id] * (self.max_length - len(target_ids))

        return {
            "input_ids": torch.tensor(input_ids, dtype=torch.int64, device=self.device),
            "labels": torch.tensor(target_ids, dtype=torch.int64, device=self.device),
        }
class CompletionDataset(Dataset):
    def __init__(self, tokenizer, data, max_length=256, device="cuda"):
        self.data = data
        self.device = device
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        text = self.data[idx]["text"]
        input_text = f"{text} {self.tokenizer.eos_token}"
        target_text = f"{text} {self.tokenizer.eos_token}"
        input_ids = self.tokenizer.encode(input_text, add_special_tokens=True, max_length=self.max_length, truncation=True)
        target_ids = self.tokenizer.encode(target_text, add_special_tokens=True, max_length=self.max_length, truncation=True)
        input_ids += [self.tokenizer.pad_token_id] * (self.max_length - len(input_ids))
        target_ids += [self.tokenizer.pad_token_id] * (self.max_length - len(target_ids))
        return {
            "input_ids": torch.tensor(input_ids, dtype=torch.int64, device=self.device),
            "labels": torch.tensor(target_ids, dtype=torch.int64, device=self.device),
        }

# Training args
We'll define some training arguments, these will be able to control the parameters of training.


In [None]:
@dataclass
class TrainingArguments:
    # Model and tokenizer arguments
    model_name_or_path: str
    tokenizer_name: Optional[str] = None
    
    # Training data arguments
    train_data_file: str = None
    eval_data_file: str = None
    train_data_config: Optional[str] = None
    eval_data_config: Optional[str] = None
    max_seq_length: int = 512
    
    # Training procedure arguments
    num_train_epochs: int = 3
    train_batch_size: int = 8
    eval_batch_size: int = 8
    learning_rate: float = 5e-5
    weight_decay: float = 0.01
    adam_epsilon: float = 1e-8
    max_grad_norm: float = 1.0
    gradient_accumulation_steps: int = 1
    
    # Logging, saving, and evaluation arguments
    print_predictions: bool = False
    logging_steps: int = 50
    output_dir: str = "./output"
    
    # DeepSpeed configuration
    deepspeed_config_file: Tuple[dict, str] = None
    
    # Accelerate configuration
    mixed_precision: str = "no"  # Options: "no", "fp16", "bf16"
    
    # WandB configuration
    use_wandb: bool = False
    wandb_project: Optional[str] = None
    wandb_entity: Optional[str] = None
    
    # Other arguments
    seed: int = 42
    device: str = "cuda"  # Options: "cuda", "cpu"
    local_rank: int = -1  # For distributed training: local_rank for distributed training on gpus

    def __post_init__(self):
        if self.tokenizer_name is None:
            self.tokenizer_name = self.model_name_or_path
        if self.local_rank == -1:
            # If not using distributed training, set mixed precision
            if self.mixed_precision == "fp16":
                self.fp16 = True
            elif self.mixed_precision == "bf16":
                self.bf16 = True
            else:
                self.fp16 = False
                self.bf16 = False
        else:
            # For distributed training, disable mixed precision here
            # and let deepspeed handle it
            self.fp16 = False
            self.bf16 = False
        if self.use_wandb and (self.wandb_project is None):
            raise ValueError("wandb project name must be defined if use_wandb = True")

## Training and Evaluation Functions

These functions define the training and evaluation loops for the model. They use the `Accelerator` class for GPU acceleration.

In [None]:
def train(model, dataloader, optimizer, tokenizer, args: TrainingArguments, accelerator: Accelerator):
    model.train()
    total_loss = 0
    total_perplexity = 0
    global_step = 0

    for i, batch in tqdm(enumerate(dataloader), total=len(dataloader)):
        with accelerator.accumulate():
            # Move batch to the correct device
            batch = {k: v.to(args.device) for k, v in batch.items()}
            input_ids = batch["input_ids"]
            labels = batch["labels"]

            # Generate the output and calculate the loss
            outputs = model(input_ids=input_ids, labels=labels)
            loss = outputs.loss
            logits = outputs.logits
            # Backward pass
            accelerator.backward(loss)
            if args.max_grad_norm is not None:
                torch.nn.utils.clip_grad_norm_(model.parameters(), args.max_grad_norm)

            # Update model parameters
            optimizer.step()
            optimizer.zero_grad()

            # Calculate the BLEU score and accuracy
            predictions = torch.argmax(logits, dim=-1)
            predictions_str = [tokenizer.decode(pred, skip_special_tokens=True) for pred in predictions.tolist()]
            target_ids_str = [tokenizer.decode(tgt, skip_special_tokens=True) for tgt in batch["labels"].tolist()]
            bleu_scores = []
            accuracy_scores = []
            for pred_str, target_str in zip(predictions_str, target_ids_str):
                bleu = sacrebleu.sentence_bleu(pred_str, [target_str])
                bleu_scores.append(bleu.score)

            bleu = sum(bleu_scores) / len(bleu_scores)

            # Logging
            try:
                wandb.log({
                        "loss": loss.item(),
                        "bleu": bleu,
                        "perplexity": torch.exp(loss).item(),
                    })
            except Exception as e:
                warnings.warn(f"An error occurred while logging to Weights & Biases: {e}")

            # Print training information
            if global_step % args.logging_steps == 0:
                print(f"Step {global_step}: Loss: {loss.item():.4f}, BLEU: {bleu:.4f}, Perplexity: {torch.exp(loss).item():.4f}")
            

            # Update the metrics
            total_loss += loss.item()
            total_perplexity += torch.exp(loss).item()

    return total_loss / len(dataloader), total_perplexity / len(dataloader)

def evaluate(model, val_loader, tokenizer, use_cuda=True):
    model.eval()
    device = torch.device('cuda' if use_cuda and torch.cuda.is_available() else 'cpu')
    model.to(device)

    # Load metrics
    f1_metric = load_metric('f1')
    bleu_metric = load_metric('bleu')
    rouge_metric = load_metric('rouge')
    
    # Initialize variables to accumulate scores
    total_loss = 0
    all_predictions = []
    all_references = []
    
    with torch.no_grad():
        for batch in tqdm(val_loader, desc="Evaluating"):
            # Move batch to the correct device
            batch = {k: v.to(device) if isinstance(v, torch.Tensor) else v for k, v in batch.items()}
            
            # Forward pass
            outputs = model(**batch)
            loss = outputs.loss
            total_loss += loss.item()
            
            # Convert logits to predictions (for F1, BLEU, ROUGE)
            # This part depends on your model's output format and the task
            # Here is a mock-up of how you might extract predictions
            # For token classification tasks:
            # predictions = outputs.logits.argmax(dim=-1)
            # For seq2seq tasks:
            predictions = tokenizer.batch_decode(outputs.logits.argmax(dim=-1), skip_special_tokens=True)

            # Post-process batch to extract labels and predictions in a suitable format
            references = batch['labels'] 
            references = tokenizer.batch_decode(references, skip_special_tokens=True)
            
            # Update metrics
            f1_metric.add_batch(predictions=predictions, references=references)
            bleu_metric.add_batch(predictions=[predictions], references=[[references]])
            rouge_metric.add_batch(predictions=predictions, references=references)
            # Store predictions and references for later use if needed
            all_predictions.extend(predictions)
            all_references.extend(references)
    # Compute the metrics
    f1_score = f1_metric.compute(predictions=all_predictions, references=all_references, average='macro')
    bleu_score = bleu_metric.compute(predictions=[all_predictions], references=[[all_references]])
    rouge_score = rouge_metric.compute(predictions=all_predictions, references=all_references)

    # Perplexity can be calculated from the total loss
    # For perplexity, we assume the loss is the negative log likelihood
    # In case the loss function is something else, this needs to be adjusted
    perplexity = torch.exp(torch.tensor(total_loss / len(val_loader)))

    metrics = {
        'val_loss': total_loss / len(val_loader),
        'val_perplexity': perplexity.item(),
        'val_f1': f1_score['f1'],
        'val_bleu': bleu_score['bleu'],
        'val_rouge': rouge_score,
    }
    try:
        wandb.log(**metrics)
    except:
        pass

    return metrics

        

# Training Loop
Here, we'll load the model, datasets, and tokenizer and start the training loop.

In [None]:
def main():
    !export CUDA_VISIBLE_DEVICES=0,1
    deepspeed_config_dict = {
        "train_micro_batch_size_per_gpu": 4,
        "gradient_accumulation_steps": 32,
        "fp16": {
            "enabled": True,
            "loss_scale": 0,
            "loss_scale_window": 1000,
            "min_loss_scale": 1,
            "hysteresis": 2
        },
        "zero_optimization": {
            "stage": 2,
            "offload_optimizer": {
                "device": "cpu",
                "pin_memory": True
            },
            "allgather_partitions": True,
            "allgather_bucket_size": 2e8,
            "overlap_comm": True,
            "reduce_scatter": True,
            "reduce_bucket_size": 2e8,
            "contiguous_gradients": True
        },
        "activation_checkpointing": {
            "partition_activations": True,
            "cpu_checkpointing": False,
            "contiguous_memory_optimization": False,
            "synchronize_checkpoint_boundary": False
        },
        "steps_per_print": 1,
        "wall_clock_breakdown": False
    }
    deepspeed_config_dict = accelerate.utils.DeepSpeedPlugin(hf_ds_config=deepspeed_config_dict)
    args = TrainingArguments(
        model_name_or_path="Locutusque/TinyMistral-248M",
        train_data_file="tatsu-lab/alpaca",
        eval_data_file="cais/mmlu",
        eval_data_config="all",
        max_seq_length=256,
        max_grad_norm=2.0,
        train_batch_size=4,
        eval_batch_size=4,
        gradient_accumulation_steps=32,
        adam_epsilon=1e-4,
        use_wandb=False,
        print_predictions=False,
        deepspeed_config_file=deepspeed_config_dict,

    )
    use_wandb = args.use_wandb
    # Initialize Weights & Biases if you're using it
    if use_wandb:
        wandb.init(project=args.wandb_project, entity=args.wandb_entity, settings=wandb.Settings(start_method="fork"))

    # Initialize the Accelerator and tokenizer
    print("Installing the tokenizer")
    tokenizer = AutoTokenizer.from_pretrained(args.tokenizer_name if args.tokenizer_name is not None else args.model_name_or_path)
    print("Initializing the accelerator")
    accelerator = Accelerator(gradient_accumulation_steps=args.gradient_accumulation_steps, deepspeed_plugin=args.deepspeed_config_file)
    print(accelerator.state.num_processes)

    # Prepare the dataset and dataloader
    print("Installing the datasets")
    train_data = load_dataset(args.train_data_file, args.train_data_config, split="train[:100]")
    val_data = load_dataset(args.eval_data_file, args.eval_data_config, split="validation")
    train_dataset = ConversationDataset(tokenizer=tokenizer, data=train_data)
    val_dataset = ConversationDataset(tokenizer=tokenizer, data=val_data)
    train_dataloader = DataLoader(train_dataset, batch_size=args.train_batch_size, shuffle=True)
    val_dataloader = DataLoader(val_dataset, batch_size=args.eval_batch_size)

    # Prepare the model and optimizer
    print("Installing the model")
    model = AutoModelForCausalLM.from_pretrained(args.model_name_or_path, torch_dtype=torch.float16).to("cuda:0")
    optimizer = AdamW(model.parameters(), lr=args.learning_rate, fused=True, eps=args.adam_epsilon)

    # Prepare the model, optimizer, and dataloaders for distributed training
    model, optimizer, train_dataloader, val_dataloader = accelerator.prepare(
        model, optimizer, train_dataloader, val_dataloader
    )

    # Training loop
    num_epochs = 3
    for epoch in range(num_epochs):
        print(f"Epoch {epoch+1}/{num_epochs}")
        train_loss = train(model, train_dataloader, optimizer, tokenizer, args=args, accelerator=accelerator)
        if args.output_dir is not None:
            accelerator.wait_for_everyone()
            unwrapped_model = accelerator.unwrap_model(model)
            unwrapped_model.save_pretrained(args.output_dir)
        metrics = evaluate(model, val_dataloader, tokenizer, use_cuda=True)
        print(f"Training Loss: {train_loss}")
        print(f"Validation Metrics: {metrics}")

    # Finalize Weights & Biases run
    if use_wandb:
        wandb.finish(quiet=True)

    print("Training complete!")
if __name__ == "__main__":
    main()