# Intermediate Ray (part 1): More Ray Core and Dealing with Large Data

This tutorial builds on the material in `1-Beginner-Ray-Core.ipynb`, and it is highly recommended to read and/or work through that notebook first.
In particular, this notebook continues to explore the sample problem introduced in the first notebook: approximating the value of Pi using a Monte Carlo approach.

In this notebook, that sample problem will be expanded to illustrate the following concepts:
* More options for remote Task management with ray.wait and ray.cancel
* Creating and scheduling remote Actors
* Using the Ray object store with ray.put
* Loading and processing data with Ray Data
* Loading and processing data with Modin
* Common pitfalls for larger-than-memory data
* Bonus - performance improvements with dedicated local storage

This tutorial is designed to serve two purposes:
1. Round out the conceptual introduction to fundamental Ray concepts from the beginner notebook.
2. Provide practical examples of loading and manipulating large data in Ray, which is a prerequisite and common stumbling block for making use of a wide variety of other packages and algorithms.

## Beginner notebook recap

This notebook assumes you have a Ray cluster set up in Domino following the `README.md`; see the Beginner notebook for a more detailed explanation.

### Connecting to and inspecting your Ray cluster

Always remember to connect to your Ray cluster in Domino like below, using the provided environment variables for the host and port.
Your Ray code may still run without this step, but it will not be running on the cluster correctly!

As in the Beginner notebook, it's recommended to duplicate the workspace tab in your browser, so you can have one showing this notebook side-by-side with the **Ray Web UI** as you work through the tutorial.

In [1]:
import ray
import os
import random
import time
import numpy as np

In [2]:
if ray.is_initialized() == False:
    service_host = os.environ["RAY_HEAD_SERVICE_HOST"]
    service_port = os.environ["RAY_HEAD_SERVICE_PORT"]
    ray.init(f"ray://{service_host}:{service_port}")

In the beginner notebook, we also inspected the cluster by looking at `ray.nodes()`. Two more useful functions for inspecting the cluster are:
* `ray.cluster_resources()` shows the total resources for the cluster, as well as CPU for each node
* `ray.available_resources()` is very similar, but shows the *idle/available* resources

In [3]:
ray.cluster_resources()

{'CPU': 18.0,
 'node:10.0.40.94': 1.0,
 'object_store_memory': 26028377700.0,
 'memory': 57840795651.0,
 'node:10.0.36.194': 1.0,
 'node:10.0.46.72': 1.0}

### Setting up our Monte Carlo approximation of Pi

Below we set up a remote function for running the Monte Carlo sampling to calculate Pi.

This is a consolidated version of the code in the Beginner notebook - note that we now return the estimate of Pi directly.
Multiple batches can still be combined to get a better estimate, by averaging the value of Pi for each.
Each batch must have equal numbers of points OR the average must weight each sample Pi accordingly.

In [4]:
@ray.remote
def mc_pi_single_batch(n_samples):
    n_inside = 0
    for _ in range(n_samples):
        x = random.uniform(0,1)
        y = random.uniform(0,1)
        if (x**2 + y**2) <= 1:
            n_inside += 1
    return 4 * n_inside / n_samples

Check that everything is set up correctly by running a small batch, and verify in the Ray Web UI that this runs on the cluster.
We expect to see a single process briefly run on the head node.

In [5]:
start = time.time()
n = 10**6
pi = ray.get(mc_pi_single_batch.remote(n))
print(f"Pi is approximately {pi} (from {n} samples in {time.time()-start:.2f}s)")

Pi is approximately 3.140888 (from 1000000 samples in 0.77s)


## More options for remote Task management

Our existing approximation of Pi works well enough, but has a few shortcomings:
* We must decide the number of batches and samples in advance
* We have no estimate of how accurate the approximation is

To fix this, we can rewrite the code to estimate the error on Pi.
Then, it would be convenient if we can decide what threshold of error is acceptable, and keep submitting more batches until we get the accuracy we want.
(In statistical terms, our estimate of the error on Pi is an example of the [standard error of the mean](https://en.wikipedia.org/wiki/Standard_error#Standard_error_of_the_sample_mean);
all that really matters here is that the error goes down as the number of batches goes up.)
This mimics the pattern of many machine learning problems.

However, using `ray.get` to get the results from each batch is not very flexible;
it will **block** until all tasks you are getting results for have finished.
If we want to keep checking results and submitting more Tasks as others finish, we will want a better way to do it.

### Getting results as Tasks finish with ray.wait

To overcome the fact that `ray.get` will block until all the tasks passed into it are finished, we turn to `ray.wait`.
It does the following:
* Return two lists, finished tasks and unfinished tasks.
* The number of finished tasks to wait for is controlled by `num_returns` and defaults to 1. (It still returns the finished task as a list of length 1.)
* It blocks until the requested number of tasks are finished, but allows processing those results while waiting for the remaining tasks.
* When no tasks remain, it returns an empty list for unfinished tasks.
* It does not matter what order tasks were submitted, or what order they are passed to `ray.wait` - it returns the first task(s) that are ready.

To get the results of the finished tasks, you will still use `ray.get`, but now you know that the results in question are ready.
All of this can be best illustrated with tasks that take very different amounts of time to complete, as below.
(Check out the [python docs for list comprehensions](https://docs.python.org/3/tutorial/datastructures.html#list-comprehensions) if you are not familiar with them.)

In [6]:
# Use ray.get to return all the results at once, and it waits for the slowest task
start = time.time()
uneven_tasks = [
    mc_pi_single_batch.remote(n)
    for n in [10**7, 3*10**6, 10**6]
]
all_results = ray.get(uneven_tasks)
print(f"Got all results {all_results} in {time.time()-start:.2f}s")

Got all results [3.1419688, 3.1424613333333333, 3.138148] in 7.64s


In [7]:
# Instead, get the first finished task with ray.wait
start = time.time()
uneven_tasks = [
    mc_pi_single_batch.remote(n)
    for n in [10**7, 3*10**6, 10**6]
]
finished_tasks, unfinished_tasks = ray.wait(uneven_tasks)
print(f"Got results {ray.get(finished_tasks)} in {time.time()-start:.2f}s")
print(f"There are {len(unfinished_tasks)} tasks unfinished")

Got results [3.140916] in 0.76s
There are 2 tasks unfinished


In [8]:
# Remember that ray.wait always returns two lists of futures, and not the results themselves
print(f"Finished tasks:\n  {finished_tasks}")
print(f"Unfinished tasks:\n  {unfinished_tasks}")

Finished tasks:
  [ClientObjectRef(623b26bdd75b28e9ffffffffffffffffffffffff0100000001000000)]
Unfinished tasks:
  [ClientObjectRef(32cccd03c567a254ffffffffffffffffffffffff0100000001000000), ClientObjectRef(480a853c2c4c6f27ffffffffffffffffffffffff0100000001000000)]


In [9]:
# You can also wait for more than one task
new_finished_tasks, new_unfinished_tasks = ray.wait(unfinished_tasks, num_returns=2)
print(f"Got more results {ray.get(new_finished_tasks)}")
print(f"There are now {len(new_unfinished_tasks)} tasks unfinished")

Got more results [3.1414092, 3.1416453333333334]
There are now 0 tasks unfinished


### Combining ray.wait, while loops, and ray.cancel

Most often, `ray.wait` will be used in conjunction with a `while` loop like below.
Most of the time this will wait until all tasks have finished, but in rare cases it may be useful to cancel remaining tasks with `ray.cancel`.
In this case, we've included one very large batch we do not want to wait for.
Watch the Ray Web UI while executing this cell to see the tasks in action.

Canceling a task can result in "unhandled errors". As long as you know they came from purposefully canceling a task, and have written your tasks to have no harmful side effects if interrupted, they are benign.
Without `force`, Ray will issue a `KeyboardInterrupt` and the error may look similar to what happens when interrupting a cell in a notebook.
With `force`, the task will immediately exit and the error may simply say a worker has died.
If the canceled task is still pending or is already finished, then there will be no error.

Also notice how both `finished` and `unfinished` are always treated as lists, even if they contain only 1 task!

In [10]:
start = time.time()
unfinished = [
    mc_pi_single_batch.remote(n)
    for n in [10**10, 10**7, 3*10**6]
]
results = []
while len(unfinished) > 1:
    finished, unfinished = ray.wait(unfinished)
    results.append(ray.get(finished[0]))
    print(f"Have {len(results)} results, {len(unfinished)} remaining, at {time.time()-start:.2f}s")
else:
    print(f"Canceling {len(unfinished)} unfinished tasks")
    for u in unfinished:
        ray.cancel(u, force=True)
print(f"Final results: {results} in {time.time()-start:.2f}s")

Have 1 results, 2 remaining, at 2.24s
Have 2 results, 1 remaining, at 7.40s
Canceling 1 unfinished tasks
Final results: [3.1427573333333334, 3.1411512] in 7.40s


### Improved Pi approximation with task management

Now we can rewrite our Pi approximation example to make use of this more flexible task management, which allows us to do a few things:
* Evaluate the accuracy of our Pi estimate as each task finishes
* Launch a new task as soon as an old task finishes
* Launch additional new tasks if the cluster scales up
* Clean up any in-flight tasks that are no longer needed when we reach the desired accuracy.

There are still a few simplifying assumptions here, such as requiring equal batch sizes.
We also assume this is the only work happening on the cluster, and that we can fit exactly one task per node running in parallel.
More robust code might use `ray.available_resources` instead of `ray.nodes`, and/or give the user some manual control over the degree of parallelism.

We require the error to converge over multiple batches as a simple way to avoid the problem of low statistics leading to bad (under)estimates of the error in the first few batches.
If you run this a few times, you may notice that this is not always effective!
Improving the statistical rigor of the Pi simulation is not our goal in this tutorial, so we will let it slide.

In [11]:
def smarter_approximate_pi_on_ray(batch_size=10**6, max_err=0.0005, convergence_req=5):
    start = time.time()
    pi_samples = []
    # Initially, launch as many tasks as there are nodes in the cluster
    unfinished_tasks = [
        mc_pi_single_batch.remote(batch_size)
        for _ in ray.nodes()
    ]
    # Initialize counters:
    #  conv - how many times in a row the error is below max_err
    #  N - how many batches have been processed total 
    conv, N = 0, 0
    print("N    Pi       Err      STD      Conv  Elapsed(s)")
    while conv < convergence_req:
        
        # Get whichever task finishes first
        finished_tasks, unfinished_tasks = ray.wait(unfinished_tasks)
        N += 1
        pi_samples.append(ray.get(finished_tasks[0]))
        
        # Calculate overall error on Pi; this assumes equal-sized batches!
        pi = np.mean(pi_samples)
        pi_std = np.std(pi_samples)
        pi_err = pi_std / np.sqrt(N)
        if pi_err < max_err:
            conv += 1
        else:
            conv = 0
        print(f"{N:<4d} {pi:.6f} {pi_err:.6f} {pi_std:.6f} {conv:<5d} {time.time()-start:3.2f}")
        
        # Start more tasks, checking the number of nodes again in case the cluster has scaled up
        # Most of the time, we expect one available node, and don't want to clutter the output
        n_avail_nodes = len(ray.nodes()) - len(unfinished_tasks)
        if n_avail_nodes > 1:
            print(f"SCALE-UP: starting an additional {n_avail_nodes - 1} task(s) on new nodes")
        unfinished_tasks.extend([
            mc_pi_single_batch.remote(batch_size)
            for _ in range(n_avail_nodes)
        ])
    
    else:
        # Clean up leftover tasks
        print(f"Cleaning up {len(unfinished_tasks)} leftover tasks")
        for u in unfinished_tasks:
            ray.cancel(u, force=True)
    
    return pi, pi_err

In [12]:
pi, pi_err = smarter_approximate_pi_on_ray()
print(f"DONE! Final result: pi = {pi} +/- {pi_err}")

N    Pi       Err      STD      Conv  Elapsed(s)
1    3.139072 0.000000 0.000000 1     0.75
2    3.139536 0.000328 0.000464 2     0.76
3    3.140664 0.000947 0.001640 0     0.80
4    3.140535 0.000719 0.001437 0     1.54
5    3.140654 0.000585 0.001307 0     1.55
6    3.140731 0.000492 0.001206 1     1.57
7    3.141217 0.000617 0.001633 0     2.36
8    3.141002 0.000576 0.001630 0     2.37
9    3.141125 0.000525 0.001575 0     2.37
10   3.141235 0.000484 0.001530 1     3.11
11   3.141332 0.000449 0.001491 2     3.12
12   3.141586 0.000479 0.001658 3     3.12
13   3.141645 0.000446 0.001606 4     3.84
14   3.141553 0.000423 0.001583 5     3.87
Cleaning up 3 leftover tasks
DONE! Final result: pi = 3.1415528571428575 +/- 0.00042316864560587624


Unhandled error (suppress with RAY_IGNORE_UNHANDLED_ERRORS=1): The worker died unexpectedly while executing this task. Check python-core-worker-*.log files for more information.
Unhandled error (suppress with RAY_IGNORE_UNHANDLED_ERRORS=1): The worker died unexpectedly while executing this task. Check python-core-worker-*.log files for more information.


In [13]:
# Try aiming for higher accuracy if you have autoscaling enabled to see it in action
#pi, pi_err = smarter_approximate_pi_on_ray(batch_size=10**7, max_err=0.00005)
#print(f"DONE! Final result: pi = {pi} +/- {pi_err}")

## Ray Actors

In addition to defining remote functions (i.e. Tasks), you can also define remote classes, which are called Actors.

Before showing how actors work, it's important to mention that these won't be necessary for most users of Ray to make use of these.
To quote [the docs](https://docs.ray.io/en/latest/ray-core/actors.html): "\[...\] tasks are scheduled more flexibly, \[and\] if you don’t need the stateful part of an actor, you’re mostly better off using tasks".
We will show the basics here, and mention a few caveats about scheduling and worker behavior.
However, you will be much more likely to interact with Actors indirectly through other libraries.

Some things to keep in mind about Actors:
* They are dedicated workers, and will execute only one task at a time.
* There are a few common patterns involving Actors which we will not fully explore in this notebook, for example a [tree of actors](https://docs.ray.io/en/latest/ray-core/actors/patterns/tree-of-actors.html) with one "Supervisor" and some number of "Workers".
* Typically, actors are automatically cleaned up when all references to them have gone out of scope, but they can also be cleaned up with `ray.kill()`.
* Actors can be scheduled with specific resource requests. Tasks can also be scheduled this way, but unlike Tasks, Actors require 0 resources by default for *running*, and typically require more care in scheduling because they are long-lived.

You can run the cells in this section immediately after the first two cells in this notebook (imports and initializing Ray).
It does not rely on any cells in the Task Management section above.

### Creating, using, and cleaning up Actors

Below, we create a worker for our Pi approximation that can run batch simlations of a given size.
Note that both initializing the worker and subsequent calls to `run_batch` make use of `remote`.

Watch the Ray Web UI to see the Actor lifecycle:
* It appears as a dedicated worker on one of the nodes, labeled with the class name
* It uses some CPU when running a simulation batch
* It disappears after `ray.kill`

In [14]:
@ray.remote
class MCPiWorker:
    def __init__(self, batch_size, worker_idx):
        self.batch_size = batch_size
        self.worker_idx = worker_idx
    
    def run_batch(self):
        n_inside_quadrant = 0
        for _ in range(self.batch_size):
            x = random.uniform(0,1)
            y = random.uniform(0,1)
            if (x**2 + y**2) <= 1:
                n_inside_quadrant += 1
        pi = 4*n_inside_quadrant/self.batch_size
        return pi, self.worker_idx

In [15]:
# Initialize the worker, and note the type
worker1 = MCPiWorker.remote(10**6, 1)
print(worker1)

ClientActorHandle(7ad8a12a33cd39e588f5215501000000)


In [16]:
# Run one batch of the pi simulation - very similar to using remote Tasks!
pi_future = worker1.run_batch.remote()
print(f"Starting work on the Actor returns a future: {pi_future}")
pi, worker_id = ray.get(pi_future)
print(f"Worker {worker_id} returned a batch result pi={pi}")

Starting work on the Actor returns a future: ClientObjectRef(4e2ab276f14c37c27ad8a12a33cd39e588f521550100000001000000)
Worker 1 returned a batch result pi=3.140976


In [17]:
# Kill the worker
ray.kill(worker1)

### Scheduling Actors

If you try creating multiple workers at once, you may see them scheduled on the same node.
This is not what we want for our current example - we don't want each batch of the simulation sharing single node resources!
Let's check the time it takes to run two batches this way.

In [18]:
# Check the Ray Web UI to see these (likely) appear on the same node
worker2 = MCPiWorker.remote(10**7, 2)
worker3 = MCPiWorker.remote(10**7, 3)

In [19]:
start = time.time()
pi_futures = [
    w.run_batch.remote()
    for w in [worker2, worker3]
]
pi_results = ray.get(pi_futures)
print(f"Got results {pi_results} in {time.time()-start:.2f}")

Got results [(3.141006, 2), (3.1412604, 3)] in 7.51


In [20]:
# Remove these workers so we can try again
ray.kill(worker2)
ray.kill(worker3)

To guarantee spreading out the Actors on different nodes, we'll make use of `options` for each worker.
Specifying resource requirements can force the actors to be spread across nodes.
The number of CPUs requested for this will depend on the Hardware Tier you choose for the cluster nodes.

If every copy of the Actor needs the same resources, they can also be specified in the decorator we first used to turn our class into a Ray Actor.
There is a lot more to scheduling that we won't cover here, and this is an area where Ray is adding functionality (especially in Ray 2.0), so be mindful of versions!

In [21]:
# Specify a resource request that will force Actors onto different nodes
worker4 = MCPiWorker.options(num_cpus=1).remote(10**7, 4)
worker5 = MCPiWorker.options(num_cpus=1).remote(10**7, 5)

In [22]:
start = time.time()
pi_futures = [
    w.run_batch.remote()
    for w in [worker4, worker5]
]
pi_results = ray.get(pi_futures)
print(f"Got results {pi_results} in {time.time()-start:.2f}")

Got results [(3.1423332, 4), (3.1413644, 5)] in 7.73


In [23]:
# Clean up
ray.kill(worker4)
ray.kill(worker5)

### Complete Pi approximation with Actors

Below we've rewritten the Pi approximation function to use Actors instead of Tasks.
Notice how we are manually keeping track of the worker ID for each Actor here, so we know which workers are idle and can accept more batches.

In [24]:
def approximate_pi_with_actors(batch_size=10**6, max_err=0.0005, convergence_req=5):
    start = time.time()
    pi_samples = []
    n_workers = len(ray.nodes())
    print(f"Initializing {n_workers} workers")
    workers = [MCPiWorker.options(num_cpus=1).remote(batch_size, i) for i in range(n_workers)]
    unfinished_tasks = [w.run_batch.remote() for w in workers]
    conv, N = 0, 0
    print("N    Pi       Err      STD      Conv  Elapsed(s)")
    while conv < convergence_req:
        finished_tasks, unfinished_tasks = ray.wait(unfinished_tasks)
        current_pi, idle_worker_idx = ray.get(finished_tasks[0])
        N += 1
        pi_samples.append(current_pi)
        pi = np.mean(pi_samples)
        pi_std = np.std(pi_samples)
        pi_err = pi_std / np.sqrt(N)
        print(f"{N:<4d} {pi:.6f} {pi_err:.6f} {pi_std:.6f} {conv:<5d} {time.time()-start:3.2f}")
        if pi_err < max_err:
            conv += 1
        else:
            conv = 0
        unfinished_tasks.append(workers[idle_worker_idx].run_batch.remote())
    else:
        print(f"Cleaning up {len(workers)} workers ({len(unfinished_tasks)} with leftover tasks)")
        for w in workers:
            ray.kill(w)
    return pi, pi_err

In [25]:
pi, pi_err = approximate_pi_with_actors()
print(f"DONE! Final result: pi = {pi} +/- {pi_err}")

Initializing 3 workers
N    Pi       Err      STD      Conv  Elapsed(s)
1    3.139984 0.000000 0.000000 0     1.46
2    3.140812 0.000585 0.000828 1     1.48
3    3.141051 0.000436 0.000756 0     1.49
4    3.141045 0.000327 0.000654 1     2.22
5    3.141002 0.000265 0.000592 2     2.23
6    3.141135 0.000252 0.000616 3     2.24
7    3.141265 0.000247 0.000653 4     2.98
Cleaning up 3 workers (3 with leftover tasks)
DONE! Final result: pi = 3.1412651428571428 +/- 0.0002469189265087082


## Object store and ray.put

Until now, we have been running simulations that generate and discard one point at a time.
They have been CPU-intensive, but use almost no RAM.

Most applications for Ray will involve processing existing data, often large amounts of it.
What happens when we mimic that scenario by generating random points in advance, then passing them to a remote task?
Ray will complain about the large amount of data being passed.

In [26]:
# Pre-generate the points
points = np.random.uniform(size=(2, 10**6))

In [27]:
# Define a new streamlined remote function to process them and estimate pi
@ray.remote
def mc_pi(points):
    n_inside = sum(points[0]**2 + points[1]**2 < 1)
    n_samples = points.shape[1]
    return 4 * n_inside / n_samples

In general, it is more efficient to pass data around the Ray cluster by first loading it into the [Object Store](https://docs.ray.io/en/latest/ray-core/objects.html).
Then, pass it to Tasks via **object refs**.

Both Tasks and Actors already return object refs; let's summarize their properties:
* Objects are stored in the shared-memory object store. There is one object store per node, and memory usage of the object store is shown in the Plasma column of the Ray Web UI.
* Objects are referred to by object refs, a unique pointer or ID that are used to refer to objects without seeing the actual data.
* Remote function calls automatically return object refs, in which case they are essentially futures. The contents of the object are accessed using `ray.get`.

Now we want to introduce some **new** properties and ways to use object refs:
* You can also explicitly create object refs using `ray.put`.
* When object refs are passed as arguments to a remote function, they are automatically de-referenced inside the function (no need to explicitly call `ray.get`). This does not apply to object refs nested inside other arguments.
* Remote objects are **immutable**

Watch the Plasma store usage in the Ray Web UI while running this section.

In [28]:
# First, pass the points directly to the function - notice the warning the first time you run this!
start = time.time()
print(f"Inspect the type of the argument we are passing: {type(points)}")
pi = ray.get(mc_pi.remote(points))
print(f"Got results {pi} in {time.time()-start:.2f}s")

Inspect the type of the argument we are passing: <class 'numpy.ndarray'>




Got results 3.140376 in 3.26s


In [29]:
# Now, use ray.put to load the points into the object store, and pass only the reference
# Notice that there is no need to add any "ray.get" into our function code!
start = time.time()
points_ref = ray.put(points)
print(f"Inspect the type of the argument we are passing: {type(points_ref)}")
pi = ray.get(mc_pi.remote(points_ref))
print(f"Got results {pi} in {time.time()-start:.2f}s")

Inspect the type of the argument we are passing: <class 'ray._raylet.ClientObjectRef'>
Got results 3.140376 in 2.15s


Explicitly loading the points into the object store before calling the function is a little faster.
The real benefits to passing references like this are in two situations:
* If multiple remote tasks will operate on the same data, using `ray.put` means it has to be copied into the object store only once instead of every time it is passed to a function.
* If data is generated as a result of another remote task, then fed into a subsequent task, there is no need to call `ray.get` in between, and the data need never leave the cluster. In fact, calling `ray.get` unnecessarily is listed as a [common antipattern](https://docs.ray.io/en/latest/ray-core/tasks/patterns/unnecessary-ray-get.html).

To see an example of the second case, we can have a remote task generate our points rather than generating them here in the client notebook.

In [30]:
@ray.remote
def mc_pi_generate_points(n_samples):
    points = np.random.uniform(size=(2, n_samples))
    return points

In [31]:
start = time.time()
n = 10**6
points_future = mc_pi_generate_points.remote(n)
print(f"Inspect the type of the argument we are passing: {type(points_future)}")
pi = ray.get(mc_pi.remote(points_future))
print(f"Got results {pi} in {time.time()-start:.2f}s")

Inspect the type of the argument we are passing: <class 'ray._raylet.ClientObjectRef'>
Got results 3.140308 in 2.04s


Note that numpy arrays are especially efficient for use in the Ray object store:
The [docs](https://docs.ray.io/en/releases-2.0.0/ray-core/objects.html) note that 
"If the object is a numpy array or a collection of numpy arrays, the get call is zero-copy and returns arrays backed by shared object store memory. Otherwise, we deserialize the object data into a Python object."
In other words, if a remote task is acting on a numpy array that is already in the corresponding node's object store, it can act directly on the array without making another copy.

## Loading large data with Ray Data

Dealing directly with `ray.put` may be simple and efficient with our example numpy arrays, but most problems will start with some data (potentially large) living in files on disk.
Ideally, we should avoid reading that data into the client notebook at all, and read it directly into the object store of the Ray cluster.

### Exploring the available data files

In this section, we will read in pre-generated points stored as parquet files.
The code to generate these files is in the `admin-notebooks`, and the recommended way to access the files during a workshop is to mount a **Dataset** from the public version of this project which contains those files.

Adjust the path below to match where the files are located in your project.

In [32]:
# dataset_path = f"/domino/datasets/local/{os.environ['DOMINO_PROJECT_NAME']}"
# dataset_path = "/domino/datasets/local/Points-For-Pi-Approximation"
# dataset_path = "/domino/datasets/Points-For-Pi-Approximation"
dataset_path = "/mnt/imported/data/Points-For-Pi-Approximation"

Ensure the files are present as expected.
Each file shows how many points are represented in exponential notation, e.g. `3e3` for `300`.

In [33]:
!ls -shS {dataset_path} | grep parquet

1.5G points_1e8.parquet
459M points_3e7.parquet
154M points_1e7.parquet
 47M points_3e6.parquet
 16M points_1e6.parquet


We also expect some directories with split parquet files, which also indicate the number of individual files

In [34]:
!du -sh {dataset_path}/points_*_split*

1.6G	/domino/datasets/local/ray-tutorial/points_1e8_split10
478M	/domino/datasets/local/ray-tutorial/points_3e7_split10
4.7G	/domino/datasets/local/ray-tutorial/points_3e8_split30


### Reading in parquet files

To read a parquet file with Ray Data, simply use the `ray.data.read_parquet` function.
Watch the RAM and Plasma usage when reading in the files.
These cells assume Small hardware tier for the cluster, and pick "small" and "medium" files accordingly - you may want to experiment with different sizes on different hardware tiers.

In [35]:
small_points_ds = ray.data.read_parquet(os.path.join(dataset_path, "points_1e6.parquet"))

In [36]:
small_points_ds.show(3)

{'x': 0.15744372219962044, 'y': 0.9991738982470619}
{'x': 0.24704316132542592, 'y': 0.6131349061935508}
{'x': 0.443876643349869, 'y': 0.9617571214294107}


In [37]:
type(small_points_ds)

ray.data.dataset.Dataset

In [38]:
print(f"Small data has {small_points_ds.count()} rows.")

Small data has 1000000 rows.


### Iterating over data with map and map_batches

To process these points, we have two major options with Ray Data:
* Loop over each row individually with `map`
* Loop over batches with `map_batches`

In general, `map_batches` is much more efficient.
It can operate on the data as a pyarrow Table, or as a pandas Dataframe, using vectorized operations.
Using `map_batches` does typically require some extra care in the formatting of input and output data - note the comments in the below cells!

In [39]:
# Map individual rows
start = time.time()
n_total = small_points_ds.count()
inside = small_points_ds.map(lambda row: (row['x']**2 + row['y']**2) < 1 )

print(f"Inspect the intermediate boolean results:\n {type(inside)}")
inside.show(3)
inside.schema()

n_inside = inside.sum()
print(f"Inspect the intermediate sum results:\n {type(n_inside)}")

pi = 4 * n_inside / n_total
print(f"Pi is approximately {pi} (from {n_total} samples in {time.time()-start:.2f}s)")

Map Progress: 100%|██████████| 1/1 [00:09<00:00,  9.97s/it]


Inspect the intermediate boolean results:
 <class 'ray.data.dataset.Dataset'>
False
True
False


GroupBy Map: 100%|██████████| 1/1 [00:00<00:00,  1.55it/s]
GroupBy Reduce: 100%|██████████| 1/1 [00:00<00:00, 93.25it/s]

Inspect the intermediate sum results:
 <class 'int'>
Pi is approximately 3.14048 (from 1000000 samples in 11.00s)





In [40]:
# Define a function to process a batch.
# It will expect a pandas Dataframe as input, which we can specify in the next cell
# It must return a pandas Dataframe or pyarrow Table as output
# (This is true even if the output will be a single number!)

import pyarrow as pa

def process_batch(batch):
    result = (batch['x']**2 + batch['y']**2 < 1).sum()
    return pa.Table.from_pydict({"n_inside": [result]})

In [41]:
start = time.time()
n_total = small_points_ds.count()
n_inside = small_points_ds.map_batches(process_batch,batch_format='pandas')

print(f"Inspect the intermediate sum results:\n {type(n_inside)}")
n_inside.show(3)
n_inside.schema()

pi = 4 * n_inside.sum() / n_total
print(f"Pi is approximately {pi} (from {n_total} samples in {time.time()-start:.2f}s)")

Map Progress: 100%|██████████| 1/1 [00:00<00:00,  3.79it/s]


Inspect the intermediate sum results:
 <class 'ray.data.dataset.Dataset'>
{'n_inside': 785120}


GroupBy Map: 100%|██████████| 1/1 [00:00<00:00, 429.22it/s]
GroupBy Reduce: 100%|██████████| 1/1 [00:00<00:00, 392.54it/s]

Pi is approximately 3.14048 (from 1000000 samples in 0.29s)





If you are watching the Ray Web UI, you will notice all the computation happening on a single node in the previous cells.
We are incurring overhead to do the computation in Ray, without any of the benefits.
In fact, using pure pandas is much faster than what we've been doing with Ray!
This is a good example of the principle that you should not distribute your code unless there is a real need to do so.

In [42]:
# Pure pandas comparison of previous section - it's actually faster!
import pandas as pd

small_points_df = pd.read_parquet(os.path.join(dataset_path, "points_1e6.parquet"))

In [43]:
start = time.time()
n_total = small_points_df.shape[0]
n_inside = (small_points_df['x']**2 + small_points_df['y']**2 < 1).sum()
pi = 4 * n_inside / n_total
print(f"Pi is approximately {pi} (from {n_total} samples in {time.time()-start:.2f}s)")

Pi is approximately 3.14048 (from 1000000 samples in 0.01s)


### Multiple blocks and partial reads

To get data on multiple nodes, we can repartition the data like below.

However, the real benefits of loading data directly into Ray (instead of, say, loading it here in the client notebook and then using `ray.put`) come when data is much larger - too large to easily load onto any single node.

With Ray Data, when we read in data that is already broken into multiple files, it will automatically partition it across nodes when doing computations.
The level of parallelism on file read defaults to 200 OR the number of individual parquet files, **whichever is smaller**.
This means that Ray Data is not well suited for reading in large monolithic parquet files.

In [44]:
small_points_ds.num_blocks()

1

In [45]:
# Repartition the small data
small_points_ds = small_points_ds.repartition(3)

Repartition: 100%|██████████| 3/3 [00:00<00:00,  4.30it/s]


In [46]:
small_points_ds.num_blocks()

3

In [47]:
# Read in the medium data, which is already split among multiple files
medium_points_ds = ray.data.read_parquet(os.path.join(dataset_path, "points_1e8_split10"))

In [48]:
medium_points_ds.show(3)

{'x': 0.37296319431756686, 'y': 0.3874623725627472}
{'x': 0.6090430917019034, 'y': 0.15193178219888936}
{'x': 0.028662288104923106, 'y': 0.6215241597148781}


In [49]:
medium_points_ds.num_blocks()

10

When reading in multiple parquet files, Ray Data will generally read only the first file until necessary.
This is an example of **lazy execution**, which is in contrast to the **immediate execution** we have seen with Ray so far.
Watch the Ray Web UI to see how the Plasma memory usage has not increased much yet, but will go up when we actually need to transform the entire data.

In [50]:
# Parquet files have some metadata already available without needing to read all the files.
medium_points_ds.count()

100000000

In [51]:
# Watch the object store memory go up as the rest of the medium dataset gets read in!
start = time.time()
n_total = medium_points_ds.count()
n_inside = medium_points_ds.map_batches(process_batch,batch_format='pandas')

print(f"Inspect the intermediate sum results:\n {type(n_inside)}")
n_inside.show(3)
n_inside.schema()

pi = 4 * n_inside.sum() / n_total
print(f"Pi is approximately {pi} (from {n_total} samples in {time.time()-start:.2f}s)")

Map Progress: 100%|██████████| 10/10 [00:02<00:00,  4.67it/s]


Inspect the intermediate sum results:
 <class 'ray.data.dataset.Dataset'>
{'n_inside': 7853759}
{'n_inside': 7853820}
{'n_inside': 7852750}


GroupBy Map: 100%|██████████| 10/10 [00:00<00:00, 416.36it/s]
GroupBy Reduce: 100%|██████████| 1/1 [00:00<00:00, 333.01it/s]

Pi is approximately 3.14153152 (from 100000000 samples in 6.41s)





### Memory overflows and other bonus activities

We've seen a few basic applications of Ray Data, but we have not demonstrated the ways things can go *wrong*.

One of the most common problem is memory overflows - any time a single parquet file is too large to read in on a single machine, Ray Data will generate errors somewhat like the below:

```
ConnectionError: GRPC connection failed: <_InactiveRpcError of RPC that terminated with:
	status = StatusCode.NOT_FOUND
	details = "Failed to serialize response!"
	debug_error_string = "{"created":"@1664232188.123154377","description":"Error received from peer ipv4:172.20.98.49:10001","file":"src/core/lib/surface/call.cc","file_line":1074,"grpc_message":"Failed to serialize response!","grpc_status":5}"
```

These can be very hard to interpret unless you know what you are looking for, and/or you see the memory run up to the max while you are watching in the Ray Web UI.
Data can also take several times the memory footprint than you might expect from seeing the size on disk - notice how much was required for the `medium` dataset in this notebook.

Try reading in the single medium dataset like below to see this problem first-hand.
If you do, pay attention to what happens if you try to run additional ray code afterwards.
Because your connection to the cluster crashes, ray is no longer initialized!
If you run more ray code after this, ray will implicitly do a `ray.init()` to re-establish a cluster connection... however, that wil **not** be the correct connection to the Domino cluster.
(Remember the correct way to do `ray.init()` involves referencing the Domino-provided cluster address.)
When in doubt, restarting the workspace will fully reset the cluster and allow you to connect again the correct way.

In [52]:
#medium_monolith_points_ds = ray.data.read_parquet(os.path.join(dataset_path, "points_1e8.parquet"))

In [53]:
#medium_monolith_points_ds.show(3)

In [54]:
#ray.is_initialized()

In [55]:
#ray.nodes()

Ray Data is not designed to supplant generic ETL tools like spark - see their [FAQ](https://docs.ray.io/en/latest/data/faq.html) for some more commentary.
It is mostly useful as an efficient way to load and pass data to other Ray libraries, so we won't go into any more detail in this tutorial.
However, if you find yourself needing to do some last-mile preprocessing with Ray Data, you may want to return to this example notebook and see what happens when you make some of the following change to code in this section:
* Try to return something that is NOT a table or dataframe from a function given to `map_batches` - see what error results.
* Return a pandas Dataframe instead of a pyarrow table, and see how the resulting `ds.schema()` changes.
* Try to do math like `batch['x']**2` inside a function given to `map_batches` without specifying `batch_format='pandas'`, i.e. directly on a pyarrow table - see what error results.
* Run the cells in this section on larger data to really see the performance differences among the different methods.

## Loading large data with Modin

Unlike Ray Data, [Modin](https://modin.readthedocs.io/en/stable/) is meant to be a drop-in replacement for Pandas that can seamlessly handle large data with a Ray cluster as the backend.
It does not have 100% coverage of the Pandas API, but where it does have coverage it aims for complete compatibility - simply replace `import pandas as pd` with `import modin.pandas as pd`!

To facilitate experimenting with both, we will instead import modin as follows:

In [56]:
import modin
import modin.pandas as mpd

In [57]:
# Confirm that Modin is using Ray
modin.utils.get_current_execution()

'PandasOnRay'

In [58]:
dataset_path = f"/domino/datasets/local/{os.environ['DOMINO_PROJECT_NAME']}"
#dataset_path = "/domino/datasets/local/Points-For-Pi-Approximation"
#dataset_path = "/domino/datasets/Points-For-Pi-Approximation"

### Reading in parquet files and manipulating data

Reading in parquet files - or any other format - is identical to Pandas.

In [59]:
small_points_mdf = mpd.read_parquet(os.path.join(dataset_path, "points_1e6.parquet"))

In [60]:
small_points_mdf.head(3)

Unnamed: 0,x,y
0,0.157444,0.999174
1,0.247043,0.613135
2,0.443877,0.961757


In [61]:
type(small_points_mdf)

modin.pandas.dataframe.DataFrame

In [62]:
small_points_inside = small_points_mdf['x']**2 + small_points_mdf['y']**2 < 1

In [63]:
type(small_points_inside)

modin.pandas.series.Series

In [64]:
sum(small_points_inside)

785120

### Parallel reads with Modin

Modin can read single large parquet files even if they do not fit in a single node's memory - however, it is not always robust to a "messy" cluster with some memory already in use.
Running the below cells in the same session as all the preceding notebook sections will likely result in an error, so to see Modin in action with larger data restart the workspace first.
(Remember to initialize the Ray cluster correctly before skipping to this section!)

In [65]:
# Read in the larger data
medium_points_mdf = mpd.read_parquet(os.path.join(dataset_path, "points_1e8.parquet"))

In [66]:
start = time.time()
n_total = medium_points_mdf.shape[0]
n_inside = sum(medium_points_mdf['x']**2 + medium_points_mdf['y']**2 < 1)
pi = 4 * n_inside / n_total
print(f"Pi is approximately {pi} (from {n_total} samples in {time.time()-start:.2f}s)")

Pi is approximately 3.14153468 (from 100000000 samples in 12.04s)


## Common pitfalls for large data

To summarize and expand on what we've seen in the previous sections, here are a few good general practices:
1. Avoid copying data to and from the cluster unnecessarily - use `ray.put` and delay using `ray.get` where appropriate when dealing with Ray Core functionality.
2. Remote tasks and actors cannot access files in `/mnt`, but they can access files mounted via `/domino/datasets/`, so store large files in Datasets.
3. Watch the Ray Web UI to see whether memory usage (especially RAM) are close to the limit. If code is running correctly, high memory usage is no problem - Ray will usually try to **spill to disk** when needed. But if code is seeing errors, and memory usage is high, try running on a smaller subset of data for initial troubleshooting.
4. Ray Data can only parallelize data read up to the number of individual parquet files being read, and it usually requires several times the on-disk size in memory. Split data into smaller individual files to prepare for reading with Ray Data.
5. Modin can parallelize data read even for single large files, but is not always robust to a "messy" cluster. Read data into Modin early in the script with a "fresh" workspace and cluster.
6. Any time a large data operation kills cluster workers and causes an error, beware the fact that it also likely kills the cluster connection. Even if subsequent ray code seems to run, it may have implicitly run an incorrect `ray.init()` to restart the connection without pointing to the correct Domino cluster address. Check `ray.is_initialized()`, `ray.nodes()` or similar to verify whether you are correctly connected to your Domino cluster, and when in doubt restart the workspace entirely.

In practice, the best way to load and manipulate large data in Ray will depend heavily on what other packages and libraries you plan to use.

## Bonus - performance improvements with dedicated local storage

We've just alluded to the fact that Ray will **spill to disk** when the object store memory is full.
This is an incredibly useful mechanism that allows manipulating data even when the total data size is too large to fit in the entire cluster's memory.
But how do you know when this is happening, and how does it work?

First, inspect the folder where spilled objects will be put - this will give an error if nothing has been spilled to disk, but should show more files as larger data is read in.

In [67]:
@ray.remote
def ray_local_file_ls(path):
    print(os.listdir(path))

In [68]:
# List files in the spilled objects folder on each node
[ray_local_file_ls.remote("/tmp/ray/session_latest/ray_spilled_objects") for _ in ray.nodes()];

Unhandled error (suppress with RAY_IGNORE_UNHANDLED_ERRORS=1): [36mray::ray_local_file_ls()[39m (pid=778, ip=10.0.46.72)
  File "/tmp/ipykernel_1584/3441348648.py", line 3, in ray_local_file_ls
FileNotFoundError: [Errno 2] No such file or directory: '/tmp/ray/session_latest/ray_spilled_objects'
Unhandled error (suppress with RAY_IGNORE_UNHANDLED_ERRORS=1): [36mray::ray_local_file_ls()[39m (pid=2234, ip=10.0.46.72)
  File "/tmp/ipykernel_1584/3441348648.py", line 3, in ray_local_file_ls
FileNotFoundError: [Errno 2] No such file or directory: '/tmp/ray/session_latest/ray_spilled_objects'
Unhandled error (suppress with RAY_IGNORE_UNHANDLED_ERRORS=1): [36mray::ray_local_file_ls()[39m (pid=583, ip=10.0.46.72)
  File "/tmp/ipykernel_1584/3441348648.py", line 3, in ray_local_file_ls
FileNotFoundError: [Errno 2] No such file or directory: '/tmp/ray/session_latest/ray_spilled_objects'


Second, inspect the total disk usage of the local filesystem.
Compare the results of this with and without dedicated local storage - you'll notice the space is probably larger and more used without the dedicated local storage!
This is because each Ray node is always given a `/tmp` folder to work with, and by default it is mounted from the same storage used by Domino Datasets.

If you enable local storage, this will show exactly the amount of storage you request, and the `/tmp` folder be located on the **local disk** of the node - meaning it gets better IO performance.
This is especially true if the shared Datasets storage is heavily used.

In [69]:
import shutil

@ray.remote
def ray_disk_usage(path):
    time.sleep(1)
    usage = shutil.disk_usage(path)
    print(f"{usage.used/(1024**3):.4f} of {usage.total/(1024**3):.4f}G used")

In [70]:
# Show the disk usage for each node
[ray_disk_usage.remote("/tmp/ray") for _ in ray.nodes()];

[2m[36m(ray_disk_usage pid=583)[0m 26.2702 of 999.9883G used
[2m[36m(ray_disk_usage pid=770)[0m 26.2702 of 999.9883G used
[2m[36m(ray_disk_usage pid=2233)[0m 26.2702 of 999.9883G used


We can benchmark a large data operation with and without local storage to see the difference.
The below few cells can be run without running any previous cells in the notebook.
See if you get similar results as the test run of this notebook:
* 58.40s without local storage
* 48.24s with local storage

In [71]:
import ray
import os
import random
import time
import numpy as np
import modin.pandas as mpd

In [72]:
if ray.is_initialized() == False:
    service_host = os.environ["RAY_HEAD_SERVICE_HOST"]
    service_port = os.environ["RAY_HEAD_SERVICE_PORT"]
    ray.init(f"ray://{service_host}:{service_port}")

In [76]:
dataset_path = f"/domino/datasets/local/{os.environ['DOMINO_PROJECT_NAME']}"
#dataset_path = "/domino/datasets/local/Points-For-Pi-Approximation"
#dataset_path = "/domino/datasets/Points-For-Pi-Approximation"

In [77]:
import pyarrow as pa

def process_batch(batch):
    result = (batch['x']**2 + batch['y']**2 < 1).sum()
    return pa.Table.from_pydict({"n_inside": [result]})

In [78]:
start = time.time()
large_points_ds = ray.data.read_parquet(os.path.join(dataset_path, "points_3e8_split30"))
n_total = large_points_ds.count()
n_inside = large_points_ds.map_batches(process_batch,batch_format='pandas')
pi = 4 * n_inside.sum() / n_total
print(f"Pi is approximately {pi} (from {n_total} samples in {time.time()-start:.2f}s)")

Metadata Fetch Progress: 100%|██████████| 5/5 [00:00<00:00, 65.30it/s]
Map Progress: 100%|██████████| 30/30 [00:02<00:00, 10.54it/s]
GroupBy Map: 100%|██████████| 30/30 [00:00<00:00, 297.29it/s]
GroupBy Reduce: 100%|██████████| 1/1 [00:00<00:00, 167.48it/s]

Pi is approximately 3.1417218933333335 (from 300000000 samples in 11.47s)





Smaller data results show about the same time for each

### Congratulations!

You have finished the Intermediate tutorial (part 1), where we covered the following:
* More options for remote Task management with ray.wait and ray.cancel
* Creating and scheduling remote Actors
* Using the Ray object store with ray.put
* Loading and processing data with Ray Data
* Loading and processing data with Modin
* Common pitfalls for larger-than-memory data
* Bonus - performance improvements with dedicated local storage

### What's next?

You are now familiar with the fundamental ingredients of Ray Core, and two common options for loading and manipulating data in Ray.
Stay tuned for **part 2** of the Intermediate tutorial, where we will use additional libraries like XGBoost and Ray Tune to walk through a real-world distributed machine learning example on Ray!