Skip to content

Commit

Permalink
Fix Azure Data Explorer Operator (#13520)
Browse files Browse the repository at this point in the history
Co-authored-by: Kaxil Naik <kaxilnaik@gmail.com>
  • Loading branch information
kentdanas and kaxil committed Jan 7, 2021
1 parent cc79117 commit b2cb6ee
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 17 deletions.
10 changes: 7 additions & 3 deletions airflow/providers/microsoft/azure/operators/adx.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@
#

"""This module contains Azure Data Explorer operators"""
from typing import Optional
from typing import Optional, Union

from azure.kusto.data._models import KustoResultTable

from airflow.configuration import conf
from airflow.models import BaseOperator
from airflow.providers.microsoft.azure.hooks.adx import AzureDataExplorerHook
from airflow.utils.decorators import apply_defaults
Expand Down Expand Up @@ -66,12 +67,15 @@ def get_hook(self) -> AzureDataExplorerHook:
"""Returns new instance of AzureDataExplorerHook"""
return AzureDataExplorerHook(self.azure_data_explorer_conn_id)

def execute(self, context: dict) -> KustoResultTable:
def execute(self, context: dict) -> Union[KustoResultTable, str]:
"""
Run KQL Query on Azure Data Explorer (Kusto).
Returns `PrimaryResult` of Query v2 HTTP response contents
(https://docs.microsoft.com/en-us/azure/kusto/api/rest/response2)
"""
hook = self.get_hook()
response = hook.run_query(self.query, self.database, self.options)
return response.primary_results[0]
if conf.getboolean('core', 'enable_xcom_pickling'):
return response.primary_results[0]
else:
return str(response.primary_results[0])
33 changes: 19 additions & 14 deletions tests/providers/microsoft/azure/operators/test_adx.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import unittest
from unittest import mock

from azure.kusto.data._models import KustoResultTable

from airflow.models import DAG, TaskInstance
from airflow.providers.microsoft.azure.hooks.adx import AzureDataExplorerHook
from airflow.providers.microsoft.azure.operators.adx import AzureDataExplorerQueryOperator
Expand All @@ -36,19 +38,22 @@
'options': {'option1': 'option_value'},
}

MOCK_RESULT = {
'name': 'getschema',
'kind': 'PrimaryResult',
'data': [
{'ColumnName': 'Source', 'ColumnOrdinal': 0, 'DataType': 'System.String', 'ColumnType': 'string'},
{
'ColumnName': 'Timestamp',
'ColumnOrdinal': 1,
'DataType': 'System.DateTime',
'ColumnType': 'datetime',
},
],
}
MOCK_RESULT = KustoResultTable(
json_table={
'TableName': 'getschema',
"TableId": 1,
'TableKind': 'PrimaryResult',
'Columns': [
{"ColumnName": "Source", "ColumnType": "string", 'DataType': 'System.String'},
{
"ColumnName": "Timestamp",
"ColumnType": "datetime",
'DataType': 'System.DateTime',
},
],
"Rows": [["hi", "2017-01-01T01:01:01.0000003Z"], ["hello", "2017-01-01T01:01:01.0000003Z"]],
}
)


class MockResponse:
Expand Down Expand Up @@ -82,4 +87,4 @@ def test_xcom_push_and_pull(self, mock_conn, mock_run_query):
ti = TaskInstance(task=self.operator, execution_date=timezone.utcnow())
ti.run()

self.assertEqual(ti.xcom_pull(task_ids=MOCK_DATA['task_id']), MOCK_RESULT)
self.assertEqual(ti.xcom_pull(task_ids=MOCK_DATA['task_id']), str(MOCK_RESULT))

0 comments on commit b2cb6ee

Please sign in to comment.