-
Notifications
You must be signed in to change notification settings - Fork 51
Add Airflow variable used to configure overrides for task timeouts #976
Conversation
Dang, this is really tough! The flexibility of defining agnostic, per-task timeouts is neat. But I think about what I had to do for WordPress/openverse#1301 - because of the reingestion workflows, we'd probably also need to make this a task ID pattern for matching timeouts 😕 How much additional complexity would that add do you think? I think it's fair to try and target the typical |
@AetherUnbound It ended up being easier than I thought! Updated version of the PR now allows arbitrarily overriding timeout on any task in a provider DAG, including iNaturalist. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Woohoo! What an excellent feature this will unlock for us. I have one request regarding the timeout format that I'd like your thoughts on 🕐
# Override the dag_id | ||
self.dag_id = f"{self.provider_name}_reingestion_workflow" | ||
return | ||
_, provider_name = self._get_module_info() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a much better approach! 😄
# If the task is a MappedOperator, apply the timeout to partial | ||
# kwargs to ensure it is set on each mapped task. | ||
if isinstance(task, MappedOperator): | ||
task.partial_kwargs["execution_timeout"] = timeout_override | ||
else: | ||
task.execution_timeout = timeout_override |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wow, what a find! 😱
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an excellent change! I know it can be hard to test, but I love the ability to just have the time range we need specified (e.g. 10h
). Very excited for this!
match unit: | ||
case "d": | ||
days = int(count) | ||
case "h": | ||
hours = int(count) | ||
case "m": | ||
minutes = int(count) | ||
case "s": | ||
seconds = int(count) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A case statement!! Wow, my first ever! 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's so flexible! Awesome, it worked like a charm! 💯
Fixes
Fixes WordPress/openverse#1437 by @AetherUnbound
Description
Adds an Airflow Variable that can be used optionally to temporarily alter the execution timeouts for any task in a provider DAG, using a
task_id_pattern
. The timeout overrides are configured in the format<days>d:<hours>h:<minutes>m:<seconds>s
. Example using theCONFIGURATION_OVERRIDES
Airflow variable:This is intentionally implemented in such a way that it can be extended very quickly to override other properties of tasks. I had
retries
in mind but did not include it in this PR for simplicity.Testing Instructions
You can use the example
CONFIGURATION_OVERRIDES
provided above, and play around with adding other configurations. For each timeout you configure, run the DAG and check theTask Instance Details
page for the task to verify that theexecution_timeout
is set to the one you configured. You can try setting really short timeouts on long-running tasks likepull_data
to verify that the timeouts will actually be hit.Make sure to test:
pull_image_data
upsert_data
and verify that the timeout gets applied to allupsert_data_day_shift_x
tasks within the workflowload_transformed_data
task, which is a Mapped task. The timeout should be applied to each mapped taskNotes:
Checklist
Update index.md
).main
) ora parent feature branch.
errors.
Developer Certificate of Origin
Developer Certificate of Origin