diff --git a/integration/airflow/openlineage/airflow/listener.py b/integration/airflow/openlineage/airflow/listener.py index 18104a3423..b1f1d598f1 100644 --- a/integration/airflow/openlineage/airflow/listener.py +++ b/integration/airflow/openlineage/airflow/listener.py @@ -89,6 +89,8 @@ def on_task_instance_running(previous_state, task_instance: "TaskInstance", sess parent_run_id = str(uuid.uuid3(uuid.NAMESPACE_URL, f'{dag.dag_id}.{dagrun.run_id}')) def on_running(): + task_instance.render_templates() + task_metadata = extractor_manager.extract_metadata(dagrun, task) adapter.start_task( diff --git a/integration/airflow/tests/integration/tests/airflow/dags/postgres_orders_popular_day_of_week.py b/integration/airflow/tests/integration/tests/airflow/dags/postgres_orders_popular_day_of_week.py index 499971ab8d..4b91c8bb01 100644 --- a/integration/airflow/tests/integration/tests/airflow/dags/postgres_orders_popular_day_of_week.py +++ b/integration/airflow/tests/integration/tests/airflow/dags/postgres_orders_popular_day_of_week.py @@ -6,6 +6,16 @@ from openlineage.client import set_producer set_producer("https://github.com/OpenLineage/OpenLineage/tree/0.0.1/integration/airflow") + +def get_sql() -> str: + return ''' + CREATE TABLE IF NOT EXISTS popular_orders_day_of_week ( + order_day_of_week VARCHAR(64) NOT NULL, + order_placed_on TIMESTAMP NOT NULL, + orders_placed INTEGER NOT NULL + );''' + + from airflow.version import version as AIRFLOW_VERSION from pkg_resources import parse_version if parse_version(AIRFLOW_VERSION) < parse_version("2.0.0"): @@ -23,25 +33,34 @@ 'email': ['datascience@example.com'] } + + + dag = DAG( 'postgres_orders_popular_day_of_week', schedule_interval='@once', default_args=default_args, + user_defined_macros={ + "get_sql": get_sql + }, description='Determines the popular day of week orders are placed.' ) -t1 = PostgresOperator( - task_id='postgres_if_not_exists', - postgres_conn_id='food_delivery_db', - sql=''' - CREATE TABLE IF NOT EXISTS popular_orders_day_of_week ( - order_day_of_week VARCHAR(64) NOT NULL, - order_placed_on TIMESTAMP NOT NULL, - orders_placed INTEGER NOT NULL - );''', - dag=dag -) +if parse_version(AIRFLOW_VERSION) < parse_version("2.0.0"): + t1 = PostgresOperator( + task_id='postgres_if_not_exists', + postgres_conn_id='food_delivery_db', + sql=get_sql(), + dag=dag + ) +else: + t1 = PostgresOperator( + task_id='postgres_if_not_exists', + postgres_conn_id='food_delivery_db', + sql="{{ get_sql() }}", + dag=dag + ) t2 = PostgresOperator( task_id='postgres_insert',