-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RFC: default_asset_attrs #21387
RFC: default_asset_attrs #21387
Conversation
e4debde
to
2309594
Compare
e9993ad
to
9beb263
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mechanically i think this is the best way to do this type of feature. I think this is a nice power feature for power users.
df8a0a2
to
c231010
Compare
9beb263
to
773314a
Compare
What does this get us over splatting or |
Writing these out so we can compare: splatting:my_asset_args = dict(group_name="abc", auto_materialize_policy=AutoMaterializePolicy.eager())
@asset(**my_asset_args)
def asset1(): ...
@my_asset(owners="bar@foo.com", **my_asset_args)
def asset2(): ... Some downsides here:
functools.partial:import functools
my_asset = functools.partial(asset, group_name="abc", auto_materialize_policy=AutoMaterializePolicy.eager()))
@my_asset
def asset1(): ...
@my_asset
def asset2(): ... Some downsides here:
[1] merging metadata entries and tagsIt would be useful to be able to do this: with default_asset_attrs(tags={"domain": "marketing"}):
@asset(tags={"is_pii": "true"})
def asset1(): ...
@asset
def asset2(): ...
assert asset1.tags == {"domain": "marketing", "is_pii": "true"} |
I think this is kinda masking a problem, in that we can't pass partial args around as a first-class value to the My initial reaction is to do something like this: # We have AssetSpec, which is a bag of all the params to the asset decorator,
# and PartialAssetSpec, which is some subset of them set (i.e. all are optional).
# (We could bikeshed the PartialAssetSpec name a lot, btw. Perhaps it should just be
# AssetSpec)
from dagster import PartialAssetSpec
default_spec = PartialAssetSpec(
group_name="marketing",
auto_materialize_policy=AutoMaterializePolicy.eager(),
)
# Create 2 assets that use this spec
@asset(default_spec)
def asset1(): ...
@asset(default_spec)
def asset2(): ...
# Create an asset that merges two PartialAssetSpecs
default_spec = PartialAssetSpec(tags={"domain": "marketing"})
# this `extends` thing does a smart merge of the tags
@asset(PartialAssetSpec(tags={"is_pii": "true"}, extends=[default_spec])) # open to something other than `extends`
def asset1(): ...
@asset(default_spec)
def asset2(): ...
# We can also have a sugared version of this if we treat the args to `@asset` as identical to those of `AssetSpec` or `PartialAssetSpec`
@asset(tags={"is_pii": "true"}, extends=[default_spec])
def asset1(): ... I also have some concerns about the nonlocal behavior of this context manager. I think that it could lead to really confusing bugs where an integration author assumes that an asset is getting the default behavior, when it is overridden higher up in the stack. It would be really hard for a user to debug the interaction between the integration code (which they don't see normally) and the contextmanager, which could be far removed from that integration depending on how many layers of abstraction the user has put in place. Explicitly threading through the |
Yeah this is definitely a downside of the context manager approach and one of the main things that makes me not fully confident in it.
To be a little pedantic, that sounds like a solution more than a problem. Though I agree that it's a solution worth having in play. It addresses the partial metadata issue I raised above. As a lover of functional programming and as someone who's very suspicious of any mutations to global state, I personally find it very appealing. Main cons that come to mind:
@asset
def asset1(): ...
@asset(tags={"foo": "bar"})
def asset2(): ...
@multi_asset(specs=[AssetSpec("asset3"), AssetSpec("asset4")])
def asset3_and_4(): ...
asset5 = my_utils.make_s3_ingest_asset("asset5", ...) |
Backing up a bit, one of the things I've been thinking about while approaching this problem is how this would look in a YAML. No idea on the specifics, but something like one of these seems perhaps the simplest: assets:
shared_attrs:
group: marketing
schedule: daily
list:
- type: python
module: a.b
- type: shell
command: a.sh or context:
asset_attrs:
group: marketing
schedule: daily
assets:
- type: python
module: a.b
- type: shell
command: a.sh The |
My initial reaction is that it's a bad idea tbh. These implicit context things tend to only work if they are used sparingly. The more code that depends on this implicit context the harder it gets to reason about what's going on.
I disagree on this one. The asset factory pattern shows up everywhere. We will definitely want a way to pass around subsets of these parameters without copy/pasting long arg lists up and down the callstack.
Agreed but I think that 99% of the time users will be using the last example in my comment, where the
I think this is a feature, not a bug. The trade-off is, fundamentally, implicit-with-nonlocal-behavior vs explicit-with-verbosity. I think avoiding nonlocal behavior is more important than avoiding a bit of (expressive IMO) verbosity. |
I think it is nice in YAML because it (depending on how you implement it) wouldn't have nonlocal semantics. In Python that's a different story. Let me show you what I mean. I would imagine the YAML example you provided: context:
asset_attrs:
group: marketing
schedule: daily
assets:
- type: python
module: a.b
- type: shell
command: a.sh would desugar to something like this: asset_attrs = PartialAssetSpec(group_name='marketing', ...)
defs = Definitions(assets=[
*run_plugin(type='python', asset_attrs=asset_attrs, config={'module': 'a.b'}),
*run_plugin(type='shell', asset_attrs=asset_attrs, config={'command': 'a.sh'}),
]) You could imagine the from importlib import import_module
def run_plugin(type: str, asset_attrs: PartialAssetSpec, config: dict[str, Any]) -> Sequence[AssetsDefinition]:
# plugins install themselves in a well-known location
plugin_module = import_module(f'dagster.yaml.plugins.{type}')
# plugins follow a protocol where they expose a create_assets() function that returns Sequence[AssetsDefinition]
return plugin_module.create_assets(asset_attrs, config) If we did it this way, the user at the YAML layer can get their nice concise config file, and the plugin authors don't need to worry about spooky action at a distance |
I'm pretty skeptical of this.
|
To motivate this a little more, one of the things I'm thinking about is what the Dagster starter example looks like. What we're heading towards is something like the following. While this isn't a mountain of code, a message I get from reading it is "man, Dagster's asset-based approach means I need to copy/paste my schedule in a lot more places than I would with orchestrator X". daily_schedule = AssetSchedulingPolicy.cron("@daily")
@asset(scheduling_policy=daily_schedule)
def asset1(): ...
@asset(deps=[asset1], scheduling_policy=daily_schedule)
def asset2(): ...
@asset(deps=[asset1, asset2], scheduling_policy=daily_schedule)
def asset3(): ... (Btw this is another potential solution to this problem: https://github.com/dagster-io/internal/discussions/8216) |
I am willing to be convinced that this is a problem but I am not there yet. When React came out it didn't have any of the dynamic scoping or two-way binding stuff that competitors had. This had the result of making it much more verbose than alternatives (and it still is), since everything had to be passed and handled explicitly (side note: at the time I felt like this was a big problem and shipped a userland feature to address it that no one ended up using) The explicitness had the nice side effect of making everything introspectable, explainable and user-customizable. With that said, I think some daily_schedule = AssetSchedulingPolicy.cron("@daily")
@asset(scheduling_policy=daily_schedule)
def asset1(): ...
@asset(deps=[asset1], scheduling_policy=daily_schedule)
def asset2(): ...
@asset(deps=[asset1, asset2], scheduling_policy=daily_schedule)
def asset3(): ...
defs = Definitions(assets=[asset1, asset2, asset3]) Could be expressed as: default_spec = PartialAssetSpec(scheduling_policy=AssetSchedulingPolicy.cron("@daily"))
defs = Definitions(
assets=assets_with_defaults([asset1, asset2, asset3], default_spec)
) And then we'd offer a more sugared version: defs = Definitions(
assets=assets_with_defaults(
[asset1, asset2, asset3],
scheduling_policy=AssetSchedulingPolicy.cron('@daily'),
),
) There are probably other ways we could do this too but I think this is a nice conservative version that is more of a two-way door. |
FWIW I think the default scheduling behavior should be for an asset to be included in an explicit cron schedule upstream but that is a separate topic and discussion. |
Going to close this for now on account of:
I think next step here is to write up a fuller problem statement and slate of options. |
## Summary & Motivation There are diverse situations where it's useful to be able to pass around collections of definitions. Doing this in Dagster right now is quite awkward. Relevant situations: - Writing a factory that returns a few assets, a few checks, and a job that executes them together. - A submodule defines a few assets, a job, and a schedule. A different submodule does the same. They should all be in the same code location. - #21387 (comment) - https://github.com/dagster-io/internal/discussions/8216 Unlike the [prior version of this PR](#21746), this PR does _not_ turn `Definitions` into a "dumb" data class. I.e. it still validates that all assets and jobs can be bound, at construction time. ## How I Tested These Changes
## Summary & Motivation There are diverse situations where it's useful to be able to pass around collections of definitions. Doing this in Dagster right now is quite awkward. Relevant situations: - Writing a factory that returns a few assets, a few checks, and a job that executes them together. - A submodule defines a few assets, a job, and a schedule. A different submodule does the same. They should all be in the same code location. - dagster-io#21387 (comment) - dagster-io/internal#8216 Unlike the [prior version of this PR](dagster-io#21746), this PR does _not_ turn `Definitions` into a "dumb" data class. I.e. it still validates that all assets and jobs can be bound, at construction time. ## How I Tested These Changes
Summary & Motivation
It's a pain to apply the same attribute to a bunch of different assets. As we make auto-materialize policies / scheduling conditions more central, this pain will show up more often.
This PR proposes using a context manager to set asset attributes in bulk. This is not an idea I am entirely convinced is a good one yet, but wanted to float it and get reactions.
I just wanted to get the idea across, so, out of laziness, I only implemented a couple asset attributes. But the idea would be to make this work for at least the following asset attributes:
How I Tested These Changes