-
Notifications
You must be signed in to change notification settings - Fork 165
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introducing composability in the middle layer of Cosmos's API #895
Comments
Hey @dwreeves! Great to see you back with more insightful feedback for Cosmos. How have you been? Thank you for your detailed proposal and the effort you've put into identifying areas for improvement in the Cosmos API. Your suggestions for making the API more composable and accessible via subclassing of the Let's address your points one by one:
However, implementing these changes would require a significant refactoring of the As a maintainer, I can't make these changes directly, but I encourage you to submit a pull request with your proposed changes. This would allow the Cosmos community to review your proposal in detail and consider its potential benefits and drawbacks. In the meantime, you might consider creating a subclass of I hope this helps, and I look forward to seeing your pull request! Sources
|
[Daniel Reeves](https://www.linkedin.com/in/daniel-reeves-27700545/) (@dwreeves ) is an experienced Open-Source Developer currently working as a Data Architect at Battery Ventures. He has significant experience with Apache Airflow, SQL, and Python and has contributed to many [OSS projects](https://github.com/dwreeve). Not only has he been using Cosmos since its early stages, but since January 2023, he has actively contributed to the project: ![Screenshot 2024-05-14 at 10 47 30](https://github.com/astronomer/astronomer-cosmos/assets/272048/57829cb6-7eee-4b02-998b-46cc7746f15a) He has been a critical driver for the Cosmos 1.4 release, and some of his contributions include new features, bug fixes, and documentation improvements, including: * Creation of an Airflow plugin to render dbt docs: #737 * Support using dbt partial parsing file: #800 * Add more template fields to `DbtBaseOperator`: #786 * Add cancel on kill functionality: #101 * Make region optional in Snowflake profile mapping: #100 * Fix the dbt docs operator to not look for `graph.pickle`: #883 He thinks about the project long-term and proposes thorough solutions to problems faced by the community, as can be seen in Github tickets: * Introducing composability in the middle layer of Cosmos's API: #895 * Establish a general pattern for uploading artifacts to storage: #894 * Support `operator_arguments` injection at a node level: #881 One of Daniel's notable traits is his collaborative and supportive approach. He has actively engaged with users in the #airflow-dbt Slack channel, demonstrating his commitment to fostering a supportive community. We want to promote him as a Cosmos committer and maintainer for all these, recognising his constant efforts and achievements towards our community. Thank you very much, @dwreeves !
Hi, @dwreeves. I believe there are many great ideas in this proposal. What do you think if we break them into smaller changes so we can split them in the next releases of Cosmos? We can have a call to talk about this if it would help you. I'd love to see progress in this direction for 1.6, but as the task is described now, we wouldn't have bandwidth. |
[Daniel Reeves](https://www.linkedin.com/in/daniel-reeves-27700545/) (@dwreeves ) is an experienced Open-Source Developer currently working as a Data Architect at Battery Ventures. He has significant experience with Apache Airflow, SQL, and Python and has contributed to many [OSS projects](https://github.com/dwreeve). Not only has he been using Cosmos since its early stages, but since January 2023, he has actively contributed to the project: ![Screenshot 2024-05-14 at 10 47 30](https://github.com/astronomer/astronomer-cosmos/assets/272048/57829cb6-7eee-4b02-998b-46cc7746f15a) He has been a critical driver for the Cosmos 1.4 release, and some of his contributions include new features, bug fixes, and documentation improvements, including: * Creation of an Airflow plugin to render dbt docs: astronomer#737 * Support using dbt partial parsing file: astronomer#800 * Add more template fields to `DbtBaseOperator`: astronomer#786 * Add cancel on kill functionality: astronomer#101 * Make region optional in Snowflake profile mapping: astronomer#100 * Fix the dbt docs operator to not look for `graph.pickle`: astronomer#883 He thinks about the project long-term and proposes thorough solutions to problems faced by the community, as can be seen in Github tickets: * Introducing composability in the middle layer of Cosmos's API: astronomer#895 * Establish a general pattern for uploading artifacts to storage: astronomer#894 * Support `operator_arguments` injection at a node level: astronomer#881 One of Daniel's notable traits is his collaborative and supportive approach. He has actively engaged with users in the #airflow-dbt Slack channel, demonstrating his commitment to fostering a supportive community. We want to promote him as a Cosmos committer and maintainer for all these, recognising his constant efforts and achievements towards our community. Thank you very much, @dwreeves !
TLDR
Sorry, this is long!!!! But there is a lot to cover. This is a massive proposal to reorganize massive chunks of Cosmos.
Here's a TLDR:
Introduction
At my last company, I built a Dbt Airflow integration prior to the existence of Cosmos. At my current company, we are using Cosmos.
There are a handful of features I really miss from that custom integration. The big one is, when I have a
ref()
to a task in a different DAG, we'd automatically create an external task sensor to the dbt node in that separate DAG. I don't think Cosmos should necessarily implement that feature, but I do think right now it would be extremely complicated to do so as a custom feature in an Airflow deployment.Background: Issues with custom implementations today
Right now, the only real way to dive deep into Cosmos's internals is to use the
node_converters
API. This is typed as adict[DbtResourceType, Callable[..., Any]]
, but a more accurate typing would bedict[DbtResourceType, NodeConverterProtocol]
with the following definition forNodeConverterProtocol
:Where the
kwargs
don't do anything.This API is hard to work with, to be honest. Imagine for example you want to use a variable in the DbtNode's
config
to implement custom functionality. You'd have to do something like this:This is essentially a method override, but with a few oddities.
First, you're not actually overriding a method, you're wrapping a function.
Second, you need to implement the custom node converter for each node type with the dict. Instead of the node converter callback dispatching the logic by node type, the node converter callback itself is dispatched. This is despite the fact that the node type is already inside the args (inside
node.resource_type
) and the only real difference across node types is the Dbt operator type.Third, the API is very large. There are 8 total args, including one feature (
on_warning_callback
) that is very niche and is just an operator arg for tests.on_warning_callback
specifically is not very well future-proofed, since if more test-specific operator args are added, the API currently demands that these be laid out as new kwargs to the node converter protocol.Fourth, many of the args are not (reasonably) task-specific; inside of a single
DbtTaskGroup
orDbtDag
, a lot of these variables can be accessed via shared state in aDbtToAirflowConverter
object viaself
:dag
task_group
execution_mode
*test_behavior
*For the latter two, you can imagine a user who wants to dispatch custom execution mode or test behavior logic in a node-specific way, rather than using a shared implementation across each task in the node. These niche situations for dispatching implementations can and should be supported, but it doesn't need to be supported via expanding the function signature; they can be implemented via method overrides.
After more dissecting of the Cosmos API, there are other places in the API where things don't feel quite right. For example,
DbtGraph().load()
uses two args:method
andexecution_mode
. What doesn't make sense is that these args are already variables available in the state of theDbtGraph
object, since theDbtGraph
is initialized with therender_config
andexecution_config
. I think it would make sense for the load method to be an optional kwarg that can override the config, rather than duplicate the config by the callee. I'm slightly picking onDbtGraph().load()
because (A) this is a common theme and (B) the idea of using kwargs to override config state, rather than just be duplicative of config state, is a good pattern for supporting subclassing. In my Proposed API, I do this a couple times: a method is called, by default, without any args/kwargs, but the implementation supports args/kwargs. The idea here is by default, all args should originate from config state, but another callee has an interface for overriding the config without actually mutating the config's state.Proposed API
I think that a more composable interface should be accessible via subclassing of the
DbtToAirflowConverter
.Here is a rough draft proposal. I've excluded a lot of the code necessary to make this work; I've only focused on the important bits, and added in-line comments about the philosophies behind these changes.
This code is very far from done, and I've skipped over a lot of details of the implementation. I put a lot of effort into defining the highest-most layer of the API, and defined the relevant methods for the middle-layer of the API, but I leave a few things beyond that up to imagination.
A lot of this will be controversial, I am aware. This is not a final proposal. I hope that this can be discussed and iterated on with others!
What this enables
I want to show just a small handful of the many cool things that all of these API changes enable for end users!
Custom business logic
Let's say a user has 1000 dbt models, and 3 of them (
foo
,bar
, andbaz
) need special operator args; for sake of argument, let's say it's just to setretries
to 5. The user could then do that with this:retries
is a little contrived, but you can imagine something less contrived, like a customprofile_config
for writing to a database with different credentials than the default (e.g. a Snowflake deployment with a complex IAM structure) or using a custompool
to control concurrency. Theprofile_config
example in particular is noteworthy because there is a proposal right now to allow for custom operator args to be supported via the dbt YAML files, but this approach would not support un-serializable objects, so you wouldn't be able to support a customprofile_config
through this API.External Task Sensors and External DAG knowledge
This is my own white whale. What I really want to do is implement a system where I can split my single dbt project up into multiple DAGs running on multiple schedules, but these DAGs are all aware of tasks in other DAGs.
Say, for example, a user has a custom tagging system that looks like this:
What they want is, whenver
foo
isref()
'd, they want to have that point to anExternalTaskSensor
that points tomy_airflow_dag.foo_run
.This can be implemented in just a few lines of code with the following custom code, using the above API changes:
Now the user has a super cool custom feature for their own Cosmos deployment!
Xcoms in dbt vars / env with
LoadMode.DBT_LS
One of my changes is, essentially, to walk back the following deprecation enforced in
validate_initial_user_config()
:I think it's correct to walk this back, and there is a practical situation for when and why you need to distinguish
env
andvars
in different command invocation contexts: Airflow's own Jinja template rendering.Right now, using Xcoms with
--vars
and usingLoadMode.DBT_LS
do not always mix. The reason is simple:{{ ti.xcoms_pull(task_ids="foo") }}
gets rendered appropriately in task execution, but it is a string literal that never touches a Jinja environment when rendering the DAG. More concretely:dbt_vars={"logical_date": "{{ logical_date }}"}
will render as e.g.{"logical_date": "2024-03-14"}
when executing a task.dbt_vars={"logical_date": "{{ logical_date }}"}
will render as literally{"logical_date": "{{ logical_date }}"}
when loading the DAG.A user who is using
LoadMode.DBT_LS
will possibly want to set a default for the logical date when loading, which parses as a valid ISO date, e.g."1970-01-01"
ordate.today()
. E.g. the dbt context may do something like{% set year, month, date = get_var("logical_date", "1970-01-01").split("-") %}
, which would work in execution but would fail in rendering/loading.Other niche things
These are more niche, but still cool that they're supported!
The most useful aspect of these "niche things" is that these features open up development of Cosmos itself. Right now, testing an experimental feature change in a production instance of Cosmos is annoying, as there is no easy way to override chunks of the code. Unless you want to put a ton of effort into it, you are mostly just beholden to whatever the features are in the latest release.
So although there are some niche things you can do with the API changes that don't provide direct support to the vast majority of users, they allow people who are developing Cosmos itself to do so much more easily.
Here is one example that uses custom DbtGraph parsing. When I say custom, I mean implementing your own method for parsing, not the custom load method. Say for example you want to experiment with whether adding
lru_cache
toDbtGraph.load()
improves performance somehow. You could write the following:And now you have code you can test immediately in your production instance. Experimenting with a feature like this in an existing Airflow deployment is next to impossible in the current iteration of Cosmos. But with the API changes I've proposed, there is now an avenue for testing something experimental like this in a real production instance.
Endless possibilities
I actually don't know what all the features are that users want. It's hard to come up with things. I think that's the point, though! Cosmos does not need to anticipate every single user need, it just needs to anticipate what users need to have access to in order to implement their own needs.
Things still left
getattr()
, but there is probably a better way than that which uses a more explicit interface.DbtGraph.load
into two methods (load
andload_with_automatic_method
) is something that can be implemented in a fully backwards compatible way.The text was updated successfully, but these errors were encountered: