Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
# Externally fetched variables:
ROLE_ARN_KEY = "ROLE_ARN"

# Valid MWAA Serverless YAML: tasks as mapping, FQN operators, flat parameters.
# Quick workflow - checks for a key that exists, completes immediately
WORKFLOW_YAML = """\
systest_mwaa_serverless:
schedule: null
Expand All @@ -59,8 +59,23 @@
bucket_key: workflow.yaml
"""

sys_test_context_task = SystemTestContextBuilder().add_variable(ROLE_ARN_KEY).build()
# Long-running workflow - waits for a key that never arrives, used to test stopping
STOPPABLE_WORKFLOW_YAML = """\
systest_mwaa_serverless_stoppable:
schedule: null
description: "System test: long-running workflow for stop testing"
tasks:
wait_for_stop:
task_id: wait_for_stop
operator: airflow.providers.amazon.aws.sensors.s3.S3KeySensor
bucket_name: {bucket}
bucket_key: never_exists.txt
poke_interval: 30
timeout: 600
soft_fail: true
"""

sys_test_context_task = SystemTestContextBuilder().add_variable(ROLE_ARN_KEY).build()

with DAG(
dag_id=DAG_ID,
Expand All @@ -82,6 +97,13 @@
data=WORKFLOW_YAML.format(bucket=bucket_name),
)

upload_stoppable_workflow_yaml = S3CreateObjectOperator(
task_id="upload_stoppable_workflow_yaml",
s3_bucket=bucket_name,
s3_key="stoppable_workflow.yaml",
data=STOPPABLE_WORKFLOW_YAML.format(bucket=bucket_name),
)

# [START howto_operator_mwaa_serverless_create_workflow]
create_workflow = MwaaServerlessCreateWorkflowOperator(
task_id="create_workflow",
Expand Down Expand Up @@ -123,13 +145,14 @@
update_workflow = MwaaServerlessUpdateWorkflowOperator(
task_id="update_workflow",
workflow_arn=workflow_arn,
definition_s3_location={"Bucket": bucket_name, "ObjectKey": "workflow.yaml"},
definition_s3_location={"Bucket": bucket_name, "ObjectKey": "stoppable_workflow.yaml"},
role_arn=role_arn,
description="Updated system test workflow",
description="Updated to stoppable workflow for stop testing",
)
# [END howto_operator_mwaa_serverless_update_workflow]

# Start a second run to test stopping
# Start a second run to test stopping - this uses the stoppable workflow
# that will keep running until explicitly stopped
start_workflow_2 = MwaaServerlessStartWorkflowRunOperator(
task_id="start_workflow_2",
workflow_arn=workflow_arn,
Expand Down Expand Up @@ -162,7 +185,7 @@
# TEST SETUP
test_context,
create_bucket,
upload_workflow_yaml,
[upload_workflow_yaml, upload_stoppable_workflow_yaml],
workflow_arn,
# TEST BODY
create_workflow_again,
Expand Down
Loading