# Parallel computation with Ray

[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/coobas/europython-25/blob/main/04-ray.ipynb)

[Ray](https://docs.ray.io/en/latest/index.html) is a set of libraries that (among others) allow an easy parallelisation of Python tasks - both locally and also in clusters. This part is called **Ray Core**.

Apart from this, it also provides specialised libraries for data processing (**Ray Data**), for machine learning and even reinforcement learning (**Ray Train**, **Ray Train**, ...). We will not deal with those in this workshop.

In [None]:
# Run this in Google Collab, not needed if you install this package locally
!pip install ray[default]

In [None]:
# Obligatory imports
import numpy as np
import pandas as pd
import ray
import plotly.express as px
from IPython import get_ipython

from pathlib import Path

In [None]:
is_colab = "google.colab" in str(get_ipython())

# Download the data which are part of this repo
if is_colab:
    import urllib
    url = "https://github.com/coobas/europython-25/raw/refs/heads/main/data.parquet"
    urllib.request.urlretrieve(url, "data.parquet")

In [None]:
def long_running(i: int) -> int:
    """A long running task that we will parallelise."""
    import time
    time.sleep(1)
    return i * i

In [None]:
%%time
long_running(1)

In [None]:
%%time
[long_running(i) for i in range(10)]
    

If these tasks are independent, we can run them in parallel. There are of course options in Python itself:

- multiprocessing (https://docs.python.org/3.13/library/multiprocessing.html) 
- threading (with GIL-releasing code or with caution in free-threaded Python 3.13+)

This does scale exactly well if there are more tasks than CPUs / GPUs on your machine...

Other options:
- [celery](https://docs.celeryq.dev/en/stable/)
- [dask](https://docs.dask.org/en/stable/index.html)

With their strengths and weaknesses. 

## Use ray

Ray always runs a server (even implicitly) and executes the task in nodes that it manages.

In [None]:
ray.init()

[36m(raylet)[0m Spilled 3052 MiB, 9993 objects, write throughput 809 MiB/s. Set RAY_verbose_spill_logs=0 to disable this message.
[36m(raylet)[0m Spilled 6105 MiB, 19988 objects, write throughput 1243 MiB/s.
[36m(compute_prices_jax_ray pid=119973)[0m 2025-07-14 14:33:27.954899: E external/xla/xla/stream_executor/cuda/cuda_platform.cc:51] failed call to cuInit: INTERNAL: CUDA error: Failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
[36m(raylet)[0m Spilled 9156 MiB, 29979 objects, write throughput 1186 MiB/s.


This either connects to an existing local server, or creates a new one. (In the same way, we can connect to a different one).

### Remote functions

Any function we decorate with the `ray.remote` decorator, becomes a **task** that can be submitted to this server.

In [None]:
@ray.remote
def long_running_ray(i: int) -> int:
    import time
    time.sleep(1)
    return i * i

In [None]:
long_running_ray(1)  # This will fail

Well, the error message is right. We should use `.remote` (a different one!)

Better (and see that we pass arguments the same way):

In [None]:
%%time
long_running_ray.remote(1)

What happened? The task was submitted to the cluster. But asynchronously. We have to capture the **future** object...

In [None]:
task_id = long_running_ray.remote(2)

...and get its value (synchronously):

In [None]:
ray.get(task_id)

In [None]:
%%time
task_ids = [long_running_ray.remote(i) for i in range(10)]
ray.get(task_ids)

## Exercise: Compute k nearest neighbours remotely

We will reuse our definitions of kNN functions (slightly modified) and for random points fínd their neighbours:

In [None]:
DEFAULT_K = 4   # How many nearest neighbors to consider

def calculate_distances(query_points: np.ndarray, reference_points: np.ndarray, *, n_dim: int = 3) -> np.ndarray:
    """
    Calculate mutual Euclidean distances between M query and N reference points.

    Parameters:
    ----------
    query_points: np.ndarray
        (M, n_dim+) array of query points
    reference_points: np.ndarray
        (N, n_dim+) array of reference points
    n_dim: int
        Number of dimensions to consider (default: 3, for x, y, floor)

    Returns:
    --------
    distances: np.ndarray
        (M, N) array of the distances
    """
    # Expand for broadcasting
    query_points = query_points[:, np.newaxis,:n_dim]
    reference_points = reference_points[np.newaxis, :, :n_dim]
    return np.sqrt(np.sum((reference_points - query_points) ** 2, axis=-1))


def knn_search(
    query_points: np.ndarray,
    reference_points: np.ndarray,
    k: int,
):
    """
    Find k nearest neighbour reference point indices for N query points.

    Returns:
    --------
    indices: np.ndarray
        (N, k) matrix of integral indices
    """
    distances = calculate_distances(query_points, reference_points).T
    return np.argpartition(distances, k, axis=0)[:k].T

Let's create some points to test this on:

In [None]:
def create_random_points(
    n_points: int, *, n_dim: int = 3, seed: int = 42
) -> np.ndarray:
    # TODO: Fix floor!
    np.random.seed(seed)
    return np.random.sample((n_points, n_dim))

In [None]:
query_points = create_random_points(5, seed=42)
reference_points = create_random_points(10, seed=84)

query_points

In [None]:
calculate_distances(query_points, reference_points)

First, we run this without ray (for sizable arrays):

In [None]:
%%time
query_points = create_random_points(32768, seed=42)
reference_points = create_random_points(1000, seed=84)
knn_search(query_points, reference_points, k=DEFAULT_K)

**Task**

1. Change the knn_search function into a task.
2. Submit the computation to ray and compare the results.
3. Look at the execution times (and compare to local run)

In [None]:
@ray.remote
def knn_search_ray(
    query_points: np.ndarray,
    reference_points: np.ndarray,
    k: int,
):
    """
    Find k nearest neighbour reference point indices for N query points.

    Returns:
    --------
    indices: np.ndarray
        (N, k) matrix of integral indices
    """
    distances = calculate_distances(query_points, reference_points).T
    return np.argpartition(distances, k, axis=0)[:k].T

In [None]:
%%time
knn_id = knn_search_ray.remote(query_points, reference_points, k=DEFAULT_K)
knn_results = ray.get(knn_id)

**Question:** Is there any improvement yet?

## Monitoring ray

Ray comes with a nice dashboard that allows you to observe running jobs. It runs in a local web server, mostly likely http://localhost:8265. This address is not accessible when running within Google Colab, and so you have to use a special trick to show a mini-window forwarded to the dashboard running in the cloud.

In [None]:
is_colab = "google.colab" in str(get_ipython())

if is_colab:
    from google.colab import output
    output.serve_kernel_port_as_iframe(8265)  # The port may differ!
else:
    print("Not in google Colab. Try the local link, it should work.")

Now we submit something really huge:

In [None]:
query_points = create_random_points(10000000, seed=42)
reference_points = create_random_points(100, seed=84)
task_id = knn_search_ray.remote(query_points, reference_points, k=DEFAULT_K)


In [None]:
%%time
ray.get(task_id)

## Exercise: Parallelize kNN execution

Ray by itself does not do any optimisations or segments the tasks you send it. So you need to care about splitting the task yourself. Sometimes this is easy, sometimes it requires a deeper thought (or a library ;-)).

If you want to improve the computation of nearest neighbours for a bunch of **independent** points, this is really easy ("embarassingly parallel problem). Just take the 1000000 query points and split them into **batches**:

In [None]:
def split_into_batches(array: np.ndarray, max_size: int) -> list[np.ndarray]:
    """
    Split an array into smaller batches of a maximum size.

    Parameters:
    ----------
    array: np.ndarray
        The array to split.
    max_size: int
        The maximum size of each batch.

    Returns:
    --------
    list[np.ndarray]
        A list of arrays, each with a maximum size of `max_size`.
    """
    return [array[i:i + max_size] for i in range(0, len(array), max_size)]

In [None]:
split_into_batches(np.arange(10), 3)

**Task**: 
1. Split the query points accordingly (you may experiment with batch_size, 1000 is pretty ok)
2. Submit and retrieve the results for the batches
3. Combine the results back into one array ([np.vstack](https://numpy.org/doc/stable/reference/generated/numpy.vstack.html) is your friend)

In [None]:
query_points = create_random_points(10000000, seed=42)
reference_points = create_random_points(100, seed=84)

batches = split_into_batches(query_points, 1000)
batches

In [None]:
%%time
task_ids = [
    knn_search_ray.remote(batch, reference_points, k=DEFAULT_K) for batch in batches
]
knn_results = ray.get(task_ids)
knn_results

In [None]:
np.vstack(knn_results)

**Question** Can we parallelise in the reference points dimension?

## Example: Predicting the housing prices with a database of reference points

We can use the distances to predict the housing prices (in arbitrary units, perhaps the monthly rent in $ for a single-bedroom apartment?) based on our custom regression implementation. 

In [None]:
def compute_prices(query_points, reference_points, k: int = DEFAULT_K):
    """
    Find prices for N data_points.

    Parameters:
    query_points: np.ndarray
        (N, 3) array of query points
    reference_points: np.ndarray
        (M, 4) array of data points with x, y, floor, and price
    k: int
        Number of nearest neighbors to consider

    Returns:
    --------
    prices: np.ndarray
        (N,) array of prices
    """
    indices = knn_search(query_points, reference_points, k)
    prices: np.ndarray = reference_points[indices, 3]
    return prices.mean(axis=1)


We have predefined reference points in an external file, modeled using an (unknown?) analytical function:

In [None]:
is_colab = "google.colab" in str(get_ipython())

# Download the data which are part of this repo
if is_colab:
    import urllib
    url = "https://github.com/coobas/europython-25/raw/refs/heads/main/data.parquet"
    urllib.request.urlretrieve(url, "data.parquet")
    print("Data downloaded to data.parquet.")
else:
    print("Not in Google Colab, skipping data download.")

In [None]:
def load_reference_points_df(path: Path = Path("../data.parquet")) -> np.ndarray:
    """
    Load reference data points from a parquet file.

    Returns:
    --------
    data_points: np.ndarray
        (N, 4) array of data points with x, y, floor, and price columns
    """

    return pd.read_parquet(path)
    # return df[["x", "y", "floor", "price"]].to_numpy().astype(float)

reference_points_df = load_reference_points_df()
reference_points_df

And we also want to run something against this. Let's start with random points:

In [None]:
def create_random_3d_points(
        n_points: int = 100, *, ranges: list[tuple[float, float]] = [(-10, 10), (-10, 10), (1, 20)], ) -> np.ndarray:
    """
    Create random 3D points within specified ranges.

    Parameters:
    ----------
    n_points: int
        Number of points to generate.    
    ranges: list[tuple[float, float]]
        List of tuples specifying the range for each dimension.


    Returns:
    --------
    points: np.ndarray
        (n_points, len(ranges)) array of random points.
    """
    if not ranges or len(ranges) != 3:
        raise ValueError("Ranges must be a list of three tuples for x, y, and floor dimensions.")
    df = np.random.uniform(
        low=[r[0] for r in ranges],
        high=[r[1] for r in ranges],
        size=(n_points, len(ranges))
    )
    df[:, 2] = df[:, 2].astype(int)  # Ensure the floor is an integer
    return df

query_points = create_random_3d_points()
query_points

Our query points need to live in the same dimension (i.e. x,y between -10 and 10, floor between 1 and 20). Note that the floor distibution probably is not "uniform", but for demonstration purposes, it should not matter that much.

Let's see how the whole thing looks without ray:

In [None]:
prices = compute_prices(query_points, reference_points_df.to_numpy(), k=DEFAULT_K)
prices

In [None]:
def combine_points_and_prices(
    query_points: np.ndarray, prices: np.ndarray
) -> pd.DataFrame:
    """
    Prepare human-friendly output from numpy arrays.

    Parameters:
    ----------
    query_points: np.ndarray
        (N, 3) array of query points
    prices: np.ndarray
        (N,) array of prices

    Returns:
    --------
    df: pd.DataFrame
        DataFrame with columns x, y, floor, price
    """
    return pd.DataFrame(
        {
            "x": query_points[:,0],
            "y": query_points[:,1],
            "floor": query_points[:,2].astype(int),
        }
    )

In [None]:
combine_points_and_prices(query_points, prices)

We already know how to parallelise this:

In [None]:
# Note we can apply the decorator directly to the function
compute_prices_ray = ray.remote(compute_prices)

In [None]:
%%time
query_points = create_random_3d_points(100_000)
batches = split_into_batches(query_points, 1_000)
task_ids = [
    compute_prices_ray.remote(batch, reference_points_df.to_numpy(), k=DEFAULT_K) 
    for batch in batches
]
np.hstack(ray.get(task_ids))

This is relatively fine but mind that we are passing the same reference points over and over again to ray (which requires repeated serialisation, ...). It does not matter that much in our case but if the reference points dataframe were larger, we might see a significant performance penalty.

### Storing objects in ray

What we can do instead, is to pass the object to ray just once and pass its **object id**. Ray automatically cares about using the referenced value.

Let's take it to the extreme and compute the price for each of 10,000 points in a separate task:

In [None]:
%%time
query_points = create_random_3d_points(10_000)
batches = split_into_batches(query_points, 1_000)
task_ids = [
    compute_prices_ray.remote(batch, reference_points_df.to_numpy(), k=DEFAULT_K) 
    for batch in batches
]
ray.get(task_ids);

In [None]:
%%time
query_points = create_random_3d_points(10_000)
batches = split_into_batches(query_points, 1)
task_ids = [
    compute_prices_ray.remote(batch, reference_points_df.to_numpy(), k=DEFAULT_K) 
    for batch in batches
]
ray.get(task_ids);

Do you see the degraded performance?

Of course it is mostly due to creating a task per point but part of the penalty is coming from passing the array so many times.

Putting objects into a ray cluster is very simple, actually:

In [None]:
i = 9
arg_id = ray.put(i)
arg_id

In [None]:
ray.get(long_running_ray.remote(arg_id))

**Task**

1. Modify the `compute_prices_ray` from above so that `reference_points_df.to_numpy()` is stored in the ray cluster and reused in each task.
2. Compare the execution time.

In [None]:
%%time
query_points = create_random_3d_points(10000)
batches = split_into_batches(query_points, 1)
reference_id = ray.put(reference_points_df.to_numpy())
task_ids = [
    compute_prices_ray.remote(batch, reference_id, k=DEFAULT_K) 
    for batch in batches
]
ray.get(task_ids);

Is it better now?

**Question**: Would putting query points (either whole or batched) to ray help as well?

## TODO: Optional: Actors

We would like to create a map of prices, i.e. sample data in a grid over the allowed area and use our kNN model to predict a price for each of those (given the floor as a parameter). 

In [74]:
class PriceMap:
    DEFAULT_N_POINTS = 10
    DEFAULT_LIMIT = 10
    DEFAULT_BATCH_SIZE = 1000

    def __init__(self, reference_points: np.ndarray, *, floor: int = 1, n_points: int = DEFAULT_N_POINTS):
        self.floor = floor
        self.n_points = n_points
        self.reference_points = reference_points
        self.limit = self.DEFAULT_LIMIT
        self.create_query_points()

    def create_query_points(self) -> None:
        """
        Create a homogenous grid of points to create a map.

        Returns:
        --------
        query_points: np.ndarray
            (n_points x n_points, 3) array of query points
        """
        x = np.linspace(-self.limit, self.limit, self.n_points)
        y = np.linspace(-self.limit, self.limit, self.n_points)
        x, y = np.meshgrid(x, y)
        self.query_points = np.vstack([x.flatten(), y.flatten(), np.ones(x.flatten().shape[0]) * self.floor]).T
    
    def compute_prices(self) -> np.ndarray:
        """
        Compute prices for the query points based on the reference points.

        Returns:
        --------
        prices: np.ndarray
            (n_points x n_points,) array of prices for each query point
        """
        return compute_prices(self.query_points, self.reference_points, k=DEFAULT_K)


In [77]:
%%time
map = PriceMap(reference_points_df.to_numpy(), floor=1, n_points=100)
map.compute_prices()

CPU times: user 1.62 s, sys: 295 ms, total: 1.92 s
Wall time: 1.93 s


array([502.62571959, 502.62571959, 502.62571959, ..., 497.50754281,
       497.50167414, 497.50167414], shape=(10000,))

In [None]:
%%time
@ray.remote
class MapActor(PriceMap):
    ...
    

map_actor = MapActor.remote(reference_points_df.to_numpy(), floor=1, n_points=100)
prices_id = map_actor.compute_prices.remote()
ray.get(prices_id)

CPU times: user 20.4 ms, sys: 4.72 ms, total: 25.1 ms
Wall time: 2.45 s


array([502.62571959, 502.62571959, 502.62571959, ..., 497.50754281,
       497.50167414, 497.50167414], shape=(10000,))

In [81]:
%%time
@ray.remote
class MapBatchActor(PriceMap):
    ... 
    
    def compute_prices(self):
        """
        Compute prices for the query points based on the reference points.

        Returns:
        --------
        prices: np.ndarray
            (n_points x n_points,) array of prices for each query point
        """
        prices = [
            compute_prices_ray.remote(batch, self.reference_points, k=DEFAULT_K) 
            for batch in split_into_batches(self.query_points, self.DEFAULT_BATCH_SIZE)
        ]
        return np.hstack(ray.get(prices))
    
map_actor = MapBatchActor.remote(reference_points_df.to_numpy(), floor=1, n_points=100)
prices_id = map_actor.compute_prices.remote()
ray.get(prices_id)

CPU times: user 711 ms, sys: 258 ms, total: 969 ms
Wall time: 2min 36s


array([502.62571959, 502.62571959, 502.62571959, ..., 497.50754281,
       497.50167414, 497.50167414], shape=(10000,))

# TODO: Check that this works in Colab

In [87]:
import jax.numpy as jnp


def calculate_distances_jax(query_points: jnp.ndarray, reference_points: jnp.ndarray, *, n_dim: int = 3) -> jnp.ndarray:
    query_points = query_points[:, jnp.newaxis, :n_dim]
    reference_points = reference_points[jnp.newaxis, :, :n_dim]
    return jnp.sqrt(jnp.sum((reference_points - query_points) ** 2, axis=-1))


@ray.remote
def compute_prices_jax_ray(query_points, reference_points, k=DEFAULT_K):
    distances = calculate_distances_jax(query_points, reference_points).T
    indices = jnp.argpartition(distances, k, axis=0)[:k].T
    return jnp.mean(reference_points[indices, 3], axis=1)


@ray.remote
class MapJaxBatchActor(PriceMap):
    ...

    def compute_prices(self):
        """
        Compute prices for the query points based on the reference points.

        Returns:
        --------
        prices: np.ndarray
            (n_points x n_points,) array of prices for each query point
        """
        prices = [
            compute_prices_jax_ray.remote(batch, self.reference_points, k=DEFAULT_K) 
            for batch in split_into_batches(self.query_points, self.DEFAULT_BATCH_SIZE)
        ]
        return jnp.hstack(ray.get(prices))

map_actor = MapJaxBatchActor.remote(reference_points_df.to_numpy(), floor=1, n_points=5)
prices_id = map_actor.compute_prices.remote()
ray.get(prices_id)

RayTaskError(RuntimeError): [36mray::MapJaxBatchActor.compute_prices()[39m (pid=150796, ip=192.168.1.23, actor_id=ebede1e1a65b7accf6c7e74401000000, repr=<__main__.MapJaxBatchActor object at 0x7f5288a96f60>)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/tmp/ipykernel_119217/294526014.py", line 34, in compute_prices
           ^^^^^^^^^^^^^^^^^^^
           ^^^^^^^^^^^^^^^^^^^^^
                                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
ray.exceptions.RayTaskError(RuntimeError): [36mray::compute_prices_jax_ray()[39m (pid=119973, ip=192.168.1.23)
              ^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/jan/code/collaboration/europython-25/.venv/lib/python3.12/site-packages/jax/_src/xla_bridge.py", line 943, in _init_backend
    backend = registration.factory()
              ^^^^^^^^^^^^^^^^^^^^^^
  File "/home/jan/code/collaboration/europython-25/.venv/lib/python3.12/site-packages/jax/_src/xla_bridge.py", line 633, in factory
    return xla_client.make_c_api_client(plugin_name, updated_options, None)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/jan/code/collaboration/europython-25/.venv/lib/python3.12/site-packages/jaxlib/xla_client.py", line 159, in make_c_api_client
    return _xla.get_c_api_client(plugin_name, options, distributed_client)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
jaxlib.xla_extension.XlaRuntimeError: FAILED_PRECONDITION: No visible GPU devices.

During handling of the above exception, another exception occurred:

[36mray::compute_prices_jax_ray()[39m (pid=119973, ip=192.168.1.23)
  File "/tmp/ipykernel_119217/294526014.py", line 12, in compute_prices_jax_ray
  File "/tmp/ipykernel_119217/294526014.py", line 7, in calculate_distances_jax
  File "/home/jan/code/collaboration/europython-25/.venv/lib/python3.12/site-packages/jax/_src/numpy/reductions.py", line 310, in sum
    return _reduce_sum(a, axis=_ensure_optional_axes(axis), dtype=dtype, out=out,
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
RuntimeError: Unable to initialize backend 'cuda': FAILED_PRECONDITION: No visible GPU devices. (you may need to uninstall the failing plugin package, or set JAX_PLATFORMS=cpu to skip this backend.)
--------------------
For simplicity, JAX has removed its internal frames from the traceback of the following exception. Set JAX_TRACEBACK_FILTERING=off to include these.
--------------------
For simplicity, JAX has removed its internal frames from the traceback of the following exception. Set JAX_TRACEBACK_FILTERING=off to include these.

TODO: Create an actor?

In [None]:
def draw_points(points: np.ndarray) -> None:
    """
    Draw points on a map.

    Parameters:
    -----------
    points: np.ndarray
        (N, 3) array of points to draw
    """
    df = pd.DataFrame({"x": points[:,0], "y": points[:,1]})
    fig = px.scatter(df, x="x", y="y", title="Query points")
    fig.show()

draw_points(create_query_points(5))

## TODO: Optional: ray + jax