# Tutorial: Distributed Training with Ray and PyTorch

## Table of Contents
1. [Introduction](#Introduction)
2. [Prerequisites](#Prerequisites)
3. [Setup](#Setup)
4. [Data Preparation](#data-preparation)
5. [Model Definition](#model-definition)
6. [Distributed Training with Ray](#distributed-training-with-ray)
7. [Monitoring and Logging](#monitoring-and-logging)
8. [Evaluation](#evaluation)
9. [Conclusion](#conclusion)

## Introduction

Welcome to this tutorial on setting up distributed training using **Ray** and **PyTorch**. In this guide, we will walk through the steps to train a GPT-2 language model across multiple nodes or GPUs, leveraging the power of Ray for distributed computing.

### Objectives:
- Understand how to integrate Ray with PyTorch for distributed training.
- Learn how to prepare data and define models compatible with distributed training.
- Monitor training progress and log metrics using MLflow or Weights & Biases.
- Evaluate the trained model's performance.

## Prerequisites

Before you begin, ensure you have the following installed:

- Python 3.7 or higher
- PyTorch
- Ray
- Transformers (Hugging Face)
- MLflow or Weights & Biases
- Jupyter Notebook

Install the necessary packages:

In [None]:
pip install torch transformers ray[default] mlflow tqdm

## Setup

### 1. Import Libraries

In [None]:
# Import standard libraries
import os
import yaml
import torch
import torch.nn as nn
from torch.utils.data import DataLoader

# Import transformers and tokenizer
from transformers import GPT2Tokenizer, GPT2LMHeadModel

# Import Ray and related libraries
import ray
from ray import tune
from ray.tune.integration.torch import DistributedTrainableCreator

# Utilities
from tqdm import tqdm

### 2. Initialise Ray

Initialise Ray for distributed computing

In [None]:
ray.init()

## Data Preparation

### 1. Load and Preprocess Data

For this tutorial, we'll use a small dataset for demonstration purposes. Replace this with your dataset as needed.

In [None]:
# Sample data (replace with your own data)
texts = [
    "Once upon a time, there was a brave knight.",
    "The quick brown fox jumps over the lazy dog.",
    "In a galaxy far far away, there was a small planet.",
    "Artificial intelligence is transforming the world.",
]

# Initialise the tokenizer
tokenizer = GPT2Tokenizer.from_pretrained('gpt2')
tokenizer.pad_token = tokenizer.eos_token  # Set pad token

# Tokenize and encode the data
tokenized_texts = [tokenizer.encode(t) for t in texts]

### 2. Create a Custom Dataset

In [None]:
class CustomDataset(torch.utils.data.Dataset):
    def __init__(self, tokenized_texts, tokenizer, max_length):
        self.tokenized_texts = tokenized_texts
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.tokenized_texts)

    def __getitem__(self, idx):
        input_ids = self.tokenized_texts[idx][:self.max_length]
        padding_length = self.max_length - len(input_ids)
        input_ids = input_ids + [self.tokenizer.pad_token_id] * padding_length
        attention_mask = [1] * len(input_ids)
        if padding_length > 0:
            attention_mask[-padding_length:] = [0] * padding_length

        input_ids = torch.tensor(input_ids)
        attention_mask = torch.tensor(attention_mask)
        labels = input_ids.clone()

        return {'input_ids': input_ids, 'attention_mask': attention_mask, 'labels': labels}


### 3. Create DataLoader

In [None]:
# Parameters
max_seq_length = 50
batch_size = 2

# Create dataset and dataloader
dataset = CustomDataset(tokenized_texts, tokenizer, max_seq_length)
data_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

## Model Definition

### 1. Define the Model

In [None]:
class GPT2ModelWrapper(nn.Module):
    def __init__(self, model_name='gpt2'):
        super(GPT2ModelWrapper, self).__init__()
        self.model = GPT2LMHeadModel.from_pretrained(model_name)

    def forward(self, input_ids, attention_mask, labels=None):
        outputs = self.model(
            input_ids=input_ids,
            attention_mask=attention_mask,
            labels=labels
        )
        loss = outputs.loss
        logits = outputs.logits
        return loss, logits

### 2. Initialise the Model

In [None]:
model = GPT2ModelWrapper()

## Distributed Training with Ray

### 1. Define the Training Function

In [None]:
def train_model(config):
    # Initialize distributed training
    rank = int(os.environ.get('RANK', 0))
    world_size = int(os.environ.get('WORLD_SIZE', 1))
    local_rank = int(os.environ.get('LOCAL_RANK', 0))
    torch.cuda.set_device(local_rank)
    torch.distributed.init_process_group(backend='nccl', init_method='env://')
    
    # Setup model and optimizer
    model = GPT2ModelWrapper()
    model.to(local_rank)
    model = nn.parallel.DistributedDataParallel(model, device_ids=[local_rank])
    optimizer = torch.optim.AdamW(model.parameters(), lr=config["lr"])
    
    # Prepare data loader
    tokenizer = GPT2Tokenizer.from_pretrained('gpt2')
    tokenizer.pad_token = tokenizer.eos_token
    dataset = CustomDataset(tokenized_texts, tokenizer, max_seq_length=50)
    sampler = torch.utils.data.distributed.DistributedSampler(dataset)
    data_loader = DataLoader(dataset, batch_size=config["batch_size"], sampler=sampler)
    
    # Training loop
    for epoch in range(config["epochs"]):
        sampler.set_epoch(epoch)
        total_loss = 0
        for batch in data_loader:
            optimizer.zero_grad()
            input_ids = batch['input_ids'].to(local_rank)
            attention_mask = batch['attention_mask'].to(local_rank)
            labels = batch['labels'].to(local_rank)
            loss, _ = model(input_ids, attention_mask, labels)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        print(f"Rank {rank}, Epoch {epoch+1}, Loss: {total_loss/len(data_loader)}")
    
    # Clean up
    torch.distributed.destroy_process_group()

### 2. Configure and Run Training with Ray Tune

In [None]:
# Define the configuration
config = {
    "lr": 5e-5,
    "batch_size": 2,
    "epochs": 3
}

# Create a distributed trainable
DistributedTrainable = DistributedTrainableCreator(
    train_model,
    num_workers=2,  # Number of distributed workers
    use_gpu=True,
    num_cpus_per_worker=1,
    backend='nccl'  # Use 'gloo' if you're training on CPU
)

# Run the training
analysis = tune.run(
    DistributedTrainable,
    config=config,
    num_samples=1
)

## Monitoring and Logging

For monitoring and logging, you can integrate MLflow or Weights & Biases into your training function. Here's how you can do it with MLflow.

### 1. Integrate MLflow

In [None]:
import mlflow

def train_model(config):
    # Initialize MLflow
    if int(os.environ.get('RANK', 0)) == 0:
        mlflow.start_run()
    
    # Rest of the training code...
    
    # Log metrics
    if int(os.environ.get('RANK', 0)) == 0:
        mlflow.log_metric("loss", total_loss/len(data_loader), step=epoch)
    
    # End MLflow run
    if int(os.environ.get('RANK', 0)) == 0:
        mlflow.end_run()

Make sure to start the MLflow server before running the training:

In [None]:
mlflow ui

## Evaluation

### 1. Define the Evaluation Function

In [None]:
def evaluate_model(model, data_loader, device):
    model.eval()
    total_loss = 0
    total_tokens = 0
    with torch.no_grad():
        for batch in data_loader:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)
            loss, _ = model(input_ids, attention_mask, labels)
            total_loss += loss.item() * input_ids.size(0)
            total_tokens += input_ids.size(0)
    avg_loss = total_loss / total_tokens
    perplexity = torch.exp(torch.tensor(avg_loss))
    print(f"Evaluation Loss: {avg_loss}, Perplexity: {perplexity}")


### 2. Run Evaluation

In [None]:
# Initialize the model (assuming single GPU for evaluation)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = GPT2ModelWrapper()
model.to(device)

# Prepare data loader
dataset = CustomDataset(tokenized_texts, tokenizer, max_length=50)
data_loader = DataLoader(dataset, batch_size=2)

# Evaluate the model
evaluate_model(model, data_loader, device)

## Conclusion

In this tutorial, we've covered the basics of setting up distributed training using Ray and PyTorch. We:

- Prepared and tokenized text data.
- Defined a GPT-2 model wrapper.
- Configured distributed training with Ray.
- Integrated MLflow for monitoring.
- Evaluated the trained model.

Next Steps:

- Experiment with larger datasets and more complex models.
- Explore hyperparameter tuning with Ray Tune.
- Integrate Weights & Biases for advanced monitoring.
- Deploy the trained model using Ray Serve.

## References

- [Ray Documentation](https://docs.ray.io/)
- [PyTorch Documentation](https://pytorch.org/)
- [Transformers Documentation](https://huggingface.co/docs/transformers/)
- [MLflow Documentation](https://mlflow.org/docs/latest/)