Introduction

In [1]:
import ray

# Create a Dataset of Python objects.
ds = ray.data.range(200)
# -> Dataset(num_blocks=200, num_rows=10000, schema=<class 'int'>)

ds.take(5)
# -> [0, 1, 2, 3, 4]

ds.count()
# -> 10000

# Create a Dataset of Arrow records.
ds = ray.data.from_items([{"col1": i, "col2": str(i)} for i in range(200)])
# -> Dataset(num_blocks=200, num_rows=10000, schema={col1: int64, col2: string})

ds.show(5)
# -> {'col1': 0, 'col2': '0'}
# -> {'col1': 1, 'col2': '1'}
# -> {'col1': 2, 'col2': '2'}
# -> {'col1': 3, 'col2': '3'}
# -> {'col1': 4, 'col2': '4'}

ds.schema()

2021-12-05 19:57:51,183	INFO services.py:1338 -- View the Ray dashboard at [1m[32mhttp://127.0.0.1:8265[39m[22m


{'col1': 0, 'col2': '0'}
{'col1': 1, 'col2': '1'}
{'col1': 2, 'col2': '2'}
{'col1': 3, 'col2': '3'}
{'col1': 4, 'col2': '4'}


col1: int64
col2: string

In [2]:
import pandas as pd
import dask.dataframe as dd

# Create a Dataset from a list of Pandas DataFrame objects.
pdf = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]})
ds = ray.data.from_pandas([pdf])

# Create a Dataset from a Dask-on-Ray DataFrame.
dask_df = dd.from_pandas(pdf, npartitions=10)
ds = ray.data.from_dask(dask_df)

In [4]:
ds = ray.data.range(10000)
ds = ds.map(lambda x: x * 2)
# -> Map Progress: 100%|████████████████████| 200/200 [00:00<00:00, 1123.54it/s]
# -> Dataset(num_blocks=200, num_rows=10000, schema=<class 'int'>)
ds.take(5)
# -> [0, 2, 4, 6, 8]

ds.filter(lambda x: x > 5).take(5)
# -> Map Progress: 100%|████████████████████| 200/200 [00:00<00:00, 1859.63it/s]
# -> [6, 8, 10, 12, 14]

ds.flat_map(lambda x: [x, -x]).take(5)
# -> Map Progress: 100%|████████████████████| 200/200 [00:00<00:00, 1568.10it/s]
# -> [0, 0, 2, -2, 4]

Map Progress: 100%|██████████| 200/200 [00:00<00:00, 324.52it/s]
Map Progress: 100%|██████████| 200/200 [00:00<00:00, 481.75it/s]
Map Progress: 100%|██████████| 200/200 [00:00<00:00, 425.79it/s]


[0, 0, 2, -2, 4]

In [6]:
ds = ray.data.range_arrow(10000)
ds = ds.map_batches(
    lambda df: df.applymap(lambda x: x * 2), batch_format="pandas")
# -> Map Progress: 100%|████████████████████| 200/200 [00:00<00:00, 1927.62it/s]
ds.take(5)
# -> [{'value': 0}, {'value': 2}, ...]

Map Progress: 100%|██████████| 200/200 [00:01<00:00, 131.55it/s]


[{'value': 0}, {'value': 2}, {'value': 4}, {'value': 6}, {'value': 8}]

In [8]:
@ray.remote
def consume(data) -> int:
    num_batches = 0
    for batch in data.iter_batches():
        num_batches += 1
    return num_batches

ds = ray.data.range(10000)
ray.get(consume.remote(ds))

200

In [3]:
@ray.remote(num_gpus=1)
class Worker:
    def __init__(self, rank: int):
        pass

    def train(self, shard: ray.data.Dataset[int]) -> int:
        for batch in shard.iter_batches(batch_size=256):
            pass
        return shard.count()

workers = [Worker.remote(i) for i in range(16)]
# -> [Actor(Worker, ...), Actor(Worker, ...), ...]

ds = ray.data.range(10000)
# -> Dataset(num_blocks=200, num_rows=10000, schema=<class 'int'>)

shards = ds.split(n=16, locality_hints=workers)
# -> [Dataset(num_blocks=13, num_rows=650, schema=<class 'int'>),
#     Dataset(num_blocks=13, num_rows=650, schema=<class 'int'>), ...]

ray.get([w.train.remote(s) for s in shards])

NameError: name 'w' is not defined

[2m[1m[36m(scheduler +40s)[0m Tip: use `ray status` to view detailed cluster status. To disable these messages, set RAY_SCHEDULER_EVENTS=0.


Required resources for this actor or task: {CPU: 1.000000}, {GPU: 1.000000}
Available resources on this node: {3.000000/4.000000 CPU, 77736960.058594 GiB/77736960.058594 GiB memory, 0.000000/1.000000 GPU, 38868479.980469 GiB/38868479.980469 GiB object_store_memory, 1.000000/1.000000 node:127.0.0.1}
 In total there are 0 pending tasks and 15 pending actors on this node.


Pipelines

In [10]:
def func1(i: int) -> int:
    return i + 1

def func2(i: int) -> int:
    return i * 2

def func3(i: int) -> int:
    return i % 3

# Create a dataset and then create a pipeline from it.
base = ray.data.range(1000000)
print(base)
# -> Dataset(num_blocks=200, num_rows=1000000, schema=<class 'int'>)
pipe = base.window(blocks_per_window=10)
print(pipe)
# -> DatasetPipeline(num_windows=20, num_stages=1)

# Applying transforms to pipelines adds more pipeline stages.
pipe = pipe.map(func1)
pipe = pipe.map(func2)
pipe = pipe.map(func3)
print(pipe)
# -> DatasetPipeline(num_windows=20, num_stages=4)

# Output can be pulled from the pipeline concurrently with its execution.
num_rows = 0
for row in pipe.iter_rows():
    num_rows += 1

print("Total num rows", num_rows)

Stage 0:   0%|          | 0/20 [00:00<?, ?it/s]
  0%|          | 0/20 [00:00<?, ?it/s][A
Stage 1:   0%|          | 0/20 [00:00<?, ?it/s][A

  0%|          | 0/20 [00:00<?, ?it/s][A[A

Stage 2:   0%|          | 0/20 [00:00<?, ?it/s][A[A


  0%|          | 0/20 [00:00<?, ?it/s][A[A[A


Stage 3:   0%|          | 0/20 [00:00<?, ?it/s][A[A[A

Dataset(num_blocks=200, num_rows=1000000, schema=<class 'int'>)
DatasetPipeline(num_windows=20, num_stages=1)
DatasetPipeline(num_windows=20, num_stages=4)



Stage 0:  10%|█         | 2/20 [00:00<00:02,  6.11it/s][A

Stage 2:   5%|▌         | 1/20 [00:00<00:10,  1.85it/s][A[A
Stage 0:  15%|█▌        | 3/20 [00:00<00:04,  3.44it/s][A


Stage 3:   5%|▌         | 1/20 [00:00<00:16,  1.13it/s][A[A[A

Stage 2:  10%|█         | 2/20 [00:01<00:09,  1.96it/s][A[A


Stage 3:  10%|█         | 2/20 [00:01<00:11,  1.56it/s][A[A[A
Stage 0:  20%|██        | 4/20 [00:01<00:07,  2.22it/s][A

Stage 2:  15%|█▌        | 3/20 [00:01<00:10,  1.66it/s][A[A


Stage 3:  15%|█▌        | 3/20 [00:01<00:10,  1.62it/s][A[A[A
Stage 0:  25%|██▌       | 5/20 [00:02<00:07,  1.97it/s][A

Stage 2:  20%|██        | 4/20 [00:02<00:09,  1.66it/s][A[A


Stage 3:  20%|██        | 4/20 [00:02<00:10,  1.53it/s][A[A[A
Stage 0:  30%|███       | 6/20 [00:02<00:07,  1.76it/s][A

Stage 2:  25%|██▌       | 5/20 [00:03<00:10,  1.45it/s][A[A
Stage 0:  35%|███▌      | 7/20 [00:03<00:07,  1.70it/s][A


Stage 3:  25%|██▌       | 5/20 [00:03<00:11,  1.29it/s][A[

Total num rows 1000000







Required resources for this actor or task: {CPU: 1.000000}, {GPU: 1.000000}
Available resources on this node: {3.000000/4.000000 CPU, 86470680.029297 GiB/86470680.029297 GiB memory, 0.000000/1.000000 GPU, 43235339.990234 GiB/43235339.990234 GiB object_store_memory, 1.000000/1.000000 node:127.0.0.1}
 In total there are 0 pending tasks and 15 pending actors on this node.


In [11]:
import ray
from ray.data.dataset_pipeline import DatasetPipeline

# Equivalent to ray.data.range(1000).repeat(times=4)
source = ray.data.range(1000)
pipe = DatasetPipeline.from_iterable(
    [lambda: source, lambda: source, lambda: source, lambda: source])

# Equivalent to ray.data.range(1000).window(blocks_per_window=10)
splits = ray.data.range(1000, parallelism=200).split(20)
pipe = DatasetPipeline.from_iterable([lambda s=s: s for s in splits])

In [12]:
pipe = ray.data.from_items([0, 1, 2, 3, 4]) \
    .repeat(3) \
    .random_shuffle_each_window()
for i, epoch in enumerate(pipe.iter_epochs()):
    print("Epoch {}", i)
    for row in epoch.iter_rows():
        print(row)

Stage 0:   0%|          | 0/3 [00:00<?, ?it/s]
  0%|          | 0/3 [00:00<?, ?it/s][A
Stage 1:   0%|          | 0/3 [00:00<?, ?it/s][A
Stage 0: 100%|██████████| 1/1 [00:00<00:00, 12.60it/s][A
Stage 0:   0%|          | 0/1 [00:00<?, ?it/s]
Stage 0: 100%|██████████| 1/1 [00:00<00:00, 16.07it/s][A
Stage 0: 100%|██████████| 1/1 [00:00<00:00, 72.91it/s]

Epoch {} 0
4
3
0
1
2
Epoch {} 1
0
1
3
2
4
Epoch {} 2
0
2
3
4
1







Stage 0: 100%|██████████| 3/3 [00:10<00:00, 14.58it/s]
Stage 1: 100%|██████████| 3/3 [00:10<00:00, 10.97it/s][A

In [13]:
ray.data.from_items([0, 1, 2, 3, 4]) \
    .repeat(2) \
    .random_shuffle_each_window() \
    .show_windows()

Stage 0:   0%|          | 0/2 [00:00<?, ?it/s]
  0%|          | 0/2 [00:00<?, ?it/s][A
Stage 1:   0%|          | 0/2 [00:00<?, ?it/s][A
Stage 1: 100%|██████████| 2/2 [00:00<00:00, 10.01it/s][A
Stage 0: 100%|██████████| 2/2 [00:00<00:00,  9.60it/s]

------ Epoch 0 ------
=== Window 0 ===
3
2
0
1
4
------ Epoch 1 ------
=== Window 1 ===
3
0
4
1
2





In [14]:
ray.data.from_items([0, 1, 2, 3, 4]) \
    .repeat(2) \
    .foreach_window(lambda w: w.random_shuffle()) \
    .show_windows()

Stage 0:   0%|          | 0/2 [00:00<?, ?it/s]
  0%|          | 0/2 [00:00<?, ?it/s][A
Stage 1:   0%|          | 0/2 [00:00<?, ?it/s][A
Stage 1: 100%|██████████| 2/2 [00:00<00:00, 14.08it/s][A
Stage 0: 100%|██████████| 2/2 [00:00<00:00, 13.61it/s]

------ Epoch 0 ------
=== Window 0 ===
0
1
4
3
2
------ Epoch 1 ------
=== Window 1 ===
2
3
0
1
4





In [17]:
def preprocess(image: bytes) -> bytes:
    return image

class BatchInferModel:
    def __init__(self):
        self.model = ImageNetModel()
    def __call__(self, batch: pd.DataFrame) -> pd.DataFrame:
        return self.model(batch)

# Load data from storage.
ds: Dataset = ray.data.read_binary_files("s3://bucket/image-dir")

# Preprocess the data.
ds = ds.map(preprocess)

# Apply GPU batch inference to the data.
ds = ds.map_batches(BatchInferModel, compute="actors", batch_size=256, num_gpus=1)
ds
# Save the output.
#ds.write_json("/tmp/results")

OSError: When getting information for key 'image-dir' in bucket 'bucket': AWS Error [code 15]: No response body.

In [18]:
# Create a pipeline that loops over its source dataset indefinitely.
pipe: DatasetPipeline = ray.data \
    .read_datasource(...) \
    .repeat() \
    .random_shuffle_each_window()

@ray.remote(num_gpus=1)
def train_func(pipe: DatasetPipeline):
    model = MyModel()
    for batch in pipe.to_torch():
        model.fit(batch)

# Read from the pipeline in a remote training function.
ray.get(train_func.remote(pipe))

AttributeError: 'ellipsis' object has no attribute 'prepare_read'

In [5]:
def train_func():
    # This is a dummy train function just iterating over the dataset shard.
    # You should replace this with your training logic.
    shard = ray.train.get_dataset_shard()
    for row in shard.iter_rows():
        print(row)

# Create a pipeline that loops over its source dataset indefinitely.
pipe: DatasetPipeline = ray.data \
    .read_parquet(...) \
    .repeat() \
    .random_shuffle_each_window()


# Pass in the pipeline to the Trainer.
# The Trainer will automatically split the DatasetPipeline for you.
trainer = Trainer(num_workers=8, backend="torch")
result = trainer.run(
    train_func,
    config={"worker_batch_size": 64, "num_epochs": 2},
    dataset=pipe)

ValueError: paths must be a path string or a list of path strings.

2021-12-05 19:47:05,375	ERROR worker.py:84 -- Unhandled error (suppress with RAY_IGNORE_UNHANDLED_ERRORS=1): [36mray::_map_block()[39m (pid=1360, ip=127.0.0.1)
  File "python\ray\_raylet.pyx", line 625, in ray._raylet.execute_task
  File "python\ray\_raylet.pyx", line 629, in ray._raylet.execute_task
  File "D:\anaconda3\envs\yourenvname\lib\site-packages\ray\data\impl\compute.py", line 23, in _map_block
    new_block = fn(block)
  File "D:\anaconda3\envs\yourenvname\lib\site-packages\ray\data\dataset.py", line 220, in transform
    applied = fn(view)
  File "C:\Users\James\AppData\Local\Temp/ipykernel_13152/2366480439.py", line 4, in transform_batch
  File "C:\Users\James\AppData\Roaming\Python\Python38\site-packages\pandas\util\_decorators.py", line 311, in wrapper
    return func(*args, **kwargs)
  File "C:\Users\James\AppData\Roaming\Python\Python38\site-packages\pandas\core\frame.py", line 5953, in dropna
    raise KeyError(list(np.compress(check, subset)))
KeyError: ['feature_1

In [7]:
# A Pandas DataFrame UDF for transforming the underlying blocks of a Dataset in parallel.
def transform_batch(df: pd.DataFrame):
    # Drop nulls.
    df = df.dropna(subset=["feature_1"])
    # Add new column.
    df["new_col"] = df["feature_1"] - 2 * df["feature_2"] + df["feature_3"] / 3
    # Transform existing column.
    df["feature_1"] = 2 * df["feature_1"] + 1
    # Drop column.
    df.drop(columns="feature_2", inplace=True)
    # One-hot encoding.
    categories = ["cat_1", "cat_2", "cat_3"]
    for category in categories:
        df[f"category_{category}"] = df["category"].map(
            collections.defaultdict(int, **{category: 1}))
    return df

# batch_format="pandas" tells Datasets to provide the transformer with blocks
# represented as Pandas DataFrames.
ds = ds.map_batches(transform_batch, batch_format="pandas")

 pid=13176)[0m 2021-12-05 19:48:05,087	INFO worker.py:431 -- Task failed with retryable exception: TaskID(ef25d802e8c24317ffffffffffffffffffffffff01000000).
 pid=13176)[0m Traceback (most recent call last):
 pid=13176)[0m   File "python\ray\_raylet.pyx", line 625, in ray._raylet.execute_task
 pid=13176)[0m   File "python\ray\_raylet.pyx", line 629, in ray._raylet.execute_task
 pid=13176)[0m   File "D:\anaconda3\envs\yourenvname\lib\site-packages\ray\data\impl\compute.py", line 23, in _map_block
 pid=13176)[0m     new_block = fn(block)
 pid=13176)[0m   File "D:\anaconda3\envs\yourenvname\lib\site-packages\ray\data\dataset.py", line 220, in transform
 pid=13176)[0m     applied = fn(view)
 pid=13176)[0m   File "C:\Users\James\AppData\Local\Temp/ipykernel_13152/2366480439.py", line 4, in transform_batch
 pid=13176)[0m   File "C:\Users\James\AppData\Roaming\Python\Python38\site-packages\pandas\util\_decorators.py", line 311, in wrapper
 pid=13176)[0m     return func(*args, **kwar

[2m[36m(_map_block pid=12328)[0m 2021-12-05 19:48:10,471	INFO worker.py:431 -- Task failed with retryable exception: TaskID(ef25d802e8c24317ffffffffffffffffffffffff01000000).
[2m[36m(_map_block pid=12328)[0m Traceback (most recent call last):
[2m[36m(_map_block pid=12328)[0m   File "python\ray\_raylet.pyx", line 625, in ray._raylet.execute_task
[2m[36m(_map_block pid=12328)[0m   File "python\ray\_raylet.pyx", line 629, in ray._raylet.execute_task
[2m[36m(_map_block pid=12328)[0m   File "D:\anaconda3\envs\yourenvname\lib\site-packages\ray\data\impl\compute.py", line 23, in _map_block
[2m[36m(_map_block pid=12328)[0m     new_block = fn(block)
[2m[36m(_map_block pid=12328)[0m   File "D:\anaconda3\envs\yourenvname\lib\site-packages\ray\data\dataset.py", line 220, in transform
[2m[36m(_map_block pid=12328)[0m     applied = fn(view)
[2m[36m(_map_block pid=12328)[0m   File "C:\Users\James\AppData\Local\Temp/ipykernel_13152/2366480439.py", line 4, in

[2m[36m(_map_block pid=13176)[0m 2021-12-05 19:48:16,489	INFO worker.py:431 -- Task failed with retryable exception: TaskID(02bed209b72c305affffffffffffffffffffffff01000000).
[2m[36m(_map_block pid=13176)[0m Traceback (most recent call last):
[2m[36m(_map_block pid=13176)[0m   File "python\ray\_raylet.pyx", line 625, in ray._raylet.execute_task
[2m[36m(_map_block pid=13176)[0m   File "python\ray\_raylet.pyx", line 629, in ray._raylet.execute_task
[2m[36m(_map_block pid=13176)[0m   File "D:\anaconda3\envs\yourenvname\lib\site-packages\ray\data\impl\compute.py", line 23, in _map_block
[2m[36m(_map_block pid=13176)[0m     new_block = fn(block)
[2m[36m(_map_block pid=13176)[0m   File "D:\anaconda3\envs\yourenvname\lib\site-packages\ray\data\dataset.py", line 220, in transform
[2m[36m(_map_block pid=13176)[0m     applied = fn(view)
[2m[36m(_map_block pid=13176)[0m   File "C:\Users\James\AppData\Local\Temp/ipykernel_13152/2366480439.py", line 4, in

[2m[36m(_map_block pid=12328)[0m 2021-12-05 19:48:22,519	INFO worker.py:431 -- Task failed with retryable exception: TaskID(faf123d8b1b35293ffffffffffffffffffffffff01000000).
[2m[36m(_map_block pid=12328)[0m Traceback (most recent call last):
[2m[36m(_map_block pid=12328)[0m   File "python\ray\_raylet.pyx", line 625, in ray._raylet.execute_task
[2m[36m(_map_block pid=12328)[0m   File "python\ray\_raylet.pyx", line 629, in ray._raylet.execute_task
[2m[36m(_map_block pid=12328)[0m   File "D:\anaconda3\envs\yourenvname\lib\site-packages\ray\data\impl\compute.py", line 23, in _map_block
[2m[36m(_map_block pid=12328)[0m     new_block = fn(block)
[2m[36m(_map_block pid=12328)[0m   File "D:\anaconda3\envs\yourenvname\lib\site-packages\ray\data\dataset.py", line 220, in transform
[2m[36m(_map_block pid=12328)[0m     applied = fn(view)
[2m[36m(_map_block pid=12328)[0m   File "C:\Users\James\AppData\Local\Temp/ipykernel_13152/2366480439.py", line 4, in

RayTaskError(KeyError): [36mray::_map_block()[39m (pid=13176, ip=127.0.0.1)
  File "python\ray\_raylet.pyx", line 625, in ray._raylet.execute_task
  File "python\ray\_raylet.pyx", line 629, in ray._raylet.execute_task
  File "D:\anaconda3\envs\yourenvname\lib\site-packages\ray\data\impl\compute.py", line 23, in _map_block
    new_block = fn(block)
  File "D:\anaconda3\envs\yourenvname\lib\site-packages\ray\data\dataset.py", line 220, in transform
    applied = fn(view)
  File "C:\Users\James\AppData\Local\Temp/ipykernel_13152/2366480439.py", line 4, in transform_batch
  File "C:\Users\James\AppData\Roaming\Python\Python38\site-packages\pandas\util\_decorators.py", line 311, in wrapper
    return func(*args, **kwargs)
  File "C:\Users\James\AppData\Roaming\Python\Python38\site-packages\pandas\core\frame.py", line 5953, in dropna
    raise KeyError(list(np.compress(check, subset)))
KeyError: ['feature_1']

2021-12-05 19:48:28,495	ERROR worker.py:84 -- Unhandled error (suppress with RAY_IGNORE_UNHANDLED_ERRORS=1): [36mray::_map_block()[39m (pid=10020, ip=127.0.0.1)
  File "python\ray\_raylet.pyx", line 625, in ray._raylet.execute_task
  File "python\ray\_raylet.pyx", line 629, in ray._raylet.execute_task
  File "D:\anaconda3\envs\yourenvname\lib\site-packages\ray\data\impl\compute.py", line 23, in _map_block
    new_block = fn(block)
  File "D:\anaconda3\envs\yourenvname\lib\site-packages\ray\data\dataset.py", line 220, in transform
    applied = fn(view)
  File "C:\Users\James\AppData\Local\Temp/ipykernel_13152/2366480439.py", line 4, in transform_batch
  File "C:\Users\James\AppData\Roaming\Python\Python38\site-packages\pandas\util\_decorators.py", line 311, in wrapper
    return func(*args, **kwargs)
  File "C:\Users\James\AppData\Roaming\Python\Python38\site-packages\pandas\core\frame.py", line 5953, in dropna
    raise KeyError(list(np.compress(check, subset)))
KeyError: ['feature_

In [6]:
ds: ray.data.Dataset = ray.data.from_items([
    {"A": x % 3, "B": 2 * x, "C": 3 * x}
    for x in range(10)])

# Group by the A column and calculate the per-group mean for B and C columns.
agg_ds: ray.data.Dataset = ds.groupby("A").mean(["B", "C"])
 
agg_ds.to_pandas()

# Global mean on B column.
ds.mean("B")

# Global mean on multiple columns.
ds.mean(["B", "C"])

# Multiple global aggregations on multiple columns.
from ray.data.aggregate import Mean, Std
ds.aggregate(Mean("B"), Std("B", ddof=0), Mean("C"), Std("C", ddof=0))

Sort Sample: 100%|██████████| 10/10 [00:00<00:00, 49.75it/s]
GroupBy Map: 100%|██████████| 10/10 [00:00<00:00, 31.72it/s]
GroupBy Reduce: 100%|██████████| 10/10 [00:00<00:00, 183.13it/s]
GroupBy Map: 100%|██████████| 10/10 [00:00<00:00, 271.91it/s]
GroupBy Reduce: 100%|██████████| 1/1 [00:00<00:00, 89.91it/s]
GroupBy Map: 100%|██████████| 10/10 [00:00<00:00, 97.95it/s]
GroupBy Reduce: 100%|██████████| 1/1 [00:00<00:00, 85.34it/s]
GroupBy Map: 100%|██████████| 10/10 [00:00<00:00, 1009.90it/s]
GroupBy Reduce: 100%|██████████| 1/1 [00:00<00:00, 73.55it/s]


{'mean(B)': 9.0, 'std(B)': 5.744562646538029, 'mean(C)': 13.5, 'std(C)': 8.616843969807043}

In [9]:
# Impute missing values with the column mean.
b_mean = ds.mean("B")

def impute_b(df: pd.DataFrame):
    df["B"].fillna(b_mean)
    return df

ds = ds.map_batches(impute_b, batch_format="pandas")

# Standard scaling of all feature columns.
stats = ds.aggregate(Mean("B"), Std("B"), Mean("C"), Std("C"))

def batch_standard_scaler(df: pd.DataFrame):
    def column_standard_scaler(s: pd.Series):
        s_mean = stats[f"mean({s.name})"]
        s_std = stats[f"std({s.name})"]
        return (s - s_mean) / s_std

    cols = df.columns.difference(["A"])
    df.loc[:, cols] = df.loc[:, cols].transform(column_standard_scaler)
    return df

ds = ds.map_batches(batch_standard_scaler, batch_format="pandas")

GroupBy Map: 100%|██████████| 10/10 [00:00<00:00, 140.28it/s]
GroupBy Reduce: 100%|██████████| 1/1 [00:00<00:00, 60.09it/s]
Map Progress: 100%|██████████| 10/10 [00:00<00:00, 95.72it/s]
GroupBy Map: 100%|██████████| 10/10 [00:00<00:00, 507.13it/s]
GroupBy Reduce: 100%|██████████| 1/1 [00:00<00:00, 37.30it/s]
Map Progress: 100%|██████████| 10/10 [00:00<00:00, 67.85it/s]


In [10]:
ds = ray.data.range(10)
# -> [0, 1, ..., 9]

# Global random shuffle.
ds = ds.random_shuffle()

# Scales to terabytes of data with the same simple API.
ds = ray.data.read_parquet("s3://ursa-labs-taxi-data")  # open, tabular, NYC taxi dataset

# Don't run this next one on your laptop; it will probably crash since it will
# try to read and shuffle ~99 GB of data!
ds = ds.random_shuffle()

# Per-epoch shuffling is as simple as changing where we invoke the shuffle:
#   - Before repeating => dataset is shuffled once.
#   - After repeating  => dataset is shuffled on every epoch.
num_epochs = 20

# Shuffle once, then repeat this once-shuffled dataset for num_epochs epochs.
ds.random_shuffle().repeat(num_epochs)

# Shuffle repeatedly, where the original dataset is shuffled into a different
# order at the beginning of each epoch.
ds.repeat(num_epochs).random_shuffle_each_window()
# -> DatasetPipeline(num_windows=10, num_stages=2)

Shuffle Map: 100%|██████████| 10/10 [00:00<00:00, 166.12it/s]
Shuffle Reduce: 100%|██████████| 10/10 [00:00<00:00, 193.22it/s]
Map Progress:   0%|          | 0/2 [02:32<?, ?it/s]


KeyboardInterrupt: 

Dask

In [12]:
import ray
from ray.util.dask import ray_dask_get
import dask
import dask.array as da
import dask.dataframe as dd
import numpy as np
import pandas as pd

# Start Ray.
# Tip: If connecting to an existing cluster, use ray.init(address="auto").
ray.init(ignore_reinit_error=True)

d_arr = da.from_array(np.random.randint(0, 1000, size=(256, 256)))

# The Dask scheduler submits the underlying task graph to Ray.
d_arr.mean().compute(scheduler=ray_dask_get)

# Set the scheduler to ray_dask_get in your config so you don't have to
# specify it on each compute call.
dask.config.set(scheduler=ray_dask_get)

df = dd.from_pandas(
    pd.DataFrame(
        np.random.randint(0, 100, size=(1024, 2)), columns=["age", "grade"]),
    npartitions=2)
df.groupby(["age"]).mean().compute()

ray.shutdown()

2021-12-05 19:50:18,098	INFO worker.py:852 -- Calling ray.init() again after it has already been called.


In [15]:
df 

Unnamed: 0_level_0,age,grade
npartitions=2,Unnamed: 1_level_1,Unnamed: 2_level_1
0,int32,int32
512,...,...
1023,...,...
