Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AWS EC2 Spot Interrupt fails entire flow #13013

Closed
trahloff opened this issue Feb 16, 2023 · 6 comments
Closed

AWS EC2 Spot Interrupt fails entire flow #13013

trahloff opened this issue Feb 16, 2023 · 6 comments
Assignees

Comments

@trahloff
Copy link

Expectation / Proposal

Prefect-Ray can run tasks on Ray clusters that utilize AWS EC2 spot instances.

Current behavior

Flows run with prefect-ray on EC2 spot clusters will immediately fail the whole flow as soon as one spot-type worker note receives a spot-interrupt.

Minimal Reproducible Example

@task
def wait_for_an_eternity():
    logger = get_run_logger()

    logger.info(f"STARTING TASK: {datetime.now()}")
    sleep(180000)  # seconds = 50h
    logger.info(f"STOPPING TASK: {datetime.now()}")


@flow(
    name="EC2-Spot-Interrupt-Lab",
    task_runner=RayTaskRunner(address="cluster-with-ec2-spot-workers"),
)
def main():
    logger = get_run_logger()
    logger.info(f"start")

    wait_for_an_eternity.submit()

Steps to reproduce:

  1. Setup Ray cluster that does not run tasks on the head node and consists of only EC2 spot worker nodes
  2. Run flow above
  3. Either wait for a worker node to get a spot interrupt notice (and subsequentially "stolen") or manually trigger a failed worker node in the cluster by terminating the worker node EC2 instance

Result:

  • Task run '...' received abort during orchestration: This run cannot transition to the RUNNING state from the RUNNING state. Task run is in RUNNING state.
  • Task is still in state RUNNING
  • Flow is immediately in state FAILED

Business perspective

Running workloads on EC2 spot instances significantly reduces computation costs compared to on-demand instances. Especially for our data processing workloads with little to no side effects, rerunning a task in case of a spot instance termination is no problem from an application point of view. Ray as our distributed compute framework is already capable of provisioning replacement instances in case of spot instance termination and rerunning the interrupted task.
Therefore, we need to run our processing with Prefect and Ray on EC2 Spot Instances. This issue blocks us from using EC2 spot and crashes every workload that runs for an extended time.
This issue was discovered after rolling out the solution to production pipelines.

Our error analysis

Prefect's PreventRedundantTransitions orchestration policy prevents a repeated task execution as the task remains in status Running
when the instance is terminated, and a new replacement instance will try to start the task from scratch, including proposing the state Running again, which leads to an abort of the state transition and fail of the task. This effectively makes it impossible to leverage Ray's retry mechanism and use spot instances, which has direct cost implications.

Our Initial Ideas for potential mitigation

Possible solutions that would enable using spot instances and automatic retries on termination could include:

  1. An adaptation of the PreventRedundantTransitions policy. The intended use case for this policy is to prevent the flow from progressing backwards or multiple agents picking up the same task. The policy could be potentially adapted to be aware of the origin of the transition request and hand this specific case separately.
  2. Gracefully handling instance termination. The application or orchestration code could react to a spot instance termination notice which is usually published 2 minutes before the instance actually terminates and transition the task into a state that can that can later transition again into Running.
    1. This could potentially be modeled as transitioning to Paused and then transitioning the same task to Running again
    2. or as a transition to Failed/Crashed and some shutdown logic that submits an entirely new task for the rerun.
@prefectcboyd
Copy link
Contributor

This seems related to #7116

@desertaxle
Copy link
Member

Thanks for this great writeup @trahloff! This looks like an issue might be addressable within prefect-ray, but may require a change to prefect. I'm going to consult with the team to see how we can best resolve this issue.

@trahloff
Copy link
Author

trahloff commented Feb 17, 2023

Quick update after investigating possible ideas for mitigation "2. Gracefully handling instance termination":

(All EC2 Spot Interrupts were simulated with the AWS Fault Injection Simulator)

Signal Handler

The assumption is that the instance receives a SIGTERM two minutes before a SIGKILL when a Spot Interrupt happens. This is not really documented, but this is at least the behavior when using EC2 Spot with ECS (ref.)

@task
def wait_for_an_eternity():
    logger = get_run_logger()
​
    def exit_gracefully(self, signum, frame):
        logger.info("\nReceived {} signal".format(self.signals[signum]))
        if self.signals[signum] == 'SIGTERM':
            logger.info("SIGTERM Signal Received. TODO: Pause task")
​
    signal.signal(signal.SIGINT, exit_gracefully)
    signal.signal(signal.SIGTERM, exit_gracefully)
​
    logger.info(f"STARTING TASK: {datetime.now()}")
    sleep(180000)  # 50h
    logger.info(f"STOPPING TASK: {datetime.now()}")

Unfortunately, this is not a valid option because the tasks are not executed in the main thread: ValueError: signal only works in main thread of the main interpreter.
Maybe you know of a way to listen for a potential SIGTERM on a per-flow basis?

Instance Metadata Interrupt Notice

Another idea was to periodically query the EC2 instance metadata service. According to documentation, the endpoint should return a termination notice 2 minutes before termination at http://169.254.169.254/latest/meta-data/spot/instance-action.

Quick code to test this hypothesis:

@task
def wait_for_an_eternity():
    logger = get_run_logger()

    logger.info(f"STARTING TASK: {datetime.now()}")

    def watch_interruption_notice():
        try:
            metadata_url = 'http://169.254.169.254/latest/meta-data/spot/instance-action'
            instance_action = urllib.request.urlopen(metadata_url).read().decode()
            logger.info(instance_action)
        except HTTPError as e:
            logger.info("No Interruption Notice available")

        sleep(30)
        watch_interruption_notice()

    watch_interruption_notice()

Results:
image

That would actually work when it comes to detecting spot terminations in advance.
I'm currently trying to manually set the task state to "Paused" as a reaction to the interrupt notice:

from prefect import get_client
from prefect.states import Paused

from prefect.utilities.asyncutils import sync_compatible

# ...

@task
def wait_for_an_eternity():

# ...

    @sync_compatible
    async def set_task_state_paused(task_id):
        async with get_client() as client:
            logger.info(f"trying to change state for: {task_id}")
            await client.set_task_run_state(
                task_run_id=task_id,
                state=Paused(),
                force=True,
            )
    
    run_context = get_run_context()
    task_run = run_context.task_run
    task_id = task_run.id
    
    set_task_state_paused(task_id)

# ...

It's more for r&d than an actual fix because it completely blocks code execution of the task, but it succeeds in catching the interrupt notice and placing the task in "Paused" so it can be restarted once the new EC2 Spot instance is allocated

@desertaxle desertaxle self-assigned this Feb 17, 2023
@desertaxle
Copy link
Member

@trahloff Working with some of the ideas that you posted, I think that a decorator like this could work to resolve this issue:

def listen_for_interruption_notice(fn):
    async def poll():
        logger = get_run_logger()
        run_context = get_run_context()
        task_run = run_context.task_run
        task_id = task_run.id

        async with AsyncClient() as client:
            while True:
                try:
                    response = await client.get(
                        "http://169.254.169.254/latest/meta-data/spot/instance-action"
                    )
                    response.raise_for_status()
                    logger.info(response.text)
                    async with get_client() as prefect_client:
                        logger.info(f"trying to change state for: {task_id}")
                        await prefect_client.set_task_run_state(
                            task_run_id=task_id,
                            state=Paused(),
                            force=True,
                        )
                except HTTPStatusError:
                    logger.info("No Interruption Notice available")

                await asyncio.sleep(30)

    @wraps(fn)
    async def wrapped(*args, **kwargs):
        asyncio.create_task(poll())
        return await fn(*args, **kwargs)

    return wrapped

The decorator would poll the instance-action endpoint and update the state of the task if a termination signal is recieved.

This decorator could be applied to an async task like so:

@task
@listen_for_interruption_notice
async def some_task(input):
    for i in range(10):
        await asyncio.sleep(2)
        print(input, i)

This isn't a long term solution, but could help with this issue while we continue to work on a long term solution.

@j-tr
Copy link
Contributor

j-tr commented Mar 3, 2023

@desertaxle I was looking into your decorator and I think it works great for async flows. But our flow codebase is currently completely synchronous and I think integrating this on our side would require refactoring all the flows to async. That's why I looked into using threading instead of async to poll for the meta-data.
I experienced that when running a sync version of the poll function in a thread, the prefect context doesn't seem to be available there which results in issues when using get_client. It finds an empty context and therefore connects to a newly created ephemeral API instead of the prefect cloud.
I'm wondering if you have any ideas on how to run this in a sync flow or if there are any known issues with using the prefect context across multiple threads or using multithreading in prefect in general.

@desertaxle desertaxle transferred this issue from PrefectHQ/prefect-ray Apr 26, 2024
@trahloff
Copy link
Author

Resolved via #8802

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants