# Gentle introduction to Ray datasets APIs

© 2019-2022, Anyscale. All Rights Reserved

### Learning objectives

In this introductory tutorial you will learn to:
 * create, transform, read, and save Ray datasets
 * use shards for parallel processing of large datasets
 * understand datapipelines and their merits
 * use `DatasetPipeline` for last-mile ML ingestion for distributed training
 * why use datasets and for what

### Overview

This is a brief and gentle introduction to Ray's native library `ray dataset`. As a native Ray library, built atop Ray, it allows you to exchange data among Ray tasks, actors, libraries, and applications. 

Ray Datasets, using distributed Apache Arrow, are designed to load and preprocess data for distributed ML training pipelines. Compared to other loading solutions, Datasets are more flexible (e.g., you can express higher-quality per-epoch global shuffles) and provides higher overall performance.

Additionally, Ray datasets provides standard transformations like `map`, `filter`, and `partition`. Ray datasets is *not* a replacement for a full-fledged data processing library for EDA, ETL or a subsitute for Apache Spark or Dask or Pandas DataFrames. Its primary objective is the last-mile rudimentary distributed data preprocessing and data ingestion for ML training.

Supporting myriad [file formats and data sources](https://docs.ray.io/en/latest/data/dataset.html#datasource-compatibility), you can read from and write to local FS and cloud storage. 

<img src="images/dataset.png" width="70%" height="35%">


### Ray Datasets

A Ray dataset implements a distributed [Apache Arrow](https://arrow.apache.org/). A Dataset consists of a list of Ray object references to blocks. Each block holds a set of items in either an [Arrow table](https://arrow.apache.org/docs/python/data.html#tables) or a Python list (for Arrow incompatible objects).

<img src="images/dataset-arch.png" width="70%" height="35%">

### Creating datasets

In [1]:
import logging, random
import ray

In [2]:
if ray.is_initialized:
    ray.shutdown()
ctx = ray.init(logging_level=logging.ERROR)
print(ctx)

RayContext(dashboard_url='127.0.0.1:8265', python_version='3.8.13', ray_version='1.12.1', ray_commit='4863e33856b54ccf8add5cbe75e41558850a1b75', address_info={'node_ip_address': '127.0.0.1', 'raylet_ip_address': '127.0.0.1', 'redis_address': None, 'object_store_address': '/tmp/ray/session_2022-06-13_13-52-22_433238_66760/sockets/plasma_store', 'raylet_socket_name': '/tmp/ray/session_2022-06-13_13-52-22_433238_66760/sockets/raylet', 'webui_url': '127.0.0.1:8265', 'session_dir': '/tmp/ray/session_2022-06-13_13-52-22_433238_66760', 'metrics_export_port': 63271, 'gcs_address': '127.0.0.1:65094', 'address': '127.0.0.1:65094', 'node_id': '1a56f25a72880e3d81d1a9f6671ac41e2ed901438109ed839be8ba3c'})


In [3]:
print(f"Dashboard url: http://{ctx.address_info['webui_url']}")

Dashboard url: http://127.0.0.1:8265


Let's create a generic dataset of 100K integers and look at the schema and underlying datatype. The difference between `show` and `take` is that the former takes one item at time and prints it, while the latter iterates over row items from the dataset, appends to a list and returns it. Underneath, `ds.show()` calls `ds.take()`.

In [5]:
ds = ray.data.range(100_000)
ds.count()

100000

In [6]:
ds.schema()

int

In [7]:
ds.show(5)

0
1
2
3
4


In [8]:
ds.take(5)

[0, 1, 2, 3, 4]

Let's create a synthetic dataset of Arrow records (750K) with several columns and data associated with it. 

In [12]:
STATES = ["CA", "AZ", "OR", "WA", "TX", "UT"]
M_STATUS = ["married", "single", "domestic", "divorced", "undeclared"]
GENDER = ["F", "M", "U"]
HOME_OWNER = ["condo", "house", "rental"]

items = [{"id": i,
          "ssn": None,
          "name": None,
          "amount": i * 1.5, 
          "interest": random.randint(1,5) * .1,
          "state": random.choice(STATES),
          "marital_status": random.choice(M_STATUS),
          "property": random.choice(HOME_OWNER),
          "dependents": random.randint(1, 5),
          "defaulted": random.randint(0,1),
          "gender":random.choice(GENDER) } for i in range(1,850_001)]
items[:1]

[{'id': 1,
  'ssn': None,
  'name': None,
  'amount': 1.5,
  'interest': 0.2,
  'state': 'WA',
  'marital_status': 'divorced',
  'property': 'house',
  'dependents': 2,
  'defaulted': 1,
  'gender': 'F'}]

### Creating a dataset from list of dictionary items

In [13]:
arrow_ds = ray.data.from_items(items)
arrow_ds

Dataset(num_blocks=200, num_rows=850000, schema={id: int64, ssn: null, name: null, amount: double, interest: double, state: string, marital_status: string, property: string, dependents: int64, defaulted: int64, gender: string})

In [15]:
arrow_ds.count()

850000

In [18]:
arrow_ds.take(1)

[{'id': 1, 'ssn': None, 'name': None, 'amount': 1.5, 'interest': 0.2, 'state': 'WA', 'marital_status': 'divorced', 'property': 'house', 'dependents': 2, 'defaulted': 1, 'gender': 'F'}]

In [19]:
arrow_ds.schema

<bound method Dataset.schema of Dataset(num_blocks=200, num_rows=850000, schema={id: int64, ssn: null, name: null, amount: double, interest: double, state: string, marital_status: string, property: string, dependents: int64, defaulted: int64, gender: string})>

### Saving datasets as a parquet file
Ray datasets support myriad data formats and public storage. Let's save this dataset as a parquet file and create `N` partitions

In [20]:
arrow_ds.repartition(5).write_parquet("data/interest.parquet")

Repartition: 100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 5/5 [00:00<00:00, 11.22it/s]
Write Progress: 100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 5/5 [00:00<00:00, 11.04it/s]


In [21]:
!ls -l data/interest.parquet

total 43904
-rw-r--r--  1 jules  staff  2139612 May 26 16:08 23b1cf0e40f24eafa473e43ee2ae774e_000000.parquet
-rw-r--r--  1 jules  staff  2121423 May 26 16:08 23b1cf0e40f24eafa473e43ee2ae774e_000001.parquet
-rw-r--r--  1 jules  staff  2119621 May 26 16:08 23b1cf0e40f24eafa473e43ee2ae774e_000002.parquet
-rw-r--r--  1 jules  staff  2119252 May 26 16:08 23b1cf0e40f24eafa473e43ee2ae774e_000003.parquet
-rw-r--r--  1 jules  staff  2169571 May 26 16:08 23b1cf0e40f24eafa473e43ee2ae774e_000004.parquet
-rw-r--r--  1 jules  staff  2344964 Jun 13 14:00 a8ee48d8bc24428695d0eac5b8f6fbaa_000000.parquet
-rw-r--r--  1 jules  staff  2325837 Jun 13 14:00 a8ee48d8bc24428695d0eac5b8f6fbaa_000001.parquet
-rw-r--r--  1 jules  staff  2323989 Jun 13 14:00 a8ee48d8bc24428695d0eac5b8f6fbaa_000002.parquet
-rw-r--r--  1 jules  staff  2323692 Jun 13 14:00 a8ee48d8bc24428695d0eac5b8f6fbaa_000003.parquet
-rw-r--r--  1 jules  staff  2473384 Jun 13 14:00 a8ee48d8bc24428695d0eac5b8f6fbaa_000004.parquet


### Transforming data with simple methods

Ray datasets support transformation in parallel using `map`. It uses Ray tasks to execute eagerly or synchronously. Among others [transformations](https://docs.ray.io/en/latest/data/package-ref.html#dataset-api), it supports`filter`, `flat_map`, `groupBy`etc.

Let's try a using `.map()`, `.filter()` and `.groupBy` on our dataset.

_Explain what's happening behind the scenes here_

Execute a lambda function for each row

In [22]:
arrow_ds.map(lambda x: x['amount'] * 2.5).take(5)

Map Progress: 100%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 200/200 [00:03<00:00, 63.79it/s]


[3.75, 7.5, 11.25, 15.0, 18.75]

Filter by amount and state

In [23]:
arrow_ds.filter(lambda x: x['amount'] > 10000.00 and x['state'] == 'CA').take(2)

Map Progress: 100%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 200/200 [00:04<00:00, 46.16it/s]


[{'id': 6669, 'ssn': None, 'name': None, 'amount': 10003.5, 'interest': 0.30000000000000004, 'state': 'CA', 'marital_status': 'divorced', 'property': 'rental', 'dependents': 2, 'defaulted': 1, 'gender': 'U'},
 {'id': 6672, 'ssn': None, 'name': None, 'amount': 10008.0, 'interest': 0.4, 'state': 'CA', 'marital_status': 'single', 'property': 'house', 'dependents': 2, 'defaulted': 0, 'gender': 'U'}]

Use `groupBy` state and compute the count

In [27]:
results = arrow_ds.groupby("state").count()

Sort Sample: 100%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 200/200 [00:01<00:00, 160.70it/s]
GroupBy Map: 100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 200/200 [00:03<00:00, 57.27it/s]
GroupBy Reduce: 100%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 200/200 [00:00<00:00, 6359.10it/s]


In [28]:
results.show()

{'state': 'AZ', 'count()': 141813}
{'state': 'CA', 'count()': 141835}
{'state': 'OR', 'count()': 141713}
{'state': 'TX', 'count()': 141258}
{'state': 'UT', 'count()': 141625}
{'state': 'WA', 'count()': 141756}


Get the max of these columns

In [29]:
results=arrow_ds.max(["amount", "interest", "dependents"])
results

GroupBy Map: 100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 200/200 [00:04<00:00, 44.32it/s]
GroupBy Reduce: 100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 1/1 [00:00<00:00, 30.20it/s]


{'max(amount)': 1275000.0, 'max(interest)': 0.5, 'max(dependents)': 5}

### Accessing datasets using batches or iterating by rows

Datasets can be passed to Ray tasks or actors and read with `.iter_batches()` or `.iter_rows()`. This does not incur a copy, since the blocks of the Dataset are passed by reference as Ray objects. Splitting data as shards and passing to individual Ray Actors to process shards in a common Ray pattern used in distributed training with Ray actors.

Let's examine how.

*Questions for clearificaiton*:
 * _why is shard a list now instead of Dataset_?

In [30]:
@ray.remote
class BatchWorker:
    def __init__(self, rank):
        self.rank = rank
        self.processed= 0
    
    @ray.method(num_returns=2)
    def process_shard_list(self, shard: ray.data.Dataset) -> tuple:
        for batch in shard.iter_batches(batch_size=1024):
            # do something with the batch such as feature
            # processing, transformation, and 
            # save as a parquet files 
            self.processed = self.processed + len(batch)
        # return items processed, worker id
        return (self.processed, self.rank)     

#### Create batch workers as Ray actors
Each actor will get a shard, list of rows, to work on. We split
our dataset `arrow_ds` into five shards. Each `BatchWorker` gets a shard.
`.split`() splits shards across these batch of workers by using the `locality_hints`

In [31]:
batch_workers = [BatchWorker.remote(i) for i in range(1, 6)]

shards = arrow_ds.split(len(batch_workers), locality_hints=batch_workers)

print(f"Shard row: {shards[0]}")
print(f"Number of shards:{len(shards)}")
print(f"Number of shard workers:{len(batch_workers)}")

Shard row: Dataset(num_blocks=40, num_rows=170000, schema={id: int64, ssn: null, name: null, amount: double, interest: double, state: string, marital_status: string, property: string, dependents: int64, defaulted: int64, gender: string})
Number of shards:5
Number of shard workers:5


### Launch `BatchWorker` actors

Process each shard. Each `BatchWorker.process_shard_list()` returns a object RefID with a tuple as its value. What we get from this comprehension is a list objectRefs as tuples.

In [32]:
object_refs = [w.process_shard_list.remote(s) for w, s in zip(batch_workers, shards)]
object_refs, len(object_refs)

([[ObjectRef(d55f44605ed60eca524127effe1a504898b7ee7b0100000001000000),
   ObjectRef(d55f44605ed60eca524127effe1a504898b7ee7b0100000002000000)],
  [ObjectRef(cb926fc120e2294b967e852eb6601c2c385f12610100000001000000),
   ObjectRef(cb926fc120e2294b967e852eb6601c2c385f12610100000002000000)],
  [ObjectRef(941c76826abe64fd85b38342e5fc14b78101c6c90100000001000000),
   ObjectRef(941c76826abe64fd85b38342e5fc14b78101c6c90100000002000000)],
  [ObjectRef(2301943ae99f0a54a6a043d58b6aec2f8c102c5c0100000001000000),
   ObjectRef(2301943ae99f0a54a6a043d58b6aec2f8c102c5c0100000002000000)],
  [ObjectRef(e4131e9dab52e0600a5446e09564682ded9398b30100000001000000),
   ObjectRef(e4131e9dab52e0600a5446e09564682ded9398b30100000002000000)]],
 5)

Fetch the values from the returned list of ObjectRefs, which is a tuple of (batch_size, worker_rank).

In [33]:
values = [ray.get(ref) for ref in object_refs]
values

[[170000, 1], [170000, 2], [170000, 3], [170000, 4], [170000, 5]]

### Creating and using dataset pipelines

What are dataset pipelines and how are they different from Ray datasets? 

Datasets perform transformation or operations eagerly or synchronously, whereas [DataPipelines](https://docs.ray.io/en/latest/data/package-ref.html#datasetpipeline-api) can execute in an overlaped pipeline executions. For example, if you had operations that require reading from file, transforming data, and then doing some minor feature engineering, these operations can be executed in a normal pipeline fashion. This allows for the overlapped execution of data input (e.g., reading files), computation (e.g. feature preprocessing), and training (e.g., distributed ML training).

A `DatasetPipeline` can be constructed in two ways: either by pipelining the execution of an existing Dataset (via `Dataset.window`) or generating repeats of an existing Dataset (via `Dataset.repeat`). 

Let's have a go at it and see what we can do with our synthetic data from above.


### Using Dataset.window

Create some functions or operations to be executed in a overlapped manner in the pipeline. These functions
are simple to illustrate a point. But they can be complex for a particular use case.

 _Questions for clarification_:
 * _how can we send arguments to these pipeline functions?_

In [34]:
def divide_row_value(row: ray.data.impl.arrow_block.ArrowRow) -> int:
    return round(row / 2)

In [35]:
def double_row_value(row: ray.data.impl.arrow_block.ArrowRow) -> int:
    return row * 2

In [36]:
def modulo_row_value(row: ray.data.impl.arrow_block.ArrowRow) -> int:
    return row % random.randint(1, 42)

#### Create a window based pipeline
With a each window of 50 blocks. 

_Questions for clarification_:
 * _why the number of stages is 2?_

In [43]:
ds_pipe = ds.window(blocks_per_window=50)
ds_pipe

DatasetPipeline(num_windows=4, num_stages=2)

### Applying transforms to pipelines adds more pipeline stages.
_Questions for clarification_:
 * _how can we send arguments to thse pipeline functions?_

In [44]:
ds_pipe = ds_pipe.map(divide_row_value)
ds_pipe = ds_pipe.map(double_row_value)
ds_pipe = ds_pipe.map(modulo_row_value)
print(ds_pipe)

DatasetPipeline(num_windows=4, num_stages=5)


#### Iterate our pipeline

 * _Questions for clearification_:
     * _how is this executed_?
     * _why are we iterating over rows_?
     * _what is row comprised of? Blocks?_?
     * _is the value of the row an already computed value_?
     * _if the `num_stages=5`, why am I seeing only stage 0 and 1 in the output of stages?_

In [45]:
results=[]
for row in ds_pipe.iter_rows():
    results.append(row)
print(f"Total value: {sum(results)}")

Stage 0:   0%|                                                                                                                                                                 | 0/4 [00:00<?, ?it/s]
  0%|                                                                                                                                                                          | 0/4 [00:00<?, ?it/s][A
Stage 1:   0%|                                                                                                                                                                 | 0/4 [00:00<?, ?it/s][A[2m[36m(_map_block_nosplit pid=88622)[0m E0613 14:55:37.417788000 123145559744512 chttp2_transport.cc:1103]     Received a GOAWAY with error code ENHANCE_YOUR_CALM and debug data equal to "too_many_pings"
[2m[36m(_map_block_nosplit pid=88621)[0m E0613 14:55:37.397263000 123145471377408 chttp2_transport.cc:1103]     Received a GOAWAY with error code ENHANCE_YOUR_CALM and debug data equal to "t

Total value: 985328





Let's try a Datapipeline with our synthetic data

In [46]:
# count or return based on the condition
def count_ca(row: ray.data.impl.arrow_block.ArrowRow) -> int:
    return 1 if row['state'] == "CA" and row["defaulted"] else 0

In [47]:
arrow_ds_pipe = arrow_ds.window(blocks_per_window=50)
arrow_ds_pipe

DatasetPipeline(num_windows=4, num_stages=1)

In [48]:
arrow_ds_pipe = arrow_ds_pipe.map(count_ca)
arrow_ds_pipe

DatasetPipeline(num_windows=4, num_stages=2)

In [49]:
results=[]
for row in arrow_ds_pipe.iter_rows():
    results.append(row)
print(f"Total rows for CA state and defaulted loans rows: {sum(results)}")

Stage 0:   0%|                                                                                                                                                                 | 0/4 [00:00<?, ?it/s]
  0%|                                                                                                                                                                          | 0/4 [00:00<?, ?it/s][A
Stage 1:   0%|                                                                                                                                                                 | 0/4 [00:00<?, ?it/s][A
Stage 0:  50%|████████████████████████████████████████████████████████████████████████████▌                                                                            | 2/4 [00:01<00:01,  1.02it/s][A
Stage 0:  75%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████▊                                      | 3/4 [00:02<00:00,  1.04it/s][

Total rows for CA state and defaulted loans rows: 70765





### Ingesting into Model Trainers
Let's define a dummy trainer that takes our synthetic data and trains the model and returns loss for that trainer

In [50]:
def model(input):
    return random.uniform(0, 1)

@ray.remote
class Trainer:
    def __init__(self, rank, model):
        self.rank = rank
        self.model = model
        self.loss = 0.0
        
    def train(self, shard:ray.data.Dataset) -> float:
        for batch in shard.iter_batches(batch_size=1024):
            for epoch in range(1,21):
                output = self.model(batch)
                self.loss = output 
        if epoch % 5 == 0:
            print(f'epoch {epoch}, loss: {self.loss:.3f}')
        return self.loss

In [51]:
trainers = [Trainer.remote(i, model) for i in range(1, 6)]
trainers

[Actor(Trainer, 4db2622e1f9b65f2113c86d901000000),
 Actor(Trainer, 17b052035cf001bb4d68581d01000000),
 Actor(Trainer, 760c1cf11447dd72c8daefda01000000),
 Actor(Trainer, b3d8b2cf153d1e9922c1e93d01000000),
 Actor(Trainer, e2bf5cf7fcc09cf709b4eb3301000000)]

In [52]:
shards = arrow_ds.split(n=len(trainers), locality_hints=trainers)
shards

[Dataset(num_blocks=40, num_rows=170000, schema={id: int64, ssn: null, name: null, amount: double, interest: double, state: string, marital_status: string, property: string, dependents: int64, defaulted: int64, gender: string}),
 Dataset(num_blocks=40, num_rows=170000, schema={id: int64, ssn: null, name: null, amount: double, interest: double, state: string, marital_status: string, property: string, dependents: int64, defaulted: int64, gender: string}),
 Dataset(num_blocks=40, num_rows=170000, schema={id: int64, ssn: null, name: null, amount: double, interest: double, state: string, marital_status: string, property: string, dependents: int64, defaulted: int64, gender: string}),
 Dataset(num_blocks=40, num_rows=170000, schema={id: int64, ssn: null, name: null, amount: double, interest: double, state: string, marital_status: string, property: string, dependents: int64, defaulted: int64, gender: string}),
 Dataset(num_blocks=40, num_rows=170000, schema={id: int64, ssn: null, name: null, a

In [53]:
object_refs = [t.train.remote(s) for t, s in zip(trainers, shards)]
ray.get(object_refs)

[0.14435674286580868,
 0.10737865179730799,
 0.8368477658282624,
 0.7149714607805753,
 0.1402075824517085]

[2m[36m(Trainer pid=89411)[0m epoch 20, loss: 0.144
[2m[36m(Trainer pid=89414)[0m epoch 20, loss: 0.715
[2m[36m(Trainer pid=89412)[0m epoch 20, loss: 0.107
[2m[36m(Trainer pid=89413)[0m epoch 20, loss: 0.837
[2m[36m(Trainer pid=89415)[0m epoch 20, loss: 0.140


In [54]:
ray.shutdown()

### Exercises
 1. Write some transformers, filters, and aggregators with our systhetic data
 2. Add additional pipleline stages with our synthetic data

### Homework

1. Work through the NYC example tutorial