Set dependencies in MappedOperator via XComArgs#20931
Conversation
There was a problem hiding this comment.
"badly" as part of the serialization code
airflow/airflow/serialization/serialized_objects.py
Lines 970 to 971 in 14a057f
|
The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease. |
There was a problem hiding this comment.
| def __init__(self, operator: "Union[BaseOperator, MappedOperator]", key: str = XCOM_RETURN_KEY): | |
| def __init__(self, operator: Union[BaseOperator, MappedOperator], key: str = XCOM_RETURN_KEY): |
Any reason for this change?
There was a problem hiding this comment.
To avoid uncessary imports/reduce chance import cycles.
There was a problem hiding this comment.
After #20945 we can probably change this to Operator, which would not cause import cycles.
Set upstream dependencies when an XComArg is used in a MappedOperator, and (de)serialize them correctly. We can only re-create XComArg at the DAG level as we need to get hold of the Operator (not just a task_id) so we need a two phase approach here: when deserializing operators we create them as a place-holder class (`_XcomRef`) and then "up a level" when deserializing the DAG we turn these back in to XComArg objects. (And in so doing we needed to fix a bug or two in serializing MappedOperator that have a DAG -- it caused a recursion error.)
Co-authored-by: Kaxil Naik <kaxilnaik@gmail.com> Co-authored-by: Ephraim Anierobi <splendidzigy24@gmail.com>
d28eb4f to
c692668
Compare
Set upstream dependencies when an XComArg is used in a MappedOperator, and (de)serialize them correctly.
We can only re-create XComArg at the DAG level as we need to get hold of the Operator (not just a task_id) so we need a two phase approach here: when deserializing operators we create them as a place-holder class (
_XcomRef) and then "up a level" when deserializing the DAG we turn these back in to XComArg objects.(And in so doing we needed to fix a bug or two in serializing MappedOperator that have a DAG -- it caused a recursion error.)
This PR could possibly be split in to two (one to set deps, a second to serialize them.)
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.