Skip to content

DataflowStartFlexTemplateOperator could not execute due to KeyError 'Job' from Response  #24265

@myang-clgx

Description

@myang-clgx

Apache Airflow Provider(s)

google

Versions of Apache Airflow Providers

apache-airflow-providers-apache-beam 3.4.0
apache-airflow-providers-ftp 2.1.2
apache-airflow-providers-google 7.0.0
apache-airflow-providers-http 2.1.2
apache-airflow-providers-imap 2.2.3
apache-airflow-providers-sqlite 2.1.3

Apache Airflow version

2.3.1

Operating System

MAC OSX 12.4

Deployment

Other

Deployment details

Just Local Deployment without Docker

What happened

Try to run a Dag with Dataflow Flex template operator like below,

@task
def get_main_config():
   ...
   return config

@task
def get_dataflow_node_config(main_json_config: dict):
   ...
   return config

def custom_dataflow_etl(**kwargs):
    fetch_main_config = get_main_config()
    fetch_dataflow_config_dic = get_dataflow_node_config(fetch_main_config)
    process_dataflow = DataflowStartFlexTemplateOperator(
            task_id="start_dataflow_flex_template",
            body=fetch_dataflow_config_dic, do_xcom_push=True, location=DATAFLOW_LOCATION)
    fetch_dataflow_config >> process_dataflow

custom_dataflow = custom_dataflow_etl()

It gives Exception when do DAG RUN.

[2022-06-03, 13:34:29 UTC] {taskinstance.py:1890} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/Users/myang/opt/miniconda3/envs/airflow-2_3_1/lib/python3.9/site-packages/airflow/providers/google/cloud/operators/dataflow.py", line 773, in execute
    job = self.hook.start_flex_template(
  File "/Users/myang/opt/miniconda3/envs/airflow-2_3_1/lib/python3.9/site-packages/airflow/providers/google/common/hooks/base_google.py", line 439, in inner_wrapper
    return func(self, *args, **kwargs)
  File "/Users/myang/opt/miniconda3/envs/airflow-2_3_1/lib/python3.9/site-packages/airflow/providers/google/cloud/hooks/dataflow.py", line 754, in start_flex_template
    job = response["job"]
KeyError: 'job'

What you think should happen instead

It will go through because I have tried to use Google Cloud CLI to test out the DataflowFlexTemplate with same test values on parameters and it executes.

How to reproduce

Create a DAG and use Dataflextemplateoperator like below,

@task
def get_main_config():
   ...
   return config

@task
def get_dataflow_node_config(main_json_config: dict):
   ...
   return config

def custom_dataflow_etl(**kwargs):
    fetch_main_config = get_main_config()
    fetch_dataflow_config_dic = get_dataflow_node_config(fetch_main_config)
    process_dataflow = DataflowStartFlexTemplateOperator(
            task_id="start_dataflow_flex_template",
            body=fetch_dataflow_config_dic, do_xcom_push=True, location=DATAFLOW_LOCATION)
    fetch_dataflow_config >> process_dataflow

custom_dataflow = custom_dataflow_etl()

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions