## Training a model using Ray inside a Jupyter notebook
This example runs distributed training of a PyTorch model on Fashion MNIST with Ray Train.

In [1]:
import ray
import os
from typing import Dict

import torch
from filelock import FileLock
from torch import nn
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
from torchvision.transforms import Normalize, ToTensor
from tqdm import tqdm

Initializing ray session...
Creating ray cluster...


2025-01-21 07:38:26,451	INFO client_builder.py:244 -- Passing the following kwargs to ray.init() on the server: log_to_driver


Ray cluster created successfully
Connecting to ray cluster at r034-9e0da5b9-3e9a-4b5b-93ec-a33eba028b98-head-svc.g1.svc.cluster.local:10001
Ray session successfully created


In [11]:
# Get the cluster resources
resources = ray.cluster_resources()

if "memory" in resources:
    resources["memory_gb"] = resources["memory"] / (1024 ** 3)
if "object_store_memory" in resources:
    resources["object_store_memory_gb"] = resources["object_store_memory"] / (1024 ** 3)

# Print cluster resources
print("Cluster resources (including memory in GB):")
for resource, value in resources.items():
    if resource in {"memory", "object_store_memory"}:
        continue  # Skip raw memory in bytes
    print(f"{resource}: {value:.2f}" if isinstance(value, float) else f"{resource}: {value}")

Cluster resources (including memory in GB):
node:__internal_head__: 1.00
node:10.2.1.235: 1.00
node:10.2.0.105: 1.00
CPU: 4.00
node:10.2.0.104: 1.00
memory_gb: 12.00
object_store_memory_gb: 3.45


In [2]:
def get_dataloaders(batch_size):
    # Transform to normalize the input images
    transform = transforms.Compose([ToTensor(), Normalize((0.5,), (0.5,))])

    with FileLock(os.path.expanduser("~/data.lock")):
        # Download training data from open datasets
        training_data = datasets.FashionMNIST(
            root="~/data",
            train=True,
            download=True,
            transform=transform,
        )

        # Download test data from open datasets
        test_data = datasets.FashionMNIST(
            root="~/data",
            train=False,
            download=True,
            transform=transform,
        )

    # Create data loaders
    train_dataloader = DataLoader(training_data, batch_size=batch_size, shuffle=True)
    test_dataloader = DataLoader(test_data, batch_size=batch_size)

    return train_dataloader, test_dataloader

In [3]:
class NeuralNetwork(nn.Module):
    def __init__(self):
        super(NeuralNetwork, self).__init__()
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(28 * 28, 512),
            nn.ReLU(),
            nn.Dropout(0.25),
            nn.Linear(512, 512),
            nn.ReLU(),
            nn.Dropout(0.25),
            nn.Linear(512, 10),
            nn.ReLU(),
        )

    def forward(self, x):
        x = self.flatten(x)
        logits = self.linear_relu_stack(x)
        return logits

In [4]:
# Function to save the model to hopsworks and register it using the hopsworks model registry
def register_model(model_name, model_file_path, metrics={}, description="A PyTorch model trained with Ray"):
    import hopsworks
    project = hopsworks.login()
    mr = project.get_model_registry()

    # Register the model
    model = mr.python.create_model(
        name=model_name,
        metrics=metrics,
        description=description
    )
    model.save(model_file_path)

In [6]:
# The train_func_per_worker function trains a neural network on a given dataset. 
# This function is executed independently by each worker
def train_func_per_worker(config: Dict):
    lr = config["lr"]
    epochs = config["epochs"]
    batch_size = config["batch_size_per_worker"]

    # Get dataloaders inside the worker training function
    train_dataloader, test_dataloader = get_dataloaders(batch_size=batch_size)

    # [1] Prepare Dataloader for distributed training
    # Shard the datasets among workers and move batches to the correct device
    # =======================================================================
    train_dataloader = ray.train.torch.prepare_data_loader(train_dataloader)
    test_dataloader = ray.train.torch.prepare_data_loader(test_dataloader)

    model = NeuralNetwork()

    # [2] Prepare and wrap your model with DistributedDataParallel
    # Move the model to the correct GPU/CPU device
    # ============================================================
    model = ray.train.torch.prepare_model(model)

    loss_fn = nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=lr, momentum=0.9)

    # Model training loop
    for epoch in range(epochs):
        if ray.train.get_context().get_world_size() > 1:
            # Required for the distributed sampler to shuffle properly across epochs.
            train_dataloader.sampler.set_epoch(epoch)

        model.train()
        for X, y in tqdm(train_dataloader, desc=f"Train Epoch {epoch}"):
            pred = model(X)
            loss = loss_fn(pred, y)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        model.eval()
        test_loss, num_correct, num_total = 0, 0, 0
        with torch.no_grad():
            for X, y in tqdm(test_dataloader, desc=f"Test Epoch {epoch}"):
                pred = model(X)
                loss = loss_fn(pred, y)

                test_loss += loss.item()
                num_total += y.shape[0]
                num_correct += (pred.argmax(1) == y).sum().item()

        test_loss /= len(test_dataloader)
        accuracy = num_correct / num_total

        # [3] Report metrics to Ray Train
        # ===============================
        ray.train.report(metrics={"loss": test_loss, "accuracy": accuracy})

    # Only save from one worker. The final model is stored at a worker with rank 0.
    if ray.train.get_context().get_world_rank() == 0:  
        save_path = "/tmp/model.pth"
        torch.save(model.state_dict(), save_path)
        print(f"Model saved to {save_path}")
        # Register the final model using hopsworks model registry
        # Alternatively the final model can be saved directly in hopsfs and register the model later
        register_model("ray_train_mnist", save_path, metrics={"loss": test_loss, "accuracy": accuracy}, description="A PyTorch model trained with Ray")

In [7]:
import ray.train
from ray.train import ScalingConfig, RunConfig
from ray.train.torch import TorchTrainer

num_workers = 2
use_gpu=False
cpus_per_worker=1
global_batch_size = 32

train_config = {
    "lr": 1e-3,
    "epochs": 10,
    "batch_size_per_worker": global_batch_size // num_workers,
}

storage_path = "/home/yarnapp/hopsfs/Resources/mnist_pytorch_train"

run_config = RunConfig(storage_path=storage_path)

# Configure computation resources
scaling_config = ScalingConfig(
    num_workers=num_workers,
    use_gpu=use_gpu,
    resources_per_worker={"CPU": cpus_per_worker}
)

# Initialize a Ray TorchTrainer
trainer = TorchTrainer(
    run_config=run_config,
    train_loop_per_worker=train_func_per_worker,
    train_loop_config=train_config,
    scaling_config=scaling_config,
)

# [4] Start distributed training
# Run `train_func_per_worker` on all workers
# =============================================
result = trainer.fit()
print(f"Training result: {result}")

Training result: Result(
  metrics={'loss': 0.4588888406027525, 'accuracy': 0.8262},
  path='/home/yarnapp/hopsfs/Resources/mnist_pytorch_train/TorchTrainer_2025-01-21_07-29-58/TorchTrainer_94dd5_00000_0_2025-01-21_07-30-02',
  filesystem='local',
  checkpoint=None
)
