Skip to content

Jinja templating throws bug when using a PostgresOperator within the Taskflow API #15547

@Robstaa

Description

@Robstaa

Apache Airflow version:
2.0.1

Environment:

  • OS (e.g. from /etc/os-release): macOS Big Sur 11.2.3
  • Kernel (e.g. uname -a): Darwin Kernel Version 20.3.0

What happened:

When executing a PostgresOperator within a PythonOperator through the new Taskflow API, the jinja templating does not seem to execute. As usual, I am giving the PostgresOperator a path to the SQL file as sql parameter. As standalone task the PostgresOperator does what is expected. It reads the SQL path and then executes the SQL code it reads from the file. It also works with params.

What you expected to happen:
I expect that this (Jinja-)templating also works when the PostgresOperator is executed within a @task method.

How to reproduce it:

  1. Define a connection "pg_connection" to a postgres database, or use the default connection.
  2. Within that database create a table test_table
  3. In dags/test_dag.py:
from airflow.decorators import dag, task
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.operators.python import get_current_context
from airflow.utils.dates import days_ago


DEFAULT_ARGS = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['admin@airflow.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 1,
    'start_date': days_ago(2)
}

@dag(default_args=DEFAULT_ARGS, schedule_interval=None)
def test_dag():

    outside_pg = PostgresOperator(
        task_id='outside_pg',
        postgres_conn_id='pg_connection',
        sql='sql/test_sql.sql'
    )

    @task()
    def inside_pg():
        context = get_current_context()
        inside_pg = PostgresOperator(
            task_id='inside_pg',
            postgres_conn_id='pg_connection',
            sql='sql/test_sql.sql'
        )
        inside_pg.execute(context=context)

    execute_inside_pg = inside_pg()
    outside_pg >> execute_inside_pg

dag = test_dag()
  1. In dags/sql/test_sql.sql:
SELECT * FROM test_table;

The error logs I get:

airflow dags test test_dag -1
[2021-04-27 13:01:25,488] {dagbag.py:448} INFO - Filling up the DagBag from /Users/robinzuschke/code/valyria/airflow/dags
[2021-04-27 13:01:25,649] {base_executor.py:82} INFO - Adding to queue: ['<TaskInstance: test_dag.outside_pg 2021-04-01 00:00:00+00:00 [queued]>']
[2021-04-27 13:01:30,669] {taskinstance.py:1257} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_EMAIL=admin@airflow.com
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=test_dag
AIRFLOW_CTX_TASK_ID=outside_pg
AIRFLOW_CTX_EXECUTION_DATE=2021-04-01T00:00:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=backfill__2021-04-01T00:00:00+00:00
[2021-04-27 13:01:30,675] {base.py:74} INFO - Using connection to: id: pg_connection. Host: localhost, Port: 5432, Schema: postgres, Login: postgres, Password: None, extra: None
[2021-04-27 13:01:30,682] {dbapi.py:180} INFO - Running statement: SELECT * FROM test_table;, parameters: None
[2021-04-27 13:01:30,686] {dbapi.py:186} INFO - Rows affected: 0
[2021-04-27 13:01:30,690] {taskinstance.py:1166} INFO - Marking task as SUCCESS. dag_id=test_dag, task_id=outside_pg, execution_date=20210401T000000, start_date=20210427T104518, end_date=20210427T110130
[2021-04-27 13:01:30,704] {taskinstance.py:1220} INFO - 0 downstream tasks scheduled from follow-on schedule check
[2021-04-27 13:01:30,722] {backfill_job.py:388} INFO - [backfill progress] | finished run 0 of 1 | tasks waiting: 1 | succeeded: 1 | running: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 1
[2021-04-27 13:01:30,744] {base_executor.py:82} INFO - Adding to queue: ['<TaskInstance: test_dag.inside_pg 2021-04-01 00:00:00+00:00 [queued]>']
[2021-04-27 13:01:35,615] {taskinstance.py:1257} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_EMAIL=admin@airflow.com
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=test_dag
AIRFLOW_CTX_TASK_ID=inside_pg
AIRFLOW_CTX_EXECUTION_DATE=2021-04-01T00:00:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=backfill__2021-04-01T00:00:00+00:00
[2021-04-27 13:01:35,619] {base.py:74} INFO - Using connection to: id: pg_connection. Host: localhost, Port: 5432, Schema: postgres, Login: postgres, Password: None, extra: None
[2021-04-27 13:01:35,624] {dbapi.py:180} INFO - Running statement: sql/test_sql.sql, parameters: None
[2021-04-27 13:01:35,625] {taskinstance.py:1455} ERROR - syntax error at or near "sql"
LINE 1: sql/test_sql.sql
        ^

Traceback (most recent call last):
  File "/Users/robinzuschke/.pyenv/versions/3.7.7/envs/airflow/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1112, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/Users/robinzuschke/.pyenv/versions/3.7.7/envs/airflow/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1285, in _prepare_and_execute_task_with_callbacks
    result = self._execute_task(context, task_copy)
  File "/Users/robinzuschke/.pyenv/versions/3.7.7/envs/airflow/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1315, in _execute_task
    result = task_copy.execute(context=context)
  File "/Users/robinzuschke/.pyenv/versions/3.7.7/envs/airflow/lib/python3.7/site-packages/airflow/operators/python.py", line 233, in execute
    return_value = self.python_callable(*self.op_args, **self.op_kwargs)
  File "/Users/robinzuschke/code/valyria/airflow/dags/test_dag.py", line 34, in inside_pg
    inside_pg.execute(context=context)
  File "/Users/robinzuschke/.pyenv/versions/3.7.7/envs/airflow/lib/python3.7/site-packages/airflow/providers/postgres/operators/postgres.py", line 71, in execute
    self.hook.run(self.sql, self.autocommit, parameters=self.parameters)
  File "/Users/robinzuschke/.pyenv/versions/3.7.7/envs/airflow/lib/python3.7/site-packages/airflow/hooks/dbapi.py", line 184, in run
    cur.execute(sql_statement)
psycopg2.errors.SyntaxError: syntax error at or near "sql"
LINE 1: sql/test_sql.sql
        ^

[2021-04-27 13:01:35,630] {taskinstance.py:1503} INFO - Marking task as UP_FOR_RETRY. dag_id=test_dag, task_id=inside_pg, execution_date=20210401T000000, start_date=20210427T104518, end_date=20210427T110135
[2021-04-27 13:01:35,643] {debug_executor.py:87} ERROR - Failed to execute task: syntax error at or near "sql"
LINE 1: sql/test_sql.sql
        ^
.
Traceback (most recent call last):
  File "/Users/robinzuschke/.pyenv/versions/3.7.7/envs/airflow/lib/python3.7/site-packages/airflow/executors/debug_executor.py", line 79, in _run_task
    ti._run_raw_task(job_id=ti.job_id, **params)  # pylint: disable=protected-access
  File "/Users/robinzuschke/.pyenv/versions/3.7.7/envs/airflow/lib/python3.7/site-packages/airflow/utils/session.py", line 65, in wrapper
    return func(*args, session=session, **kwargs)
  File "/Users/robinzuschke/.pyenv/versions/3.7.7/envs/airflow/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1112, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/Users/robinzuschke/.pyenv/versions/3.7.7/envs/airflow/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1285, in _prepare_and_execute_task_with_callbacks
    result = self._execute_task(context, task_copy)
  File "/Users/robinzuschke/.pyenv/versions/3.7.7/envs/airflow/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1315, in _execute_task
    result = task_copy.execute(context=context)
  File "/Users/robinzuschke/.pyenv/versions/3.7.7/envs/airflow/lib/python3.7/site-packages/airflow/operators/python.py", line 233, in execute
    return_value = self.python_callable(*self.op_args, **self.op_kwargs)
  File "/Users/robinzuschke/code/valyria/airflow/dags/test_dag.py", line 34, in inside_pg
    inside_pg.execute(context=context)
  File "/Users/robinzuschke/.pyenv/versions/3.7.7/envs/airflow/lib/python3.7/site-packages/airflow/providers/postgres/operators/postgres.py", line 71, in execute
    self.hook.run(self.sql, self.autocommit, parameters=self.parameters)
  File "/Users/robinzuschke/.pyenv/versions/3.7.7/envs/airflow/lib/python3.7/site-packages/airflow/hooks/dbapi.py", line 184, in run
    cur.execute(sql_statement)
psycopg2.errors.SyntaxError: syntax error at or near "sql"
LINE 1: sql/test_sql.sql
        ^

[2021-04-27 13:01:35,651] {backfill_job.py:219} ERROR - Task instance <TaskInstance: test_dag.inside_pg 2021-04-01 00:00:00+00:00 [failed]> failed
[2021-04-27 13:01:35,655] {dagrun.py:430} ERROR - Marking run <DagRun test_dag @ 2021-04-01 00:00:00+00:00: backfill__2021-04-01T00:00:00+00:00, externally triggered: False> failed
[2021-04-27 13:01:35,657] {backfill_job.py:388} INFO - [backfill progress] | finished run 1 of 1 | tasks waiting: 0 | succeeded: 1 | running: 0 | failed: 1 | skipped: 0 | deadlocked: 0 | not ready: 0
Some task instances failed:
DAG ID    Task ID    Execution date               Try number
--------  ---------  -------------------------  ------------
test_dag  inside_pg  2021-04-01 00:00:00+00:00             1
(3.7.7/envs/airflow)

As you can see the outside_pg works fine while the inside_pg throws the error as it cannot execute the pathname as SQL.
Any help is appreciated!

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions