From 29f19bc03bd49f7a0046a9c38ea1dc831f698d83 Mon Sep 17 00:00:00 2001 From: Maciej Obuchowski Date: Wed, 22 Jun 2022 17:29:52 +0200 Subject: [PATCH] airflow: render templates at start Signed-off-by: Maciej Obuchowski --- .../airflow/openlineage/airflow/listener.py | 2 ++ .../postgres_orders_popular_day_of_week.py | 19 +++++++++++++------ 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/integration/airflow/openlineage/airflow/listener.py b/integration/airflow/openlineage/airflow/listener.py index 18104a3423..b1f1d598f1 100644 --- a/integration/airflow/openlineage/airflow/listener.py +++ b/integration/airflow/openlineage/airflow/listener.py @@ -89,6 +89,8 @@ def on_task_instance_running(previous_state, task_instance: "TaskInstance", sess parent_run_id = str(uuid.uuid3(uuid.NAMESPACE_URL, f'{dag.dag_id}.{dagrun.run_id}')) def on_running(): + task_instance.render_templates() + task_metadata = extractor_manager.extract_metadata(dagrun, task) adapter.start_task( diff --git a/integration/airflow/tests/integration/tests/airflow/dags/postgres_orders_popular_day_of_week.py b/integration/airflow/tests/integration/tests/airflow/dags/postgres_orders_popular_day_of_week.py index 499971ab8d..3d71d7ea6a 100644 --- a/integration/airflow/tests/integration/tests/airflow/dags/postgres_orders_popular_day_of_week.py +++ b/integration/airflow/tests/integration/tests/airflow/dags/postgres_orders_popular_day_of_week.py @@ -23,10 +23,22 @@ 'email': ['datascience@example.com'] } + +def get_sql() -> str: + return ''' + CREATE TABLE IF NOT EXISTS popular_orders_day_of_week ( + order_day_of_week VARCHAR(64) NOT NULL, + order_placed_on TIMESTAMP NOT NULL, + orders_placed INTEGER NOT NULL + );''' + dag = DAG( 'postgres_orders_popular_day_of_week', schedule_interval='@once', default_args=default_args, + user_defined_macros={ + "get_sql": get_sql + }, description='Determines the popular day of week orders are placed.' ) @@ -34,12 +46,7 @@ t1 = PostgresOperator( task_id='postgres_if_not_exists', postgres_conn_id='food_delivery_db', - sql=''' - CREATE TABLE IF NOT EXISTS popular_orders_day_of_week ( - order_day_of_week VARCHAR(64) NOT NULL, - order_placed_on TIMESTAMP NOT NULL, - orders_placed INTEGER NOT NULL - );''', + sql="{{ get_sql() }}", dag=dag )