Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[docs] - Partitioned multi asset sensor examples #9722

Merged

Conversation

clairelin135
Copy link
Contributor

Adds examples to multi_asset_sensor documentation to detail several use cases for monitoring asset partitions:

  • Triggering a partitioned run after a corresponding upstream partition is materialized
  • Updating a weekly asset partition when upstream daily partitions are materialized or replaced
  • Materializing a daily asset partition if both upstream daily partitions are materialized

@vercel
Copy link

vercel bot commented Sep 16, 2022

The latest updates on your projects. Learn more about Vercel for Git ↗︎

3 Ignored Deployments
Name Status Preview Comments Updated
dagit-storybook ⬜️ Ignored (Inspect) 💬 Add your feedback Oct 11, 2022 at 6:46PM (UTC)
dagster ⬜️ Ignored (Inspect) Oct 11, 2022 at 6:46PM (UTC)
dagster-oss-cloud-consolidated ⬜️ Ignored (Inspect) 💬 Add your feedback Oct 11, 2022 at 6:46PM (UTC)

@clairelin135
Copy link
Contributor Author

clairelin135 commented Sep 16, 2022

Side note: these code examples are all of the OR case (e.g. if ANY upstream partition is replaced). We don't handle the AND case as well--for example, materializing a partition only if both upstreams have been replaced.

I think the majority of cases will be the OR case, though I wouldn't be surprised if users also wanted support for the AND case. It's a challenge because we can't evaluate materializations out of order due to the current cursor approach, so we may need to consider other options depending on how high-priority this use case is

@clairelin135 clairelin135 marked this pull request as ready for review September 16, 2022 22:18

Returns a dictionary mapping the `AssetKey` for each monitored asset to the most recent materialization record. If there is no materialization event, the mapped value will be `None`
| Method | Description |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is duplicative of the API reference, and I think we should just point people there to avoid the concept page getting massive. Thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with this - I feel like this page is already really unwieldy.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it--I will remove this

if downstream_partitions: # Check that a downstream daily partition exists
# Upstream daily partition can only map to at most one downstream daily partition
yield downstream_daily_job.run_request_for_partition(
downstream_partitions[0], run_key=None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can omit run_key now

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we now prefer returning a list of RunRequests rather than yielding them one by one, because it makes it clear that runs aren't requested at yield time but rather once the function has returned.

to_asset_key=AssetKey("downstream_daily_asset"),
)
if downstream_partitions: # Check that a downstream daily partition exists
# Upstream daily partition can only map to at most one downstream daily partition
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a little unclear to me what this comment means. Does this apply to all partitioned asset sensors, or just to this particular situation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This applied to just this particular situation. In any case, I am removing this code example in favor of only displaying the example of monitoring multiple upstream assets.

)


@multi_asset_sensor(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you consider including multiple assets here? It would make the code more complex, but I think there's value in the general case. It's easy for users to adapt a multi-asset example to a single-asset example, but harder for them to go the other way around.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense. I've updated this code example instead to be of the multi-asset case

@erinkcochran87 erinkcochran87 changed the title Partitioned multi asset sensor examples [docs] - Partitioned multi asset sensor examples Sep 22, 2022
@erinkcochran87 erinkcochran87 added the area: docs Related to documentation in general label Sep 22, 2022

Returns a dictionary mapping the `AssetKey` for each monitored asset to the most recent materialization record. If there is no materialization event, the mapped value will be `None`
| Method | Description |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with this - I feel like this page is already really unwieldy.


If a partition mapping is not defined, Dagster will use the default partition mapping, which is the <PyObject object="TimeWindowPartitionMapping"/> for time window partitions definitions and the <PyObject object="IdentityPartitionMapping"/> for other partitions definitions. The <PyObject object="TimeWindowPartitionMapping"/> will map an upstream partition to the downstream partitions that overlap with it.

#### Additional examples
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thoughts on making this a callout instead of a section with a heading? I almost missed it when looking at the preview, and a callout could make it stand out.

Ex:

Looking for more? Check out the Examples section!

- [Updating a weekly asset partition when upstream daily partitions are materialized]()
- [Materializing a daily asset partition when both upstream daily partitions are materialized]()

@@ -721,6 +761,89 @@ def uses_db_connection():

If a resource you want to initialize has dependencies on other resources, those can be included in the dictionary passed to <PyObject object="build_resources"/>. For more in-depth usage, check out the [Initializing Resources Outside of Execution](/concepts/resources#initializing-resources-outside-of-execution) section.

### Monitoring asset partitions with sensors

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a list of links to the subsections here? It makes it easy to see what's in this bit and jump around. Ex:

- [Updating a weekly asset partition when upstream daily partitions are materialized]()
- [Materializing a daily asset partition if both upstream daily partitions are materialized]()

@@ -721,6 +761,89 @@ def uses_db_connection():

If a resource you want to initialize has dependencies on other resources, those can be included in the dictionary passed to <PyObject object="build_resources"/>. For more in-depth usage, check out the [Initializing Resources Outside of Execution](/concepts/resources#initializing-resources-outside-of-execution) section.

### Monitoring asset partitions with sensors

#### Updating a weekly asset partition when upstream daily partitions are materialized
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is probably a nit, but if there's a way to shorten this heading (and the other one in this section) I think it's worth a shot. It's getting squashed in the page nav and is pretty long for a heading in general.


#### Materializing a daily asset partition if both upstream daily partitions are materialized

The following example monitors two upstream daily-partitioned assets, kicking off a run in the downstream daily-partitioned asset if any upstream daily partition is replaced and the other upstream daily partition has an existing materialization.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend breaking this sentence up into two, at a minimum. It's currently a bit hard to read + absorb due to its length.

…laire/docs/partitioned-multi-asset-sensor-examples
@clairelin135
Copy link
Contributor Author

@sryza @erinkcochran87 I added updates to this page per your feedback, including:

  • Featuring the multi-asset monitoring example at the forefront (materializing a daily asset when upstream partitions from 2 assets are materialized). Also deleted the original daily -> daily partition asset example for brevity.
  • Formatting changes and text rewording
  • Miscellaneous code changes

I think this is ready for you to take another look!

asset_keys=[AssetKey("upstream_daily_1"), AssetKey("upstream_daily_2")],
job=downstream_daily_job,
)
def trigger_daily_asset_if_all_upstream_partitions_materialized(context):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a pretty complex implementation for such a common use case. I wonder if there's additional utility methods we could supply to make it simpler?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm... I updated this example to take your feedback into account.

One small optimization I can think of is to allow all_partitions_materialized to accept a list of asset keys, or use all monitored asset keys by default. But aside from that, it's hard to think of more because we are constrained to advancing the cursor for each monitored asset key sequentially

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we had a method that returned a dictionary that mapped each partition key to a list or set of asset keys that had materializations with that partition? (We would probably want to exclude partitions with no materializations or something). Then the user could just iterate through that list?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we could do that. If we did that we'd have to advance all of the cursors in that call (unless if we plan on passing the materialization objects to the user within the call).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(unless if we plan on passing the materialization objects to the user within the call).

Is there a downside to doing that? I think in general it's better to separate the methods that return information from the methods that advance the cursor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the only downside is that the returned object gets a little bit more complicated, it would be Mapping[str, Mapping[AssetKey, List[EventLogEntry]]]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree that we want to avoid advancing the cursor in this call though

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could it just be Mapping[str, Tuple[AssetKey, EventLogEntry]? I.e. only return the latest entry for that asset-partition combo?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we can do that.

One nuance with doing something like this is that the materializations will be returned out of order (from the order they occurred). So if we provided this method, users would have to use context.advance_all_cursors instead of context.advance_cursor.

For example, if asset A had partition A materialized and then partition B, and asset B had partition B materialized and then partition A, this method would return:

{
  partition A: [(Asset A, partition a materialization), (Asset B, partition B materialization)],
  partition B: [(asset A, partition b materialization), (Asset B, partition B materialization)]
}

And so calling advance_cursor (instead of advance_all_cursors) for partition B after partition A would backtrack the cursor for partition B

@sryza
Copy link
Contributor

sryza commented Sep 27, 2022

This is looking in the right direction! I like it how it is with 2 examples instead of 3.

@clairelin135
Copy link
Contributor Author

@sryza The code example of kicking off a downstream partition when either of the corresponding upstream partitions are materialized has been updated, following the implementation added in #9856

@clairelin135
Copy link
Contributor Author

@sryza This PR is now updated with the latest_materialization_by_partition_and_asset method merged in #9856

I think it's ready for you to take another look!

partition,
materializations_by_asset,
) in context.latest_materialization_records_by_partition_and_asset().items():
for asset_key, materialization in materializations_by_asset.items():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could this inner for loop be replaced with:

if materializations_by_asset.keys() == context.asset_keys:
    run_requests.append(downstream_daily_job.run_request_for_partition(partition))
    for asset_key, record in materializations_by_asset.items():
        context.advance_cursor({asset_key: record})

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sensor is currently designed to update a downstream asset when a partitioned materialization occurs and the same partition in the other upstream assets is also materialized.

If we replaced the inner loop, then this sensor would only produce a run request when new materializations occur for the same partition for every upstream asset (and not when one of those partitions is individually replaced).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Is there a reason we prefer that behavior over behavior that only rematerializes if all parents are rematerialized? I could see the case for either, but smaller simpler example code seems like a good tiebreaker.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the "and" case is actually more complex. So say:

  • Downstream asset monitors upstreams A and B
  • A materializes partitions 1 and 2, in that order
  • B materializes partitions 3, 2, 1 in that order

The cursor can't actually be updated unless if the set of partitions in the first N materializations are the same for upstreams A and B. So in this case, until we get a partition 3 materialization in asset A, we can't advance the cursor for B (and thus can't advance the cursor for A)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh man, this is gnarly. Thinking through all this stuff makes me think about how easy it would be for a user to mess this up.

If we had infinite resources, I think we'd ideally store a cursor component for every (asset key, partition) tuple instead of just for every asset key, right? Would that make programming against this significantly simpler? If so, I wonder if there's some data structure that we could use to make this possible.

Copy link
Contributor

@sryza sryza Oct 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example, what if the cursor was a Mapping[AssetKey, Sequence[StorageIdPartitionKeyRange]]? Where each StorageIdPartitionKeyRange had start_storage_id, end_storage_id, start_partition_key, and end_partition_key fields.

{AssetKey("asset1"): [StorageIdPartitionKeyRange(1, 5, "2020-01-01", "2020-05-05")]} would mean that our sensor has handled every event that both has a storage_id between 1 and 5 and has a partition key between "2020-01-01" and "2020-05-05".

@clairelin135
Copy link
Contributor Author

@erinkcochran87 @sryza The implementation for multi assets is now merged! I've updated the docs to contain examples of the new functionality and broken out the asset sensor content into its own page to prevent the page from growing too unwieldy.

Let me know how this looks! https://dagster-hkdzmk6y2-elementl.vercel.app/concepts/partitions-schedules-sensors/asset-sensors

Copy link
Contributor

@sryza sryza left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a few small comments, but otherwise, this looks great to me.

@clairelin135
Copy link
Contributor Author

@sryza oddly, I'm not seeing any new comments from you

@sryza
Copy link
Contributor

sryza commented Oct 11, 2022

@clairelin135 they're on the Vercel preview

@clairelin135 clairelin135 merged commit f91dc3b into master Oct 11, 2022
@clairelin135 clairelin135 deleted the claire/docs/partitioned-multi-asset-sensor-examples branch October 11, 2022 19:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area: docs Related to documentation in general
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants