diff --git a/orchestrator/api/state/queue.py b/orchestrator/api/state/queue.py index e94b79e8..31ae7f57 100644 --- a/orchestrator/api/state/queue.py +++ b/orchestrator/api/state/queue.py @@ -42,7 +42,7 @@ def __init__(self): starts the background monitoring coroutine. """ self.logger = logging.getLogger("QueueMonitorActor") - self.shared_queue = MeasurementQueue.get_measurement_queue() + self.shared_queue = MeasurementQueue() # The in-memory store: experiment reference → request ID → request instance. self.requests_memory_storage: dict[ diff --git a/orchestrator/cli/resources/operation/create.py b/orchestrator/cli/resources/operation/create.py index 40bd5e58..d7840593 100644 --- a/orchestrator/cli/resources/operation/create.py +++ b/orchestrator/cli/resources/operation/create.py @@ -40,6 +40,7 @@ def create_operation(parameters: AdoCreateCommandParameters): import orchestrator.modules.operators.orchestrate from orchestrator.modules.actuators.base import MeasurementError + from orchestrator.modules.operators.base import InterruptedOperationError try: op_resource_configuration = ( @@ -164,6 +165,13 @@ def create_operation(parameters: AdoCreateCommandParameters): stderr=True, ) raise typer.Exit(1) from e + except InterruptedOperationError as e: + console_print( + f"{ERROR}Created operation with identifier {magenta(e.operation_identifier)} " + "but it was interrupted.", + stderr=True, + ) + raise typer.Exit(3) from None except KeyboardInterrupt as e: console_print( f"{INFO}Operation creation has been stopped due to a keyboard interrupt.", diff --git a/orchestrator/modules/actuators/measurement_queue.py b/orchestrator/modules/actuators/measurement_queue.py index 58d4b089..1948d485 100644 --- a/orchestrator/modules/actuators/measurement_queue.py +++ b/orchestrator/modules/actuators/measurement_queue.py @@ -1,25 +1,27 @@ # Copyright (c) IBM Corporation # SPDX-License-Identifier: MIT -import logging - import ray.util.queue class MeasurementQueue(ray.util.queue.Queue): - """Actuators place completed MeasurementRequests in the singleton instance of this queue for addition to the active DiscoverySpace""" + """Class used to relay measurements for an explore operation + + The DiscoverySpaceManager and all Actuators in an explore operation MUST + use the same MeasurementQueue instance.""" - # stateUpdateQueue = None + def __init__(self, maxsize=0, ray_namespace: str | None = None): + """Parameters: - @classmethod - def get_measurement_queue(cls, maxsize=0): - """This returns the singleton measurement queue for the current explore operation + ray_namespace: The namespace of the operation this queue is for. + Can be None in which case this indicates it is the default ray namespace for the job + (get_runtime_context().namespace). + """ - However, its only singleton in the process that creates it. - i.e. if you execute locally and in a remote actor you will get different objects. - Solution: Pass the queue to any remote objects that need it""" + super().__init__(maxsize=maxsize) + self._ray_namespace = ray_namespace - log = logging.getLogger() - log.debug(f"Getting measurement queue via {cls}") + def ray_namespace(self) -> str: + """Returns the ray namespace the operation actors are using""" - return cls(maxsize=maxsize) + return self._ray_namespace diff --git a/orchestrator/modules/operators/_cleanup.py b/orchestrator/modules/operators/_cleanup.py index 37031ffd..970e02c8 100644 --- a/orchestrator/modules/operators/_cleanup.py +++ b/orchestrator/modules/operators/_cleanup.py @@ -2,56 +2,39 @@ # SPDX-License-Identifier: MIT import logging -import signal import typing +from collections import OrderedDict +from collections.abc import Callable import ray from ray.actor import ActorHandle -shutdown = False +shutdown_signal_received = False CLEANER_ACTOR = "resource_cleaner" moduleLog = logging.getLogger("orchestration_cleanup") +cleanup_callback_functions: dict[str, Callable[[], None]] = OrderedDict() -def graceful_operation_shutdown(): - - global shutdown - - if not shutdown: - import time - - moduleLog.info("Shutting down gracefully") - - shutdown = True - - moduleLog.debug("Cleanup custom actors") - try: - cleaner_handle = ray.get_actor(name=CLEANER_ACTOR) - ray.get(cleaner_handle.cleanup.remote()) - # deleting a cleaner actor. It is detached one, so has to be deleted explicitly - ray.kill(cleaner_handle) - except Exception as e: - moduleLog.warning(f"Failed to cleanup custom actors {e}") - - moduleLog.info("Shutting down Ray...") - ray.shutdown() - moduleLog.info("Waiting for logs to flush ...") - time.sleep(10) - moduleLog.info("Graceful shutdown complete") - else: - moduleLog.info("Graceful shutdown already completed") - - -def graceful_operation_shutdown_handler() -> ( +def graceful_operation_shutdown_signal_handler() -> ( typing.Callable[[int, typing.Any | None], None] ): + """Handler which executes cleanup callbacks registered by operations on receiving a signal""" def handler(sig, frame): - moduleLog.warning(f"Got signal {sig}") - moduleLog.warning("Calling graceful shutdown") - graceful_operation_shutdown() + moduleLog.critical(f"Got signal {sig}") + global shutdown_signal_received + global cleanup_callback_functions + + if shutdown_signal_received: + moduleLog.info("Graceful shutdown already completed") + + shutdown_signal_received = True + moduleLog.warning("Calling cleanup callbacks") + for entry in cleanup_callback_functions: + moduleLog.info(f"Cleaning {entry}") + cleanup_callback_functions[entry]() return handler @@ -91,16 +74,11 @@ def cleanup(self) -> None: moduleLog.info(f"cleaned {len(done)}, clean failed {len(not_done)}") -def initialize_resource_cleaner(): +def initialize_ray_resource_cleaner(namespace=None): # create a cleaner actor. # We are creating Named detached actor (https://docs.ray.io/en/latest/ray-core/actors/named-actors.html) # so that we do not need to pass its handle (can get it by name) and it does not go out of scope, until # we explicitly kill it ResourceCleaner.options( - name=CLEANER_ACTOR, get_if_exists=True, lifetime="detached" + name=CLEANER_ACTOR, get_if_exists=True, lifetime="detached", namespace=namespace ).remote() - # Create a default handler that will clean up the ResourceCleaner - # Orchestration functions that require more complex shutdown can replace this handler - signal.signal( - signalnum=signal.SIGTERM, handler=graceful_operation_shutdown_handler() - ) diff --git a/orchestrator/modules/operators/_explore_orchestration.py b/orchestrator/modules/operators/_explore_orchestration.py index 05766735..8235d55c 100644 --- a/orchestrator/modules/operators/_explore_orchestration.py +++ b/orchestrator/modules/operators/_explore_orchestration.py @@ -8,24 +8,21 @@ import ray import ray.util.queue -import orchestrator.core -import orchestrator.modules -import orchestrator.modules.operators._cleanup from orchestrator.core import OperationResource from orchestrator.core.discoveryspace.space import DiscoverySpace from orchestrator.core.operation.config import ( FunctionOperationInfo, OperatorModuleConf, - get_actuator_configurations, - validate_actuator_configurations_against_space_configuration, ) from orchestrator.core.operation.operation import OperationOutput from orchestrator.modules.actuators.measurement_queue import MeasurementQueue -from orchestrator.modules.actuators.registry import ActuatorRegistry from orchestrator.modules.module import load_module_class_or_function from orchestrator.modules.operators._cleanup import ( CLEANER_ACTOR, - initialize_resource_cleaner, + cleanup_callback_functions, + graceful_operation_shutdown_signal_handler, + initialize_ray_resource_cleaner, + shutdown_signal_received, ) from orchestrator.modules.operators._orchestrate_core import ( _run_operation_harness, @@ -41,119 +38,90 @@ if typing.TYPE_CHECKING: from orchestrator.modules.actuators.base import ActuatorActor - from orchestrator.modules.operators.base import OperatorActor + from orchestrator.modules.operators.base import ( + OperatorActor, + ) from orchestrator.modules.operators.discovery_space_manager import ( DiscoverySpaceManagerActor, ) def graceful_explore_operation_shutdown( + identifier: str, operator: "OperatorActor", state: "DiscoverySpaceManagerActor", actuators: list["ActuatorActor"], + namespace: str, timeout=60, ): - if not orchestrator.modules.operators._cleanup.shutdown: - import time - - from rich.console import Console - - moduleLog.info("Shutting down gracefully") - - orchestrator.modules.operators._cleanup.shutdown = True - - # - # Shutdown process - # 1. Shutdown state calling onComplete on operation and metricServer and ensuring metrics are flushed - # 2. Shutdown custom actors - # 3. Send graceful __ray_terminate__ to metric_server, operation and actuators - - # This should not return until the metric server has processed all updates. - - console = Console() - with console.status( - "Shutdown - waiting on all samples to be stored", spinner="dots" - ) as status: + from rich.console import Console - moduleLog.debug("Shutting down state") - promise = state.shutdown.remote() - ray.get(promise) + moduleLog.info(f"Shutting down operation {identifier} gracefully") - status.update("Shutdown - cleanup") + # + # Shutdown process + # 1. Shutdown state calling onComplete on operation and metricServer and ensuring metrics are flushed + # 2. Shutdown custom actors + # 3. Send graceful __ray_terminate__ to metric_server, operation and actuators + + # This should not return until the metric server has processed all updates. + + console = Console() + with console.status( + f"Shutdown ({identifier}) - waiting on all samples to be stored", spinner="dots" + ) as status: + + moduleLog.debug("Shutting down state") + ray.get(state.shutdown.remote()) + + status.update(f"Shutdown ({identifier}) - cleaning up custom actors") + + # ResourceCleaner cleanup before killing actors + try: + cleaner_handle = ray.get_actor(name=CLEANER_ACTOR, namespace=namespace) + moduleLog.debug(f"Calling cleanup on {cleaner_handle}") + ray.get(cleaner_handle.cleanup.remote()) + ray.kill(cleaner_handle) + except Exception as e: + moduleLog.warning(f"Failed to cleanup custom actors {e}") + + status.update( + f"Shutdown ({identifier}) - waiting for actors to terminate (max 60s)" + ) - moduleLog.debug("Cleanup custom actors") - try: - cleaner_handle = ray.get_actor(name=CLEANER_ACTOR) - ray.get(cleaner_handle.cleanup.remote()) - # deleting a cleaner actor. It is detached one, so has to be deleted explicitly - ray.kill(cleaner_handle) - except Exception as e: - moduleLog.warning(f"Failed to cleanup custom actors {e}") + terminating_actors = [ + operator.__ray_terminate__.remote(), + state.__ray_terminate__.remote(), + ] + # __ray_terminate allows atexit handlers of actors to run + # see https://docs.ray.io/en/latest/ray-core/api/doc/ray.kill.html + terminating_actors.extend([a.__ray_terminate__.remote() for a in actuators]) + n_actors = len(terminating_actors) + moduleLog.debug(f"waiting for graceful shutdown of {n_actors} actors") - status.update("Shutdown - waiting for actors to terminate") + actors = [operator] + actors.extend(actuators) - wait_graceful = [ - operator.__ray_terminate__.remote(), - state.__ray_terminate__.remote(), - ] - # __ray_terminate allows atexit handlers of actors to run - # see https://docs.ray.io/en/latest/ray-core/api/doc/ray.kill.html - wait_graceful.extend([a.__ray_terminate__.remote() for a in actuators]) - n_actors = len(wait_graceful) - moduleLog.debug(f"waiting for graceful shutdown of {n_actors} actors") + lookup = dict(zip(terminating_actors, actors)) - actors = [operator] - actors.extend(actuators) + moduleLog.debug(f"Shutdown waiting on {lookup}") + moduleLog.debug( + f"Gracefully stopping actors - will wait {timeout} seconds ..." + ) + terminated, active = ray.wait( + ray_waitables=terminating_actors, num_returns=n_actors, timeout=timeout + ) - lookup = dict(zip(wait_graceful, actors)) + moduleLog.debug(f"Terminated: {terminated}") + moduleLog.debug(f"Active: {active}") - moduleLog.debug(f"Shutdown waiting on {lookup}") - moduleLog.debug( - f"Gracefully stopping actors - will wait {timeout} seconds ..." + if active: + status.update( + f"Some actors have not completed after the {timeout}s grace period - killing" ) - terminated, active = ray.wait( - ray_waitables=wait_graceful, num_returns=n_actors, timeout=60.0 - ) - - moduleLog.debug(f"Terminated: {terminated}") - moduleLog.debug(f"Active: {active}") - - if active: - moduleLog.warning( - f"Some actors have not completed after {timeout} grace period - killing" - ) - for actor_ref in active: - print(f"... {lookup[actor_ref]}") - ray.kill(lookup[actor_ref]) - - moduleLog.info("Shutting down Ray...") - ray.shutdown() - status.update("Shutdown - waiting for logs to flush") - moduleLog.info("Waiting for logs to flush ...") - time.sleep(10) - moduleLog.info("Graceful shutdown complete") - else: - moduleLog.info("Graceful shutdown already completed") - - -def graceful_explore_operation_shutdown_handler( - operation, state, actuators, timeout=60 -) -> typing.Callable[[int, typing.Any | None], None]: - """Return a signal handler that sh.""" - - def handler(sig, frame): - - moduleLog.warning(f"Got signal {sig}") - moduleLog.warning("Calling graceful shutdown") - graceful_explore_operation_shutdown( - operator=operation, - state=state, - actuators=actuators, - timeout=timeout, - ) - - return handler + for actor_ref in active: + ray.kill(lookup[actor_ref]) def run_explore_operation_core_closure( @@ -236,70 +204,55 @@ def orchestrate_explore_operation( f"{operator_module.moduleClass}-namespace-{str(uuid.uuid4())[:8]}" ) - initialize_resource_cleaner() - - project_context = discovery_space.project_context - # Check the space if not discovery_space.measurementSpace.isConsistent: moduleLog.critical("Measurement space is inconsistent - aborting") raise ValueError("Measurement space is inconsistent") - if issues := ActuatorRegistry.globalRegistry().checkMeasurementSpaceSupported( - discovery_space.measurementSpace - ): - moduleLog.critical( - "The measurement space is not supported by the known actuators - aborting" - ) - for issue in issues: - moduleLog.critical(issue) - raise ValueError( - "The measurement space is not supported by the known actuators" - ) - log_space_details(discovery_space) - actuator_configurations = get_actuator_configurations( - actuator_configuration_identifiers=operation_info.actuatorConfigurationIdentifiers, - project_context=project_context, - ) - - validate_actuator_configurations_against_space_configuration( - actuator_configurations=actuator_configurations, - discovery_space_configuration=discovery_space.config, - ) + # create cleaner for this namespace + initialize_ray_resource_cleaner(namespace=operation_info.ray_namespace) # - # STATE - # Create State actor + # MEASUREMENT QUEUE # - queue = MeasurementQueue.get_measurement_queue() - - # noinspection PyUnresolvedReferences - state = DiscoverySpaceManager.options( - namespace=operation_info.ray_namespace - ).remote( - queue=queue, space=discovery_space, namespace=operation_info.ray_namespace - ) # type: "InternalStateActor" - moduleLog.debug(f"Waiting for discovery state actor to be ready: {state}") - _ = ray.get(state.__ray_ready__.remote()) - moduleLog.debug("Discovery state actor is ready") + # For communication between actuators -> discovery space manager -> operator + measurement_queue = MeasurementQueue(ray_namespace=operation_info.ray_namespace) # # ACTUATORS # - # Will raise ray.exceptions.ActorDiedError if any actuator died - # during init + # Will raise ray.exceptions.ActorDiedError if any actuator died during init + # Will raise ValueError if there is a mismatch between the Actuators and + # the actuator configurations actuators = orchestrator.modules.operators.setup.setup_actuators( - namespace=operation_info.ray_namespace, - actuator_configurations=actuator_configurations, + actuator_configuration_identifiers=operation_info.actuatorConfigurationIdentifiers, discovery_space=discovery_space, - queue=queue, + measurement_queue=measurement_queue, ) # FIXME: This is only necessary for mock actuator - but does it actually need to use it? for actuator in actuators.values(): actuator.setMeasurementSpace.remote(discovery_space.measurementSpace) + # + # DISCOVERY SPACE MANAGER + # + + # noinspection PyUnresolvedReferences + discovery_space_manager = DiscoverySpaceManager.options( + namespace=operation_info.ray_namespace + ).remote( + queue=measurement_queue, + space=discovery_space, + namespace=operation_info.ray_namespace, + ) # type: "InternalStateActor" + moduleLog.debug( + f"Waiting for discovery space manager to be ready: {discovery_space_manager}" + ) + _ = ray.get(discovery_space_manager.__ray_ready__.remote()) + moduleLog.debug("Discovery space manager is ready") + # # OPERATOR # @@ -317,21 +270,33 @@ def orchestrate_explore_operation( discovery_space=discovery_space, actuators=actuators, namespace=operation_info.ray_namespace, - state=state, + state=discovery_space_manager, ) # type: "OperatorActor" identifier = ray.get(operator.operationIdentifier.remote()) - explore_run_closure = run_explore_operation_core_closure(operator, state) - - orchestrator.modules.operators._cleanup.shutdown = False + explore_run_closure = run_explore_operation_core_closure( + operator, discovery_space_manager + ) + # Handling SIGTERM + # First register a callback which will clean up if SIGTERM is sent + # and the handler is in place + # Note we can't register the callback until the actors are created so there + # is a short window where graceful cleanup is not possible on SIGTERM + cleanup_callback_functions[identifier] = ( + lambda: graceful_explore_operation_shutdown( + identifier=identifier, + operator=operator, + state=discovery_space_manager, + actuators=list(actuators.values()), + namespace=operation_info.ray_namespace, + ) + ) + # Next register the handler in case it was not registered already + # Since all operations register the same stateless handler, setting + # it multiple times does not change behaviour signal.signal( - signalnum=signal.SIGTERM, - handler=graceful_explore_operation_shutdown_handler( - operation=operator, - state=state, - actuators=actuators, - ), + signalnum=signal.SIGTERM, handler=graceful_operation_shutdown_signal_handler() ) def finalize_callback_closure(operator_actor: "OperatorActor"): @@ -346,20 +311,27 @@ def finalize_callback(operation_resource: OperationResource): return finalize_callback - operation_output = _run_operation_harness( - run_closure=explore_run_closure, - discovery_space=discovery_space, - operator_module=operator_module, - operation_parameters=parameters, - operation_info=operation_info, - operation_identifier=identifier, - finalize_callback=finalize_callback_closure(operator), - ) - - graceful_explore_operation_shutdown( - operator=operator, - state=state, - actuators=list(actuators.values()), - ) + try: + operation_output = _run_operation_harness( + run_closure=explore_run_closure, + discovery_space=discovery_space, + operator_module=operator_module, + operation_parameters=parameters, + operation_info=operation_info, + operation_identifier=identifier, + finalize_callback=finalize_callback_closure(operator), + ) + finally: + # Need to ensure shutdown is processed if an exception + # is raised + if not shutdown_signal_received: + graceful_explore_operation_shutdown( + identifier=identifier, + operator=operator, + state=discovery_space_manager, + actuators=list(actuators.values()), + namespace=operation_info.ray_namespace, + ) + cleanup_callback_functions.pop(identifier) return operation_output diff --git a/orchestrator/modules/operators/_general_orchestration.py b/orchestrator/modules/operators/_general_orchestration.py index 77e1d798..dd7f0a17 100644 --- a/orchestrator/modules/operators/_general_orchestration.py +++ b/orchestrator/modules/operators/_general_orchestration.py @@ -2,7 +2,6 @@ # SPDX-License-Identifier: MIT import logging -import signal import typing import pydantic @@ -18,10 +17,6 @@ validate_actuator_configurations_against_space_configuration, ) from orchestrator.core.operation.operation import OperationOutput -from orchestrator.modules.operators._cleanup import ( - graceful_operation_shutdown, - graceful_operation_shutdown_handler, -) from orchestrator.modules.operators._orchestrate_core import ( _run_operation_harness, log_space_details, @@ -104,6 +99,10 @@ def orchestrate_general_operation( import uuid + # Note on signals: Since there is no specific cleanup logic + # for general operations it makes no difference + # if a signal handler for SIGTERM is in place or not + if not operation_info.ray_namespace: operation_info.ray_namespace = ( f"{operator_function.__name__}-namespace-{str(uuid.uuid4())[:8]}" @@ -142,20 +141,10 @@ def orchestrate_general_operation( operation_parameters=operation_parameters, ) - orchestrator.modules.operators._cleanup.shutdown = False - - signal.signal( - signalnum=signal.SIGTERM, handler=graceful_operation_shutdown_handler() - ) - - output = _run_operation_harness( + return _run_operation_harness( run_closure=operation_run_closure, discovery_space=discovery_space, operator_module=operator_module, operation_parameters=operation_parameters, operation_info=operation_info, ) - - graceful_operation_shutdown() - - return output diff --git a/orchestrator/modules/operators/_orchestrate_core.py b/orchestrator/modules/operators/_orchestrate_core.py index ae1259c7..c05c8c79 100644 --- a/orchestrator/modules/operators/_orchestrate_core.py +++ b/orchestrator/modules/operators/_orchestrate_core.py @@ -22,8 +22,9 @@ OperationResourceEventEnum, OperationResourceStatus, ) -from orchestrator.modules.operators._cleanup import shutdown +from orchestrator.modules.operators._cleanup import shutdown_signal_received from orchestrator.modules.operators.base import ( + InterruptedOperationError, add_operation_output_to_metastore, create_operation_and_add_to_metastore, ) @@ -106,7 +107,7 @@ def _run_operation_harness( ) discovery_space.metadataStore.updateResource(operation_resource) operation_output: OperationOutput | None = run_closure() - except KeyboardInterrupt: + except KeyboardInterrupt as error: sys.stdout.flush() moduleLog.warning("Caught keyboard interrupt - initiating graceful shutdown") operationStatus = OperationResourceStatus( @@ -114,6 +115,7 @@ def _run_operation_harness( exit_state=OperationExitStateEnum.ERROR, message="Operation exited due to SIGINT", ) + raise InterruptedOperationError(operation_resource.identifier) from error except RayTaskError as error: sys.stdout.flush() e = error.as_instanceof_cause() @@ -143,7 +145,7 @@ def _run_operation_harness( else: time.sleep(1) sys.stdout.flush() - if shutdown: + if shutdown_signal_received: moduleLog.warning( "Operation exited normally but an external event e.g. SIGTERM, has already initiated shutdown" ) @@ -194,7 +196,7 @@ def _run_operation_harness( # Add the final status to the operation resource operation_resource.status.append(operation_output.exitStatus) - if not shutdown and finalize_callback: + if not shutdown_signal_received and finalize_callback: finalize_callback(operation_resource) discovery_space.metadataStore.updateResource(operation_resource) diff --git a/orchestrator/modules/operators/base.py b/orchestrator/modules/operators/base.py index 91245f3e..fa355971 100644 --- a/orchestrator/modules/operators/base.py +++ b/orchestrator/modules/operators/base.py @@ -440,5 +440,17 @@ def warn_deprecated_operator_parameters_model_in_use( ) +class InterruptedOperationError(KeyboardInterrupt): + """Exception raised when an operation is interrupted (e.g., by SIGINT) + + This exception inherits from KeyboardInterrupt and includes the operation identifier + to provide context about which operation was interrupted. + """ + + def __init__(self, operation_identifier: str): + self.operation_identifier = operation_identifier + super().__init__(f"Operation {operation_identifier} was interrupted") + + if typing.TYPE_CHECKING: OperatorActor = type[ActorHandle[DiscoverySpaceSubscribingDiscoveryOperation]] diff --git a/orchestrator/modules/operators/discovery_space_manager.py b/orchestrator/modules/operators/discovery_space_manager.py index 1caae577..74b66148 100644 --- a/orchestrator/modules/operators/discovery_space_manager.py +++ b/orchestrator/modules/operators/discovery_space_manager.py @@ -47,10 +47,13 @@ def onError(self, error: Exception): class DiscoverySpaceManager: """A Ray actor wrapping a DiscoverySpace - Provides remote/async access to discovery space properties - Handles insertion of new entities and measurements into the space coming from a MeasurementQueue. - Notifies subscribers of update events. - Notifies subscribers of shutdown + ray namespace scoped i.e. All ray actors accessing a DiscoverySpaceManager instance + should be in the same ray namespace as that instance. + + - Provides remote/async access to discovery space properties + - Handles insertion of new entities and measurements into the space coming from a MeasurementQueue. + - Notifies subscribers of update events. + - Notifies subscribers of shutdown """ @classmethod @@ -114,8 +117,8 @@ def __init__( namespace=None, ): """ - :param queue: A ray.util.queue.Queue instance. - Usually the singleton instance returned by StateUpdateQueue.get_measurement_queue + :param queue: A MeasurementQueue instance for this operation + All actuators in the same operation must use this queue :param space: The DiscoverySpace instance """ @@ -125,6 +128,11 @@ def __init__( self.log.debug("Initialising DiscoverySpaceManager") self._namespace = namespace + if self._namespace != queue.ray_namespace(): + raise ValueError( + f"The provided measurement queues ray namespace, {queue.ray_namespace()}, does " + f"not match the namespace provided to DiscoverySpaceManager, {self._namespace} " + ) # This ivar will be used to mimic receiving updates on Measurements self._measurement_queue = queue diff --git a/orchestrator/modules/operators/orchestrate.py b/orchestrator/modules/operators/orchestrate.py index fdb398eb..1c8d0170 100644 --- a/orchestrator/modules/operators/orchestrate.py +++ b/orchestrator/modules/operators/orchestrate.py @@ -5,6 +5,7 @@ import logging import os +import signal import typing import pydantic @@ -23,7 +24,8 @@ from orchestrator.modules.operators._cleanup import ( CLEANER_ACTOR, # noqa: F401 ResourceCleaner, # noqa: F401 - graceful_operation_shutdown, + cleanup_callback_functions, + graceful_operation_shutdown_signal_handler, ) from orchestrator.modules.operators._explore_orchestration import ( orchestrate_explore_operation, @@ -42,6 +44,24 @@ moduleLog = logging.getLogger("orch") +def graceful_orchestrate_shutdown(): + """Clean resources set up by orchestrate() + + This includes ray.shutdown and waiting for logs to flush.""" + + import time + + from rich.console import Console + + console = Console() + with console.status("Shutdown - shutting down Ray", spinner="dots") as status: + ray.shutdown() + status.update("Shutdown - waiting for logs to flush") + moduleLog.info("Waiting for logs to flush ...") + time.sleep(10) + moduleLog.info("Graceful shutdown complete") + + def orchestrate( operation_resource_configuration: DiscoveryOperationResourceConfiguration, project_context: ProjectContext, @@ -97,6 +117,14 @@ def orchestrate( for key, value in ray_env_vars.items(): os.environ[key] = value + # + # Register signal handler + # + signal.signal( + signalnum=signal.SIGTERM, handler=graceful_operation_shutdown_signal_handler() + ) + cleanup_callback_functions["orchestrate"] = graceful_orchestrate_shutdown + # # GET SPACE # @@ -167,10 +195,8 @@ def orchestrate( ) raise finally: - if not orchestrator.modules.operators._cleanup.shutdown: - # If we get here the exception must have been raised before the operation started. - # Therefore, we don't need to wait in DiscoverySpaceManager, Actuators etc. to shut down - # as they never processed any date. - graceful_operation_shutdown() + if not orchestrator.modules.operators._cleanup.shutdown_signal_received: + graceful_orchestrate_shutdown() + cleanup_callback_functions.pop("orchestrate") return output diff --git a/orchestrator/modules/operators/setup.py b/orchestrator/modules/operators/setup.py index 0afb1ded..78fb1ec5 100644 --- a/orchestrator/modules/operators/setup.py +++ b/orchestrator/modules/operators/setup.py @@ -8,13 +8,12 @@ import pydantic -from orchestrator.core.actuatorconfiguration.config import ( - ActuatorConfiguration, -) from orchestrator.core.discoveryspace.space import DiscoverySpace from orchestrator.core.operation.config import ( DiscoveryOperationConfiguration, OperatorModuleConf, + get_actuator_configurations, + validate_actuator_configurations_against_space_configuration, ) from orchestrator.modules.actuators.measurement_queue import MeasurementQueue from orchestrator.modules.module import load_module_class_or_function @@ -55,16 +54,17 @@ def load_secrets_from_env(vars_to_load, env_var_dict): def setup_actuators( - namespace: str, - actuator_configurations: list[ActuatorConfiguration], + actuator_configuration_identifiers: list[str], discovery_space: DiscoverySpace, - queue: MeasurementQueue, + measurement_queue: MeasurementQueue, ) -> dict[str, "ActuatorActor"]: """ + Creates all the actuators required by discovery_space + Params: - namespace: The namespace to set up in - config: Configuration of the orchestrator - queue: the update queue + discovery_space: The discovery space to create the actuators for + actuator_configurations: A set of (optional) configurations for actuators in the discoveryspace + queue: the measurement queue Raises: ray.exceptions.ActorDiedError if any actuator @@ -79,6 +79,29 @@ def setup_actuators( moduleLog.info("Initialising requested actuators") registry = orchestrator.modules.actuators.registry.ActuatorRegistry.globalRegistry() actuators = {} + namespace = measurement_queue.ray_namespace() + + if issues := registry.checkMeasurementSpaceSupported( + discovery_space.measurementSpace + ): + moduleLog.critical( + "The measurement space is not supported by the known actuators - aborting" + ) + for issue in issues: + moduleLog.critical(issue) + raise ValueError( + "The measurement space is not supported by the known actuators" + ) + + actuator_configurations = get_actuator_configurations( + actuator_configuration_identifiers=actuator_configuration_identifiers, + project_context=discovery_space.project_context, + ) + + validate_actuator_configurations_against_space_configuration( + actuator_configurations=actuator_configurations, + discovery_space_configuration=discovery_space.config, + ) # First instantiate any actuators passed in actuatorConfigurations @@ -88,7 +111,7 @@ def setup_actuators( actuator: ActuatorActor = ( registry.actuatorForIdentifier(actuatorIdentifier) .options(name=actuatorIdentifier, namespace=namespace) - .remote(queue=queue, params=actuatorConfig.parameters) + .remote(queue=measurement_queue, params=actuatorConfig.parameters) ) actuators[actuatorIdentifier] = actuator @@ -118,7 +141,7 @@ def setup_actuators( actuator: ActuatorActor = cls.options( name=actuatorIdentifier, namespace=namespace ).remote( - queue=queue, + queue=measurement_queue, params=default_actuator_parameters, ) diff --git a/orchestrator/utilities/run_experiment.py b/orchestrator/utilities/run_experiment.py index f6abaa5d..263257b6 100644 --- a/orchestrator/utilities/run_experiment.py +++ b/orchestrator/utilities/run_experiment.py @@ -17,9 +17,9 @@ from orchestrator.modules.actuators.measurement_queue import MeasurementQueue from orchestrator.modules.actuators.registry import ActuatorRegistry from orchestrator.modules.operators._cleanup import ( - graceful_operation_shutdown, - initialize_resource_cleaner, + initialize_ray_resource_cleaner, ) +from orchestrator.modules.operators.orchestrate import graceful_orchestrate_shutdown from orchestrator.schema.entity import Entity from orchestrator.schema.point import SpacePoint from orchestrator.schema.reference import ExperimentReference @@ -43,7 +43,7 @@ def local_execution_closure( A callable that submits a local measurement request. """ actuators: dict[str, ActorHandle[ActuatorBase]] = {} - queue = MeasurementQueue.get_measurement_queue() + queue = MeasurementQueue() actuator_configurations = {} if actuator_configuration_identifiers: @@ -248,7 +248,7 @@ def run( ) if not remote: - initialize_resource_cleaner() + initialize_ray_resource_cleaner() try: for reference in point.experiments: @@ -274,7 +274,7 @@ def run( print("Entity is not valid") finally: if not remote: - graceful_operation_shutdown() + graceful_orchestrate_shutdown() def main(): diff --git a/plugins/actuators/vllm_performance/ado_actuators/vllm_performance/actuator.py b/plugins/actuators/vllm_performance/ado_actuators/vllm_performance/actuator.py index 28a58729..fc58cb29 100644 --- a/plugins/actuators/vllm_performance/ado_actuators/vllm_performance/actuator.py +++ b/plugins/actuators/vllm_performance/ado_actuators/vllm_performance/actuator.py @@ -102,7 +102,9 @@ def __init__( else: # add to clean up try: - cleaner_handle = ray.get_actor(name=CLEANER_ACTOR) + cleaner_handle = ray.get_actor( + name=CLEANER_ACTOR, namespace=queue.ray_namespace() + ) cleaner_handle.add_to_cleanup.remote(handle=self.env_manager) except Exception as e: logger.warning( diff --git a/tests/core/test_group_samplers.py b/tests/core/test_group_samplers.py index 7e708ec1..7a789b2b 100644 --- a/tests/core/test_group_samplers.py +++ b/tests/core/test_group_samplers.py @@ -203,7 +203,7 @@ async def test_group_sampler_remote( ), "Expected 42 entities in ml cloud sample store" # Test Remote Sequential Iterator - queue = MeasurementQueue.get_measurement_queue() + queue = MeasurementQueue() manager = DiscoverySpaceManager.remote(space=space, queue=queue) assert sampler.samplerCompatibleWithDiscoverySpaceRemote(manager) @@ -268,7 +268,7 @@ async def test_group_sampler_sequential_remote( ), "Expected 42 entities in ml cloud sample store" # Test Remote Sequential Iterator - queue = MeasurementQueue.get_measurement_queue() + queue = MeasurementQueue() manager = DiscoverySpaceManager.remote(space=space, queue=queue) assert RandomGroupSampleSelector.samplerCompatibleWithDiscoverySpaceRemote(manager) diff --git a/tests/core/test_samplers.py b/tests/core/test_samplers.py index 20b296a9..fa536197 100644 --- a/tests/core/test_samplers.py +++ b/tests/core/test_samplers.py @@ -132,7 +132,7 @@ async def test_explicit_space_grid_sampler_async_entity_iterator( sampler = ExplicitEntitySpaceGridSampleGenerator(mode=walk_mode) count = 0 entity = None - queue = MeasurementQueue.get_measurement_queue() + queue = MeasurementQueue() manager = DiscoverySpaceManager.remote(space=space, queue=queue) assert ( @@ -282,7 +282,7 @@ async def test_random_sample_selector( space.matchingEntities() ), "Expected the number of entities iterated was equal to number matching entities in source" - queue = MeasurementQueue.get_measurement_queue() + queue = MeasurementQueue() manager = DiscoverySpaceManager.remote(space=space, queue=queue) assert RandomSampleSelector.samplerCompatibleWithDiscoverySpaceRemote(manager) @@ -326,7 +326,7 @@ async def test_sequential_sample_selector( space.matchingEntities() ), "Expected the number of entities iterated was equal to number matching entities in source" - queue = MeasurementQueue.get_measurement_queue() + queue = MeasurementQueue() manager = DiscoverySpaceManager.remote(space=space, queue=queue) assert SequentialSampleSelector.samplerCompatibleWithDiscoverySpaceRemote(manager) diff --git a/tests/operators/test_discovery_space_manager.py b/tests/operators/test_discovery_space_manager.py index 7a2593b4..2a3e5c44 100644 --- a/tests/operators/test_discovery_space_manager.py +++ b/tests/operators/test_discovery_space_manager.py @@ -15,7 +15,7 @@ def test_internal_state_direct_init( ): """Tests InternalState actor can be initialised with a DiscoverySpace instance""" - queue = MeasurementQueue.get_measurement_queue() + queue = MeasurementQueue() state = DiscoverySpaceManager.remote(queue=queue, space=pfas_space) try: @@ -50,7 +50,7 @@ def test_internal_state_conf_init( pfas_space_configuration.sampleStoreIdentifier = pfas_space.sample_store.identifier - queue = MeasurementQueue.get_measurement_queue() + queue = MeasurementQueue() state = DiscoverySpaceManager.fromConfiguration( queue=queue, name="State",