-
Notifications
You must be signed in to change notification settings - Fork 16.8k
AzureDataExplorerQueryOperator fails with ERROR - Object of type KustoResultTable is not JSON serializable #13354
Description
Apache Airflow version: 2.0
What happened:
The AzureDataExplorerQueryOperator fails because it attempts to return the results of the query but the results can't be pushed to xcom.
2020-12-28 23:10:08,458] {xcom.py:238} ERROR - Could not serialize the XCom value into JSON. If you are using pickles instead of JSON for XCom, then you need to enable pickle support for XCom in your airflow config.
[2020-12-28 23:10:08,459] {taskinstance.py:1454} ERROR - Object of type KustoResultTable is not JSON serializable
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1112, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1284, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1317, in _execute_task
self.xcom_push(key=XCOM_RETURN_KEY, value=result)
File "/usr/local/lib/python3.7/site-packages/airflow/utils/session.py", line 65, in wrapper
return func(*args, session=session, **kwargs)
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1898, in xcom_push
session=session,
File "/usr/local/lib/python3.7/site-packages/airflow/utils/session.py", line 62, in wrapper
return func(*args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/airflow/models/xcom.py", line 88, in set
value = XCom.serialize_value(value)
File "/usr/local/lib/python3.7/site-packages/airflow/models/xcom.py", line 235, in serialize_value
return json.dumps(value).encode('UTF-8')
File "/usr/local/lib/python3.7/json/__init__.py", line 231, in dumps
return _default_encoder.encode(obj)
File "/usr/local/lib/python3.7/json/encoder.py", line 199, in encode
chunks = self.iterencode(o, _one_shot=True)
File "/usr/local/lib/python3.7/json/encoder.py", line 257, in iterencode
return _iterencode(o, 0)
File "/usr/local/lib/python3.7/json/encoder.py", line 179, in default
raise TypeError(f'Object of type {o.__class__.__name__} '
TypeError: Object of type KustoResultTable is not JSON serializable
[2020-12-28 23:10:08,461] {taskinstance.py:1502} INFO - Marking task as UP_FOR_RETRY. dag_id=azure_data_explorer, task_id=adx_query, execution_date=20201228T231006, start_date=20201228T231007, end_date=20201228T231008
[2020-12-28 23:10:08,503] {local_task_job.py:142} INFO - Task exited with return code 1
I get the same error no matter what query I run. Execution of the query works fine, but it appears to fail on the final return; from the operator code: return response.primary_results[0]
Not sure if there's a way to change the query output format by passing in options to the operator, but if there is I couldn't find anything. The operator also doesn't have an xcom_push param, so I couldn't find a way to turn off this functionality.
What you expected to happen:
Operator would push results of query to xcom and task would complete successfully.
How to reproduce it:
Call the operator with basic input params as shown in the docs, something like this:
from airflow.providers.microsoft.azure.operators.adx import AzureDataExplorerQueryOperator
adx_query = '''StormEvents
| sort by StartTime desc
| take 10'''
opr_adx_query = AzureDataExplorerQueryOperator(
task_id='adx_query',
query=adx_query,
database='storm_demo',
azure_data_explorer_conn_id='adx'
)