Skip to content

Unable to execute a file, by placing the absolute path of it, ending with .sql extension to the AWS SqlToS3Operator #37372

@Altonymous

Description

@Altonymous

Apache Airflow version

2.8.1

If "Other Airflow 2 version" selected, which one?

No response

What happened?

Unable to pass in an absolute path to a sql file/template and have it load appropriately in the amazon sql to s3 transfer class:
https://github.com/apache/airflow/blob/2.8.1/airflow/providers/amazon/aws/transfers/sql_to_s3.py#L64

Traceback (most recent call last):
  File "/opt/venv/lib/python3.11/site-packages/pandas/io/sql.py", line 2675, in execute
    cur.execute(sql, *args)
  File "/opt/venv/lib/python3.11/site-packages/MySQLdb/cursors.py", line 179, in execute
    res = self._query(mogrified_query)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/venv/lib/python3.11/site-packages/MySQLdb/cursors.py", line 330, in _query
    db.query(q)
  File "/opt/venv/lib/python3.11/site-packages/MySQLdb/connections.py", line 261, in query
    _mysql.connection.query(self, query)
MySQLdb.ProgrammingError: (1064, "You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'extract/last_event.sql' at line 1")
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
  File "/opt/venv/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 433, in _execute_task
    result = execute_callable(context=context, **execute_callable_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/venv/lib/python3.11/site-packages/airflow/decorators/base.py", line 241, in execute
    return_value = super().execute(context)
                   ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/venv/lib/python3.11/site-packages/airflow/operators/python.py", line 199, in execute
    return_value = self.execute_callable()
                   ^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/venv/lib/python3.11/site-packages/airflow/operators/python.py", line 216, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/dags/aggregates/users_last_event.py", line 48, in extract
    ).execute(context=None)
      ^^^^^^^^^^^^^^^^^^^^^
  File "/opt/venv/lib/python3.11/site-packages/airflow/providers/amazon/aws/transfers/sql_to_s3.py", line 178, in execute
    data_df = sql_hook.get_pandas_df(sql=self.query, parameters=self.parameters)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/venv/lib/python3.11/site-packages/airflow/providers/common/sql/hooks/sql.py", line 221, in get_pandas_df
    return psql.read_sql(sql, con=conn, params=parameters, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/venv/lib/python3.11/site-packages/pandas/io/sql.py", line 706, in read_sql
    return pandas_sql.read_query(
           ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/venv/lib/python3.11/site-packages/pandas/io/sql.py", line 2739, in read_query
    cursor = self.execute(sql, params)
             ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/venv/lib/python3.11/site-packages/pandas/io/sql.py", line 2687, in execute
    raise ex from exc
pandas.errors.DatabaseError: Execution failed on sql 'extract/last_event.sql': (1064, "You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'extract/last_event.sql' at line 1")

What you think should happen instead?

I would expect it to use the sql/template in the file.

How to reproduce

import pendulum

from airflow.decorators import dag, task
from airflow.providers.amazon.aws.transfers.sql_to_s3 import SqlToS3Operator

dag(
    schedule_interval=None,
    start_date=pendulum.datetime(2024, 1, 1, tz="UTC"),
    catchup=False,
    tags=["prototype"],
    template_searchpath=["/app/dags/templates/users"],
)
def users_last_event_taskflow():
    @task()
    def extract(s3_bucket, s3_key, sql_conn_id, aws_conn_id):
        SqlToS3Operator(
            query="/app/path/to/sql/template.sql",
            s3_bucket=s3_bucket,
            s3_key=s3_key,
            sql_conn_id=sql_conn_id,
            aws_conn_id=aws_conn_id,
            task_id="sql_to_s3_task",
            replace=True,
        ).execute(context=None)

    extract("s3_bucket", "s3_key", "mysql_default", "aws_default")

users_last_event_taskflow_etl = users_last_event_taskflow()

Operating System

debian

Versions of Apache Airflow Providers

apache-airflow==2.8.1
apache-airflow-providers-amazon==8.17.0
apache-airflow-providers-mysql==5.1.0

Deployment

Official Apache Airflow Helm Chart

Deployment details

No response

Anything else?

Every time.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions