Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions orchestrator/cli/resources/operation/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def create_operation(parameters: AdoCreateCommandParameters):

import orchestrator.modules.operators.orchestrate
from orchestrator.modules.actuators.base import MeasurementError
from orchestrator.modules.operators.base import InterruptedOperationError

try:
op_resource_configuration = (
Expand Down Expand Up @@ -164,6 +165,13 @@ def create_operation(parameters: AdoCreateCommandParameters):
stderr=True,
)
raise typer.Exit(1) from e
except InterruptedOperationError as e:
console_print(
f"{ERROR}Created operation with identifier {magenta(e.operation_identifier)} "
"but it was interrupted.",
stderr=True,
)
raise typer.Exit(3) from None
except KeyboardInterrupt as e:
console_print(
f"{INFO}Operation creation has been stopped due to a keyboard interrupt.",
Expand Down
69 changes: 29 additions & 40 deletions orchestrator/modules/operators/_cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,56 +2,50 @@
# SPDX-License-Identifier: MIT

import logging
import signal
import typing
from collections import OrderedDict
from collections.abc import Callable

import ray
from ray.actor import ActorHandle

shutdown = False
shutdown_signal_received = False
CLEANER_ACTOR = "resource_cleaner"

moduleLog = logging.getLogger("orchestration_cleanup")
cleanup_callback_functions: dict[str, Callable[[], None]] = OrderedDict()


def graceful_operation_shutdown():

global shutdown

if not shutdown:
import time

moduleLog.info("Shutting down gracefully")

shutdown = True

moduleLog.debug("Cleanup custom actors")
try:
cleaner_handle = ray.get_actor(name=CLEANER_ACTOR)
ray.get(cleaner_handle.cleanup.remote())
# deleting a cleaner actor. It is detached one, so has to be deleted explicitly
ray.kill(cleaner_handle)
except Exception as e:
moduleLog.warning(f"Failed to cleanup custom actors {e}")

moduleLog.info("Shutting down Ray...")
ray.shutdown()
moduleLog.info("Waiting for logs to flush ...")
time.sleep(10)
moduleLog.info("Graceful shutdown complete")
else:
moduleLog.info("Graceful shutdown already completed")


def graceful_operation_shutdown_handler() -> (
def graceful_operation_shutdown_signal_handler() -> (
typing.Callable[[int, typing.Any | None], None]
):
"""Handler which executes cleanup callbacks registered by operations on receiving a signal"""

def handler(sig, frame):

moduleLog.warning(f"Got signal {sig}")
moduleLog.warning("Calling graceful shutdown")
graceful_operation_shutdown()
global shutdown_signal_received
global cleanup_callback_functions

if not shutdown_signal_received:
shutdown_signal_received = True
moduleLog.debug("Cleanup custom actors")
try:
cleaner_handle = ray.get_actor(name=CLEANER_ACTOR)
ray.get(cleaner_handle.cleanup.remote())
# The cleaner actor is detached, so it requires explicit deletion
ray.kill(cleaner_handle)
except ValueError:
moduleLog.debug("No Ray cleaner actors were found")
except Exception as e:
moduleLog.warning(f"Failed to cleanup custom actors {e}")

moduleLog.warning("Calling cleanup callbacks")
for entry in cleanup_callback_functions:
moduleLog.warning(f"Cleaning {entry}")
cleanup_callback_functions[entry]()
else:
moduleLog.info("Graceful shutdown already completed")

return handler

Expand Down Expand Up @@ -91,16 +85,11 @@ def cleanup(self) -> None:
moduleLog.info(f"cleaned {len(done)}, clean failed {len(not_done)}")


def initialize_resource_cleaner():
def initialize_ray_resource_cleaner():
# create a cleaner actor.
# We are creating Named detached actor (https://docs.ray.io/en/latest/ray-core/actors/named-actors.html)
# so that we do not need to pass its handle (can get it by name) and it does not go out of scope, until
# we explicitly kill it
ResourceCleaner.options(
name=CLEANER_ACTOR, get_if_exists=True, lifetime="detached"
).remote()
# Create a default handler that will clean up the ResourceCleaner
# Orchestration functions that require more complex shutdown can replace this handler
signal.signal(
signalnum=signal.SIGTERM, handler=graceful_operation_shutdown_handler()
)
179 changes: 72 additions & 107 deletions orchestrator/modules/operators/_explore_orchestration.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@
import ray
import ray.util.queue

import orchestrator.core
import orchestrator.modules
import orchestrator.modules.operators._cleanup
from orchestrator.core import OperationResource
from orchestrator.core.discoveryspace.space import DiscoverySpace
from orchestrator.core.operation.config import (
Expand All @@ -24,8 +21,9 @@
from orchestrator.modules.actuators.registry import ActuatorRegistry
from orchestrator.modules.module import load_module_class_or_function
from orchestrator.modules.operators._cleanup import (
CLEANER_ACTOR,
initialize_resource_cleaner,
cleanup_callback_functions,
graceful_operation_shutdown_signal_handler,
shutdown_signal_received,
)
from orchestrator.modules.operators._orchestrate_core import (
_run_operation_harness,
Expand All @@ -48,112 +46,69 @@


def graceful_explore_operation_shutdown(
identifier: str,
operator: "OperatorActor",
state: "DiscoverySpaceManagerActor",
actuators: list["ActuatorActor"],
timeout=60,
):

if not orchestrator.modules.operators._cleanup.shutdown:
import time
from rich.console import Console

from rich.console import Console
moduleLog.info(f"Shutting down operation {identifier} gracefully")

moduleLog.info("Shutting down gracefully")

orchestrator.modules.operators._cleanup.shutdown = True

#
# Shutdown process
# 1. Shutdown state calling onComplete on operation and metricServer and ensuring metrics are flushed
# 2. Shutdown custom actors
# 3. Send graceful __ray_terminate__ to metric_server, operation and actuators
#
# Shutdown process
# 1. Shutdown state calling onComplete on operation and metricServer and ensuring metrics are flushed
# 2. Shutdown custom actors
# 3. Send graceful __ray_terminate__ to metric_server, operation and actuators

# This should not return until the metric server has processed all updates.
# This should not return until the metric server has processed all updates.

console = Console()
with console.status(
"Shutdown - waiting on all samples to be stored", spinner="dots"
) as status:
console = Console()
with console.status(
f"Shutdown ({identifier}) - waiting on all samples to be stored", spinner="dots"
) as status:

moduleLog.debug("Shutting down state")
promise = state.shutdown.remote()
ray.get(promise)
moduleLog.debug("Shutting down state")
ray.get(state.shutdown.remote())

status.update("Shutdown - cleanup")
status.update(
f"Shutdown ({identifier}) - waiting for actors to terminate (max 60s)"
)

moduleLog.debug("Cleanup custom actors")
try:
cleaner_handle = ray.get_actor(name=CLEANER_ACTOR)
ray.get(cleaner_handle.cleanup.remote())
# deleting a cleaner actor. It is detached one, so has to be deleted explicitly
ray.kill(cleaner_handle)
except Exception as e:
moduleLog.warning(f"Failed to cleanup custom actors {e}")
terminating_actors = [
operator.__ray_terminate__.remote(),
state.__ray_terminate__.remote(),
]
# __ray_terminate allows atexit handlers of actors to run
# see https://docs.ray.io/en/latest/ray-core/api/doc/ray.kill.html
terminating_actors.extend([a.__ray_terminate__.remote() for a in actuators])
n_actors = len(terminating_actors)
moduleLog.debug(f"waiting for graceful shutdown of {n_actors} actors")

status.update("Shutdown - waiting for actors to terminate")
actors = [operator]
actors.extend(actuators)

wait_graceful = [
operator.__ray_terminate__.remote(),
state.__ray_terminate__.remote(),
]
# __ray_terminate allows atexit handlers of actors to run
# see https://docs.ray.io/en/latest/ray-core/api/doc/ray.kill.html
wait_graceful.extend([a.__ray_terminate__.remote() for a in actuators])
n_actors = len(wait_graceful)
moduleLog.debug(f"waiting for graceful shutdown of {n_actors} actors")
lookup = dict(zip(terminating_actors, actors))

actors = [operator]
actors.extend(actuators)
moduleLog.debug(f"Shutdown waiting on {lookup}")
moduleLog.debug(
f"Gracefully stopping actors - will wait {timeout} seconds ..."
)
terminated, active = ray.wait(
ray_waitables=terminating_actors, num_returns=n_actors, timeout=60.0
)

lookup = dict(zip(wait_graceful, actors))
moduleLog.debug(f"Terminated: {terminated}")
moduleLog.debug(f"Active: {active}")

moduleLog.debug(f"Shutdown waiting on {lookup}")
moduleLog.debug(
f"Gracefully stopping actors - will wait {timeout} seconds ..."
if active:
status.update(
f"Some actors have not completed after {timeout} grace period - killing"
)
terminated, active = ray.wait(
ray_waitables=wait_graceful, num_returns=n_actors, timeout=60.0
)

moduleLog.debug(f"Terminated: {terminated}")
moduleLog.debug(f"Active: {active}")

if active:
moduleLog.warning(
f"Some actors have not completed after {timeout} grace period - killing"
)
for actor_ref in active:
print(f"... {lookup[actor_ref]}")
ray.kill(lookup[actor_ref])

moduleLog.info("Shutting down Ray...")
ray.shutdown()
status.update("Shutdown - waiting for logs to flush")
moduleLog.info("Waiting for logs to flush ...")
time.sleep(10)
moduleLog.info("Graceful shutdown complete")
else:
moduleLog.info("Graceful shutdown already completed")


def graceful_explore_operation_shutdown_handler(
operation, state, actuators, timeout=60
) -> typing.Callable[[int, typing.Any | None], None]:
"""Return a signal handler that sh."""

def handler(sig, frame):

moduleLog.warning(f"Got signal {sig}")
moduleLog.warning("Calling graceful shutdown")
graceful_explore_operation_shutdown(
operator=operation,
state=state,
actuators=actuators,
timeout=timeout,
)

return handler
for actor_ref in active:
ray.kill(lookup[actor_ref])


def run_explore_operation_core_closure(
Expand Down Expand Up @@ -236,8 +191,6 @@ def orchestrate_explore_operation(
f"{operator_module.moduleClass}-namespace-{str(uuid.uuid4())[:8]}"
)

initialize_resource_cleaner()

project_context = discovery_space.project_context

# Check the space
Expand Down Expand Up @@ -323,15 +276,24 @@ def orchestrate_explore_operation(

explore_run_closure = run_explore_operation_core_closure(operator, state)

orchestrator.modules.operators._cleanup.shutdown = False

signal.signal(
signalnum=signal.SIGTERM,
handler=graceful_explore_operation_shutdown_handler(
operation=operator,
# Handling SIGTERM
# First register a callback which will cleanup if SIGTERM is sent
# and the handler is in place
# Note we can't register the callback until the actors are created so there
# is a short window where graceful cleanup is not possible on SIGTERM
cleanup_callback_functions[identifier] = (
lambda: graceful_explore_operation_shutdown(
identifier=identifier,
operator=operator,
state=state,
actuators=actuators,
),
actuators=list(actuators.values()),
)
)
# Next register the handler in case it was not registered already
# Since all operations register the same stateless handler, setting
# it multiple times does not change behaviour
signal.signal(
signalnum=signal.SIGTERM, handler=graceful_operation_shutdown_signal_handler()
)

def finalize_callback_closure(operator_actor: "OperatorActor"):
Expand All @@ -356,10 +318,13 @@ def finalize_callback(operation_resource: OperationResource):
finalize_callback=finalize_callback_closure(operator),
)

graceful_explore_operation_shutdown(
operator=operator,
state=state,
actuators=list(actuators.values()),
)
if not shutdown_signal_received:
graceful_explore_operation_shutdown(
identifier=identifier,
operator=operator,
state=state,
actuators=list(actuators.values()),
)
cleanup_callback_functions.pop(identifier)

return operation_output
Loading