In [4]:
import torch
from torch import nn
from torch.utils.data import DataLoader
from torchvision import datasets
from torchvision.transforms import ToTensor
from ray.air import Checkpoint
from ray.train.torch import TorchTrainer
from ray.air.config import ScalingConfig
import ray.train as train
from ray.air import session
import argparse
import os

In [2]:
# Define model
class NeuralNetwork(nn.Module):
    def __init__(self):
        super(NeuralNetwork, self).__init__()
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(28 * 28, 512),
            nn.ReLU(),
            nn.Linear(512, 512),
            nn.ReLU(),
            nn.Linear(512, 10),
            nn.ReLU(),
        )

    def forward(self, x):
        x = self.flatten(x)
        logits = self.linear_relu_stack(x)
        return logits

def train_epoch(dataloader, model, loss_fn, optimizer, device='cuda'):
    size = len(dataloader.dataset) // session.get_world_size()  # Divide by word size
    model.train()
    for batch, (X, y) in enumerate(dataloader):
        # We don't need this anymore! Ray Train does this automatically:
        X, y = X.to(device), y.to(device)  

        # Compute prediction error
        pred = model(X)
        loss = loss_fn(pred, y)

        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if batch % 100 == 0:
            loss, current = loss.item(), batch * len(X)
            # print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")

def test_epoch(dataloader, model, loss_fn, device='cuda'):
    size = len(dataloader.dataset) // session.get_world_size()  # Divide by word size
    num_batches = len(dataloader)
    model.eval()
    test_loss, correct = 0, 0
    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()
    test_loss /= num_batches
    correct /= size
    # print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")
    return test_loss

def load_data():
    # Download training data from open datasets.
    training_data = datasets.FashionMNIST(
        root="data",
        train=True,
        download=True,
        transform=ToTensor(),
    )

    # Download test data from open datasets.
    test_data = datasets.FashionMNIST(
        root="data",
        train=False,
        download=True,
        transform=ToTensor(),
    )
    return training_data, test_data


def train_func(config: dict):
    batch_size = config["batch_size"]
    lr = config["lr"]
    epochs = config["epochs"]
    
    batch_size_per_worker = batch_size // session.get_world_size()
    
    training_data, test_data = load_data()  # <- this is new!
    
    # Create data loaders.
    train_dataloader = DataLoader(training_data, batch_size=batch_size_per_worker)
    test_dataloader = DataLoader(test_data, batch_size=batch_size_per_worker)
    
    train_dataloader = train.torch.prepare_data_loader(train_dataloader)
    test_dataloader = train.torch.prepare_data_loader(test_dataloader)
    
    model = NeuralNetwork()
    model = train.torch.prepare_model(model)
    
    loss_fn = nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=lr)
    
    for t in range(epochs):
        train_epoch(train_dataloader, model, loss_fn, optimizer)
        test_loss = test_epoch(test_dataloader, model, loss_fn)
        checkpoint = Checkpoint.from_dict(
            dict(epoch=t, model=model.state_dict())
        )
        session.report(dict(loss=test_loss), checkpoint=checkpoint)

    print("Done!")

def train_fashion_mnist(num_workers=3, use_gpu=True):
    trainer = TorchTrainer(
        train_loop_per_worker=train_func,
        train_loop_config={"lr": 1e-3, "batch_size": 64, "epochs": 4},
        scaling_config=ScalingConfig(num_workers=num_workers, use_gpu=use_gpu),
    )
    result = trainer.fit()
    print(f"Last result: {result.metrics}")

In [3]:
if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--num-workers","-n", type=int, default=3, help="Sets number of workers for training.",)
    parser.add_argument("--use-gpu", action="store_true", default=True, help="Enables GPU training")
    args, _ = parser.parse_known_args()

    import ray
    if ray.is_initialized() == False:
        print("Connecting to Ray cluster...")
        service_host = os.environ["RAY_HEAD_SERVICE_HOST"]
        service_port = os.environ["RAY_HEAD_SERVICE_PORT"]
        ray.util.connect(f"{service_host}:{service_port}")

    # train_fashion_mnist(num_workers=args.num_workers, use_gpu=True)
    train_fashion_mnist(num_workers=3, use_gpu=True)


Connecting to Ray cluster...


Error in data channel:
Queue filler thread failed to join before timeout: 10
2023-04-05 22:41:16,473	ERROR dataclient.py:323 -- Unrecoverable error in data channel.


ConnectionError: Failed during this or a previous request. Exception that broke the connection: <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.FAILED_PRECONDITION
	details = "No module named 'IPython'"
	debug_error_string = "{"created":"@1680734476.473590501","description":"Error received from peer ipv4:172.20.64.229:10001","file":"src/core/lib/surface/call.cc","file_line":1074,"grpc_message":"No module named 'IPython'","grpc_status":9}"
>

In [5]:
ray.init()

2023-04-05 22:47:31,603	ERROR services.py:1169 -- Failed to start the dashboard 
2023-04-05 22:47:31,605	ERROR services.py:1194 -- Error should be written to 'dashboard.log' or 'dashboard.err'. We are printing the last 20 lines for you. See 'https://docs.ray.io/en/master/ray-observability/ray-logging.html#logging-directory-structure' to find where the log file is.
2023-04-05 22:47:31,606	ERROR services.py:1238 -- 
The last 20 lines of /tmp/ray/session_2023-04-05_22-47-10_034990_451/logs/dashboard.log (it contains the error message from the dashboard): 
2023-04-05 22:47:12,088	INFO http_server_head.py:210 -- <ResourceRoute [GET] <PlainResource  /api/v0/cluster_events> -> <function RateLimitedModule.enforce_max_concurrent_calls.<locals>.async_wrapper at 0x7fe27a657700>
2023-04-05 22:47:12,088	INFO http_server_head.py:210 -- <ResourceRoute [GET] <PlainResource  /api/v0/logs> -> <function RateLimitedModule.enforce_max_concurrent_calls.<locals>.async_wrapper at 0x7fe27a6578b0>
2023-04-05 22

0,1
Python version:,3.9.5
Ray version:,2.3.1


In [4]:
@ray.remote
def read_file():
    logs = open("/home/ray/ray_results/TorchTrainer_2023-04-05_14-32-44/TorchTrainer_672ea_00000_0_2023-04-05_14-32-45/error.txt", "r").readlines()
    print(logs)
    return logs

In [2]:
import pkg_resources

In [3]:
@ray.remote
def ray_version():
    version = pkg_resources.get_distribution("ray").version
    print(version)
    return version

In [4]:
ray.get(ray_version.remote())

2023-04-05 22:24:48,701	INFO worker.py:1544 -- Started a local Ray instance. View the dashboard at [1m[32m127.0.0.1:8265 [39m[22m


[2m[36m(ray_version pid=418)[0m 2.3.1


'2.3.1'

In [5]:
ray.get(read_file.remote())

['Failure # 1 (occurred at 2023-04-05_14-32-49)\n',
 '\x1b[36mray::TrainTrainable.train()\x1b[39m (pid=1066, ip=10.0.57.80, repr=TorchTrainer)\n',
 '  File "/home/ray/anaconda3/lib/python3.9/site-packages/ray/tune/trainable/trainable.py", line 347, in train\n',
 '    result = self.step()\n',
 '  File "/home/ray/anaconda3/lib/python3.9/site-packages/ray/tune/trainable/function_trainable.py", line 417, in step\n',
 '    self._report_thread_runner_error(block=True)\n',
 '  File "/home/ray/anaconda3/lib/python3.9/site-packages/ray/tune/trainable/function_trainable.py", line 589, in _report_thread_runner_error\n',
 '    raise e\n',
 '  File "/home/ray/anaconda3/lib/python3.9/site-packages/ray/tune/trainable/function_trainable.py", line 289, in run\n',
 '    self._entrypoint()\n',
 '  File "/home/ray/anaconda3/lib/python3.9/site-packages/ray/tune/trainable/function_trainable.py", line 362, in entrypoint\n',
 '    return self._trainable_func(\n',
 '  File "/opt/conda/lib/python3.9/site-packag

[2m[36m(read_file pid=6316)[0m ['Failure # 1 (occurred at 2023-04-05_14-32-49)\n', '\x1b[36mray::TrainTrainable.train()\x1b[39m (pid=1066, ip=10.0.57.80, repr=TorchTrainer)\n', '  File "/home/ray/anaconda3/lib/python3.9/site-packages/ray/tune/trainable/trainable.py", line 347, in train\n', '    result = self.step()\n', '  File "/home/ray/anaconda3/lib/python3.9/site-packages/ray/tune/trainable/function_trainable.py", line 417, in step\n', '    self._report_thread_runner_error(block=True)\n', '  File "/home/ray/anaconda3/lib/python3.9/site-packages/ray/tune/trainable/function_trainable.py", line 589, in _report_thread_runner_error\n', '    raise e\n', '  File "/home/ray/anaconda3/lib/python3.9/site-packages/ray/tune/trainable/function_trainable.py", line 289, in run\n', '    self._entrypoint()\n', '  File "/home/ray/anaconda3/lib/python3.9/site-packages/ray/tune/trainable/function_trainable.py", line 362, in entrypoint\n', '    return self._trainable_func(\n', '  File "/opt/conda/lib

In [None]:
'scikit-image' : pkg_resources.get_distribution("scikit-image").version,

In [1]:
import ray
ray.__version__

'2.3.1'