Record the writer info for every asset store write for better cross linkage#67902
Record the writer info for every asset store write for better cross linkage#67902amoghrajesh wants to merge 4 commits into
Conversation
jroachgolf84
left a comment
There was a problem hiding this comment.
What's going to happen when AssetStore is updated from a BaseEventTrigger? Is that going to cause an issue (as there is no dag_id, task_id, etc.)?
|
cc @kaxil I think I need your input on this one |
|
@jroachgolf84 good question, but I don't think a The asset store holds producer-side state: a task's incremental watermark/cursor and the AIP-103 checkpointing state. That's authored by a task, which always has a TI. A watcher doesn't produce that. Its job is to detect an external event and fire an AssetEvent, and it delegates cursor/durability to the source it watches (SQS visibility, PubSub/Redis consumer offsets, Kafka offsets). Anything it carries with an event goes in the AssetEvent payload, not the store. So the writers are tasks (via the Execution API, recording Nothing writes the store from a trigger today afaik. If we ever do write asset state from the triggerer, it'd be from deferral triggers (a task that deferred), and those keep their task instance attached ( |
Using the Asset Store in a BaseEventTrigger is actually a core use-case here. This is needed to allow for watermarking in event-driven scheduling (which was a driving factor when drafting this AIP). |
|
Check out this PR - it's quite pertinent: #67839 |
|
IMO, we should keep Asset Store such that it is Task unaware. Thoughts? Not being able to persist state from a BaseEventTrigger is one of the biggest blockers to creating Triggers for Asset watching (and one of the primary blockers for widespread community adoption). |
| ti1 = create_task_instance(task_id="task1") | ||
| ti2 = create_task_instance(task_id="task2") |
There was a problem hiding this comment.
This test fails on every backend in CI (it's why the run is red), not flaky:
sqlalchemy.exc.IntegrityError: UNIQUE constraint failed: dag_run.dag_id, dag_run.run_id
create_task_instance defaults to dag_id="dag" with a default run_id and creates a fresh dag_run each call, so these two calls collide on the dag_run unique key. Only task_id differs, which doesn't help since the collision is on the dag_run, not the TI. Distinct dag runs fix it:
ti1 = create_task_instance(task_id="task1", dag_id="dag1")
ti2 = create_task_instance(task_id="task2", dag_id="dag2")(distinct run_id works too). The test only needs two distinct TI ids, so separate dag runs satisfy the intent.
Aah, hmm, let me check that PR and get back! |
|
Sounds good, thanks! |
|
@jroachgolf84 I read your PR and the AIP use-case you pointed to, and I agree it's a valid use-case. So you're right, especially the S3 use-case and watermarking. I'm also on board with keeping the store task-unaware. The one thing I'd flag so we don't over-correct: tasks do write the Asset Store too (a producer maintaining an asset's watermark across runs), and for those entries it's genuinely useful to be able to jump to which run set a value, e.g. when a watermark looks wrong. So I'd keep a per-entry "where was this written", just not as a task-only field. The problem with On storage, the asset store is long-lived (a watermark outlives the runs that touch it), so for the task case I'd lean to plain |
|
Thanks for taking a look at that - I'm on board with your comments here. |
|
Great conversation folks, I redid this as flat denormalized columns instead of a FK to Now this is how it looks, recording these fields (
Validation is enforced at write time via |
| ["asset_id", "key"], | ||
| values, | ||
| dict(value=value, updated_at=now), | ||
| dict(value=value, updated_at=now, **writer_info), |
There was a problem hiding this comment.
The upsert always lists the last_updated_by_* columns in the update set, so a plain set() on an AssetScope (which reaches _set_asset_store with kind=None) overwrites previously recorded writer info back to NULL on conflict. No in-tree Metastore caller hits that path today since both routes go through set_asset_store, but the follow-up watcher write (#67839) and the worker-side AssetStoreAccessor also write asset state, and if either uses set() instead of set_asset_store(), attribution gets silently cleared with no error. Consider dropping the writer columns from the update set when kind is None so an attribution-less write leaves the existing values untouched.
There was a problem hiding this comment.
Fixed. When kind is None (plain set() call), writer columns are now excluded from
the ON CONFLICT UPDATE set and only value and updated_at are updated on conflict, leaving any existing writer info untouched. Writer columns are still included in the INSERT values (so new rows get NULL writer fields as expected). The writer-aware path (set_asset_store / aset_asset_store) continues to include writer columns in both the insert and update sets.
| ).one_or_none() | ||
| if row is None: | ||
| raise HTTPException( | ||
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, |
There was a problem hiding this comment.
A missing TI here returns 500, but the sibling execution-API routes in task_instances.py return 404 {"reason": "not_found", ...} for the same stale-TI condition (e.g. the TI row was cleaned up between token mint and this PUT). A 500 reads as a server bug and can trigger client retries / 5xx alerts. Returning 404 to match the convention would be more accurate.
| ) | ||
|
|
||
| @property | ||
| def last_updated_by_kind_enum(self) -> AssetStoreWriterKind | None: |
There was a problem hiding this comment.
Is last_updated_by_kind_enum used anywhere? The routes build the response straight off the last_updated_by_kind string column, so this looks unused in the PR. If a UI/API consumer is coming, fine to keep; otherwise it's dead code.
There was a problem hiding this comment.
Removed it. Had added it speculatively but the routes and UI consume last_updated_by_kind as a plain string via the JSON response, so the ORM property has no use here.
There was a problem hiding this comment.
Handled in 7250b423f3
|
@kaxil would you mind taking another look when you can pls? |
Was generative AI tooling used to co-author this PR?
related to UI PR: #67292
What problem are we solving?
Asset store entries can be written by any task, watcher, or admin via core API, but there is no way to know who made the last write. This makes it impossible to link a stored value back to the run that produced it, a gap for UI attribution and auditability.
Current behaviour
The
asset_storetable has no record of the writing actor. All writes are anonymous: the value and timestamp are stored, but there is no way to trace which task instance, watcher, or API call was responsible.How this helps
last_updated_byfields to build a provenance trail from stored value back to the producing run.dag_id,run_id,task_id, andmap_indexfields are exactly what the Grid view needs to cross link from an asset store entry to the task instance that wrote itProposed change
Adds five flat writer columns to
asset_store:last_updated_by_kindlast_updated_by_dag_idlast_updated_by_run_idlast_updated_by_task_idlast_updated_by_map_indexIntroduces
AssetStoreWriterKind(task,watcher,api) with avalidate_writer_fieldsmethod that enforces per-kind contracts:taskrequires all four task fields to be set;watcherandapirequire them all to be null.kinddag_idrun_idtask_idmap_indextaskwatchernullnullnullnullapinullnullnullnullA
match/case _: raise AssertionErrorguard ensures any future new kind is handled explicitly.The execution API PUT endpoints extract writer fields from the task instance and record them with
kind=task. The core API PUT recordskind=api. Both GET endpoints return alast_updated_byblock includingkind,dag_id,run_id,task_id, andmap_index— null task fields for non-task writes.Design decisions worth flagging
Why flat denormalized columns instead of a FK to
task_instance: watchers(
BaseEventTrigger) write asset store entries but have no task instance, so a FKcannot be the universal reference. Flat columns also survive task instance cleanup
— a FK would be cleared on delete, losing the provenance trail.
Why
set_asset_storelives onMetastoreStoreBackend, notBaseStoreBackend:recording writer kind and task fields is a database centric concern. Adding these parameters to the base interface would force every alternative backend (Redis, S3, etc.) to implement a concept that has no meaning for them. Instead,
MetastoreStoreBackendgets dedicatedset_asset_store/aset_asset_storemethods, and the API routes dispatch to them viaisinstancecheck, falling back to the genericset()for other backends.Testing
This is how response would look for:
kind: task{ "asset_store": [ { "key": "last_run_summary", "value": { "rows_loaded": 668, "prev_watermark": "2026-01-01T00:00:00+00:00", "completed_at": "2026-06-03T05:59:35.512421+00:00" }, "updated_at": "2026-06-03T05:59:35.546524Z", "last_updated_by": { "kind": "task", "dag_id": "example_asset_store_producer", "run_id": "manual__2026-06-03T05:59:31.611729+00:00", "task_id": "load", "map_index": -1 } }, { "key": "total_runs", "value": 1, "updated_at": "2026-06-03T05:59:35.536352Z", "last_updated_by": { "kind": "task", "dag_id": "example_asset_store_producer", "run_id": "manual__2026-06-03T05:59:31.611729+00:00", "task_id": "load", "map_index": -1 } }, { "key": "watermark", "value": "2026-06-03T05:59:35.512421+00:00", "updated_at": "2026-06-03T05:59:35.518394Z", "last_updated_by": { "kind": "task", "dag_id": "example_asset_store_producer", "run_id": "manual__2026-06-03T05:59:31.611729+00:00", "task_id": "load", "map_index": -1 } } ], "total_entries": 3 }{ "asset_store": [ { "key": "dict-value", "value": { "example-key": "example-value" }, "updated_at": "2026-06-03T06:03:27.434455Z", "last_updated_by": { "kind": "api", "dag_id": null, "run_id": null, "task_id": null, "map_index": null } }, { "key": "int-value", "value": 7, "updated_at": "2026-06-03T06:03:12.783128Z", "last_updated_by": { "kind": "api", "dag_id": null, "run_id": null, "task_id": null, "map_index": null } }, { "key": "some-value", "value": "2026-05-01T00:00:00Z", "updated_at": "2026-06-03T06:03:03.147371Z", "last_updated_by": { "kind": "api", "dag_id": null, "run_id": null, "task_id": null, "map_index": null } } ], "total_entries": 3 }What's next
Worker-side
AssetStoreAccessorSDK class and the watcher write path inBaseEventTriggerare follow-up PRs in the AIP-103 series which are being tracked separately: #67839{pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.