Skip to content

[Regression]Asset_events extra are not getting stored with classic operators in asset_events table #47767

@vatsrahul1001

Description

@vatsrahul1001

Apache Airflow version

main (development)

If "Other Airflow 2 version" selected, which one?

No response

What happened?

Extra info not getting stored when in asset_events table when used with classic operator. This is working fine with Airflow 2.10.5

Image

What you think should happen instead?

Extra info should also get updated in asset_events table with classic operator

How to reproduce

  1. Below mentioned are 3 DAGS(dataset_with_extra_by_yield, dataset_with_extra_by_context, dataset_with_extra_from_classic_operator)
  2. Try executing dataset_with_extra_from_classic_operator and check asset_event table extra column is empty in case of dataset_with_extra_by_yield and dataset_with_extra_by_context its getting store
from __future__ import annotations

import datetime

from airflow.datasets import Dataset
from airflow.sdk.definitions.asset.metadata import Metadata
from airflow.decorators import task
from airflow.models.dag import DAG
from airflow.providers.standard.operators.bash import BashOperator

ds = Dataset("s3://output/1.txt")

with DAG(
    dag_id="dataset_with_extra_by_yield",
    catchup=False,
    start_date=datetime.datetime.min,
    schedule="@daily",
    tags=["datasets"],
):

    @task(outlets=[ds])
    def dataset_with_extra_by_yield():
        yield Metadata(ds, {"hi": "bye"})

    dataset_with_extra_by_yield()

with DAG(
    dag_id="dataset_with_extra_by_context",
    catchup=False,
    start_date=datetime.datetime.min,
    schedule="@daily",
    tags=["dataset"],
):

    @task(outlets=[ds])
    def dataset_with_extra_by_context(*, outlet_events=None):
        outlet_events[ds].extra = {"hi": "bye"}

    dataset_with_extra_by_context()

with DAG(
    dag_id="dataset_with_extra_from_classic_operator",
    catchup=False,
    start_date=datetime.datetime.min,
    schedule="@daily",
    tags=["dataset"],
):

    def _dataset_with_extra_from_classic_operator_post_execute(context, result):
        context["outlet_events"][ds].extra = {"hi": "bye"}

    BashOperator(
        task_id="dataset_with_extra_from_classic_operator",
        outlets=[ds],
        bash_command=":",
        post_execute=_dataset_with_extra_from_classic_operator_post_execute,
    )

Operating System

linux

Versions of Apache Airflow Providers

No response

Deployment

Other

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions