-
Notifications
You must be signed in to change notification settings - Fork 16.2k
Description
Apache Airflow version
Other Airflow 2/3 version (please specify below)
If "Other Airflow 2/3 version" selected, which one?
2.10.3
What happened?
When calling the .execute, and retrieve the query_ids I get the parent ID, but if I'm running multiple queries I want to know all the queries IDs to retrieve results for any of them.
On the logs, I can see the API response, but I don't see a method to get it.
While checking the code I see it calls to the Hook, and on the hook I can see
def _process_response(self, status_code, resp):
self.log.info("Snowflake SQL GET statements status API response: %s", resp)
if status_code == 202:
return {"status": "running", "message": "Query statements are still running"}
elif status_code == 422:
return {"status": "error", "message": resp["message"]}
elif status_code == 200:
if resp_statement_handles := resp.get("statementHandles"):
statement_handles = resp_statement_handles
elif resp_statement_handle := resp.get("statementHandle"):
statement_handles = [resp_statement_handle]
else:
statement_handles = []
return {
"status": "success",
"message": resp["message"],
"statement_handles": statement_handles,
}
else:
return {"status": "error", "message": resp["message"]}
So, seeing this I would say in the Hook is correct, because if it has statementHandles it returns it. Although I would say that is not totally right, because you may want/need both. The parent id that is the statementHandle, and the IDs of the children (statementHandles).
But the Operator only returns the parent ID, even so when in the resp I can see everything. I think is because it calls to check_query_output() which only logs the json response, but doesn't return it, and the hook uses get_sql_api_query_status that parses it correctly?
I also think that the poll_queries would be a nice method to have exposed to the hook, since now the hook I think it runs the query and forget about it.
resp:
[TIMESTAMP] {snowflake_sql_api.py:287} INFO - Snowflake SQL GET statements status API response: {'resultSetMetaData': {'numRows': 1, 'format': 'jsonv2', 'partitionInfo': [{'rowCount': 1, 'uncompressedSize': 57}], 'rowType': [{'name': 'multiple statement execution', 'database': '', 'schema': '', 'table': '', 'scale': None, 'nullable': False, 'byteLength': 16777216, 'precision': None, 'collation': None, 'length': 16777216, 'type': 'text'}]}, 'data': [['Multiple statements executed successfully.']], 'code': '090001', 'statementHandles': ['XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX', 'XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX', 'XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX', 'XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX', 'XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX', 'XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX', 'XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX', 'XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX', 'XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX'], 'statementStatusUrl': '/api/v2/statements/XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX?requestId=XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX', 'requestId': 'XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX', 'sqlState': '00000', 'statementHandle': 'XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX', 'message': 'Statement executed successfully.', 'createdOn': 1763984429070}
What you think should happen instead?
No response
How to reproduce
I don't have a basic example since I'm debugging production code, but it should be
op = SnowflakeSqlApiOperator(
task_id="run_sql",
snowflake_conn_id=snflk_conn_id,
sql=sql_statement,
database=sql_params["database"],
schema=sql_params["schema_db"],
deferrable=False,
statement_count=stmt_count,
warehouse=kwargs.get("warehouse", "").upper(),
)
sql_statement = """
ALTER SESSION SET TRANSACTION_ABORT_ON_ERROR = TRUE;
BEGIN TRANSACTION;
SELECT 4 as "number of rows inserted";
SELECT 2 as "number of rows updated";
COMMIT;
"""
Operating System
AWS MWAA
Versions of Apache Airflow Providers
Providers
| Package Name | Version | Description |
|---|---|---|
| apache-airflow-providers-amazon | 9.0.0 | Amazon integration (including Amazon Web Services (AWS)). |
| apache-airflow-providers-celery | 3.8.3 | Celery |
| apache-airflow-providers-common-compat | 1.7.3 | Common Compatibility Provider - providing compatibility code for previous Airflow versions |
| apache-airflow-providers-common-io | 1.4.2 | Common IO Provider |
| apache-airflow-providers-common-sql | 1.28.0 | Common SQL Provider |
| apache-airflow-providers-fab | 1.5.0 | Flask App Builder |
| apache-airflow-providers-ftp | 3.11.1 | File Transfer Protocol (FTP) |
| apache-airflow-providers-http | 4.13.2 | Hypertext Transfer Protocol (HTTP) |
| apache-airflow-providers-imap | 3.7.0 | Internet Message Access Protocol (IMAP) |
| apache-airflow-providers-microsoft-mssql | 3.9.1 | Microsoft SQL Server (MSSQL) |
| apache-airflow-providers-pagerduty | 5.0.2 | Pagerduty |
| apache-airflow-providers-postgres | 5.13.1 | PostgreSQL |
| apache-airflow-providers-slack | 9.2.0 | Slack services integration including: - Slack API - Slack Incoming Webhook |
| apache-airflow-providers-smtp | 1.8.0 | Simple Mail Transfer Protocol (SMTP) |
| apache-airflow-providers-snowflake | 6.5.3 | Snowflake |
| apache-airflow-providers-sqlite | 3.9.0 | SQLite |
Deployment
Amazon (AWS) MWAA
Deployment details
No response
Anything else?
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct