Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

XCom unable to parse tuple response from DatabricksSQLOperator on SQL query execution #39448

Open
1 of 2 tasks
KanikaAdik opened this issue May 6, 2024 · 1 comment
Open
1 of 2 tasks

Comments

@KanikaAdik
Copy link

Apache Airflow Provider(s)

databricks

Versions of Apache Airflow Providers

apache-airflow-providers-databricks==6.3.0

Apache Airflow version

2.7.3

Operating System

(airflow)cat /etc/os-release PRETTY_NAME="Debian GNU/Linux 11 (bullseye)" NAME="Debian GNU/Linux" VERSION_ID="11" VERSION="11 (bullseye)" VERSION_CODENAME=bullseye ID=debian HOME_URL="https://www.debian.org/" SUPPORT_URL="https://www.debian.org/support" BUG_REPORT_URL="https://bugs.debian.org/" (airflow)

Deployment

Other Docker-based deployment

Deployment details

No response

What happened

I am trying to execute a few queries using DatabricksSQLOperator in an airflow dag. Seems like the output received from the Operator cannot be handled by Xcom.
Hence receiving error log -
[2024-05-06T20:17:44.253+0000] {xcom.py:661} ERROR - Object of type tuple is not JSON serializable. If you are using pickle instead of JSON for XCom, then you need to enable pickle support for XCom in your *** config or make sure to decorate your object with attr.
[2024-05-06T20:17:44.254+0000] {taskinstance.py:1937} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/json.py", line 91, in default
return serialize(o)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py", line 145, in serialize
return encode(classname, version, serialize(data, depth + 1))
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py", line 124, in serialize
return [serialize(d, depth + 1) for d in o]
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py", line 124, in
return [serialize(d, depth + 1) for d in o]
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py", line 124, in serialize
return [serialize(d, depth + 1) for d in o]
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py", line 124, in
return [serialize(d, depth + 1) for d in o]
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py", line 178, in serialize
raise TypeError(f"cannot serialize object of type {cls}")
TypeError: cannot serialize object of type <class '***.providers.databricks.hooks.databricks_sql.Row'>

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/session.py", line 76, in wrapper
return func(*args, **kwargs)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 2479, in xcom_push
XCom.set(
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/session.py", line 76, in wrapper
return func(*args, **kwargs)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/xcom.py", line 244, in set
value = cls.serialize_value(
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/xcom.py", line 659, in serialize_value
return json.dumps(value, cls=XComEncoder).encode("UTF-8")
File "/usr/local/lib/python3.8/json/init.py", line 234, in dumps
return cls(
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/json.py", line 102, in encode
o = self.default(o)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/json.py", line 93, in default
return super().default(o)
File "/usr/local/lib/python3.8/json/encoder.py", line 179, in default
raise TypeError(f'Object of type {o.class.name} '
TypeError: Object of type tuple is not JSON serializable
[2024-05-06T20:17:44.259+0000] {taskinstance.py:1400} INFO - Marking task as FAILED. dag_id=ct_sql_xcom, task_id=select_data, execution_date=20240506T170918, start_date=20240506T201741, end_date=20240506T201744
[2024-05-06T20:17:44.266+0000] {standard_task_runner.py:104} ERROR - Failed to execute job 238 for task select_data (Object of type tuple is not JSON serializable; 1850)

What you think should happen instead

Xcom should be able to handle the JSON serialization or have set a standard with DatabricksSQL providers on acceptable response type to handle any generic case.

How to reproduce

  1. create a DatabricksSQLOperator airflow dag
  2. set the do_xcom_push=True
  3. set a separate task to parse and use values sql query result

`from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.databricks.operators.databricks_sql import DatabricksSqlOperator
from airflow.models import Variable

env = Variable.get('AIRFLOW_VAR_ENV_NAME')

with DAG('ct_sql_xcom',
start_date = datetime(2024, 1, 30),
schedule_interval = None
) as dag:

create_file = DatabricksSqlOperator(
    databricks_conn_id='databricks',
    sql_endpoint_name='Serverless',
    task_id="create_and_populate_from_file",
    sql="select table_name from system.information_schema.tables where table_catalog = 'rsg_prod'",
    do_xcom_push=True,
)

def downstream_task(**kwargs):
    result = kwargs['task_instance'].xcom_pull(task_ids='create_file')
    print("Received result from XCom:", result)

create_file >> downstream_task
`

Anything else

`[2024-05-06T19:13:57.614+0000] {xcom.py:661} ERROR - Object of type tuple is not JSON serializable. If you are using pickle instead of JSON for XCom, then you need to enable pickle support for XCom in your *** config or make sure to decorate your object with attr.
[2024-05-06T19:13:57.615+0000] {taskinstance.py:1937} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/json.py", line 91, in default
return serialize(o)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py", line 145, in serialize
return encode(classname, version, serialize(data, depth + 1))
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py", line 124, in serialize
return [serialize(d, depth + 1) for d in o]
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py", line 124, in
return [serialize(d, depth + 1) for d in o]
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py", line 124, in serialize
return [serialize(d, depth + 1) for d in o]
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py", line 124, in
return [serialize(d, depth + 1) for d in o]
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py", line 178, in serialize
raise TypeError(f"cannot serialize object of type {cls}")
TypeError: cannot serialize object of type <class '***.providers.databricks.hooks.databricks_sql.Row'>

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/session.py", line 76, in wrapper
return func(*args, **kwargs)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 2479, in xcom_push
XCom.set(
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/session.py", line 76, in wrapper
return func(*args, **kwargs)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/xcom.py", line 244, in set
value = cls.serialize_value(
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/xcom.py", line 659, in serialize_value
return json.dumps(value, cls=XComEncoder).encode("UTF-8")
File "/usr/local/lib/python3.8/json/init.py", line 234, in dumps
return cls(
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/json.py", line 102, in encode
o = self.default(o)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/json.py", line 93, in default
return super().default(o)
File "/usr/local/lib/python3.8/json/encoder.py", line 179, in default
raise TypeError(f'Object of type {o.class.name} '
TypeError: Object of type tuple is not JSON serializable
[2024-05-06T19:13:57.620+0000] {taskinstance.py:1400} INFO - Marking task as UP_FOR_RETRY. dag_id=ct_sql_xcom, task_id=create_file , execution_date=20240506T170918, start_date=20240506T191355, end_date=20240506T191357
[2024-05-06T19:13:57.627+0000] {standard_task_runner.py:104} ERROR - Failed to execute job 233 for task create_file (Object of type tuple is not JSON serializable; 842)`

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@KanikaAdik KanikaAdik added area:providers kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels May 6, 2024
Copy link

boring-cyborg bot commented May 6, 2024

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

@eladkal eladkal added good first issue and removed needs-triage label for new issues that we didn't triage yet labels May 7, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants