Skip to content

Task SDK not able to read asset events #46508

@vatsrahul1001

Description

@vatsrahul1001

Apache Airflow version

3.0.0a1

If "Other Airflow 2 version" selected, which one?

No response

What happened?

Task SDK not able to read asset events

Error

[{\"name\":\"s3://output/1.txt\",\"uri\":\"s3://output/1.txt\",\"asset_type\":\"Asset\"}],\"outlets\":[],\"type\":\"RuntimeCheckOnTask\"}\n","logger":"task"}
{"timestamp":"2025-02-06T09:59:33.498327Z","level":"info","event":"inlet_events are None","chan":"stdout","logger":"task"}
{"timestamp":"2025-02-06T09:59:33.498183","level":"error","event":"Task failed with exception","logger":"task","error_detail":[{"exc_type":"TypeError","exc_value":"'NoneType' object is not subscriptable","syntax_error":null,"is_cause":false,"frames":[{"filename":"/opt/airflow/task_sdk/src/airflow/sdk/execution_time/task_runner.py","lineno":545,"name":"run"},{"filename":"/opt/airflow/task_sdk/src/airflow/sdk/execution_time/task_runner.py","lineno":645,"name":"_execute_task"},{"filename":"/opt/airflow/airflow/models/baseoperator.py","lineno":173,"name":"wrapper"},{"filename":"/opt/airflow/airflow/decorators/base.py","lineno":252,"name":"execute"},{"filename":"/opt/airflow/airflow/models/baseoperator.py","lineno":173,"name":"wrapper"},{"filename":"/opt/airflow/providers/standard/src/airflow/providers/standard/operators/python.py","lineno":196,"name":"execute"},{"filename":"/opt/airflow/providers/standard/src/airflow/providers/standard/operators/python.py","lineno":222,"name":"execute_callable"},{"filename":"/opt/airflow/airflow/utils/operator_helpers.py","lineno":261,"name":"run"},{"filename":"/files/dags/example_inlet_event_extra.py","lineno":30,"name":"read_dataset_event"}]}]}
{"timestamp":"2025-02-06T09:59:33.500653","level":"debug","event":"Sending request","json":"{\"state\":\"failed\",\"end_date\":\"2025-02-06T09:59:33.500437Z\",\"type\":\"TaskState\"}\n","logger":"task"}

What you think should happen instead?

We should be able to fetch asset events in AF3 as we are able to do in AF2.

How to reproduce

Create Asset events with below DAG

ds = Dataset("s3://output/1.txt")

with DAG(
    dag_id="dataset_with_extra_by_yield",
    catchup=False,
    start_date=datetime.datetime.min,
    schedule="@daily",
    tags=["datasets"],
):

    @task(outlets=[ds])
    def dataset_with_extra_by_yield():
        yield Metadata(ds, {"hi": "bye"})

    dataset_with_extra_by_yield()

with DAG(
    dag_id="dataset_with_extra_by_context",
    catchup=False,
    start_date=datetime.datetime.min,
    schedule="@daily",
    tags=["dataset"],
):

    @task(outlets=[ds])
    def dataset_with_extra_by_context(*, outlet_events=None):
        outlet_events[ds].extra = {"hi": "bye"}

    dataset_with_extra_by_context()

Now try to retrieve them with below DAG

ds = Dataset("s3://output/1.txt")

with DAG(
    dag_id="read_dataset_event",
    catchup=False,
    start_date=datetime.datetime.min,
    schedule="@daily",
    tags=["datasets"],
):

    @task(inlets=[ds])
    def read_dataset_event(*, inlet_events=None):
        print(f"inlet_events are {inlet_events}")
        for event in inlet_events[ds][:-2]:
            print(event.extra["hi"])

    read_dataset_event()

Operating System

Linux

Versions of Apache Airflow Providers

No response

Deployment

Other

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Labels

area:corearea:data-aware-schedulingassets, datasets, AIP-48kind:bugThis is a clearly a bugpriority:highHigh priority bug that should be patched quickly but does not require immediate new release

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions