In [6]:
import os
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
import ray
from ray.util.sgd.torch import TrainingOperator
from ray.util.sgd.torch.examples.train_example import LinearDataset

In [7]:
if ray.is_initialized() == False:
        print("Connecting to Ray cluster...")
        service_host = os.environ["RAY_HEAD_SERVICE_HOST"]
        service_port = os.environ["RAY_HEAD_SERVICE_PORT"]
        ray.util.connect(f"{service_host}:{service_port}")

Connecting to Ray cluster...


In [8]:
class MyTrainingOperator(TrainingOperator):
    def setup(self, config):
        # Setup all components needed for training here. This could include
        # data, models, optimizers, loss & schedulers.

        # Setup data loaders.
        train_dataset, val_dataset = LinearDataset(2, 5), LinearDataset(2,
                                                                        5)
        train_loader = DataLoader(train_dataset,
                                  batch_size=config["batch_size"])
        val_loader = DataLoader(val_dataset,
                                batch_size=config["batch_size"])

        # Setup model.
        model = nn.Linear(1, 1)

        # Setup optimizer.
        optimizer = torch.optim.SGD(model.parameters(), lr=config.get("lr", 1e-4))

        # Setup loss.
        criterion = torch.nn.BCELoss()

        # Setup scheduler.
        scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.9)

        # Register all of these components with Ray SGD.
        # This allows Ray SGD to do framework level setup like Cuda, DDP,
        # Distributed Sampling, FP16.
        # We also assign the return values of self.register to instance
        # attributes so we can access it in our custom training/validation
        # methods.
        self.model, self.optimizer, self.criterion, self.scheduler = \
            self.register(models=model, optimizers=optimizer,
                          criterion=criterion,
                          schedulers=scheduler)
        self.register_data(train_loader=train_loader, validation_loader=val_loader)

In [13]:
from ray.util.sgd import TorchTrainer

trainer = TorchTrainer(
    training_operator_cls=MyTrainingOperator,
    scheduler_step_freq="epoch",  # if scheduler is used
    config={"lr": 0.001, "batch_size": 64})

In [14]:
trainer.train()

RayTaskError(RuntimeError): [36mray::TorchRunner.train_epoch()[39m (pid=46, ip=100.96.3.66)
  File "python/ray/_raylet.pyx", line 505, in ray._raylet.execute_task
  File "python/ray/_raylet.pyx", line 449, in ray._raylet.execute_task.function_executor
  File "/home/ray/anaconda3/lib/python3.8/site-packages/ray/_private/function_manager.py", line 556, in actor_method_executor
    return method(__ray_actor, *args, **kwargs)
  File "/home/ray/anaconda3/lib/python3.8/site-packages/ray/util/sgd/torch/torch_runner.py", line 140, in train_epoch
    train_stats = self.training_operator.train_epoch(iterator, info)
  File "/home/ray/anaconda3/lib/python3.8/site-packages/ray/util/sgd/torch/training_operator.py", line 510, in train_epoch
    metrics = self.train_batch(batch, batch_info=batch_info)
  File "/home/ray/anaconda3/lib/python3.8/site-packages/ray/util/sgd/torch/training_operator.py", line 588, in train_batch
    loss = criterion(output, target)
  File "/home/ray/anaconda3/lib/python3.8/site-packages/torch/nn/modules/module.py", line 550, in __call__
    result = self.forward(*input, **kwargs)
  File "/home/ray/anaconda3/lib/python3.8/site-packages/torch/nn/modules/loss.py", line 516, in forward
    return F.binary_cross_entropy(input, target, weight=self.weight, reduction=self.reduction)
  File "/home/ray/anaconda3/lib/python3.8/site-packages/torch/nn/functional.py", line 2378, in binary_cross_entropy
    return torch._C._nn.binary_cross_entropy(
RuntimeError: all elements of input should be between 0 and 1

In [10]:
for i in range(10):
    metrics = trainer.train()
    val_metrics = trainer.validate()

RayTaskError(RuntimeError): [36mray::TorchRunner.train_epoch()[39m (pid=345, ip=100.96.3.67)
  File "python/ray/_raylet.pyx", line 505, in ray._raylet.execute_task
  File "python/ray/_raylet.pyx", line 449, in ray._raylet.execute_task.function_executor
  File "/home/ray/anaconda3/lib/python3.8/site-packages/ray/_private/function_manager.py", line 556, in actor_method_executor
    return method(__ray_actor, *args, **kwargs)
  File "/home/ray/anaconda3/lib/python3.8/site-packages/ray/util/sgd/torch/torch_runner.py", line 140, in train_epoch
    train_stats = self.training_operator.train_epoch(iterator, info)
  File "/home/ray/anaconda3/lib/python3.8/site-packages/ray/util/sgd/torch/training_operator.py", line 510, in train_epoch
    metrics = self.train_batch(batch, batch_info=batch_info)
  File "/home/ray/anaconda3/lib/python3.8/site-packages/ray/util/sgd/torch/training_operator.py", line 588, in train_batch
    loss = criterion(output, target)
  File "/home/ray/anaconda3/lib/python3.8/site-packages/torch/nn/modules/module.py", line 550, in __call__
    result = self.forward(*input, **kwargs)
  File "/home/ray/anaconda3/lib/python3.8/site-packages/torch/nn/modules/loss.py", line 516, in forward
    return F.binary_cross_entropy(input, target, weight=self.weight, reduction=self.reduction)
  File "/home/ray/anaconda3/lib/python3.8/site-packages/torch/nn/functional.py", line 2378, in binary_cross_entropy
    return torch._C._nn.binary_cross_entropy(
RuntimeError: all elements of input should be between 0 and 1

In [15]:
from ray.util.sgd.torch import TorchTrainer

In [16]:
help(TorchTrainer)

Help on class TorchTrainer in module ray.util.sgd.torch.torch_trainer:

class TorchTrainer(builtins.object)
 |  TorchTrainer(*, training_operator_cls, initialization_hook=None, config=None, num_workers=1, num_cpus_per_worker=1, use_gpu='auto', backend='auto', wrap_ddp=True, timeout_s=1800, use_fp16=False, use_tqdm=False, add_dist_sampler=True, scheduler_step_freq=None, use_local=False, num_replicas=None, batch_size=None, model_creator=None, data_creator=None, optimizer_creator=None, scheduler_creator=None, loss_creator=None, serialize_data_creation=None, data_loader_args=None, apex_args=None)
 |  
 |  Train a PyTorch model using distributed PyTorch.
 |  
 |  Launches a set of actors which connect via distributed PyTorch and
 |  coordinate gradient updates to train the provided model. If Ray is not
 |  initialized, TorchTrainer will automatically initialize a local Ray
 |  cluster for you. Be sure to run `ray.init(address="auto")` to leverage
 |  multi-node training.
 |  
 |  .. code-bl