Add ClickHouse Provider#67080
Conversation
Introduces the package structure for the new ClickHouse provider: pyproject.toml, provider.yaml, namespace packages, and test skeletons.
Implements ClickHouseHook (extending DbApiHook) via clickhouse-connect: - ClickHouseConnection DB-API 2.0 adapter (cursor, commit/rollback no-ops) - Connection-form widgets and UI field behaviour for the Airflow UI - session_settings and client_kwargs merge (extra JSON + constructor args) - bulk_insert_rows() for efficient columnar inserts - get_uri() for SQLAlchemy-compatible connection strings
95 tests covering connection building, session_settings and client_kwargs merge logic, database override, UI widgets, bulk_insert_rows, and get_uri.
Connection type reference, operator how-to guide, changelog, and integration logo for the ClickHouse provider.
Demonstrates ClickHouseHook and SQLExecuteQueryOperator usage: create table, bulk insert, read rows, and drop table.
Previous uv.lock was regenerated with a local uv version that produced a different format. Restore to upstream format with only the clickhouse-connect entries added.
|
Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide
|
|
@koletzilla @joe-clickhouse would you mind reviewing that as well? |
|
thanks for adding Clickhouse provider @BentsiLeviav |
joe-clickhouse
left a comment
There was a problem hiding this comment.
Hi @BentsiLeviav. Looks pretty good! From a clickhouse-connect perspective I have few comments. In short, the scheme name needs updating and I think the bulk insert should be changed to use an insert context or just the regular client insert method which will automatically stream.
| host = conn.host or "localhost" | ||
| port = int(conn.port) if conn.port else 8123 | ||
| database = self.database or conn.schema or "default" | ||
| scheme = "clickhousedbs" if bool(extra.get("secure", False)) else "clickhousedb" |
There was a problem hiding this comment.
clickhousedbs isn't a registered scheme. clickhouse-connect only registers clickhousedb (and clickhousedb+connect) as a SQLAlchemy dialect. The way you'll want to use with a TLS connection is a single scheme with secure as a query parameter, which the dbapi Connection.__init__ forwards via generic_args -> create_client. e.g. clickhousedb://user:pw@host:port/db?secure=true&verify=true.
It's probably worth wiring the other tuning params like connect_timeout, send_receive_timeout, compress, etc. through the query string the same way, otherwise SQLAlchemy-path users silently lose the settings change ability that DB-API-path users get.
There was a problem hiding this comment.
Thanks! pushed a fix. LMK if it is ok now
| try: | ||
| for i in range(0, len(rows), commit_every): | ||
| batch = rows[i : i + commit_every] | ||
| client.insert(table, batch, column_names=column_names) |
There was a problem hiding this comment.
client.insert() already streams data to ClickHouse in adaptive ~2MB blocks as a single Transfer-Encoding: chunked HTTP request. Batching at the Python layer here doesn't help and could actually even hurt because each client.insert() call without a reusable context issues a DESCRIBE TABLE to resolve column types even when column_names is supplied column_names is only used to filter/order the describe result.
I'd recommend either:
- Just call
client.insert(table, rows, column_names=column_names)once and let the driver handle block-level streaming internally. - If batching is genuinely needed (e.g. memory pressure on extremely large inputs), build the context once and reuse it like:
ctx = client.create_insert_context(table, column_names=column_names)
for i in range(0, len(rows), commit_every):
client.insert(data=rows[i:i+commit_every], context=ctx)Side note, I think commit_every is a misnomer as inserts are not transactional. batch_size might be a more appropriate term.
There was a problem hiding this comment.
Thanks for the input.
Took up your recommendation and did the following:
- renamed
commit_everytobatch_size - Change the default value of it to None, so if it is not provided, we will have a single insert.
- In case it is provided, create the context once before the insertion loop.
- updated the tests to verify all these
| @patch("airflow.providers.clickhouse.hooks.clickhouse.ClickHouseHook.get_connection") | ||
| def test_get_uri_secure_uses_clickhousedbs_scheme(self, mock_get_connection): | ||
| """secure=True in extra must produce the clickhousedbs:// (HTTPS) scheme.""" | ||
| conn = Connection( | ||
| conn_id="ch_secure", | ||
| conn_type="clickhouse", | ||
| host="secure-host", | ||
| port=8443, | ||
| login="user", | ||
| password="pass", | ||
| schema="db", | ||
| extra=json.dumps({"secure": True}), | ||
| ) | ||
| mock_get_connection.return_value = conn | ||
| uri = ClickHouseHook(clickhouse_conn_id="ch_secure").get_uri() | ||
| assert uri == "clickhousedbs://user:pass@secure-host:8443/db" |
There was a problem hiding this comment.
This test can be deleted/reconfigured in light of the other comment explaining how clickhousedbs isn't a valid scheme.
There was a problem hiding this comment.
I renamed the test to test_get_uri_secure_adds_query_param and updated the assertion to assert uri == "clickhousedb://user:pass@secure-host:8443/db?secure=true"
…ning The clickhouse-connect library only registers the clickhousedb:// SQLAlchemy dialect; clickhousedbs:// was never a valid scheme and would fail at engine creation. TLS is now enabled via ?secure=true, and tuning params (connect_timeout, send_receive_timeout, compress, verify) are forwarded as query-string arguments so SQLAlchemy-path users get the same settings as DB-API-path users. Tests updated accordingly.
joe-clickhouse
left a comment
There was a problem hiding this comment.
Hi @BentsiLeviav. The changes you made look good! I did notice one last thing that i've left a comment about in the code related to passing arbitrary kwargs to the client from the Connection extra level.
| extra: dict[str, Any] = conn.extra_dejson | ||
|
|
||
| # Merge client_kwargs: extra values are the base, constructor values override. | ||
| raw_client_kwargs = extra.get("client_kwargs") |
There was a problem hiding this comment.
I'm not an Airflow expert, but I think this might expose low-level clickhouse-connect client options at the Connection extra level, which may be too broad for an Airflow provider.
From a clickhouse-connect perspective, arbitrary client kwargs are useful when the caller owns the Python code. So I think ClickHouseHook(client_kwargs=...) is reasonable at the Dag author level. But for Connection extras, the provider should probably only expose a finite set of reviewed and documented fields like host, port, username, password, database, secure, verify, timeouts, compression, etc.
It looks like _HOOK_MANAGED_KWARGS prevents overriding hook-owned fields, which is good, but it still allows any other clickhouse_connect.get_client() kwarg through. That means a Connection configuration user can configure low-level transport and security behavior on behalf of any Dag that uses the connection.
Long story short, I think we should keep arbitrary client_kwargs as a hook constructor argument only, and promote individual kwargs to Connection extras when the provider intentionally supports and documents them. This seems more consistent with Airflow's guidance to allowlist Connection extras rather than forwarding arbitrary kwargs into underlying libraries, but I'll defer to the Airflow maintainers on the provider policy.
Reference on Connection configuration users:
Description
Adds a new apache-airflow-providers-clickhouse provider that integrates Airflow with ClickHouse via the HTTP interface using the
clickhouse-connectlibrary.Scope of this implementation
ClickHouseHook- the core integration, extendingDbApiHookso all standardSQLExecuteQueryOperatorfeatures work out of the box (templating, handler, split_statements, etc.)bulk_insert_rows()for more performant inserts using clickhouse-connect's native insert pathget_uri()for SQLAlchemy-compatible connection strings (clickhousedb:///clickhousedbs://)Implementation decisions
DB-API 2.0adapter (ClickHouseConnection): clickhouse-connect doesn't expose a DB-API connection natively - we wrap its Client in a thin adapter soDbApiHook.run()works unmodified.commit()and
rollback()are intentional no-ops since ClickHouse has no transactions.session_settingsandclient_kwargscan be set at the connection level (via the extra JSON field) and overridden at the task level (via hook constructor arguments), with the constructor taking precedence on conflicts.apache-airflow/<version> apache-airflow-providers-clickhouse/<version>in the HTTP User-Agent (system.query_log), making queries traceable back to their Airflow source. Users can append a custom label via the client_name extra field.SQLExecuteQueryOperatorfromcommon.sqlcovers all standard SQL use cases.File structure (generated with Claude)
provider.yamlconn-fieldsschema used to generate the connection formpyproject.tomlclickhouse-connect >=0.7.0,common-sql >=1.32.0) — auto-generated from the Breeze templatesrc/.../hooks/clickhouse.pyClickHouseHook(extendsDbApiHook) andClickHouseConnection(thin DB-API 2.0 adapter wrapping theclickhouse-connectclient)src/.../get_provider_info.pyprovider.yamlby the Breeze release tooling — do not edit manuallysrc/airflow/__init__.py,src/airflow/providers/__init__.pyairflow.providersimplicit namespacesrc/.../clickhouse/__init__.py__version__ = "1.0.0") with minimum Airflow version guard — auto-generateddocs/connections/clickhouse.rstdocs/operators/clickhouse.rstSQLExecuteQueryOperatorandClickHouseHookdirectly, includingsession_settingsandbulk_insert_rowsexamplesdocs/index.rst,docs/conf.py,docs/changelog.rst,docs/security.rstdocs/integration-logos/ClickHouse.pngtests/unit/clickhouse/hooks/test_clickhouse.pytests/system/clickhouse/example_clickhouse.py.github/boring-cyborg.ymlprovider:clickhouselabel rule for automatic PR labellingscripts/ci/docker-compose/remove-sources.yml,tests-sources.yml