### Dependencies

In [1]:
## Only for this notebook
from pprint import pprint
from typing import Any, Dict

import ray

## Package
from ezRay import MultiCoreExecutionTool

## Step 1: Specify ezRay

In [2]:
## Instance metadata
##### IMPORTANT: In this step you can apecify any other keyword argument labeled "for remote functions"
##### found in https://docs.ray.io/en/latest/ray-core/api/doc/ray.init.html.

# -> 'num_cpu' is the number of virtual CPU cores to link with your instance.
# -> 'num_gpu' is the number of GPU devices to link with your instance.
# -> 'address' is the address of the Ray cluster to connect to. If None, a new cluster will be created locally.
instance_metadata: dict = {"num_cpus": 1, "num_gpus": 0, "address": None}

## Task metadata
##### IMPORTANT: In this step you can specify any other keyword argument labeled "for remote functions"
##### found in https://docs.ray.io/en/latest/ray-core/api/doc/ray.remote.html.

# We keep a few default values for the task metadata.
# -> 'num_cpus' is the number of CPU cores to allocate for each task.
# -> 'num_gpus' is the number of GPU devices to allocate for each task.
# -> 'num_returns' is the number of return values from each task.

task_metadata: dict = {"num_cpus": 1, "num_gpus": 0, "num_returns": 1}

## Behavioral flags
# -> 'AutoArchive' is a boolean flag that will force the archiving of results when between runs and upon data update.
# -> 'AutoContinue' is a boolean flag that will force the tool to automatically prepare for the next run after the current run is completed. If AutoArchive is enabled, the tool will automatically archive the results before preparing for the next run.
# -> 'SingleShot' is a boolean flag that will force the tool to run once and return the results. Archiving will be disabled and the next run automatically prepared. This is meant to be used if the tool is used as a function or part of a pipeline.
# -> 'AutoLaunchDashboard' is a boolean flag to launch the Dask dashboard on cluster initialization.
# -> 'ListenerSleeptime' adds an artificial delay when monitoring the cluster status.
# -> 'silent' is a boolean flag to suppress all output.
# -> 'DEBUG' is a boolean flag to enable debug output.
verbosity_flags: dict = {
    "AutoArchive": True,
    "AutoContinue": False,
    "SingleShot": False,
    "AutoLaunchDashboard": False,
    "ListenerSleeptime": 0.0,
    "silent": False,
    "DEBUG": False,
}

## Assembly
RuntimeMetadata: Dict[str, Any] = {
    "instance_metadata": instance_metadata,
    "task_metadata": task_metadata,
}
print("Runtime Metadata:")
pprint(RuntimeMetadata)
print("\n")


# Example data
RuntimeData: dict = {i: {"ID": i} for i in range(10)}
print("Example data:")
pprint(RuntimeData)

Runtime Metadata:
{'instance_metadata': {'address': None, 'num_cpus': 1, 'num_gpus': 0},
 'task_metadata': {'num_cpus': 1, 'num_gpus': 0, 'num_returns': 1}}


Example data:
{0: {'ID': 0},
 1: {'ID': 1},
 2: {'ID': 2},
 3: {'ID': 3},
 4: {'ID': 4},
 5: {'ID': 5},
 6: {'ID': 6},
 7: {'ID': 7},
 8: {'ID': 8},
 9: {'ID': 9}}


## Step 2: Initialize ezRay Instance

In [3]:
## We create a MultiCoreExecutionTool instance
# -> We can either directly provice the RuntimeData or use the 'update_data' method to add data
MultiCore: MultiCoreExecutionTool = MultiCoreExecutionTool(
    RuntimeData, **RuntimeMetadata, **verbosity_flags
)

2025-03-09 12:45:25,320	INFO worker.py:1832 -- Started a local Ray instance. View the dashboard at [1m[32mhttp://127.0.0.1:8265 [39m[22m


In [6]:
## Update data
# -> We can either directly provice the RuntimeData or use the 'update_data' method to add data
# e.g.
MultiCore.update_data(RuntimeData)

## Update metadata
# -> We can update the task metadata using the 'update_metadata' method
# e.g.
MultiCore.update_metadata(
    task_metadata={"num_cpus": 1, "num_gpus": 0, "num_returns": 1},
    instance_metadata={"num_cpus": 1, "num_gpus": 0},
)

### Step 2.5: Launch ray dashboard (optional)

In [7]:
## Lauching the dashboard to keep track of the cluster and the tasks
MultiCore.launch_dashboard()

True

## Step 3: Run ezRay

In [8]:
## Run a ray-remote testfunction
# We can forward any ray-remote function to the MultiCoreExecutionTool instance.
# The function will be executed on the remote cluster.
# The function must be defined in a separate module or script.

## DEMO function
# this will just return the input dictionary in a structured manner
@ray.remote(num_cpus=1, num_returns=1)
def remote_test_function(kwargs) -> Dict[Any, Any]:
    """Test function for the framework that merely forwards the input."""
    return {k: v for k, v in kwargs.items()}


## Pass the function to the MultiCoreExecutionTool instance
MultiCore.run(remote_test_function)

## Get the results
pprint(MultiCore.get_results())

## Move to next task
MultiCore.next()

Scheduling Workers:   0%|          | 0/10 [00:00<?, ?it/s]

Workers:   0%|          | 0/10 [00:00<?, ?it/s]

CPU usage:   0%|          |

RAM usage:   0%|          |

{0: {'result': {'ID': 0}, 'status': 'retrieved'},
 1: {'result': {'ID': 1}, 'status': 'retrieved'},
 2: {'result': {'ID': 2}, 'status': 'retrieved'},
 3: {'result': {'ID': 3}, 'status': 'retrieved'},
 4: {'result': {'ID': 4}, 'status': 'retrieved'},
 5: {'result': {'ID': 5}, 'status': 'retrieved'},
 6: {'result': {'ID': 6}, 'status': 'retrieved'},
 7: {'result': {'ID': 7}, 'status': 'retrieved'},
 8: {'result': {'ID': 8}, 'status': 'retrieved'},
 9: {'result': {'ID': 9}, 'status': 'retrieved'}}


True

In [9]:
## Run a local test function
# We can also run a local function on the remote cluster.
# The function must be defined in a separate module or script.
# The function will be wrapped as a remote worker and then passed to the MultiCoreExecutionTool instance.

## DEMO function
# this will just return the input dictionary in a structured manner
def local_test_function(**kwargs):
    return {k: v for k, v in kwargs.items()}


## Pass the function to the MultiCoreExecutionTool instance
MultiCore.run(local_test_function)

## Get the results
pprint(MultiCore.get_results())

## Move to next task
MultiCore.next()

Scheduling Workers:   0%|          | 0/10 [00:00<?, ?it/s]

Workers:   0%|          | 0/10 [00:00<?, ?it/s]

CPU usage:   0%|          |

RAM usage:   0%|          |

{0: {'result': {'ID': 0}, 'status': 'retrieved'},
 1: {'result': {'ID': 1}, 'status': 'retrieved'},
 2: {'result': {'ID': 2}, 'status': 'retrieved'},
 3: {'result': {'ID': 3}, 'status': 'retrieved'},
 4: {'result': {'ID': 4}, 'status': 'retrieved'},
 5: {'result': {'ID': 5}, 'status': 'retrieved'},
 6: {'result': {'ID': 6}, 'status': 'retrieved'},
 7: {'result': {'ID': 7}, 'status': 'retrieved'},
 8: {'result': {'ID': 8}, 'status': 'retrieved'},
 9: {'result': {'ID': 9}, 'status': 'retrieved'}}


True

In [10]:
## Run in batch mode
# We can run multiple functions in batch mode.
# The functions must be defined in a separate module or script.

## DEMO function
# this will just return the input dictionary in a structured manner
def local_test_function(**kwargs):
    return {k: v for k, v in kwargs.items()}


## Pass the function to the MultiCoreExecutionTool instance
MultiCore.batch(local_test_function, runIDs=2)

## Get the results
pprint(MultiCore.get_results())

## Move to next task
MultiCore.next()

Scheduling Workers:   0%|          | 0/2 [00:00<?, ?it/s]

Workers:   0%|          | 0/2 [00:00<?, ?it/s]

CPU usage:   0%|          |

RAM usage:   0%|          |

Pending results found. Use the "run()" method to get results.
{0: {'result': {'ID': 0}, 'status': 'retrieved'},
 1: {'result': {'ID': 1}, 'status': 'retrieved'},
 2: {'result': None, 'status': 'pending'},
 3: {'result': None, 'status': 'pending'},
 4: {'result': None, 'status': 'pending'},
 5: {'result': None, 'status': 'pending'},
 6: {'result': None, 'status': 'pending'},
 7: {'result': None, 'status': 'pending'},
 8: {'result': None, 'status': 'pending'},
 9: {'result': None, 'status': 'pending'}}
Pending results found. Continuing to next task.


True

In [11]:
MultiCore.archive_results()

True

### Dont forget to shutdown the cluster after use.

In [12]:
## Shutdown the cluster
MultiCore.shutdown()