From e502b03fe7f783ed14fb83d6a82902f50d92ce58 Mon Sep 17 00:00:00 2001 From: Niko Oliveira Date: Tue, 26 May 2026 15:02:56 -0700 Subject: [PATCH] Fix example_mwaa_serverless system test race condition with stop_workflow_run The stop_workflow_run task was failing with a ValidationException because the workflow run completed before the stop command could be issued. This was particularly problematic on high-latency executors (e.g., ECS) where task launch overhead (~90s) gave the workflow enough time to finish. The fix introduces a second workflow definition (stoppable_workflow.yaml) that sensors for a non-existent S3 key, keeping the workflow running indefinitely until explicitly stopped. --- .../amazon/aws/example_mwaa_serverless.py | 35 +++++++++++++++---- 1 file changed, 29 insertions(+), 6 deletions(-) diff --git a/providers/amazon/tests/system/amazon/aws/example_mwaa_serverless.py b/providers/amazon/tests/system/amazon/aws/example_mwaa_serverless.py index c36801884ccb3..5a54d159660f5 100644 --- a/providers/amazon/tests/system/amazon/aws/example_mwaa_serverless.py +++ b/providers/amazon/tests/system/amazon/aws/example_mwaa_serverless.py @@ -46,7 +46,7 @@ # Externally fetched variables: ROLE_ARN_KEY = "ROLE_ARN" -# Valid MWAA Serverless YAML: tasks as mapping, FQN operators, flat parameters. +# Quick workflow - checks for a key that exists, completes immediately WORKFLOW_YAML = """\ systest_mwaa_serverless: schedule: null @@ -59,8 +59,23 @@ bucket_key: workflow.yaml """ -sys_test_context_task = SystemTestContextBuilder().add_variable(ROLE_ARN_KEY).build() +# Long-running workflow - waits for a key that never arrives, used to test stopping +STOPPABLE_WORKFLOW_YAML = """\ +systest_mwaa_serverless_stoppable: + schedule: null + description: "System test: long-running workflow for stop testing" + tasks: + wait_for_stop: + task_id: wait_for_stop + operator: airflow.providers.amazon.aws.sensors.s3.S3KeySensor + bucket_name: {bucket} + bucket_key: never_exists.txt + poke_interval: 30 + timeout: 600 + soft_fail: true +""" +sys_test_context_task = SystemTestContextBuilder().add_variable(ROLE_ARN_KEY).build() with DAG( dag_id=DAG_ID, @@ -82,6 +97,13 @@ data=WORKFLOW_YAML.format(bucket=bucket_name), ) + upload_stoppable_workflow_yaml = S3CreateObjectOperator( + task_id="upload_stoppable_workflow_yaml", + s3_bucket=bucket_name, + s3_key="stoppable_workflow.yaml", + data=STOPPABLE_WORKFLOW_YAML.format(bucket=bucket_name), + ) + # [START howto_operator_mwaa_serverless_create_workflow] create_workflow = MwaaServerlessCreateWorkflowOperator( task_id="create_workflow", @@ -123,13 +145,14 @@ update_workflow = MwaaServerlessUpdateWorkflowOperator( task_id="update_workflow", workflow_arn=workflow_arn, - definition_s3_location={"Bucket": bucket_name, "ObjectKey": "workflow.yaml"}, + definition_s3_location={"Bucket": bucket_name, "ObjectKey": "stoppable_workflow.yaml"}, role_arn=role_arn, - description="Updated system test workflow", + description="Updated to stoppable workflow for stop testing", ) # [END howto_operator_mwaa_serverless_update_workflow] - # Start a second run to test stopping + # Start a second run to test stopping - this uses the stoppable workflow + # that will keep running until explicitly stopped start_workflow_2 = MwaaServerlessStartWorkflowRunOperator( task_id="start_workflow_2", workflow_arn=workflow_arn, @@ -162,7 +185,7 @@ # TEST SETUP test_context, create_bucket, - upload_workflow_yaml, + [upload_workflow_yaml, upload_stoppable_workflow_yaml], workflow_arn, # TEST BODY create_workflow_again,