-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[DS][29/n] Create ParentNewerCondition #21641
Conversation
745c94c
to
ff7943c
Compare
9eba99d
to
20b9c4a
Compare
ff7943c
to
5b377a0
Compare
20b9c4a
to
b9c8dfd
Compare
5b377a0
to
449ff84
Compare
b9c8dfd
to
01aafa4
Compare
01aafa4
to
ef8815e
Compare
d01bb66
to
389c08d
Compare
b39eb21
to
aa9b7f1
Compare
389c08d
to
6a8d1b0
Compare
aa9b7f1
to
393055f
Compare
def compute_asset_partitions(self) -> AbstractSet[AssetKeyPartitionKey]: | ||
return self._compatible_subset.asset_partitions | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should consider a scarier name for this to indicate that you should really understand the constraints of the callsite before calling it, as this will materialize the entire partition range in memory.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is that functionally all that different from the potential impact of other types of compute_*()
functions? Many of those involve actual round trips to the database, which is comparatively expensive to iterating through a ton of time window partitions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I basically think operation should be illegal except in very specific contexts, and it should be named accordingly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hm I disagree -- I think this is a fairly standard operation that is a sub routine of plenty of other similarly-named operations (e.g. intersecting two AssetSlices requires computing all the keys in each slice), and so if we were to make this illegal, we'd need to make intersections and unions etc. illegal as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel pretty strongly this. We can change this later, but for now I would prefer the name
expensively_materialize_partition_key_range_in_memory
or something ridiculous like that is very easily greppable. This is not a public API so we can codemod it later.
But that will make people think about this in the short-term and make sure that we are in a context where there aren't huge partition ranges.
@gibsondan keeps on running into this in py-spy
runs so I consider this "on fire" at the moment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok I updated the name to compute_asset_partitions_EXPENSIVE
, which I think is a sufficient deterrent for now, and we can move this discussion to a future diff
.../dagster/dagster/_core/definitions/declarative_scheduling/operands/parent_newer_condition.py
Outdated
Show resolved
Hide resolved
def description(self) -> str: | ||
return "At least one parent has been updated more recently than the candidate." | ||
|
||
def compute_newer_parents( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is the relationship between newer parents and our new language around "unsynced"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right now, the main differences are:
- this does not consider data versions for materialized assets, meaning a parent update which does not modify the data version will still result in the asset being marked as having a parent updated (it does consider data versions for asset observations, however)
Note that currently, code-versioned assets with no parents will only get new data versions when their code versions are updated. This means that for dbt assets (which are assigned code versions by default), parents would almost never be considered updated if we didn't have this weird definition.
- this definition handles a slight edge case bug with observable source asset versioning, wherein if you make an observation with a data version, then another observation without any data version, then that counts as a "new data version".
.../dagster/dagster/_core/definitions/declarative_scheduling/operands/parent_newer_condition.py
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Super supportive of the direction and thanks for the clear PR summary. It looks very good overall.
I think this is worth a brief in-person convo to talk through the subtleties, if nothing else so that I fully understand the semantics here. Getting this as right as possible upfront is important as this is such a critical rule.
6a8d1b0
to
7732a0a
Compare
0da2923
to
0d2ac17
Compare
7732a0a
to
09e1d91
Compare
0d2ac17
to
96789db
Compare
All caps too aggressive aesthetically imo
…On Tue, May 14 2024 at 9:43 AM, OwenKephart < ***@***.*** > wrote:
***@***.**** commented on this pull request.
In python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py
( #21641 (comment)
) :
> + def compute_asset_partitions(self) ->
AbstractSet[AssetKeyPartitionKey]:
+ return
self._compatible_subset.asset_partitions
+
Ok I updated the name to compute_asset_partitions_EXPENSIVE , which I think
is a sufficient deterrent for now, and we can move this discussion to a
future diff
—
Reply to this email directly, view it on GitHub (
#21641 (comment) ) ,
or unsubscribe (
https://github.com/notifications/unsubscribe-auth/AG3IK6KLIK7SC35FNPNQAY3ZCIIH7AVCNFSM6AAAAABHF7TBCOVHI2DSMVQWIX3LMV43YUDVNRWFEZLROVSXG5CSMV3GSZLXHMZDANJVGQZDIMJXGY
).
You are receiving this because your review was requested. Message ID: <dagster-io/dagster/pull/21641/review/2055424176
@ github. com>
|
bc62b76
to
e5a2f0d
Compare
Merge activity
|
09e1d91
to
1a917ab
Compare
e5a2f0d
to
7ec169c
Compare
7ec169c
to
2013a99
Compare
## Summary & Motivation Originally, this was conceived as a condition that would be used as part of an `any_deps_match()` sort of context. There are a few problems with this: 1. All other conditions are simply true or false for a given asset partition, regardless of which child_partition we consider to be the "downstream". e.g. an asset partition is unambiguously missing or not missing. However, if you have A -> B with A unpartitioned and B partitioned, then it's possible for A to be newer than some partitions of B but older than others. This means that if we wanted to evaluate something like: `any_deps_match(missing() | newer_than_child())`, then we would have to evaluate the inner expression of missing | newer_than_child on a per-child-partition basis, which is both expensive and extremely challenging to visualize 2. We want to eventually implement this via the StaleStatusResolver, which fits way better with the mental model of "a child partition can have the status (parent is newer)" rather than "a child partition can have the status (any parent has the status (newer than child partition))" The downside is that this does not compose as nicely with other conditions, but I think this is actually ok. For example, we don't support "all parents newer" out of the box, but I think this is a good thing, as that sort of behavior is actually a bit of a footgun, which leads to weird synchronization issues if you (e.g.) materialize one of the two parents, then manually materialize the child. Now you're on a weird new cadence in which your child will _not_ materialize after the second parent updates. For most cases in which you want to wait for more than one parent to update, you're either filling in time partitions as they come in (and so you can just wait for no parent partitions to be missing, rather than all parent partitions to update), or you want to materialize an unpartitioned asset regularly (in which you can wait for all parent partitions to be updated since a given cron tick, rather than waiting for them all to be updated more recently than you have updated). ## How I Tested These Changes
Summary & Motivation
Originally, this was conceived as a condition that would be used as part of an
any_deps_match()
sort of context. There are a few problems with this:any_deps_match(missing() | newer_than_child())
, then we would have to evaluate the inner expression of missing | newer_than_child on a per-child-partition basis, which is both expensive and extremely challenging to visualizeThe downside is that this does not compose as nicely with other conditions, but I think this is actually ok. For example, we don't support "all parents newer" out of the box, but I think this is a good thing, as that sort of behavior is actually a bit of a footgun, which leads to weird synchronization issues if you (e.g.) materialize one of the two parents, then manually materialize the child. Now you're on a weird new cadence in which your child will not materialize after the second parent updates.
For most cases in which you want to wait for more than one parent to update, you're either filling in time partitions as they come in (and so you can just wait for no parent partitions to be missing, rather than all parent partitions to update), or you want to materialize an unpartitioned asset regularly (in which you can wait for all parent partitions to be updated since a given cron tick, rather than waiting for them all to be updated more recently than you have updated).
How I Tested These Changes