-
Notifications
You must be signed in to change notification settings - Fork 13.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Resolve upstream tasks when template field is XComArg #8805
Conversation
d40c9d9
to
252be10
Compare
for field in self.template_fields: | ||
arg = getattr(self, field) | ||
|
||
def apply_set_upstream(arg: Any): | ||
if isinstance(arg, XComArg): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this traversing part should be very "error prune". wee are starting to traverse all user templated args with no exception (old code is affected), it can be that something will fail and this feature will become a "blocker" to update to the new version
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it can be that something will fail and this feature will become a "blocker" to update to the new version
Can you provide an example? We perform an action only when we encounter XComArg
so self.set_upstream
should work. If it doesn't work then it means that our implementation is corrupted.
airflow/providers/google/marketing_platform/operators/display_video.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add a test that checks the up/down stream dependencies work with Dag Serialization enabled.
0660ed3
to
4b4af57
Compare
@ash @kaxil @evgenyshulman are we still sure that this way it's better than having a metaclass? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
templated classes aren't handled yet.
Additionally, https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-31%3A+Airflow+functional+DAG+definition doesn't have this .output
attribute.
Where has this concept come from? It wasn't in the AIP.
It was proposed in #8055 . Without it mixing functional and non-functional operators will be hard. |
46eab09
to
6743e4e
Compare
@ashb @evgenyshulman @casassg I think all questions were addressed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm overall 🚀
3536855
to
745d0a3
Compare
Co-authored-by: Ash Berlin-Taylor <ash_github@firemirror.com>
* Resolve upstream tasks when template field is XComArg closes: apache#8054 * fixup! Resolve upstream tasks when template field is XComArg * Resolve task relations in DagRun and DagBag * Add tests for serialized DAG * Set dependencies only in bag_dag, refactor tests * Traverse template_fields attribute * Use provide_test_dag_bag in all tests * fixup! Use provide_test_dag_bag in all tests * Use metaclass + setattr * Add prepare_for_execution method * Check signature of __init__ not class * Apply suggestions from code review Co-authored-by: Ash Berlin-Taylor <ash_github@firemirror.com> * Update airflow/models/baseoperator.py Co-authored-by: Ash Berlin-Taylor <ash_github@firemirror.com>
When we are referring ``{{task}}` in jinja templates we can also refer some of the fields, which are templated. We are not able to solve all the problems with such rendering (specifically recursive rendering of the fields used in JINJA templating might be problematic. Currently whether you see original, or rendered field depends solely on the sequence in templated_fields. However that would not even explain the rendering problem described in apache#13559 where kwargs were defined after opargs and the rendering of opargs **should** work. It turned out that the problem was with a change introduced in apache#8805 which made the context effectively holds a DIFFERENT task than the current one. Context held an original task, and the curren task was actually a locked copy of it (to allow resolving upstream args before locking). As a result, any changes done by rendering templates were not visible in the task accessed via {{ task }} jinja variable. This change replaces the the task stored in context with the same copy that is then used later during execution so that at least the "sequential" rendering works and templated fields which are 'earlier' in the list of templated fields can be used (and render correctly) in the following fields. Fixes: apache#13559
When we are referring ``{{task}}` in jinja templates we can also refer some of the fields, which are templated. We are not able to solve all the problems with such rendering (specifically recursive rendering of the fields used in JINJA templating might be problematic. Currently whether you see original, or rendered field depends solely on the sequence in templated_fields. However that would not even explain the rendering problem described in #13559 where kwargs were defined after opargs and the rendering of opargs **should** work. It turned out that the problem was with a change introduced in #8805 which made the context effectively holds a DIFFERENT task than the current one. Context held an original task, and the curren task was actually a locked copy of it (to allow resolving upstream args before locking). As a result, any changes done by rendering templates were not visible in the task accessed via {{ task }} jinja variable. This change replaces the the task stored in context with the same copy that is then used later during execution so that at least the "sequential" rendering works and templated fields which are 'earlier' in the list of templated fields can be used (and render correctly) in the following fields. Fixes: #13559
closes: #8054
Make sure to mark the boxes below before creating PR: [x]
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.
Read the Pull Request Guidelines for more information.