# Ray Libararies Quick Start

ref: https://docs.ray.io/en/latest/ray-overview/index.html#ray-libraries-quick-start

## 1. Ray Data: Creating and Transfroming Datasets

In [1]:
import ray

# Create a Dataset of Python objects.
ds = ray.data.range(10000)
# -> Dataset(num_blocks=200, num_rows=10000, schema=<class 'int'>)

2023-01-03 15:41:16,109	INFO worker.py:1529 -- Started a local Ray instance. View the dashboard at [1m[32mhttp://127.0.0.1:8266 [39m[22m


In [2]:
print(ds)

Dataset(num_blocks=20, num_rows=10000, schema=<class 'int'>)


In [3]:
ds.take(5)

[0, 1, 2, 3, 4]

In [4]:
ds.schema()

int

In [5]:
# Create a Dataset from Python objects, which are held as Arrow records.
ds = ray.data.from_items([
        {"sepal.length": 5.1, "sepal.width": 3.5,
         "petal.length": 1.4, "petal.width": 0.2, "variety": "Setosa"},
        {"sepal.length": 4.9, "sepal.width": 3.0,
         "petal.length": 1.4, "petal.width": 0.2, "variety": "Setosa"},
        {"sepal.length": 4.7, "sepal.width": 3.2,
         "petal.length": 1.3, "petal.width": 0.2, "variety": "Setosa"},
     ])

In [6]:
print(ds)

Dataset(num_blocks=3, num_rows=3, schema={sepal.length: double, sepal.width: double, petal.length: double, petal.width: double, variety: string})


In [7]:
ds.show()

{'sepal.length': 5.1, 'sepal.width': 3.5, 'petal.length': 1.4, 'petal.width': 0.2, 'variety': 'Setosa'}
{'sepal.length': 4.9, 'sepal.width': 3.0, 'petal.length': 1.4, 'petal.width': 0.2, 'variety': 'Setosa'}
{'sepal.length': 4.7, 'sepal.width': 3.2, 'petal.length': 1.3, 'petal.width': 0.2, 'variety': 'Setosa'}


In [8]:
ds.schema()

sepal.length: double
sepal.width: double
petal.length: double
petal.width: double
variety: string

In [9]:
# Create from CSV.
# Tip: "example://" is a convenient protocol to access the
# python/ray/data/examples/data directory.
ds = ray.data.read_csv("example://iris.csv")



In [10]:
print(ds)

Dataset(num_blocks=1, num_rows=150, schema={sepal.length: double, sepal.width: double, petal.length: double, petal.width: double, variety: string})


In [12]:
ds.take(3)

[ArrowRow({'sepal.length': 5.1,
           'sepal.width': 3.5,
           'petal.length': 1.4,
           'petal.width': 0.2,
           'variety': 'Setosa'}),
 ArrowRow({'sepal.length': 4.9,
           'sepal.width': 3.0,
           'petal.length': 1.4,
           'petal.width': 0.2,
           'variety': 'Setosa'}),
 ArrowRow({'sepal.length': 4.7,
           'sepal.width': 3.2,
           'petal.length': 1.3,
           'petal.width': 0.2,
           'variety': 'Setosa'})]

In [14]:
# Create from Parquet.
ds = ray.data.read_parquet("example://iris.parquet")
# Dataset(num_blocks=1, num_rows=150,
#         schema={sepal.length: float64, sepal.width: float64,
#                 petal.length: float64, petal.width: float64, variety: object})



In [16]:
print(ds)

Dataset(num_blocks=1, num_rows=150, schema={sepal.length: double, sepal.width: double, petal.length: double, petal.width: double, variety: string})


In [15]:
ds.take(3)

[ArrowRow({'sepal.length': 5.1,
           'sepal.width': 3.5,
           'petal.length': 1.4,
           'petal.width': 0.2,
           'variety': 'Setosa'}),
 ArrowRow({'sepal.length': 4.9,
           'sepal.width': 3.0,
           'petal.length': 1.4,
           'petal.width': 0.2,
           'variety': 'Setosa'}),
 ArrowRow({'sepal.length': 4.7,
           'sepal.width': 3.2,
           'petal.length': 1.3,
           'petal.width': 0.2,
           'variety': 'Setosa'})]

In [17]:
import pandas

# Create 10 blocks for parallelism.
ds = ds.repartition(10)

[dataset]: Run `pip install tqdm` to enable progress reporting.


In [18]:
print(ds)

Dataset(num_blocks=10, num_rows=150, schema={sepal.length: double, sepal.width: double, petal.length: double, petal.width: double, variety: string})


In [19]:
# Find rows with sepal.length < 5.5 and petal.length > 3.5.
def transform_batch(df: pandas.DataFrame) -> pandas.DataFrame:
    return df[(df["sepal.length"] < 5.5) & (df["petal.length"] > 3.5)]

In [20]:
transformed_ds = ds.map_batches(transform_batch)

In [21]:
print(transformed_ds)

Dataset(num_blocks=10, num_rows=3, schema={sepal.length: float64, sepal.width: float64, petal.length: float64, petal.width: float64, variety: object})


In [22]:
transformed_ds.show()

{'sepal.length': 5.2, 'sepal.width': 2.7, 'petal.length': 3.9, 'petal.width': 1.4, 'variety': 'Versicolor'}
{'sepal.length': 5.4, 'sepal.width': 3.0, 'petal.length': 4.5, 'petal.width': 1.5, 'variety': 'Versicolor'}
{'sepal.length': 4.9, 'sepal.width': 2.5, 'petal.length': 4.5, 'petal.width': 1.7, 'variety': 'Virginica'}


In [23]:
transformed_ds.take()

[PandasRow({'sepal.length': 5.2,
            'sepal.width': 2.7,
            'petal.length': 3.9,
            'petal.width': 1.4,
            'variety': 'Versicolor'}),
 PandasRow({'sepal.length': 5.4,
            'sepal.width': 3.0,
            'petal.length': 4.5,
            'petal.width': 1.5,
            'variety': 'Versicolor'}),
 PandasRow({'sepal.length': 4.9,
            'sepal.width': 2.5,
            'petal.length': 4.5,
            'petal.width': 1.7,
            'variety': 'Virginica'})]

## 2. Ray Train: Distributed Model Training

In [1]:
import torch
import torch.nn as nn

num_samples = 20
input_size = 10
layer_size = 15
output_size = 5

class NeuralNetwork(nn.Module):
    def __init__(self):
        super(NeuralNetwork, self).__init__()
        self.layer1 = nn.Linear(input_size, layer_size)
        self.relu = nn.ReLU()
        self.layer2 = nn.Linear(layer_size, output_size)

    def forward(self, input):
        return self.layer2(self.relu(self.layer1(input)))

# In this example we use a randomly generated dataset.
input = torch.randn(num_samples, input_size)
labels = torch.randn(num_samples, output_size)

In [2]:
import torch.optim as optim

def train_func():
    num_epochs = 3
    model = NeuralNetwork()
    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(model.parameters(), lr=0.1)

    for epoch in range(num_epochs):
        output = model(input)
        loss = loss_fn(output, labels)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        print(f"epoch: {epoch}, loss: {loss.item()}")


In [3]:
train_func()

epoch: 0, loss: 0.8631597757339478
epoch: 1, loss: 0.8453680276870728
epoch: 2, loss: 0.829220712184906


In [4]:
from ray import train

def train_func_distributed():
    num_epochs = 3
    model = NeuralNetwork()
    model = train.torch.prepare_model(model)
    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(model.parameters(), lr=0.1)

    for epoch in range(num_epochs):
        output = model(input)
        loss = loss_fn(output, labels)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        print(f"epoch: {epoch}, loss: {loss.item()}")


In [5]:
from ray.train.torch import TorchTrainer
from ray.air.config import ScalingConfig

# For GPU Training, set `use_gpu` to True.
use_gpu = False

trainer = TorchTrainer(
    train_func_distributed,
    scaling_config=ScalingConfig(
        num_workers=4, use_gpu=use_gpu)
)

results = trainer.fit()


2023-01-03 16:04:00,779	INFO worker.py:1529 -- Started a local Ray instance. View the dashboard at [1m[32mhttp://127.0.0.1:8265 [39m[22m


0,1
Current time:,2023-01-03 16:04:14
Running for:,00:00:12.40
Memory:,15.1/16.0 GiB

Trial name,status,loc
TorchTrainer_2f8c3_00000,TERMINATED,127.0.0.1:1255


[2m[36m(RayTrainWorker pid=1275)[0m 2023-01-03 16:04:10,885	INFO config.py:86 -- Setting up process group for: env:// [rank=0, world_size=4]
[2m[36m(RayTrainWorker pid=1275)[0m 2023-01-03 16:04:11,958	INFO train_loop_utils.py:270 -- Moving model to device: cpu
[2m[36m(RayTrainWorker pid=1275)[0m 2023-01-03 16:04:11,959	INFO train_loop_utils.py:330 -- Wrapping provided model in DistributedDataParallel.


[2m[36m(RayTrainWorker pid=1275)[0m epoch: 0, loss: 0.9114844799041748
[2m[36m(RayTrainWorker pid=1278)[0m epoch: 0, loss: 0.9114844799041748
[2m[36m(RayTrainWorker pid=1277)[0m epoch: 0, loss: 0.9114844799041748
[2m[36m(RayTrainWorker pid=1279)[0m epoch: 0, loss: 0.9114844799041748
[2m[36m(RayTrainWorker pid=1279)[0m epoch: 1, loss: 0.8945875763893127
[2m[36m(RayTrainWorker pid=1275)[0m epoch: 1, loss: 0.8945875763893127
[2m[36m(RayTrainWorker pid=1278)[0m epoch: 1, loss: 0.8945875763893127
[2m[36m(RayTrainWorker pid=1277)[0m epoch: 1, loss: 0.8945875763893127
[2m[36m(RayTrainWorker pid=1279)[0m epoch: 2, loss: 0.8796463012695312
[2m[36m(RayTrainWorker pid=1275)[0m epoch: 2, loss: 0.8796463012695312
[2m[36m(RayTrainWorker pid=1278)[0m epoch: 2, loss: 0.8796463012695312
[2m[36m(RayTrainWorker pid=1277)[0m epoch: 2, loss: 0.8796463012695312


2023-01-03 16:04:14,943	ERROR checkpoint_manager.py:327 -- Result dict has no key: training_iteration. checkpoint_score_attr must be set to a key in the result dict. Valid keys are: ['trial_id', 'experiment_id', 'date', 'timestamp', 'pid', 'hostname', 'node_ip', 'done']


Trial TorchTrainer_2f8c3_00000 completed. Last result: 


2023-01-03 16:04:15,063	INFO tune.py:762 -- Total run time: 12.53 seconds (12.40 seconds for the tuning loop).
