# Ray with PyTorch Distributed Training

This notebook demonstrates how to use Ray with PyTorch for distributed model training and inference.

In [None]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
import numpy as np

# Install Ray if not already installed
try:
    import ray
except ImportError:
    !pip install ray[train]
    import ray

from ray import train
from ray.train import Trainer, TrainingCallback
from ray.train.torch import TorchTrainer
from ray.air.config import ScalingConfig

# Initialize Ray
ray.init(address="auto", namespace="ray_pytorch_example")
print("Ray initialized. Available resources:", ray.available

In [None]:
class SimpleModel(nn.Module):
    def __init__(self, input_size=10, hidden_size=50, output_size=1):
        super(SimpleModel, self).__init__()
        self.layer1 = nn.Linear(input_size, hidden_size)
        self.activation = nn.ReLU()
        self.layer2 = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        x = self.layer1(x)
        x = self.activation(x)
        x = self.layer2(x)
        return x

In [None]:
def train_func(config):
    # Get the rank of the current process
    rank = train.get_context().get_world_rank()
    world_size = train.get_context().get_world_size()

    # Set up the distributed process group
    dist.init_process_group(backend="nccl" if torch.cuda.is_available() else "gloo")

    # Create a device to train on
    device = torch.device(f"cuda:{train.get_context().get_local_rank()}"
                         if torch.cuda.is_available() else "cpu")

    # Create fake dataset (in real applications, you'd use a proper DataLoader)
    input_size = config["input_size"]
    dataset_size = config["dataset_size"]

    # Generate synthetic data
    X = torch.randn(dataset_size, input_size)
    y = torch.randn(dataset_size, 1)

    # Create model and move to device
    model = SimpleModel(input_size=input_size,
                       hidden_size=config["hidden_size"],
                       output_size=config["output_size"])
    model = model.to(device)

    # Wrap model with DDP
    model = DDP(model, device_ids=[train.get_context().get_local_rank()]
               if torch.cuda.is_available() else None)

    # Define loss function and optimizer
    criterion = nn.MSELoss()
    optimizer = optim.SGD(model.parameters(), lr=config["learning_rate"])

    # Training loop
    batch_size = config["batch_size"]
    for epoch in range(config["num_epochs"]):
        total_loss = 0
        for i in range(0, dataset_size, batch_size):
            # Get batch
            indices = slice(i, min(i + batch_size, dataset_size))
            inputs = X[indices].to(device)
            targets = y[indices].to(device)

            # Forward pass
            outputs = model(inputs)
            loss = criterion(outputs, targets)

            # Backward and optimize
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            total_loss += loss.item()

        # Report metrics
        if rank == 0:  # Only report from the first worker
            avg_loss = total_loss / (dataset_size // batch_size)
            train.report({"loss": avg_loss})
            print(f"Epoch {epoch+1}/{config['num_epochs']}, Loss: {avg_loss:.4f}")

In [None]:
# Define training configuration
train_config = {
    "input_size": 10,
    "hidden_size": 50,
    "output_size": 1,
    "batch_size": 64,
    "learning_rate": 0.01,
    "num_epochs": 5,
    "dataset_size": 1000
}

# Configure the Ray Trainer
scaling_config = ScalingConfig(
    num_workers=2,  # Adjust based on available resources
    use_gpu=torch.cuda.is_available(),
    resources_per_worker={"CPU": 1, "GPU": 1 if torch.cuda.is_available() else 0}
)

trainer = TorchTrainer(
    train_func,
    train_loop_config=train_config,
    scaling_config=scaling_config,
)

# Run the training
result = trainer.fit()
print("Training complete!")
print(f"Final metrics: {result.metrics}")

In [None]:
@ray.remote(num_gpus=1 if torch.cuda.is_available() else 0)
class ModelWorker:
    def __init__(self, model_config):
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.model = SimpleModel(
            input_size=model_config["input_size"],
            hidden_size=model_config["hidden_size"],
            output_size=model_config["output_size"]
        ).to(self.device)
        self.model.eval()

    def predict(self, inputs):
        # Convert inputs to tensor if needed
        if not isinstance(inputs, torch.Tensor):
            inputs = torch.tensor(inputs, dtype=torch.float32)

        # Move to device and ensure correct shape
        inputs = inputs.to(self.device)
        if inputs.dim() == 1:
            inputs = inputs.unsqueeze(0)  # Add batch dimension

        with torch.no_grad():
            outputs = self.model(inputs)

        return outputs.cpu().numpy()

# Example usage
def run_distributed_inference():
    # Initialize worker actors
    num_workers = min(2, torch.cuda.device_count() if torch.cuda.is_available() else 1)
    model_config = {
        "input_size": 10,
        "hidden_size": 50,
        "output_size": 1
    }

    workers = [ModelWorker.remote(model_config) for _ in range(num_workers)]

    # Generate some random input data for testing
    batch_size = 10
    test_inputs = [torch.randn(model_config["input_size"]) for _ in range(batch_size)]

    # Distribute inference requests across workers (round-robin)
    futures = []
    for i, data in enumerate(test_inputs):
        worker_idx = i % len(workers)
        futures.append(workers[worker_idx].predict.remote(data))

    # Gather results
    results = ray.get(futures)
    return results

# Run the distributed inference
inference_results = run_distributed_inference()
print(f"Inference complete. Results shape: {np.array(inference_results).shape}")

In [None]:
# Clean up Ray resources
ray.shutdown()
print("Ray resources released.")