Skip to content

Commit

Permalink
Make pyodbc.Row and databricks.Row JSON-serializable via new `make_se…
Browse files Browse the repository at this point in the history
…rializable` method (#32319)
  • Loading branch information
Joffreybvn committed Nov 17, 2023
1 parent d089e1a commit 064fc2b
Show file tree
Hide file tree
Showing 19 changed files with 353 additions and 57 deletions.
6 changes: 6 additions & 0 deletions .pre-commit-config.yaml
Expand Up @@ -378,6 +378,12 @@ repos:
files: ^dev/breeze/src/airflow_breeze/utils/docker_command_utils\.py$|^scripts/ci/docker_compose/local\.yml$
pass_filenames: false
additional_dependencies: ['rich>=12.4.4']
- id: check-common-sql-dependency-make-serializable
name: Check dependency of SQL Providers with '_make_serializable'
entry: ./scripts/ci/pre_commit/pre_commit_check_common_sql_dependency.py
language: python
files: ^airflow/providers/.*/hooks/.*\.py$
additional_dependencies: ['rich>=12.4.4', 'pyyaml', 'packaging']
- id: update-providers-dependencies
name: Update cross-dependencies for providers packages
entry: ./scripts/ci/pre_commit/pre_commit_update_providers_dependencies.py
Expand Down
2 changes: 2 additions & 0 deletions STATIC_CODE_CHECKS.rst
Expand Up @@ -167,6 +167,8 @@ require Breeze Docker image to be built locally.
+-----------------------------------------------------------+--------------------------------------------------------------+---------+
| check-cncf-k8s-only-for-executors | Check cncf.kubernetes imports used for executors only | |
+-----------------------------------------------------------+--------------------------------------------------------------+---------+
| check-common-sql-dependency-make-serializable | Check dependency of SQL Providers with '_make_serializable' | |
+-----------------------------------------------------------+--------------------------------------------------------------+---------+
| check-core-deprecation-classes | Verify usage of Airflow deprecation classes in core | |
+-----------------------------------------------------------+--------------------------------------------------------------+---------+
| check-daysago-import-from-utils | Make sure days_ago is imported from airflow.utils.dates | |
Expand Down
8 changes: 8 additions & 0 deletions airflow/providers/common/sql/CHANGELOG.rst
Expand Up @@ -25,6 +25,14 @@
Changelog
---------

1.8.1
.....

Misc
~~~~

* ``Add '_make_serializable' method which other SQL operators can overrides when result from cursor is not JSON-serializable (#32319)``

1.8.0
.....

Expand Down
21 changes: 20 additions & 1 deletion airflow/providers/common/sql/hooks/sql.py
Expand Up @@ -138,6 +138,11 @@ class DbApiHook(BaseForDbApiHook):
"""
Abstract base class for sql hooks.
When subclassing, maintainers can override the `_make_serializable` method:
This method transforms the result of the handler method (typically `cursor.fetchall()`) into
JSON-serializable objects. Most of the time, the underlying SQL library already returns tuples from
its cursor, and the `_make_serializable` method can be ignored.
:param schema: Optional DB schema that overrides the schema specified in the connection. Make sure that
if you change the schema parameter value in the constructor of the derived Hook, such change
should be done before calling the ``DBApiHook.__init__()``.
Expand Down Expand Up @@ -403,7 +408,7 @@ def run(
self._run_command(cur, sql_statement, parameters)

if handler is not None:
result = handler(cur)
result = self._make_serializable(handler(cur))
if return_single_query_results(sql, return_last, split_statements):
_last_result = result
_last_description = cur.description
Expand All @@ -423,6 +428,20 @@ def run(
else:
return results

@staticmethod
def _make_serializable(result: Any) -> Any:
"""Ensure the data returned from an SQL command is JSON-serializable.
This method is intended to be overridden by subclasses of the `DbApiHook`. Its purpose is to
transform the result of an SQL command (typically returned by cursor methods) into a
JSON-serializable format.
If this method is not overridden, the result data is returned as-is.
If the output of the cursor is already JSON-serializable, this method
should be ignored.
"""
return result

def _run_command(self, cur, sql_statement, parameters):
"""Run a statement using an already open cursor."""
if self.log_sql:
Expand Down
1 change: 1 addition & 0 deletions airflow/providers/common/sql/provider.yaml
Expand Up @@ -23,6 +23,7 @@ description: |
suspended: false
versions:
- 1.8.1
- 1.8.0
- 1.7.2
- 1.7.1
Expand Down
9 changes: 8 additions & 1 deletion airflow/providers/databricks/hooks/databricks_sql.py
Expand Up @@ -222,7 +222,7 @@ def run(
with closing(conn.cursor()) as cur:
self._run_command(cur, sql_statement, parameters)
if handler is not None:
result = handler(cur)
result = self._make_serializable(handler(cur))
if return_single_query_results(sql, return_last, split_statements):
results = [result]
self.descriptions = [cur.description]
Expand All @@ -240,6 +240,13 @@ def run(
else:
return results

@staticmethod
def _make_serializable(result):
"""Transform the databricks Row objects into a JSON-serializable list of rows."""
if result is not None:
return [list(row) for row in result]
return result

def bulk_dump(self, table, tmp_file):
raise NotImplementedError()

Expand Down
10 changes: 2 additions & 8 deletions airflow/providers/databricks/operators/databricks_sql.py
Expand Up @@ -30,15 +30,9 @@
from airflow.providers.databricks.hooks.databricks_sql import DatabricksSqlHook

if TYPE_CHECKING:
from databricks.sql.types import Row

from airflow.utils.context import Context


def make_serializable(val: Row):
return tuple(val)


class DatabricksSqlOperator(SQLExecuteQueryOperator):
"""
Executes SQL code in a Databricks SQL endpoint or a Databricks cluster.
Expand Down Expand Up @@ -129,7 +123,7 @@ def _should_run_output_processing(self) -> bool:

def _process_output(self, results: list[Any], descriptions: list[Sequence[Sequence] | None]) -> list[Any]:
if not self._output_path:
return list(zip(descriptions, [[make_serializable(row) for row in res] for res in results]))
return list(zip(descriptions, results))
if not self._output_format:
raise AirflowException("Output format should be specified!")
# Output to a file only the result of last query
Expand Down Expand Up @@ -162,7 +156,7 @@ def _process_output(self, results: list[Any], descriptions: list[Sequence[Sequen
file.write("\n")
else:
raise AirflowException(f"Unsupported output format: '{self._output_format}'")
return list(zip(descriptions, [[make_serializable(row) for row in res] for res in results]))
return list(zip(descriptions, results))


COPY_INTO_APPROVED_FORMATS = ["CSV", "JSON", "AVRO", "ORC", "PARQUET", "TEXT", "BINARYFILE"]
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/databricks/provider.yaml
Expand Up @@ -56,7 +56,7 @@ versions:

dependencies:
- apache-airflow>=2.5.0
- apache-airflow-providers-common-sql>=1.5.0
- apache-airflow-providers-common-sql>=1.8.1
- requests>=2.27,<3
# The connector 2.9.0 released on Aug 10, 2023 has a bug that it does not properly declare urllib3 and
# it needs to be excluded. See https://github.com/databricks/databricks-sql-python/issues/190
Expand Down
13 changes: 12 additions & 1 deletion airflow/providers/odbc/hooks/odbc.py
Expand Up @@ -17,7 +17,7 @@
"""This module contains ODBC hook."""
from __future__ import annotations

from typing import Any
from typing import Any, NamedTuple
from urllib.parse import quote_plus

import pyodbc
Expand Down Expand Up @@ -211,3 +211,14 @@ def get_sqlalchemy_connection(
engine = self.get_sqlalchemy_engine(engine_kwargs=engine_kwargs)
cnx = engine.connect(**(connect_kwargs or {}))
return cnx

@staticmethod
def _make_serializable(result: list[pyodbc.Row] | None) -> list[NamedTuple] | None:
"""Transform the pyodbc.Row objects returned from an SQL command into JSON-serializable NamedTuple."""
if result is not None:
columns: list[tuple[str, type]] = [col[:2] for col in result[0].cursor_description]
# Below line respects NamedTuple docstring, but mypy do not support dynamically
# instantiated Namedtuple, and will never do: https://github.com/python/mypy/issues/848
row_object = NamedTuple("Row", columns) # type: ignore[misc]
return [row_object(*row) for row in result]
return result
2 changes: 1 addition & 1 deletion airflow/providers/odbc/provider.yaml
Expand Up @@ -42,7 +42,7 @@ versions:

dependencies:
- apache-airflow>=2.5.0
- apache-airflow-providers-common-sql>=1.3.1
- apache-airflow-providers-common-sql>=1.8.1
- pyodbc

integrations:
Expand Down
1 change: 1 addition & 0 deletions dev/breeze/src/airflow_breeze/pre_commit_ids.py
Expand Up @@ -37,6 +37,7 @@
"check-builtin-literals",
"check-changelog-has-no-duplicates",
"check-cncf-k8s-only-for-executors",
"check-common-sql-dependency-make-serializable",
"check-core-deprecation-classes",
"check-daysago-import-from-utils",
"check-decorated-operator-implements-custom-name",
Expand Down
4 changes: 2 additions & 2 deletions generated/provider_dependencies.json
Expand Up @@ -308,7 +308,7 @@
"databricks": {
"deps": [
"aiohttp>=3.6.3, <4",
"apache-airflow-providers-common-sql>=1.5.0",
"apache-airflow-providers-common-sql>=1.8.1",
"apache-airflow>=2.5.0",
"databricks-sql-connector>=2.0.0, <3.0.0, !=2.9.0",
"requests>=2.27,<3"
Expand Down Expand Up @@ -659,7 +659,7 @@
},
"odbc": {
"deps": [
"apache-airflow-providers-common-sql>=1.3.1",
"apache-airflow-providers-common-sql>=1.8.1",
"apache-airflow>=2.5.0",
"pyodbc"
],
Expand Down
75 changes: 75 additions & 0 deletions images/breeze/output-commands-hash.txt
@@ -0,0 +1,75 @@
# This file is automatically generated by pre-commit. If you have a conflict with this file
# Please do not solve it but run `breeze setup regenerate-command-images`.
# This command should fix the conflict and regenerate help images that you have conflict with.
main:ffb1a766b791beaf5f8a983587db870f
build-docs:2e9882744f219e56726548ce2d13c3f5
ci:find-backtracking-candidates:17fe56b867a745e5032a08dfcd3f73ee
ci:fix-ownership:3e5a73533cc96045e72cb258783cfc96
ci:free-space:49af17b032039c05c41a7a8283f365cc
ci:get-workflow-info:8246038093359b9c3c110043419473e2
ci:resource-check:bfcca92f18a403ca630955074eb5e9ad
ci:selective-check:6657ed5d42affb7264b5efcc86f17a2a
ci:5315c29bd9f68725ef92e4db8aff5cda
ci-image:build:dd891a7e3c99131f7166ebbccd4f670b
ci-image:pull:f9248c6026da61fe0acdb5d8f37b20da
ci-image:verify:c90dc7e20fce2351eb89d8d1ebbd35e7
ci-image:973b722fdd947e21ff59f3bf9cfc6264
cleanup:8d92d453a6700f6d8cb11fb6a8b50461
compile-www-assets:0963f1409f0aa1e3b137cddd4cc52e87
down:4580f5b3b178ea00182694f134a751f3
exec:9d0fb86607526afb6b161115ae7bf9cc
k8s:build-k8s-image:b625255c3e8f3f794ee404f9a4476836
k8s:configure-cluster:9958c5aac726565ec043e850d56ec8f8
k8s:create-cluster:3e43f9da5e7c0bb67f3d868c9005515a
k8s:delete-cluster:5f580bb09b6456610bf1044321717673
k8s:deploy-airflow:f4b05b2101a4a029c9706ecd6fbf3c5c
k8s:k9s:892a7931e981ba01a21c0da72fac39bc
k8s:logs:f1a3fa2c5747d86ff712d1b0a06ff48b
k8s:run-complete-tests:5018013f47f6c60aae07eb35256eb240
k8s:setup-env:a34e94744ca4e0592371fe55478c3d54
k8s:shell:b872c01cedfd50b865d98ed85933fed7
k8s:status:6e711c24648c9bf42372e5b73cb2ac0f
k8s:tests:4fea1fee4cfbf15f313ffd9026219401
k8s:upload-k8s-image:46c5f1b042222047fda3f18f1ef75835
k8s:6994fe347c18bcc01d95fb721a3757d5
prod-image:build:20f84ddadc2fe4ae2723b7ccdde0197f
prod-image:pull:3817ef211b023b76df84ee1110ef64dd
prod-image:verify:bd2b78738a7c388dbad6076c41a9f906
prod-image:e9ecd759e51ebd926df3170b29d1d2dc
release-management:add-back-references:51960e2831d0e03a2b127d252929b843
release-management:create-minor-branch:a3834afc4aa5d1e98002c9e9e7a9931d
release-management:generate-constraints:01aef235b11e59ed7f10c970a5cdaba7
release-management:generate-issue-content-providers:cda108e7f2506c2816af8f2a6c24070c
release-management:generate-providers-metadata:d4e8e5cfaa024e3963af02d7a873048d
release-management:install-provider-packages:34c38aca17d23dbb454fe7a6bfd8e630
release-management:prepare-airflow-package:85d01c57e5b5ee0fb9e5f9d9706ed3b5
release-management:prepare-provider-documentation:eb861d68b8d72cd98dc8732fc5393796
release-management:prepare-provider-packages:908e2c826f7b4959dfd8bc693f3857a7
release-management:publish-docs:51ee9bf1268529513996a14bd5350c19
release-management:release-prod-images:cfbfe8b19fee91fd90718f98ef2fd078
release-management:start-rc-process:b27bd524dd3c89f50a747b60a7e892c1
release-management:start-release:419f48f6a4ff4457cb9de7ff496aebbe
release-management:update-constraints:02ec4b119150e3fdbac52026e94820ef
release-management:verify-provider-packages:96dce5644aad6b37080acf77b3d8de3a
release-management:59d956e45fccf55e47f16e33cfc5d04a
sbom:build-all-airflow-images:32f8acade299c2b112e986bae99846db
sbom:generate-providers-requirements:3926848718283cf2ef00310a0892e867
sbom:update-sbom-information:653be48be70b4b7ff5172d491aadc694
sbom:386048e0c00c0de30cf181eb9f3862ea
setup:autocomplete:fffcd49e102e09ccd69b3841a9e3ea8e
setup:check-all-params-in-groups:f9ca6bef11ed65e40f06d7cf261a4859
setup:config:53a0aeec6237da4d46bde68fafa29dc3
setup:regenerate-command-images:ea2fba3440bd4e84311a53abe6e8dc56
setup:self-upgrade:4af905a147fcd6670a0e33d3d369a94b
setup:version:be116d90a21c2afe01087f7609774e1e
setup:304a70e939d78427c749c24e8c0992df
shell:aa92fe60473e4b5d0f41b5b182c02468
start-airflow:f09871892c61bc889e6b56791115c923
static-checks:f39d698d5735f372c01d9f1d5719fd13
testing:db-tests:e08e3f30ddc34d95ae56de5222392b59
testing:docker-compose-tests:fd154a058082fcfda12eb877a9a89338
testing:helm-tests:98a9ba6631249762b1633b76a29f4461
testing:integration-tests:c7fde5144126a445201d7e353aa19ba5
testing:non-db-tests:ed916603036dd9979b1593c4d088eb40
testing:tests:4ad1723c7b2b6d2d7d249d42964ced92
testing:eae1e62ff40d5012388abd104461b88e

0 comments on commit 064fc2b

Please sign in to comment.