Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Starts execution directly from triggerer without going to worker #38674

Merged
merged 21 commits into from
Apr 29, 2024

Conversation

Lee-W
Copy link
Member

@Lee-W Lee-W commented Apr 2, 2024

Why

For some operators such as S3KeySensor with deferrable set to True, running execute method in workers might not be necessary. It would be better if we could have a way to run a task in triggerer directly without going into the worker.

In the current solution, we still need to run next_method/execute_complete in the worker. This PR serves as a POC / first step of running the whole execution in triggerer in asyncronize manner.

What

Introduce _start_trigger and _next_method to BaseOperator. If an operator defines both _start_trigger and _next_method in the __init__ method, the scheduler will directly defer the task without going to the worker.

from __future__ import annotations

from airflow.models.baseoperator import BaseOperator
from airflow.triggers.testing import SuccessTrigger


class StartFromTriggerOperator(BaseOperator):
    _start_trigger = SuccessTrigger()
    _next_method = "execute_complete"

    def execute_complete(self, context, event=None) -> None:
        self.log.info("execute complete")

These attributes can also be assigned at the instance level. However, dynamic task mapping is not supported in this setup.


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@Lee-W Lee-W force-pushed the starts-execution-from-triggerer branch from 56faab9 to edb7071 Compare April 2, 2024 11:23
@Lee-W Lee-W changed the title Starts execution from triggerer Starts execution directly from triggerer without going to worker Apr 2, 2024
@Lee-W Lee-W force-pushed the starts-execution-from-triggerer branch 7 times, most recently from babd63a to a5ada83 Compare April 4, 2024 06:44
@tirkarthi
Copy link
Contributor

Related :

#31808
#31718

@Lee-W Lee-W force-pushed the starts-execution-from-triggerer branch from a5ada83 to 80c5e3c Compare April 8, 2024 08:31
@uranusjr
Copy link
Member

uranusjr commented Apr 8, 2024

I wonder if we could somehow infer starts_execution_from_triggerer from the combination of other arguments, instead of setting the flag explicitly.

@Lee-W Lee-W force-pushed the starts-execution-from-triggerer branch 2 times, most recently from d907d27 to d58856a Compare April 8, 2024 09:43
@Lee-W
Copy link
Member Author

Lee-W commented Apr 8, 2024

I wonder if we could somehow infer starts_execution_from_triggerer from the combination of other arguments, instead of setting the flag explicitly.

For this specific case, I would say yes. We could check whether start_trigger is None to decide. Not sure whether we would want to use this chance to refactor all the existing operators with deferrable support. (e.g., Moving the trigger definition to __init__ or somewhere else maybe?)

Also, the name is set to start_trigger instead trigger because https://airflow.apache.org/docs/apache-airflow-providers-google/stable/operators/cloud/cloud_build.html already take this argument name. Without changing this name to something else, mypy and many tests complain. start_trigger is more like a temporary name. Suggestions are welcomed.

@Lee-W Lee-W force-pushed the starts-execution-from-triggerer branch from d58856a to 3001b54 Compare April 8, 2024 10:44
@uranusjr
Copy link
Member

uranusjr commented Apr 9, 2024

If we also infer starts_execution_from_triggerer from it, start_trigger is not so bad to me, and actually better than trigger because it more clearly describes the attributes controls what trigger the task is started from, not the one trigger the task uses. There are other possibile names (e.g. start_from_trigger), but trigger is not a good one IMO.

@Lee-W Lee-W force-pushed the starts-execution-from-triggerer branch from 3001b54 to 320544a Compare April 10, 2024 10:58
@Lee-W
Copy link
Member Author

Lee-W commented Apr 10, 2024

This comment is for the original design and is outdated


In the current design, I introduced start_trigger and next_method to the BaseOperator. When start_trigger is not None, the scheduler will call the defer_task method of the task instance object, and everything should work as it does when TaskDeferred is raised.

Developers who want to implement an operator that starts the execution in a triggerer instead of a worker can write something like the following.

from airflow.models.baseoperator import BaseOperator
from airflow.triggers.testing import SuccessTrigger


class AsyncOperator(BaseOperator):
    start_trigger = SuccessTrigger()
    next_method = "execute_complete"

    def execute_complete(self, context, event=None) -> None:
        self.log.info("execute complete")

However, it raises a few issues.

  1. The DAG developer can overwrite the functionality of the whole operator by passing start_trigger and next_method like the following. Imagine the user user RdsCreateDbSnapshotOperator but passing a GCSBlobTrigger. We probably don't want the DAG developer to change the behavior of the operator, so maybe we can try something like inherits_from_empty_operator
with DAG(...) as dag:
    task = SomeOperator(
        task_id="task",
        start_trigger=SuccessTrigger(),
        next_method="execute_complete",
    )
  1. If we're trying to do something mentioned in point 1, which does not allow the DAG developer to change the operator behavior, will we let them decide whether they want to run op.execute or start_trigger implemented by the operator author? In the current implementation, if start_trigger is not None, op.execute will be ignored. This raises another issue. What if the deployed airflow does not have a triggerer running? Should we try something like _run_inline_trigger, or should we just fallback to op.execute? Or maybe we should add back the flag (was introduced a few commits ago in this PR) to decide whether to start the execution from triggerer? Yet another option is to block operator author to implement start_trigger and execute at the same time.

@Lee-W Lee-W force-pushed the starts-execution-from-triggerer branch 10 times, most recently from 81cf1d1 to 454c57c Compare April 17, 2024 00:22
Lee-W added 15 commits April 29, 2024 14:51
…r and use start_trigger to infer"

This reverts commit 8dd583f.
…o arg conflict with google provider"

This reverts commit ed20305.
…cution_from_triggerer attributes instead of properties"

This reverts commit 06f544a.
…er and _next_method can be directly deferred during scheduling
…od should be used

noted that dynamic task mapping is not supported when assigned in instance level
@Lee-W Lee-W force-pushed the starts-execution-from-triggerer branch from c71be36 to 25e12a3 Compare April 29, 2024 06:55
@Lee-W Lee-W force-pushed the starts-execution-from-triggerer branch from 25e12a3 to 26f044c Compare April 29, 2024 07:26
@Lee-W Lee-W merged commit 6112745 into apache:main Apr 29, 2024
41 checks passed
@Lee-W Lee-W deleted the starts-execution-from-triggerer branch April 29, 2024 11:19
RodrigoGanancia pushed a commit to RodrigoGanancia/airflow that referenced this pull request May 10, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants