-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Executor Synchronous callback workload #61153
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Executor Synchronous callback workload #61153
Conversation
…eryExecutor Add support for the Callback workload to be run in the executors. Other executors will need to be updated before the can support the workload, but I tried to make it as non-invasive as I could.
| key = workload.callback.id | ||
| try: | ||
| _execute_callback(log, workload, team_conf) | ||
| output.put((key, TaskInstanceState.SUCCESS, None)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly just curious: We still use TaskInstanceState here even those these are callbacks?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I chewed this one a bit.... Its' either a SUCCESS or FAIL and I felt heavy handed adding yet another state machine enum to repeat those values. I have an idea I'll include in the next revision and see what you think.
| # Find the appropriate executor | ||
| executor = None | ||
| if executor_name: | ||
| # Find executor by name - try multiple matching strategies | ||
| for exec in self.job.executors: | ||
| # Match by class name (e.g., "CeleryExecutor") | ||
| if exec.__class__.__name__ == executor_name: | ||
| executor = exec | ||
| break | ||
| # Match by executor name attribute if available | ||
| if hasattr(exec, "name") and exec.name and str(exec.name) == executor_name: | ||
| executor = exec | ||
| break | ||
| # Match by executor name attribute if available | ||
| if hasattr(exec, "executor_name") and exec.executor_name == executor_name: | ||
| executor = exec | ||
| break | ||
|
|
||
| # Default to first executor if no specific executor found | ||
| if executor is None: | ||
| executor = self.job.executors[0] if self.job.executors else None | ||
|
|
||
| if executor is None: | ||
| self.log.warning("No executor available for callback %s", callback.id) | ||
| continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is also missing multi-team logic which we need to stay up to date with at this point. It also is duplicating a lot of the work in _try_to_load_executor which is made to do exactly this kind of lookup. I think it's going to save you a bunch of effort and future maintenance to update _try_to_load_executor to support workloads generally instead of just ti (basically exactly the type of coding you did in the base executor and local executor changes).
| self.log.warning("No executor available for callback %s", callback.id) | ||
| continue | ||
|
|
||
| executor_to_callbacks[executor].append(callback) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to the above, there is already a _executor_to_tis which is doing exactly this but for tis, could be generalized.
| .where(ExecutorCallback.type == CallbackType.EXECUTOR) | ||
| .where(ExecutorCallback.state == CallbackState.QUEUED) | ||
| .order_by(ExecutorCallback.priority_weight.desc()) | ||
| .limit(conf.getint("scheduler", "max_callback_workloads_per_loop", fallback=100)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here and down below in the final loop over executors/workloads we're just queueing a static amount each time. But it is the schedulers responsibility now (in the world of multiple executors and now multi-team) to ensure we don't ever schedule more tasks (now, workloads) than we have executor slots for. You can see how we do this math for tasks currently here:
airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py
Lines 854 to 871 in 056e24e
| # The user can either request a certain number of tis to schedule per main scheduler loop (default | |
| # is non-zero). If that value has been set to zero, that means use the value of core.parallelism (or | |
| # however many free slots are left). core.parallelism represents the max number of running TIs per | |
| # scheduler. Historically this value was stored in the executor, who's job it was to control/enforce | |
| # it. However, with multiple executors, any of which can run up to core.parallelism TIs individually, | |
| # we need to make sure in the scheduler now that we don't schedule more than core.parallelism totally | |
| # across all executors. | |
| num_occupied_slots = sum([executor.slots_occupied for executor in self.job.executors]) | |
| parallelism = conf.getint("core", "parallelism") | |
| if self.job.max_tis_per_query == 0: | |
| max_tis = parallelism - num_occupied_slots | |
| else: | |
| max_tis = min(self.job.max_tis_per_query, parallelism - num_occupied_slots) | |
| if max_tis <= 0: | |
| self.log.debug("max_tis query size is less than or equal to zero. No query will be performed!") | |
| return 0 | |
| queued_tis = self._executable_task_instances_to_queued(max_tis, session=session) |
We need to ensure that math now includes callbacks because they also take up worker slots.
I think this will work for now, as long as this method is always called before the critical section. Since callbacks will increase occupied slots in the executors which should be taken into account in the critical section. BUT this code here needs to ensure it doesn't over subscribe the executors. So some similar logic to the critical section needs to be done here. E.g. we're taking a flat 100 here (by default anyway) but there may only be 20 free executor slots.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah. I think I see what you are saying, I missed that. I may need to chat offline to clarify, but I'll look into it. Thanks for catching that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be addressed if I understood correctly.
| if callback: | ||
| # Note: We receive TaskInstanceState from executor (SUCCESS/FAILED) but convert to CallbackState here. | ||
| # This is intentional - executor layer uses generic completion states, scheduler converts to proper types. | ||
| if state == TaskInstanceState.SUCCESS: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is fine for now, but would be cool if Callbacks were fully first class citizens in executors. Including executors reporting the right state back.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is going to fall under the same fix as #61153 (comment)
I was on the fence about this but you've convinced me.... Next revision will have a more generic WorkloadState and WorkloadKey. Code is done, just testing it out now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's up now, let me know what you think
| def _process_workloads(self, workloads: Sequence[workloads.All]) -> None: | ||
| # Airflow V3 version -- have to delay imports until we know we are on v3 | ||
| from airflow.executors.workloads import ExecuteTask | ||
| from airflow.executors.workloads import ExecuteCallback, ExecuteTask |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The ExecuteCallback was added leter than 3.0.0, so if you import here it will break in core versions <3.2.0?
Can you either make a (very) late or lazy import such that no upgrade dependency to airflow core is generated and that it is still working with 3.0.0?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, for sure, good catch. I can do a soft test like :
if isinstance(workload, ExecuteTask):
# handle task
elif type(workload).__name__ == "ExecuteCallback":
# Import only if callback found
from airflow.executors.workloads import ExecuteCallback
or a hard gate like
def _process_workloads(self, workloads):
from airflow.executors.workloads import ExecuteTask
if AIRFLOW_V_3_2_PLUS:
from airflow.executors.workloads import ExecuteCallback
The first option feels better, but the second looks more explicit. Any preference?
Add support for the Callback workload to be run in the executors. Other executors will need to be updated before they can support the workload, but I tried to make it as non-invasive as I could.
This is the bulk of the work required to allow synchronous callbacks to be used in DeadlineAlerts. For example, this now works in LocalExecutor:
Co-author: Builds on work handed off by @seanghaeli and research from @ramitkataria; if I did this right then they should be getting co-author credits, I think?
Was generative AI tooling used to co-author this PR?
Cline (Claude Sonnet 4.5) was used for debugging and suggesting some unit test edge cases.
{pr_number}.significant.rstor{issue_number}.significant.rst, in airflow-core/newsfragments.