New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RFC: Add register_run_asset event #7098
Conversation
This pull request is being automatically deployed with Vercel (learn more). dagster – ./docs/next🔍 Inspect: https://vercel.com/elementl/dagster/2wk6U5ghVUNcnJ3pFNAPaVicBnNs [Deployment for 394c3c1 canceled] dagit-storybook – ./js_modules/dagit/packages/ui🔍 Inspect: https://vercel.com/elementl/dagit-storybook/6L7guFy7j9Uxq4qQZpaKR8mFb3aj [Deployment for 394c3c1 canceled] |
python_modules/dagster/dagster/core/definitions/pipeline_definition.py
Outdated
Show resolved
Hide resolved
I think my preference is to not show these events at all in the event log (run view). I also think we should reconsider putting this code in the executor. One advantage of inserting events upon run creation as opposed to run execution is that you have insight into pre-start runs (e.g. queued runs, run launcher failures, etc). |
…laire/intent-to-materialize-event
+1 to both things @prha said |
@alangenfeld I've changed the event name to |
ASSET_EVENTS = { | ||
DagsterEventType.ASSET_MATERIALIZATION, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
stale?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah I was referring to a code comment, i think i was commenting on a stale version of the PR
event = DagsterEvent.asset_materialization_planned(pipeline_name, asset_key) | ||
event_record = EventLogEntry( | ||
user_message="", | ||
level=logging.DEBUG, | ||
pipeline_name=pipeline_name, | ||
run_id=pipeline_run.run_id, | ||
error_info=None, | ||
timestamp=time.time(), | ||
dagster_event=event, | ||
) | ||
self.handle_new_event(event_record) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thoughts on putting this in line with the other DagsterEvent static constructors and logging there? Its a weird pattern but I think there is value in consistency.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure--I've updated the code now to log the dagster event in a static constructor.
) | ||
|
||
# mirror the event in the cross-run index database | ||
with self.index_connection() as conn: | ||
conn.execute(insert_event_statement) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[Re: lines 235 to 239]
why remove instead of update?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No strong reason here--I removed it earlier because I didn't feel that the check was necessary. I've updated it now though to contain the new event type
event = DagsterEvent.asset_materialization_planned(pipeline_name, asset_key) | ||
event_record = EventLogEntry( | ||
user_message="", | ||
level=logging.DEBUG, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
might be good to leave reasoning next to the debug log level setting
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, added a comment next to the setting
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@prha what are your thoughts on this debug log level hide the event thing
ASSET_EVENTS = { | ||
DagsterEventType.ASSET_MATERIALIZATION, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah I was referring to a code comment, i think i was commenting on a stale version of the PR
const l = | ||
node.__typename === 'LogMessageEvent' || | ||
node.__typename === 'AssetMaterializationPlannedEvent' | ||
? node.level | ||
: 'EVENT'; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
eesh - lets definitely leave a comment here explaining whats going on. I forgot that we classified events differently.
def log_asset_materialization_planned_event( | ||
log_manager: DagsterLogManager, event: "DagsterEvent" | ||
) -> None: | ||
# asset_materialization_planned events have a log level "DEBUG" in order to hide these | ||
# events by default in Dagit. Modifying filtering to select DEBUG events will show these events | ||
# in Dagit run logs. | ||
log_level = logging.DEBUG | ||
log_manager.log_dagster_event(level=log_level, msg=event.message or "", dagster_event=event) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need to cargo-cult the pattern here and refactor when single use - this can just be inlined
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✅
if execution_plan_snapshot: | ||
self._log_asset_materialization_planned_events(pipeline_run, execution_plan_snapshot) | ||
|
||
return self._run_storage.add_run(pipeline_run) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
might be good to sequence the event writes after the run gets added to the db? incase the run add fails
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✅
I'm more inclined to just not show the events, regardless of level. We're only putting this in the event log so that we can query runs more efficiently by asset key. If there were a better implementation option that didn't result in visible events, we would pick that instead. I think we should log them in debug mode but just not display them at all in the frontend. And we could change this later without major repercussions. |
@alangenfeld I don't feel strongly either way about whether to make these events viewable in Dagit or not, though I don't think users will find them very useful (the run will already show which assets intend to be materialized, and these events are only responsible for populating asset run information in Dagit). Defer to you to make the final call though |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Defer to you to make the final call though
I don't think I'm the best person to make final call on asset product experience questions. I don't have broader context, so things like
the run will already show which assets intend to be materialized
I don't know how that currently is presented.
As prha pointed out, this last bit is very easy to change so I will accept the PR and you can land it with the behavior you feel best about.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[1] I believe filtering here is going to cause problems with the offset based pagination we currently have
events = [ | ||
event | ||
for event in events | ||
if event.dagster_event_type != DagsterEventType.ASSET_MATERIALIZATION_PLANNED |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[1]
events = [ | ||
event | ||
for event in events | ||
if event.dagster_event_type != DagsterEventType.ASSET_MATERIALIZATION_PLANNED |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[1]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I forgot about the offset cursors... I guess easiest thing to do is to just filter these events client-side?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ya, that or fix pagination to not be offset based 🙂
@alangenfeld got it. I can adjust the filtering to filter client-side instead of here |
Adds a new Dagster event type
REGISTER_RUN_ASSET
.This event is yielded after an asset job run begins for every single asset that is selected in the job. The goal of this feature is to enable querying for runs that intended to generate certain assets (e.g. job that intends to materialize 3 assets fails during materialization of second asset).
This will enable warnings in Dagit on the asset details page and the asset graph nodes. In the future, we can enable this for partitioned assets to generate views of all runs across a certain asset's partition. If this event becomes too noisy in run logs, we can consider ways to condense the logs.