Skip to content

feat: Add dynamically emitted asset events to OpenLineage#65727

Merged
mobuchowski merged 1 commit intoapache:mainfrom
kacpermuda:feat-ol-asset-aliases
Apr 29, 2026
Merged

feat: Add dynamically emitted asset events to OpenLineage#65727
mobuchowski merged 1 commit intoapache:mainfrom
kacpermuda:feat-ol-asset-aliases

Conversation

@kacpermuda
Copy link
Copy Markdown
Collaborator

On Airflow 3, tasks can emit asset events at runtime in several ways beyond statically declaring inlets=/outlets= on the operator. Today OpenLineage only sees the static declarations - anything the task produces dynamically is invisible to OL unless a dedicated extractor or hook-lineage reader picks it up.

This PR closes this gap and adds those dynamic assets into:

  • AirflowRunFacet.task.inlets / outlets — so the serialized task snapshot reflects what actually ran.
  • The OL job's inputs / outputs datasets (the "no extractor, no hook lineage" fallback) - via the existing translate_airflow_asset converter, deduped by (namespace, name) against static inlets/outlets and against each other.

Static inlets / outlets behavior is unchanged. Dynamic assets are appended to the static ones; nothing static is dropped, replaced, or rewritten. If a task has no dynamic events, the emitted facet is identical to before.

Inlets are unaffected by this PR - Airflow has no runtime inlet_events emission path that adds new inlets, so the static list remains the source of truth on the input side.

There is also a small change to use ProvidersManagerTaskRuntime from task sdk instead of legacy ProvidersManager from core, as it's raising deprecation warning.

How users emit these events

  1. Via AssetAlias + outlet_events (alias resolved at runtime):
@task(outlets=[AssetAlias("my-alias")])
def producer(*, outlet_events):
    outlet_events[AssetAlias("my-alias")].add(
        Asset("s3://bucket/file.txt"), extra={"rows": 42}
    )

-> OL outlet = "[{'uri': 's3://bucket/file.txt', 'extra': {'rows': '42'}, 'asset_type': 'asset_event_from_alias', 'source_alias': 'my-alias'}]"

  1. Via outlet_events directly on a statically-declared Asset (no alias):
@task(outlets=[Asset("s3://bucket/file.txt")])
def producer(*, outlet_events):
    outlet_events[Asset("s3://bucket/file.txt")].extra = {"rows": 42}

-> OL outlet = "[{'uri': 's3://bucket/file.txt', 'extra': {'rows': 42}, 'asset_type': 'asset_event'}]"

  1. Via yield Metadata(...) from the task:
@task(outlets=[Asset("s3://bucket/file.txt")])
def producer():
    yield Metadata(Asset("s3://bucket/file.txt"), extra={"rows": 42})

-> Same as .2 above


Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below)

Generated-by: Claude following the guidelines


  • Read the Pull Request Guidelines for more information. Note: commit author/co-author name and email in commits become permanently public when merged.
  • For fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
  • When adding dependency, check compliance with the ASF 3rd Party License Policy.
  • For significant user-facing changes create newsfragment: {pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.

@kacpermuda kacpermuda requested a review from mobuchowski as a code owner April 23, 2026 15:00
@kacpermuda kacpermuda force-pushed the feat-ol-asset-aliases branch 3 times, most recently from e3a8f87 to 475a7fe Compare April 24, 2026 11:48
@kacpermuda kacpermuda force-pushed the feat-ol-asset-aliases branch from 475a7fe to 50c05d5 Compare April 24, 2026 14:02
Copy link
Copy Markdown
Contributor

@mobuchowski mobuchowski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some nits, no blockers.

@mobuchowski mobuchowski merged commit 93173b8 into apache:main Apr 29, 2026
141 checks passed
@kacpermuda kacpermuda deleted the feat-ol-asset-aliases branch April 29, 2026 12:11
seruman pushed a commit to seruman/airflow that referenced this pull request Apr 30, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants