Skip to content

[Regression] Argument 'extra' should be optional when pushing metadata to alias #47779

@vatsrahul1001

Description

@vatsrahul1001

Apache Airflow version

main (development)

If "Other Airflow 2 version" selected, which one?

No response

What happened?

Mapped Asset Alias task failing
Image

Error

[2025-03-14, 10:48:23] ERROR - Task failed with exception source="task" error_detail=[{"exc_type":"TypeError","exc_value":"Key should be either an asset or an asset alias, not <class 'str'>","syntax_error":null,"is_cause":false,"frames":[{"filename":"/opt/airflow/task-sdk/src/airflow/sdk/execution_time/task_runner.py","lineno":609,"name":"run"},{"filename":"/opt/airflow/task-sdk/src/airflow/sdk/execution_time/task_runner.py","lineno":734,"name":"_execute_task"},{"filename":"/opt/airflow/task-sdk/src/airflow/sdk/definitions/baseoperator.py","lineno":373,"name":"wrapper"},{"filename":"/opt/airflow/airflow/decorators/base.py","lineno":252,"name":"execute"},{"filename":"/opt/airflow/task-sdk/src/airflow/sdk/definitions/baseoperator.py","lineno":373,"name":"wrapper"},{"filename":"/opt/airflow/providers/standard/src/airflow/providers/standard/operators/python.py","lineno":196,"name":"execute"},{"filename":"/opt/airflow/providers/standard/src/airflow/providers/standard/operators/python.py","lineno":220,"name":"execute_callable"},{"filename":"/opt/airflow/airflow/utils/operator_helpers.py","lineno":274,"name":"run"},{"filename":"/opt/airflow/task-sdk/src/airflow/sdk/execution_time/context.py","lineno":238,"name":"__getitem__"}]}]

What you think should happen instead?

No response

How to reproduce

Try running below DAG. This used to work in AF2


from airflow.decorators import dag, task
from airflow.datasets import Dataset, DatasetAlias
from airflow.sdk.definitions.asset.metadata import Metadata
from pendulum import datetime

my_alias_name = "alias-dataset-1"


@dag(
    dag_display_name="example_dataset_alias_mapped",
    start_date=datetime(2024, 8, 1),
    schedule=None,
    catchup=False,
    tags=["datasets"],
)
def dataset_alias_dynamic_test():
    @task
    def upstream_task():
        return ["a", "b"]

    @task(outlets=[DatasetAlias(my_alias_name)])
    def use_metadata(name):
        yield Metadata(
            Dataset(name),
            alias=my_alias_name,
            extra={}  # extra is NOT optional
        )

    use_metadata.expand(name=upstream_task())


dataset_alias_dynamic_test()


@dag(
    start_date=datetime(2024, 8, 1),
    schedule=[DatasetAlias(my_alias_name)],
    catchup=False,
    tags=["dataset"]
)
def downstream_alias():
    @task
    def t1():
        return 0

    t1()


downstream_alias()

Operating System

linux

Versions of Apache Airflow Providers

No response

Deployment

Other

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions