In [1]:
import ray
ray.init()
from ray import tune


2021-08-30 14:18:17,294	INFO worker.py:801 -- Connecting to existing Ray cluster at address: 172.31.107.75:6379


In [2]:
ray.__version__

'1.5.2'

In [7]:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader

from ray.util.sgd.torch import TrainingOperator
from ray.util.sgd.torch.examples.train_example import LinearDataset

In [18]:
class MyTrainingOperator(TrainingOperator):
    def setup(self, config):
        # Setup all components needed for training here. This could include
        # data, models, optimizers, loss & schedulers.

        # Setup data loaders.
        train_dataset, val_dataset = LinearDataset(2, 5), LinearDataset(2,
                                                                        5)
        train_loader = DataLoader(train_dataset,
                                  batch_size=config["batch_size"])
        val_loader = DataLoader(val_dataset,
                                batch_size=config["batch_size"])

        # Setup model.
        model = nn.Linear(1, 1)

        # Setup optimizer.
        optimizer = torch.optim.SGD(model.parameters(), lr=config.get("lr", 1e-4))

        # Setup loss.
        criterion = torch.nn.MSELoss()

        # Setup scheduler.
        scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.9)

        # Register all of these components with Ray SGD.
        # This allows Ray SGD to do framework level setup like Cuda, DDP,
        # Distributed Sampling, FP16.
        # We also assign the return values of self.register to instance
        # attributes so we can access it in our custom training/validation
        # methods.
        self.model, self.optimizer, self.criterion, self.scheduler = \
            self.register(models=model, optimizers=optimizer,
                          criterion=criterion,
                          schedulers=scheduler)
        self.register_data(train_loader=train_loader, validation_loader=val_loader)

In [22]:
from ray.util.sgd import TorchTrainer

trainer = TorchTrainer(
    training_operator_cls=MyTrainingOperator,
    scheduler_step_freq="epoch",  # if scheduler is used
    config={"lr": 0.001, "batch_size": 64*64*64*64},
    num_workers=2,
    use_gpu=False)


[2m[36m(pid=21973)[0m 2021-08-30 14:39:00,098	INFO distributed_torch_runner.py:58 -- Setting up process group for: tcp://172.31.107.75:57193 [rank=0]
[2m[36m(pid=22279)[0m 2021-08-30 14:39:00,699	INFO distributed_torch_runner.py:58 -- Setting up process group for: tcp://172.31.107.75:57193 [rank=1]


In [24]:
for i in range(10):
    metrics = trainer.train()
    print(metrics)
    val_metrics = trainer.validate()
    print(val_metrics)
print("success!")

{'num_samples': 1000, 'epoch': 11.0, 'batch_count': 1.0, 'train_loss': 81.68974304199219, 'last_train_loss': 81.68974304199219}
{'num_samples': 1000, 'batch_count': 1.0, 'val_loss': 73.56553649902344, 'last_val_loss': 73.56553649902344, 'val_accuracy': 0.0, 'last_val_accuracy': 0.0}
{'num_samples': 1000, 'epoch': 12.0, 'batch_count': 1.0, 'train_loss': 73.56553649902344, 'last_train_loss': 73.56553649902344}
{'num_samples': 1000, 'batch_count': 1.0, 'val_loss': 66.31209182739258, 'last_val_loss': 66.31209182739258, 'val_accuracy': 0.0, 'last_val_accuracy': 0.0}
{'num_samples': 1000, 'epoch': 13.0, 'batch_count': 1.0, 'train_loss': 66.31209182739258, 'last_train_loss': 66.31209182739258}
{'num_samples': 1000, 'batch_count': 1.0, 'val_loss': 59.83603286743164, 'last_val_loss': 59.83603286743164, 'val_accuracy': 0.0, 'last_val_accuracy': 0.0}
{'num_samples': 1000, 'epoch': 14.0, 'batch_count': 1.0, 'train_loss': 59.83603096008301, 'last_train_loss': 59.83603096008301}
{'num_samples': 1000