Skip to content

Expended parameters are rendered as MappedArgument #40604

@AronsonDan

Description

@AronsonDan

Apache Airflow version

Other Airflow 2 version (please specify below)

If "Other Airflow 2 version" selected, which one?

2.9.0

What happened?

When trying to expand a task group the following way:

    copy_into_postgres_by_company_task = (
        copy_into_postgres_by_company_task_group.partial(table_name="table_name",
                                                         s3_bucket="s3_bucket",
                                                         aws_region="aws_region", ).expand(
            company_id=['1', '2', '3']
        ))

and inside the task group trying to use company ID the following way:

@task_group(group_id="task_group",
            prefix_group_id=False)
def copy_into_postgres_by_company_task_group(*,
                                             company_id: str,
                                             table_name="table_name",
                                             s3_bucket="s3_bucket",
                                             aws_region: str = "aws_region",
                                             postgres_conn_id: str = "postgres_default"):

    create_postgres_temp_table_destination = SQLExecuteQueryOperator(
        task_id="create_postgres_temp_table_destination_task",
        conn_id=postgres_conn_id,
        sql="sql/create_postgres_temp_table_per_company_query.sql",
        parameters={"table_name": f"{table_name}_temp_{company_id}", }
    )

The parameter is rendered the following way:

{'table_name': "inventory_transactions_temp_MappedArgument(_input=DictOfListsExpandInput(value={'company_id': XComArg(<Task(_PythonDecoratedOperator): prepare_company_ids>)}), _key='company_id')"}

What you think should happen instead?

I would hav expected parameters to render as:

{'table_name': "inventory_transactions_temp_1"}
{'table_name': "inventory_transactions_temp_2"}
{'table_name': "inventory_transactions_temp_3"}

How to reproduce

create the following DAG:

@dag(
    dag_id="kukuriku",
)
def kukuriku():
    copy_into_postgres_by_company_task = (
        copy_into_postgres_by_company_task_group.partial(table_name="table_name",
                                                         s3_bucket="s3_bucket",
                                                         aws_region="aws_region", ).expand(
            company_id=['1', '2', '3']
        ))

Operating System

Linux

Versions of Apache Airflow Providers

No response

Deployment

Official Apache Airflow Helm Chart

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions