-
Notifications
You must be signed in to change notification settings - Fork 14.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[AIP-31] Create XComArg model #8052
Comments
Thanks for opening your first issue here! Be sure to follow the issue template! |
Hey @casassg any update on this issue? :) |
I've been a bit overwhelmed with the work from home situation. Will try to get to this on the weekend or next week. Otherwise, please feel free to take it from me 😄 |
Hey guys, the databand team and I decided to implement [AIP-31] starting tomorrow. |
Sounds great to me! I can help review and give directions. Let's sync some time! I also do plan to contribute some part |
now the main purpose of this XComArgs is about being able to inject “dependency” on another operator + route to fetch that dependency from XCom. I assume generic class is required around jinja templates baking + defining dependencies in the Arg. Let’s create class ArgBase that will have these 3 interfaces defined :
|
Should "normal" operators be also XComArgs? Once we have functional operators, should I as a user expect this DAG to work? with DAG("example"):
op1 = CreateClusterOperator("op", ...)
op2 = RunJobOperator("op2", cluster_name=op1)
# Or with custom XCom key
with DAG("example"):
op1 = CreateClusterOperator("op", ...)
op2 = RunJobOperator("op2", cluster_name=op1["cluster_id"]) If no, why not? |
I do not agree on this. Why do we need to stick to Jinja templates? Seems we are trying to fit the new solution to the existing workaround, when we should think of this as a more generic solution. I dont see why we need a
No let's use #8055 instead: with DAG("example"):
op1 = CreateClusterOperator("op", ...)
op2 = RunJobOperator("op2", cluster_name=op1.output)
# Or with custom XCom key
with DAG("example"):
op1 = CreateClusterOperator("op", ...)
op2 = RunJobOperator("op2", cluster_name=op1.output["cluster_id"]) |
An example would be cases when people build command lines, text messages, SQL queries, and so on based on the previous operator's results I personally don't like using Jinja templating, but till now, this was the only option for Airflow. I would implement it for "backward compatibility," not only on the technical side but also on the philosophical side. Also, it worth mentioning, I would not do that as separate features, but everything else (Jinja, Jinja resolving, templated_params is already part of airflow, so we just need to implement "to string" representation for this feature. |
The reasoning for base class is only because there is a proposal about having different implementation for the class. So it worth defining what is expected fro the new implementation in the base class. (for example what if some implementation doesn't use XCom at all, why should it inherit from XcomArg class? These are non-critical question, I am ok with no base class at all. |
Imho here we should be opinionated and "purely functional". Thus I would suggest using factory function (operator) like |
it's definitely an option for people who wants a "purely function" approach from the beginning. Most of Airflow users are already very invested in the current implementation without functional operators.
IMHO: ( as I wrote at current PR), we should decide before we push implementation into Airflow/master and we should base our decision on some examples we wrote with end to end usage of AIP-31 implementation. I see a point in keeping everything as simple as possible, at the same time, I want all users to be happy if it doesn't affect implementation in a bad way :) |
Regarding templating, I understand why you say that now. An option we implemented internally to do this was to add a My main worry is enabling the Jinja string is that it may cause some non-expected behaviour. For example, what happens when the value is not a string? I rather not enable it mostly because it still can be done using Regarding BaseArg class. I would say no as we are moving the extensibility piece to XCom itself. That should allow to inject custom serialization/deserialization behaviour consistently across Airflow and not only XComArg. Can't think of any other reason to overwrite XComArg apart from custom serial/deserial logic. |
I agree that Jinja + XCom was always problematic, however, most of the time it does work. if object is nonstr -> Jinja will convert it into str as it was doing for any other object coming directly from
My working assumption is that replacing XCom only is going to be not enough to implement "real" replacement for serialization/deserialization, but.. if I am wrong, we can easily implement that base class. So let's just wait till somebody will do that for real. |
Sounds good. Regarding Jinja, maybe a way is to make it If we keep XComArg as a simple XCom wrapper, that should work. Want to make sure we keep XComArg as low complex as possible, and if we need to add some extra XCom logic add it to XCom class itself instead. |
Description
XComArg is a class that references an XCom entry from an operator. Can be used to explicitly set XCom dependencies and explictly pass results between operators.
Class attributes:
operator: BaseOperator
: Origin operator instance.key: str
: Stores key for XCom value. Defaults toairflow.models.xcom. XCOM_RETURN_KEY
Class methods
get(context: dict)-> Any
: Resolves XCom value givenoperator
andkey
.__getitem__(key, str) -> XComArg
: Easy method to create new XComArg using the same operator but a different key.add_downstream_dependent(operator: BaseOperator)->None
: Add an operator as a downstream dependency ofoperator
. [Optional] Makes this task simpler.Proposed implementation: https://github.com/casassg/corrent/blob/master/corrent/xcom_arg.py
Use case / motivation
Stretch goal
The text was updated successfully, but these errors were encountered: