# Ray Data Test Notebook

The vast majority of this notebook is based off implementing the examples made available by the ray data [getting started](https://docs.ray.io/en/latest/data/getting-started.html#datasets-getting-started) docs. 



### What kinds of things should I use Ray Datasets for? 

According to their docs, Ray is, "designed to load and pre-process data for distributed ML training pipelines...Ray Datasets is not intended as a replacement for more general data processing systems"[[1]]. Its purpose is only to serve as a "last mile" distributed data processing tool. Therefore it is designed with the following 3 use cases in mind. 

* Last Mile Processing
* Parallel Batch Inference
* ML Training Ingest (Distributed training)

Below we will attempt to evaluate Ray for these different types of use cases. 

_Note: current testing / evaluation done on a local PC with 32GB memory. This will need to be scaled down to work on ODH with current pod resource sizes i think._ 

[1]: https://docs.ray.io/en/master/data/faq.html#what-should-i-use-ray-datasets-for

In [1]:
import ray
from ray.data.aggregate import Mean, Std

import os
import gc
import pandas as pd
import numpy as np
import dask.dataframe as dd

from ray.util import connect as ray_connect
from ray.util import disconnect as ray_disconnect
from ray.util.client import ray as rayclient


%load_ext memory_profiler

Connect to our remote ray cluster if we're on an ODH notebook image. 

In [2]:
if os.environ.get('RAY_CLUSTER') is not None:
    if rayclient.is_connected():
        ray_disconnect()

    ray_connect('{ray_head}:10001'.format(ray_head=os.environ['RAY_CLUSTER']))
    print(f"connected to {os.environ.get('RAY_CLUSTER')}") 
else:
    print("local")

local


# Part 1: Ray Dataset 

If we are going to test the capabilities of this Ray data tool, we are going to need a reasonably sized example data set. Let's create a CSV file that's almost 1GB and save it to our current file system.    

In [3]:
if os.path.exists("tmp/output") == False:
    print("creating dataset")
    %memit \
    ds = ray.data.range(100000000)
    print("writing file")
    ds.repartition(1).write_csv("tmp/output")
    del ds
    gc.collect()
else:
    print("file exists")

file exists


Now that we've got our "BIG" dataset, let's read it in with Ray vs vanilla pandas, run some basic data transformations and compare each's memory foot print.   

In [2]:
file = os.listdir("tmp/output/")[0]

## pandas

In [4]:
%%time
%memit ds_line = pd.read_csv(f"tmp/output/{file}")
ds_line.shape

peak memory: 1699.57 MiB, increment: 1499.58 MiB
CPU times: user 8.82 s, sys: 1.2 s, total: 10 s
Wall time: 10.9 s


(100000000, 1)

In [7]:
%%time
%memit ds_line = pd.read_csv(f"tmp/output/{file}")
ds_line.shape

peak memory: 2425.41 MiB, increment: 1461.45 MiB
CPU times: user 8.36 s, sys: 546 ms, total: 8.91 s
Wall time: 9.08 s


(100000000, 1)

In [5]:
%%time
%memit ds_line[:1000000]

peak memory: 964.46 MiB, increment: 0.00 MiB
CPU times: user 104 ms, sys: 49.7 ms, total: 154 ms
Wall time: 287 ms


In [6]:
%%time
%memit ds_line.applymap(lambda x: x *2) 

peak memory: 12588.46 MiB, increment: 11624.00 MiB
CPU times: user 37.2 s, sys: 5.36 s, total: 42.5 s
Wall time: 42.9 s


In [7]:
%%time
%memit \
ds_line = ds_line[ds_line["value"] > 5]
ds_line.head(5)

peak memory: 4170.55 MiB, increment: 3204.82 MiB
CPU times: user 1.81 s, sys: 322 ms, total: 2.13 s
Wall time: 2.27 s


Unnamed: 0,value
6,6
7,7
8,8
9,9
10,10


In [8]:
del ds_line

## Ray Data

In [3]:
### Wokers don't have PVC access...So this won't work like locally 
%%time
%memit ds_dst = ray.data.read_csv(f"tmp/output/{file}")
print(ds_dst)

2022-06-07 16:12:52,333	INFO services.py:1462 -- View the Ray dashboard at [1m[32mhttp://127.0.0.1:8265[39m[22m


peak memory: 228.02 MiB, increment: 29.05 MiB
Dataset(num_blocks=1, num_rows=None, schema={value: int64})
CPU times: user 302 ms, sys: 119 ms, total: 421 ms
Wall time: 13.7 s


In [6]:
%%time
%memit ds_dst.take(1000000)

peak memory: 925.93 MiB, increment: 695.38 MiB
CPU times: user 4.47 s, sys: 363 ms, total: 4.83 s
Wall time: 4.94 s


In [7]:
%%time
%memit ds_dst.map_batches(lambda df:  df.applymap(lambda x: x *2), batch_format='pandas') 

Map Progress: 100%|██████████| 1/1 [02:08<00:00, 128.44s/it]

peak memory: 678.05 MiB, increment: 2.24 MiB
CPU times: user 2.11 s, sys: 413 ms, total: 2.52 s
Wall time: 2min 8s





In [4]:
%%time
%memit ds_dst = ds_dst.map_batches(lambda df: df[df["value"] > 5], batch_format="pandas")
ds_dst.take(10)

Map Progress: 100%|██████████| 1/1 [01:26<00:00, 86.60s/it]

peak memory: 229.45 MiB, increment: 0.86 MiB
CPU times: user 1.57 s, sys: 283 ms, total: 1.85 s
Wall time: 1min 26s





[{'value': 6},
 {'value': 7},
 {'value': 8},
 {'value': 9},
 {'value': 10},
 {'value': 11},
 {'value': 12},
 {'value': 13},
 {'value': 14},
 {'value': 15}]

Running all of the above cells looks to leave you with about a 20Gb memory load... May have to reset the kernel to move forward.

What if we have a distributed dataset? not 1 file and partition?

In [14]:
if os.path.exists("tmp/output_dist") == False:
    print("creating dataset")
    %memit \
    ds = ray.data.range(100000000)
    print("writing file")
    ds.write_csv("tmp/output_dist")
    del ds
    gc.collect()
else:
    print("files exists")

creating dataset
peak memory: 683.82 MiB, increment: 0.95 MiB
writing file


Write Progress: 100%|██████████| 200/200 [01:43<00:00,  1.93it/s]


In [15]:
%%time
%memit ds_dst = ray.data.read_csv(f"tmp/output_dist/")
print(ds_dst)

peak memory: 686.88 MiB, increment: 0.17 MiB
Dataset(num_blocks=200, num_rows=None, schema={value: int64})
CPU times: user 137 ms, sys: 35.2 ms, total: 172 ms
Wall time: 476 ms


In [16]:
%%time
%memit ds_dst.take(1000000)

peak memory: 941.44 MiB, increment: 254.50 MiB
CPU times: user 4.53 s, sys: 498 ms, total: 5.03 s
Wall time: 5.18 s


In [17]:
%%time
%memit ds_dst.map_batches(lambda df:  df.applymap(lambda x: x *2), batch_format='pandas') 

Map Progress: 100%|██████████| 200/200 [02:03<00:00,  1.61it/s]

peak memory: 697.15 MiB, increment: 1.21 MiB
CPU times: user 2.94 s, sys: 483 ms, total: 3.42 s
Wall time: 2min 4s





In [18]:
%%time
%memit ds_dst = ds_dst.map_batches(lambda df: df[df["value"] > 5], batch_format="pandas")
ds_dst.take(10)

Map Progress: 100%|██████████| 200/200 [01:26<00:00,  2.32it/s]

peak memory: 697.88 MiB, increment: 0.73 MiB
CPU times: user 2.64 s, sys: 426 ms, total: 3.07 s
Wall time: 1min 26s





[{'value': 6},
 {'value': 7},
 {'value': 8},
 {'value': 9},
 {'value': 10},
 {'value': 11},
 {'value': 12},
 {'value': 13},
 {'value': 14},
 {'value': 15}]

Above we have evaluated 3 scenarios: Using vanilla pandas with a single dataset, using Ray with a single dataset and using Ray with a distributed dataset for a number of different operations. 

Our generated datasets are 100,000,000 rows long and 1 column wide, consisting only of integers. Below we have recorded the timing and memory results for loading the data, retrieving a slice (subset), applying the square function to each element and applying a filter to the dataset along with the total time taken to perform each step and the memory still in use after the entire set of operations ran. 

#### Ray vs Pandas performance results

|                   |  Load      | Slice     |  Square     |  Filter   | Total Change | 
|-------------------|------------|-----------|-------------|-----------|--------------|
|Pandas             | 10s, 800mb | 1s, 0mb   | 43s, 0mb    | 2s, 500mb | 56s, 1600mb  |
|Ray (single block) | 9s, 1600mb | 5s,400mb  | 128s,2400mb | 86s,0mb   | 228s, 4400mb |
|Ray (multi block)  | 1s, 1000mb | 5s, 400mb | 35s, 1100mb | 14s, 0mb  | 55s, 2500mb  |


<br/><br/>
From the table above we can see that using Ray data without dividing our dataset object into a reasonable number of blocks performs quite poorly. Its by far the slowest approach for the operations above and uses the most memory overall. 
For a smallish dataset like we are using here (~1Gb) vanilla pandas still works fairly well, however, it is still running as a single process and is not taking full advantage of the available resources. 
With the Ray Dataset divided into 200 blocks we get (in some cases) faster times than pandas with only about 1Gb more memory required. Furthermore, this approach maximizes use of the available resource on the machine. 

We are also able to convert Ray (single block) to Ray (multi block) and get the same increased performance by running a `ds.repartition(200)` command on our dataset. However, it is a somewhat expensive operation and should be avoided if possible.  


_note: These are the experimental results on an 8 core laptop with 32Gb Memory and should be repeated on an OPF cluster._

_note 2: The memory values recorded above from %memit did not seem to accurately capture the amount of memory used by the multiple Ray processes, so the chart reflects total usage from machine while running above code and not the %memit values._



# Part 2: ML Preprocessing 

In this section we will mostly follow the ["dataset ml preprocessing"](https://docs.ray.io/en/latest/data/examples/big_data_ingestion.html) section of the Ray data docs to evaluate some of the "last mile" type of processing we'd want to use Ray for in a machine learning pipeline. Specifically we will perform the following 3 types of operations:

1. Data Cleaning
2. Aggregation and scaling
3. Random Shuffle

The first thing we need to do is create a slightly more complex data set, one that has 3 columns with proper column names. 

In [3]:
# make a multi-column data set
if os.path.exists("tmp/output_multi_col") == False:
    print("creating dataset")
    %memit \
    ds = ray.data.from_items([{"A":i%3,"B":i * 2,"C":i * 3} for i in range(20000000)])
    print("writing file")
    ds.write_csv("tmp/output_multi_col")
    del ds
    gc.collect()
else:
    print("files exists")

files exists


#### Data Cleaning

Cool, let's encapsulate all the data cleaning steps we want to perform on our data into a single function. This is good practice in general, but will also let us pass this function to ray to be run in parallel on our dataset. 

All the moves below are arbitrary and selected just to show what's possible :)   

In [4]:
# A Pandas DataFrame UDF for transforming the underlying blocks of a Dataset in parallel.
def transform_batch(df: pd.DataFrame):
    # Drop nulls.
    df = df.dropna(subset=["A"])
    # Add new column.
    df["new_col"] = df["A"] - 2 * df["B"] + df["C"] / 3
    # Transform existing column.
    df["A"] = 2 * df["A"] + 1
    # Drop column.
    df.drop(columns="B", inplace=True)
    # Re-add column 
    df["B"] = df["C"]
    return df

Read in our new dataset 

In [5]:
%%time
%memit ds = ray.data.read_csv(f"tmp/output_multi_col/")
print(ds)

2022-06-09 15:53:52,033	INFO services.py:1462 -- View the Ray dashboard at [1m[32mhttp://127.0.0.1:8265[39m[22m


peak memory: 230.08 MiB, increment: 32.63 MiB
Dataset(num_blocks=200, num_rows=None, schema={A: int64, B: int64, C: int64})
CPU times: user 297 ms, sys: 86.7 ms, total: 384 ms
Wall time: 4.91 s


Apply the transformations to our dataset in parallel on each block.  

In [6]:
%%time
ds = ds.map_batches(transform_batch, batch_format="pandas")
ds.take(5)

Map Progress: 100%|██████████| 200/200 [00:13<00:00, 14.41it/s]

CPU times: user 1.8 s, sys: 255 ms, total: 2.05 s
Wall time: 14.2 s





[{'A': 1, 'C': 0, 'new_col': 0.0, 'B': 0},
 {'A': 3, 'C': 3, 'new_col': -2.0, 'B': 3},
 {'A': 5, 'C': 6, 'new_col': -4.0, 'B': 6},
 {'A': 1, 'C': 9, 'new_col': -9.0, 'B': 9},
 {'A': 3, 'C': 12, 'new_col': -11.0, 'B': 12}]

AND, for good measure, let's compare timing of loading our dataset and running our cleaning function using regular old pandas.

In [8]:
%%time
files = os.listdir("tmp/output_multi_col")
files = [f"tmp/output_multi_col/{file}" for file in files]
%memit ds_panda = pd.concat(map(pd.read_csv, files))
ds_panda.shape

peak memory: 1251.20 MiB, increment: 1010.46 MiB
CPU times: user 4.85 s, sys: 1.48 s, total: 6.33 s
Wall time: 6.44 s


(20000000, 3)

In [9]:
%%time
ds_panda = transform_batch(ds_panda)
ds_panda.head(5)

CPU times: user 929 ms, sys: 268 ms, total: 1.2 s
Wall time: 1.19 s


Unnamed: 0,A,C,new_col,B
0,1,0,0.0,0
1,3,3,-2.0,3
2,5,6,-4.0,6
3,1,9,-9.0,9
4,3,12,-11.0,12


#### Aggregations and Scaling

Now let's looks at a few operations like getting the mean, std, and scaling our data set that require knowledge of the whole dataset making them a little more difficult to parallelize.  

In [10]:
%time
ds.mean("B")

CPU times: user 6 µs, sys: 1 µs, total: 7 µs
Wall time: 11.4 µs


GroupBy Map: 100%|██████████| 200/200 [01:18<00:00,  2.56it/s]
GroupBy Reduce: 100%|██████████| 1/1 [00:00<00:00,  3.97it/s]


29999998.5

In [11]:
%time
ds.mean(["B", "C"])

CPU times: user 3 µs, sys: 1 µs, total: 4 µs
Wall time: 8.11 µs


GroupBy Map: 100%|██████████| 200/200 [02:01<00:00,  1.65it/s]
GroupBy Reduce: 100%|██████████| 1/1 [00:00<00:00,  4.64it/s]


{'mean(B)': 29999998.5, 'mean(C)': 29999998.5}

As always, we'll run the same operations with pandas so we have something to compare our results to.  

In [12]:
%time
ds_panda[["B","C"]].mean()

CPU times: user 7 µs, sys: 0 ns, total: 7 µs
Wall time: 11 µs


B    29999998.5
C    29999998.5
dtype: float64

In [13]:
%%time
stats = ds.aggregate(Mean("B"), Std("B"), Mean("C"), Std("C"), Mean("new_col"), Std("new_col") )
stats

GroupBy Map: 100%|██████████| 200/200 [04:58<00:00,  1.49s/it]
GroupBy Reduce: 100%|██████████| 1/1 [00:00<00:00,  2.74it/s]

CPU times: user 3.4 s, sys: 459 ms, total: 3.86 s
Wall time: 5min





{'mean(B)': 29999998.5, 'std(B)': 17320508.50870145, 'mean(C)': 29999998.5, 'std(C)': 17320508.50870145, 'mean(new_col)': -29999997.50000005, 'std(new_col)': 17320508.508702107}

In [14]:
def batch_standard_scaler(df: pd.DataFrame):
    def column_standard_scaler(s: pd.Series):
        s_mean = stats[f"mean({s.name})"]
        s_std = stats[f"std({s.name})"]
        return (s - s_mean) / s_std

    cols = df.columns.difference(["A"])
    df.loc[:, cols] = df.loc[:, cols].transform(column_standard_scaler)
    return df

In [15]:
%time
ds = ds.map_batches(batch_standard_scaler, batch_format="pandas")
ds.take(5)

CPU times: user 4 µs, sys: 1 µs, total: 5 µs
Wall time: 10.3 µs


Map Progress: 100%|██████████| 200/200 [00:10<00:00, 19.25it/s]


[{'A': 1, 'C': -1.7320506776650724, 'new_col': 1.732050619929984, 'B': -1.7320506776650724},
 {'A': 3, 'C': -1.7320505044599959, 'new_col': 1.7320505044599332, 'B': -1.7320505044599959},
 {'A': 5, 'C': -1.7320503312549196, 'new_col': 1.7320503889898822, 'B': -1.7320503312549196},
 {'A': 1, 'C': -1.732050158049843, 'new_col': 1.7320501003147548, 'B': -1.732050158049843},
 {'A': 3, 'C': -1.7320499848447666, 'new_col': 1.732049984844704, 'B': -1.7320499848447666}]

#### Shuffle

When running ML training pipelines it is considered good practice to shuffle our training set at the beginning of each epoch. Let's look at a couple different ways we can shuffle our data with Ray.  

* First we will shuffle the whole dataset once
* Then we will shuffle it N times
* Finally, we create a DatasetPipeline object that will shuffle each block when called in an iteration loop (like we would do for training) 


In [17]:
%%time
# Shuffle once
ds = ds.random_shuffle()
print(ds)
ds.take(5)

Shuffle Map: 100%|██████████| 200/200 [00:07<00:00, 26.83it/s]
Shuffle Reduce: 100%|██████████| 200/200 [00:21<00:00,  9.30it/s]

Dataset(num_blocks=200, num_rows=20000000, schema={A: int64, C: float64, new_col: float64, B: float64})
CPU times: user 6.03 s, sys: 2.31 s, total: 8.34 s
Wall time: 29 s





[{'A': 3, 'C': -1.113222541377082, 'new_col': 1.1132225413770425, 'B': -1.113222541377082},
 {'A': 5, 'C': -1.1175845380218135, 'new_col': 1.1175845957567994, 'B': -1.1175845380218135},
 {'A': 5, 'C': -1.1177726387348128, 'new_col': 1.117772696469799, 'B': -1.1177726387348128},
 {'A': 5, 'C': -1.1159144946749067, 'new_col': 1.1159145524098926, 'B': -1.1159144946749067},
 {'A': 3, 'C': -1.111170580836654, 'new_col': 1.1111705808366148, 'B': -1.111170580836654}]

In [18]:
num_epochs = 20

In [19]:
%%time
#Shuffle N times
ds.random_shuffle().repeat(num_epochs)

Shuffle Map: 100%|██████████| 200/200 [00:07<00:00, 28.19it/s]
Shuffle Reduce: 100%|██████████| 200/200 [00:21<00:00,  9.39it/s]

CPU times: user 6.17 s, sys: 2.06 s, total: 8.23 s
Wall time: 28.4 s





DatasetPipeline(num_windows=20, num_stages=1)

In [20]:
%%time
# create a pipeline that trigger a random shuffle before each batch (epoch)
ds = ds.repeat(num_epochs).random_shuffle_each_window()

n = 0
for i in ds.iter_batches():
    n += len(i)
n 

Stage 1: 100%|██████████| 20/20 [08:13<00:00, 24.68s/it]
Stage 0: 100%|██████████| 20/20 [08:13<00:00, 24.68s/it]

CPU times: user 2min 8s, sys: 31.7 s, total: 2min 40s
Wall time: 8min 13s





Unnamed: 0,A,C,new_col,B
0,11960,52.856811,-52.856812,52.856811
1,11972,-174.240306,174.240305,-174.240306
2,11886,-3.698579,3.698575,-3.698579
3,11946,-20.915122,20.915120,-20.915122
4,11970,0.412173,-0.412174,0.412173
...,...,...,...,...
99995,12090,39.352158,-39.352156,39.352158
99996,11960,-49.184004,49.184003,-49.184004
99997,12136,26.850452,-26.850448,26.850452
99998,12084,71.344830,-71.344828,71.344830


Great, so from the above, we can see how to use Ray to apply some common "last mile" data processing types of transformations to our dataset in a parallel fashion.  

# Part 3: Data Pipelines 

In this section we will mostly follow the examples from ["pipelining-compute"](https://docs.ray.io/en/latest/data/pipelining-compute.html) and ["advanced-pipelines"](https://docs.ray.io/en/latest/data/advanced-pipelines.html) from the Ray docs to demonstrate how and when to use "DatasetPipelines". 

According to the docs, "Unlike Datasets, which execute all transformations synchronously, DatasetPipelines implement pipelined execution. This allows for the overlapped execution of data input (e.g., reading files), computation (e.g. feature preprocessing), and output (e.g., distributed ML training)."

We saw DatasetPipelines a bit in the earlier section for shuffling our data. Here will look into constructing slightly more complex pipelines. 

First things first; Let's build a small dataset we can convert into a DatasetPipeline.  

In [21]:
base = ray.data.range(100000)
print(base)

Dataset(num_blocks=200, num_rows=100000, schema=<class 'int'>)


Now we use `.window()` to convert our Dataset into a DatasetPipeline with 10 blocks per window (20 windows for 200 blocks) 

In [32]:
pipe = base.window(blocks_per_window=10)
print(pipe)

2022-06-09 16:22:31,729	INFO dataset.py:2649 -- Created DatasetPipeline with 20 windows: 0.04MiB min, 0.04MiB max, 0.04MiB mean


DatasetPipeline(num_windows=20, num_stages=2)


Next we want to define some functions we want to apply to our Data through the DatasetPipeline approach. We then use `pipe.map(func_N)` to add them to our pipeline.  

In [33]:
def func1(i):
    return i+1

def func2(i):
    return i *2

def func3(i):
    return i%3

In [34]:
pipe = pipe.map(func1)
pipe = pipe.map(func2)
pipe = pipe.map(func3)
print(pipe)

DatasetPipeline(num_windows=20, num_stages=5)


Once the DatasetPipeline is defined, we have to iterate over it for it to trigger the computations we've defined on it. To do that let's just run a quick for loop over data batches.

In [35]:
num_rows = 0
for row in pipe.iter_batches():
    num_rows += len(row) 
print(num_rows)

Stage 1: 100%|██████████| 20/20 [00:18<00:00,  1.11it/s]
Stage 0: 100%|██████████| 20/20 [00:18<00:00,  1.11it/s]

100000





Great! now we know how to create, define and run DatasetPipelines with Ray!

# Part 4: Large Scale ML Ingest Example

Here we will go ahead an follow the ["Big Data Ingestion"](https://docs.ray.io/en/latest/data/examples/big_data_ingestion.html) example from the Ray docs. 

The goal here is to tie together everything above into a single demo that reflects a more _realistic_ scenario on how we would apply the Ray Data toolkit to a parallel and distributed machine learning use case.  

First thing we will do is define a function called `create_shuffle_pipeline` that will turn our Dataset into a DatasetPipeline that will read in our data for each epoch, shuffle it and split it into equally sized shards for distributed training on multiple workers. 

In [36]:
def create_shuffle_pipeline(training_data_dir: str, num_epochs: int, num_shards: int):

    return (
        ray.data.read_csv(training_data_dir)
        .repeat(num_epochs)
        .random_shuffle_each_window()
        .split(num_shards, equal=True)
    )

Then we will define our own remote `TrainingWorker` class that iterates over our shards during training. For simplicity we will simple `pass` our training step as we are focused on just the distributed data processing steps here (keep things simple).   

In [37]:
@ray.remote
class TrainingWorker:
    def __init__(self, rank, shard):
        self.rank = rank
        self.shard = shard

    def train(self):
        for epoch, training_dataset in enumerate(self.shard.iter_epochs()):
            # Following code emulates epoch based SGD training.
            print(f"Training... worker: {self.rank}, epoch: {epoch}")
            for i, batch in enumerate(training_dataset.iter_batches()):
                # TODO: replace the code for real training.
                pass

Here we will define the two key variables for this example, the number of Ray workers we'll use and the number of epochs to run. With the appropriate cluster resources, we can scale up our data ingest here by increasing the number of workers.

According to the docs this whole process can be linearly scaled to arbitrarily large data sets (example is 500gb) by adding more nodes to our cluster and increasing our `NUM_TRAINING_WORKERS`.   

In [38]:
    NUM_TRAINING_WORKERS = 4
    NUM_EPOCHS = 5

Now we create our DatasetPipeline called `splits` and instantiate our list of `TrainingWorkers`. 

In [39]:
%%time
splits = create_shuffle_pipeline(f"tmp/output_multi_col/", NUM_EPOCHS, NUM_TRAINING_WORKERS)

CPU times: user 278 ms, sys: 38.9 ms, total: 317 ms
Wall time: 369 ms


Stage 0:   0%|          | 0/5 [00:00<?, ?it/s]=4106916)[0m 
  0%|          | 0/5 [00:00<?, ?it/s][Aor pid=4106916)[0m 
Stage 1:   0%|          | 0/5 [00:00<?, ?it/s][A06916)[0m 


In [40]:
%%time
training_workers = [TrainingWorker.remote(rank, shard) for rank, shard in enumerate(splits)]

CPU times: user 20.6 ms, sys: 5.82 ms, total: 26.4 ms
Wall time: 19.6 ms


Finally we use, `ray.get` to train our remote training_workers in parallel! 

In [41]:
%%time
ray.get([worker.train.remote() for worker in training_workers])

Stage 0:  20%|██        | 1/5 [00:17<01:08, 17.17s/it])[0m 
[2m[36m(PipelineSplitExecutorCoordinator pid=4106916)[0m 
Stage 0:  40%|████      | 2/5 [00:26<00:37, 12.64s/it][A0m 
[2m[36m(PipelineSplitExecutorCoordinator pid=4106916)[0m 
Stage 0:  60%|██████    | 3/5 [00:34<00:20, 10.38s/it][A0m 
Stage 0:   0%|          | 0/1 [00:00<?, ?it/s]
Stage 0:   0%|          | 0/1 [00:00<?, ?it/s]


[2m[36m(TrainingWorker pid=4106999)[0m Training... worker: 0, epoch: 0
[2m[36m(TrainingWorker pid=4107001)[0m Training... worker: 1, epoch: 0


Stage 0:   0%|          | 0/1 [00:00<?, ?it/s]
Stage 0:   0%|          | 0/1 [00:00<?, ?it/s]


[2m[36m(TrainingWorker pid=4107002)[0m Training... worker: 2, epoch: 0
[2m[36m(TrainingWorker pid=4107003)[0m Training... worker: 3, epoch: 0


[2m[36m(PipelineSplitExecutorCoordinator pid=4106916)[0m 
Stage 0:  80%|████████  | 4/5 [00:42<00:09,  9.55s/it][A0m 
Stage 0: 100%|██████████| 1/1 [00:10<00:00, 10.38s/it]
Stage 0:   0%|          | 0/1 [00:00<?, ?it/s]


[2m[36m(TrainingWorker pid=4106999)[0m Training... worker: 0, epoch: 1
[2m[36m(TrainingWorker pid=4107001)[0m Training... worker: 1, epoch: 1
[2m[36m(TrainingWorker pid=4107003)[0m Training... worker: 3, epoch: 1


Stage 0: 100%|██████████| 1/1 [00:10<00:00, 10.53s/it]
Stage 0:   0%|          | 0/1 [00:00<?, ?it/s]
Stage 0: 100%|██████████| 1/1 [00:10<00:00, 10.42s/it]
Stage 0:   0%|          | 0/1 [00:00<?, ?it/s]
Stage 0: 100%|██████████| 1/1 [00:10<00:00, 10.64s/it]
Stage 0:   0%|          | 0/1 [00:00<?, ?it/s]


[2m[36m(TrainingWorker pid=4107002)[0m Training... worker: 2, epoch: 1


[2m[36m(PipelineSplitExecutorCoordinator pid=4106916)[0m 
Stage 0: 100%|██████████| 5/5 [00:50<00:00,  9.13s/it][A0m 
Stage 0: 100%|██████████| 1/1 [00:07<00:00,  7.37s/it]
Stage 0:   0%|          | 0/1 [00:00<?, ?it/s]


[2m[36m(TrainingWorker pid=4106999)[0m Training... worker: 0, epoch: 2
[2m[36m(TrainingWorker pid=4107003)[0m Training... worker: 3, epoch: 2


Stage 0: 100%|██████████| 1/1 [00:07<00:00,  7.25s/it]
Stage 0:   0%|          | 0/1 [00:00<?, ?it/s]


[2m[36m(TrainingWorker pid=4107001)[0m Training... worker: 1, epoch: 2
[2m[36m(TrainingWorker pid=4107002)[0m Training... worker: 2, epoch: 2


Stage 0: 100%|██████████| 1/1 [00:07<00:00,  7.50s/it]
Stage 0:   0%|          | 0/1 [00:00<?, ?it/s]
Stage 0: 100%|██████████| 1/1 [00:07<00:00,  7.38s/it]
Stage 0:   0%|          | 0/1 [00:00<?, ?it/s]
[2m[36m(PipelineSplitExecutorCoordinator pid=4106916)[0m 
Stage 1: 100%|██████████| 5/5 [00:59<00:00,  9.81s/it][A0m 
Stage 0: 100%|██████████| 1/1 [00:07<00:00,  7.67s/it]
Stage 0:   0%|          | 0/1 [00:00<?, ?it/s]
Stage 0: 100%|██████████| 1/1 [00:07<00:00,  7.31s/it]
Stage 0:   0%|          | 0/1 [00:00<?, ?it/s]
Stage 0: 100%|██████████| 1/1 [00:07<00:00,  7.44s/it]
Stage 0:   0%|          | 0/1 [00:00<?, ?it/s]
Stage 0: 100%|██████████| 1/1 [00:07<00:00,  7.57s/it]
Stage 0:   0%|          | 0/1 [00:00<?, ?it/s]


[2m[36m(TrainingWorker pid=4106999)[0m Training... worker: 0, epoch: 3
[2m[36m(TrainingWorker pid=4107001)[0m Training... worker: 1, epoch: 3
[2m[36m(TrainingWorker pid=4107002)[0m Training... worker: 2, epoch: 3
[2m[36m(TrainingWorker pid=4107003)[0m Training... worker: 3, epoch: 3


Stage 0: 100%|██████████| 1/1 [00:00<00:00,  1.38it/s]
Stage 0:   0%|          | 0/1 [00:00<?, ?it/s]


[2m[36m(TrainingWorker pid=4106999)[0m Training... worker: 0, epoch: 4
[2m[36m(TrainingWorker pid=4107001)[0m Training... worker: 1, epoch: 4
[2m[36m(TrainingWorker pid=4107003)[0m Training... worker: 3, epoch: 4
[2m[36m(TrainingWorker pid=4107002)[0m Training... worker: 2, epoch: 4


Stage 0: 100%|██████████| 1/1 [00:00<00:00,  1.06it/s]
Stage 0:   0%|          | 0/1 [00:00<?, ?it/s]
Stage 0: 100%|██████████| 1/1 [00:00<00:00,  1.00it/s]
Stage 0:   0%|          | 0/1 [00:00<?, ?it/s]
Stage 0: 100%|██████████| 1/1 [00:01<00:00,  1.12s/it]
Stage 0:   0%|          | 0/1 [00:00<?, ?it/s]
Stage 0: 100%|██████████| 1/1 [00:00<00:00,  1.01it/s]


CPU times: user 1.17 s, sys: 347 ms, total: 1.52 s
Wall time: 44.8 s


Stage 0: 100%|██████████| 1/1 [00:00<00:00,  1.23it/s]
Stage 0: 100%|██████████| 1/1 [00:00<00:00,  1.27it/s]


[None, None, None, None]

Stage 0: 100%|██████████| 1/1 [00:00<00:00,  1.44it/s]


### Congrats!

If you are looking at this cell and there are no error above, you know that Ray Data is working! 