Skip to content

airflow.contrib.operators.pubsub_operator.PubSubPublishOperator can't handle multiple messages in a template #11983

@prodhype

Description

@prodhype

Apache Airflow version: 1.10.10

Kubernetes version (if you are using kubernetes) (use kubectl version): 1.19

Environment: composer

  • Cloud provider or hardware configuration: GCP
  • OS (e.g. from /etc/os-release): Debian Buster
  • Kernel (e.g. uname -a): Linux cs-1078880547532-default-default-cnxpx 5.4.49+ #1 SMP Sun Oct 18 19:43:35 PDT 2020 x86_64 GNU/Linux
  • Install tools: GCP
  • Others: none

What happened: When a template is used to send multiple pubsub messages in a list, pubsub returns the error ERROR - ('Error publishing to topic projects/my-cloud-composer/topics/my-devtest', <HttpError 400 when requesting https://pubsub.googleapis.com/v1/projects/my-cloud-composer/topics/mytopic:publish?alt=json returned "Invalid value at 'messages' (type.googleapis.com/google.pubsub.v1.PubsubMessage), "[{'data': 'eyJwb2 (... trimmed ...) Details: "[{'@type': 'type.googleapis.com/google.rpc.BadRequest', 'fieldViolations': [{'field': 'messages', 'description': 'Invalid value at \'messages\' (type.googleapis.com/google.pubsub.v1.PubsubMessage), .... For example, if the following template contains a list of messages I get the error:

with dag:
    my_messages = "{{ ti.xcom_pull(task_ids='transform', key='messages') }}"

    enqueue_data = PubSubPublishOperator(
        task_id="enqueue",
        project=PUBSUB_PROJECT,
        topic=PUBSUB_TOPIC,
        messages=my_messages,
        retries=2,
        retry_delay=30,
        sla=datetime.timedelta(seconds=1200),
        dag=dag,
    )

When I test with a single message that is not templated it works as intended.

When I try it with a single message in a template I get the error as well

What you expected to happen: The messages in the list should get enqueued to pubsub correctly.

How to reproduce it: See the above example dag and run it.

Anything else we need to know: none

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions