# Parallel computation with Ray

[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/coobas/europython-25/blob/main/98-ray.ipynb)

[Ray](https://docs.ray.io/en/latest/index.html) is a set of libraries that (among others) allow an easy parallelisation of Python tasks - both locally and also in clusters. This part is called **Ray Core**.

Apart from this, it also provides specialised libraries for data processing (**Ray Data**), for machine learning and even reinforcement learning (**Ray Train**, **Ray Train**, ...). We will not deal with those in this workshop.

In [None]:
# Run this in Google Collab, not needed if you install this package locally
!pip install numpy ray[default]

In [None]:
# Obligatory imports
import numpy as np
import polars as pl
import ray

from pathlib import Path

In [None]:
def long_running() -> int:
    """A long running task that we will parallelise."""
    import time
    time.sleep(1)
    return 42

In [None]:
%%time
long_running()

In [None]:
%%time
[long_running() for i in range(10)]
    

If these tasks are independent, we can run them in parallel. There are of course options in Python itself:

- multiprocessing (https://docs.python.org/3.13/library/multiprocessing.html) 
- threading (with GIL-releasing code or with caution in free-threaded Python 3.13+)

This does scale exactly well if there are more tasks than CPUs / GPUs on your machine...

Other options:
- [celery](https://docs.celeryq.dev/en/stable/)
- [dask](https://docs.dask.org/en/stable/index.html)

With their strengths and weaknesses. 

## Use ray

Ray always runs a server (even implicitly) and executes the task in nodes that it manages.

In [None]:
ray.init()

In [None]:
@ray.remote
def long_running_ray() -> int:
    import time
    time.sleep(1)
    return 42

In [None]:
long_running_ray()  # This will fail

In [None]:
long_running_ray.remote()

In [None]:
task_id = long_running_ray.remote()

In [None]:
ray.get(task_id)

In [None]:
%%time
task_ids = [long_running_ray.remote() for i in range(10)]
ray.get(task_ids)

## Exercise: Compute prices at many points 

In [None]:
# Constants
N_POINTS = 10   # Number of points in each dimension for the grid
LIMIT = 10.0    # +/- Span of the grid
DEFAULT_K = 4   # How many nearest neighbors to consider

In [None]:
def calculate_distances(query_points: np.ndarray, reference_points: np.ndarray) -> np.ndarray:
    """
    Calculate mutual distances between M query and N reference points.

    Returns:
    --------
    distances: np.ndarray
        (N, M) array of the distances
    """
    # Expand for broadcasting
    query_points = query_points[:, :, np.newaxis]
    reference_points = reference_points[:3, np.newaxis]
    return np.sqrt(np.sum((reference_points - query_points) ** 2, axis=0))


def knn_search(
    query_points: np.ndarray,
    reference_points: np.ndarray,
    k: int,
    distances_func=calculate_distances,
):
    """
    Find k nearest neighbour reference point indices for N query points.

    Returns:
    --------
    indices: np.ndarray
        (N, k) matrix of integral indices

    """
    distances = distances_func(query_points, reference_points).T
    nearest_indices = np.argpartition(distances, k, axis=0)[:k].T
    return nearest_indices

In [None]:
def create_point_grid(n_points: int = N_POINTS) -> tuple[np.ndarray, ...]:
    """
    Create a homogenous grid of points to create a map.

    Returns:
    --------
    x: np.ndarray
        Flattened (N_POINTS x N_POINTS,) array of x values
    y: np.ndarray
        Flattened (N_POINTS x N_POINTS,) array of x values
    """
    # TODO: Add floor
    x = np.linspace(-LIMIT, LIMIT, n_points)
    y = np.linspace(-LIMIT, LIMIT, n_points)
    return tuple(arr.flatten() for arr in np.meshgrid(x, y))


def create_query_points(n_points: int = N_POINTS, floor: int = 1) -> np.ndarray:
    """
    Create a homogenous grid of points with a floor to create a map.

    Returns:
    --------
    query_points: np.ndarray
        (n_points x n_points, 3) array of query points
    """
    x, y = create_point_grid(n_points=n_points)
    return np.vstack([x, y, np.ones(x.shape[0]) * floor])

In [None]:
def compute_prices(query_points: np.ndarray, reference_points: np.ndarray) -> np.ndarray:
    """
    Find prices for N data_points.

    Returns:
    --------
    prices: np.ndarray
        (N,) array of prices
    """
    indices = knn_search(query_points, reference_points, DEFAULT_K)
    prices: np.ndarray = reference_points[3][indices]
    return prices.mean(axis=1)


def combine_points_and_prices(
    query_points: np.ndarray, prices: np.ndarray
) -> pl.DataFrame:
    """
    Prepare human-friendly output from numpy arrays.

    Returns:
    --------
    df: pl.DataFrame
        DataFrame with columns x, y, floor, price
    """
    return pl.DataFrame(
        {
            "x": query_points[0],
            "y": query_points[1],
            "floor": query_points[2],
            "price": prices,
        }
    )

In [None]:
def load_reference_points(path: Path = Path("local_data/data.parquet")) -> np.ndarray:
    """
    Load reference data points from a Parquet file.

    Returns:
    --------
    data_points: np.ndarray
        (N, 4) array of data points with x, y, floor, and price columns
    """

    df = pl.read_parquet(path)
    return np.vstack(
        [
            df["x"].to_numpy(),
            df["y"].to_numpy(),
            df["floor"].to_numpy(),
            df["price"].to_numpy(),
        ]
    )

load_reference_points()

## Run without ray

In [None]:
reference_points = load_reference_points()
query_points = create_query_points(n_points=21)  # 21x21 grid

In [None]:
prices = compute_prices(query_points, reference_points)
prices

In [None]:
%%time
points_and_prices = combine_points_and_prices(query_points=query_points, prices=prices)
points_and_prices

In [None]:
@ray.remote
def long_running_task():
    import time
    time.sleep(60)
    return "Finished long running task"

In [None]:
ray.get(long_running_task.remote())

## Monitoring ray

Ray comes with a nice dashboard that allows you to observe running jobs. It runs in a local web server, mostly likely http://localhost:8265. This address is not accessible when running within Google Colab, and so you have to use a special trick to show a mini-window forwarded to the dashboard running in the cloud.

In [None]:
try:
    from google.colab import output
    output.serve_kernel_port_as_iframe(8265)  # The port may differ!
except ImportError:
    print("Not in google Colab. Try the local link, it might work.")

## Compute prices in ray

In [None]:
@ray.remote
def compute_prices(query_points: np.ndarray, reference_points: np.ndarray) -> np.ndarray:
    """
    Find prices for N data_points.

    Returns:
    --------
    prices: np.ndarray
        (N,) array of prices
    """
    indices = knn_search(query_points, reference_points, DEFAULT_K)
    prices: np.ndarray = reference_points[3][indices]
    return prices.mean(axis=1)

In [None]:
reference_points = load_reference_points()
query_points = create_query_points(n_points=21)  

ray.init(ignore_reinit_error=True)

prices = compute_prices.remote(query_points, reference_points)
prices

In [None]:
ray.get(prices)