fix(elasticsearch): pin compatible-with at the transport layer to keep ES 8 servers working#66065
Conversation
…p ES 8 servers working Since apache#64070 the provider depends on elasticsearch>=8.10,<10. A default install resolves to an elasticsearch>=9 Python client, which always negotiates 'compatible-with=9' on every request. Elasticsearch 8.x servers reject that with HTTP 400 media_type_header_exception, breaking remote task log ingestion and both ElasticsearchSQLHook and ElasticsearchPythonHook against ES 8 clusters. Add a [elasticsearch] es_compat_with config option that, when set to a major version string ('7'/'8'/'9'), wraps the client's transport perform_request so every outbound request carries 'Accept: application/vnd.elasticsearch+json; compatible-with=<major>' (and the matching '+x-ndjson' form for bulk so streaming bodies still parse). The wrap is applied at every Elasticsearch client construction site in the provider: - ElasticsearchTaskHandler (log/es_task_handler.py) - ElasticsearchRemoteLogIO (log/es_task_handler.py) - ESConnection (hooks/elasticsearch.py) - ElasticsearchPythonHook (hooks/elasticsearch.py) When the option is unset, behavior is unchanged. Tests assert against what the transport actually sends, not the in-memory state of the client object. Setting client._headers (which is what client.options(headers=...) does) is not enough because elasticsearch-py re-applies its own per-API-method content-negotiation headers right before the request is sent — only the transport layer sees the final headers. Closes: apache#66063 Supersedes: apache#66064
…impler ct check, correct test assertion - idempotent guard: skip second wrap by checking transport.__dict__ for an existing instance attribute. Repeated apply_compat_with calls (e.g. hook reuse paths) are now true no-ops. - content-type check simplified: '"ndjson" in ct' already matches both 'ndjson' and '+x-ndjson' so the redundant 'x-ndjson' branch is dropped. - unset-case test was using 'transport.perform_request is original' which would fail even when nothing was wrapped, because attribute access on a bound method produces a fresh wrapper object every time. Switched to inspecting transport.__dict__ for the 'perform_request' key, which precisely tracks whether the helper installed an instance override. - new test_apply_compat_with_is_idempotent asserts the guard above.
The previous wrapper unconditionally overwrote the entire `Accept` header to `application/vnd.elasticsearch+json; compatible-with=<major>` whenever one was present. That is too aggressive: elasticsearch-py emits non-JSON `Accept` values for several APIs that still need to flow through the same transport. Notably: - `client.cat.help()` sends `Accept: text/plain`. - All other `client.cat.*` endpoints send `Accept: text/plain,application/json`. - Search-MVT endpoints send `Accept: application/vnd.mapbox-vector-tile`. After the previous wrap every one of those calls went on the wire as plain `application/vnd.elasticsearch+json; compatible-with=<N>`, silently turning cat responses into JSON for any operator using `ElasticsearchPythonHook.get_conn()` to call cat APIs. Mirror upstream's own `mimetype_header_to_compat` instead: only `application/(json|x-ndjson|vnd.mapbox-vector-tile)` parts of the header get the `compatible-with=<configured>` suffix, anything else is left verbatim. The regex also matches the already-rewritten `application/vnd.elasticsearch+<x>; compatible-with=<N>` form that elasticsearch-py 9.x ships before the transport sees the request, so the configured major actually replaces the client default major on the wire (verified with a Transport spy against elasticsearch-py 9.3.0). Two adjacent hardenings while we are in here: - Strip whitespace from the config value and reject anything that is not a positive integer string with `AirflowConfigException` at construction time, so a typo like `es_compat_with = 'v8'` fails fast in the worker startup log instead of returning a 400 storm per request. - Walk header keys case-insensitively, so a future `elastic_transport` that forwards PascalCase `Accept` / `Content-Type` keys cannot silently bypass the rewrite. Tests: add wire-level cases for cat APIs (`text/plain` preserved, `text/plain,application/...` partial rewrite), PascalCase headers, whitespace stripping, non-numeric major rejection, and a direct `conf.get -> None` branch (the existing parametrize folds into the provider yaml default `""` via `conf_vars`).
….sdk The rest of this module already routes airflow imports through the `common.compat.sdk` shim (`conf` lives there), and the shim explicitly exports `AirflowConfigException` so the same provider build can target both Airflow 2 (`airflow.exceptions`) and Airflow 3 (`airflow.sdk.exceptions`). Switch the new exception import to the same shim so we don't pin to `airflow.exceptions` and silently break the Airflow 3 import path.
CI on the latest `main` merge surfaced four failures, all mechanical and
fixed in this commit:
1. **Static checks** (ruff / ruff-format / autogen):
- `_compat.py` — D205 docstring rule wants the summary line on its own
line, both for the module docstring and for `apply_compat_with`'s
docstring. Reformatted both.
- `hooks/elasticsearch.py` — collapsed the multi-line
`apply_compat_with(Elasticsearch(...))` call into a single line
(now under the line-length cap thanks to the basic_auth tuple sitting
inside the existing parens).
- `tests/.../test__compat.py` — collapsed two over-wrapped expressions
(`captured.append({...})` in the spy, and the
`assert wire_capture[-1][...] == "..."` in
`test_apply_compat_with_strips_whitespace_in_config`).
- `get_provider_info.py` — the autogenerated mirror of `provider.yaml`
was missing the new `es_compat_with` config option entry. Added it
with the same description / version_added / type / example / default
as the yaml.
2. **MyPy providers** (`Cannot assign to a method [method-assign]`):
- `transport.perform_request = perform_request` (instance-level
assignment) is rejected by mypy because elastic_transport's
`Transport.perform_request` is bound at the class. Switched to
`setattr(transport, "perform_request", perform_request)`, which
mypy accepts and which preserves the exact same runtime behaviour
(the idempotency guard at the top of the function still inspects
`transport.__dict__["perform_request"]`, so repeat calls remain
no-ops).
3. **Non-DB tests core** and **Low dep tests core**
(`test_project_structure.py::test_providers_modules_should_have_tests`):
- The structure check expects the test file for source `_compat.py`
(note the leading underscore) to be named `test__compat.py` (two
underscores: `test_` + `_compat`). Renamed the file from
`test_compat.py` → `test__compat.py` via `git mv` so the rest of
git history follows.
Re-validated locally:
- `ruff check` and `ruff format --check` pass on all four files.
- mypy on `_compat.py` no longer reports the `method-assign` error
(only an unrelated `airflow.__version__` attr-defined error from
running mypy outside a real Airflow install — Airflow CI runs against
an installed Airflow so this does not surface there).
- Wire-level regression matrix re-run with elasticsearch-py 9.3.0 and
the `setattr` variant: cat.help `text/plain` preserved, cat.indices
partial rewrite preserved, search/bulk Accept and Content-Type
rewritten to compat=8, idempotency guard still triggers, bad values
rejected. 7/7 PASS.
|
cc @Subham-KRLX as you added elasticsearch 9 support. Can you review? |
|
This fixes the ES9 to ES8 regression. Before I can approve please validate that the elasticsearch es_compat_with option only accepts a major version string for example 7 8 or 9 and raise AirflowConfigException on invalid values. Do not construct the Elasticsearch client twice. Create a single instance and pass it to apply_compat_with. Make the transport wrapper accept args and kwargs and decorate it with functools.wraps so it is tolerant of different elastic_transport perform_request signatures. Add a short docs entry for es_compat_with with an airflow.cfg example and note that the transport level rewrite overrides per API header negotiation. Confirm whether a newsfragment is required. Address these and I will re check and approve. |
- Refactor apply_compat_with to use functools.wraps + *args, **kwargs so the wrapper survives future elastic_transport perform_request signature changes (new keyword-only params, reordered positionals) and preserves __name__/__doc__/__wrapped__ for introspection. - Extend the es_compat_with docs entry with explicit valid-value rules and a note that the fix is installed at the transport layer and therefore overrides elasticsearch-py's per-API-method header negotiation (constructor headers= does not work for this purpose).
Thanks for the review @Subham-KRLX — addressed below.
The airflow.cfg example was already there ([elasticsearch] es_compat_with = 8). Ready for another look when you have a moment. |
Used in providers/elasticsearch/docs/logging/index.rst to describe the fail-fast behavior when [elasticsearch] es_compat_with is set to an invalid value. The wordlist already contained 'misconfigured' but the noun form was missing, causing the --spellcheck-only docs build to fail.
|
@eladkal I reviewed the changes. es_compat_with accepts only numeric majors and raises AirflowConfigException. The transport wrapper uses functools.wraps accepts args and kwargs rewrites headers idempotently and the client is constructed once and wrapped in place. Tests check wire headers and docs were updated. Please confirm there are no other sites constructing two clients and I will approve. |
|
@Subham-KRLX since you created the original PR I defer to your judgment and confirmation that this PR fix the issue |
I checked the codebase and audited all client construction sites in the provider there are only four sites that instantiate the client and each does it exactly once before wrapping it in place so there is no double construction the fix is clean and works at the transport layer to bypass the client limitation and all tests pass. This definitely fixes the regression so we are good to go. |
|
Awesome work, congrats on your first merged pull request! You are invited to check our Issue Tracker for additional contributions. |
What does this PR do
Adds a new
[elasticsearch] es_compat_withconfig option that pins theHTTP
Accept/Content-Typecompatible-withparameter on everyrequest the provider makes, by wrapping
client.transport.perform_request.When set to a major version string (
"7","8","9"), every outboundrequest carries
Accept: application/vnd.elasticsearch+json; compatible-with=<major>(and the matching
+x-ndjsonform for bulk so multi-line bodies stillparse on the server). When unset, behavior is unchanged.
The wrap is applied at every
Elasticsearchclient construction site inthe provider:
ElasticsearchTaskHandler(log/es_task_handler.py)ElasticsearchRemoteLogIO(log/es_task_handler.py)ESConnection(hooks/elasticsearch.py)ElasticsearchPythonHook(hooks/elasticsearch.py)A new shared module
airflow.providers.elasticsearch._compatexposes thehelper.
Why
Since #64070 the provider declares
elasticsearch>=8.10,<10. A defaultinstall resolves to an
elasticsearch>=9Python client, which alwaysnegotiates
Accept: application/vnd.elasticsearch+json; compatible-with=9on every request. Elasticsearch 8.x servers reject that with HTTP 400
media_type_header_exception(Accept version must be either version 8 or 7, but found 9), breaking the entire remote task log path:The same regression hits
ElasticsearchSQLHookandElasticsearchPythonHookagainst ES 8 clusters.There is no operator-side workaround today: the existing
elasticsearch_configssection can passheaders=toElasticsearch.__init__, but elasticsearch-py re-applies its ownper-API-method content-negotiation headers right before the request goes
out — those override the constructor
headers. The compat header has tobe enforced at the transport layer.
Why not auto-detect the server version?
The first thing one would reach for is
client.info()to readversion.number. That does not work, becauseinfo()itself goesthrough the same transport with the same per-method
compatible-with=<client_major>header — so against an ES 8 server itfails with the same 400. Auto-detect would need a separate raw HTTP
probe (or a fix in elasticsearch-py to make compat negotiation
server-aware), which is out of scope for this regression fix. Operators
already know which major version their cluster is.
Why supersede #66064
#66064 attempted the same fix using
client.options(headers={...}). That approach pinsclient._headers(the default headers), but elasticsearch-py mergesper-API-method
Accept/Content-Typeover the default in_perform_request, then runsmimetype_header_to_compat, whichhardcodes
compatible-with=<client.__versionstr__.split(".")[0]>. So thewire still carried
compatible-with=9against an ES 8 server. Verifiedlocally with elasticsearch-py 9.3.0 by capturing
Transport.perform_requestcalls. The unit tests in #66064 onlyasserted against
client._headers, so they passed without catching theissue. This PR therefore (a) patches at the transport layer, where the
final headers actually live, and (b) writes wire-level tests that
intercept
Transport.perform_requestand assert on the captured headers— see
providers/elasticsearch/tests/unit/elasticsearch/test_compat.py.How was this tested
Wire-level unit tests that swap
elastic_transport.Transport.perform_requestfor a recording spy, drive
search/info/bulk, and assert on thecaptured
Accept/Content-Typeheaders:test_apply_compat_with_unset_does_not_wrap_transport: with theoption unset (both
""andNone) the helper returns the clientunchanged and
transport.perform_requestis the original.test_apply_compat_with_pins_compatible_with_8: withes_compat_with = "8", every captured call carriesAccept: application/vnd.elasticsearch+json; compatible-with=8,and
bulkkeeps+x-ndjsonfor theContent-Type.test_apply_compat_with_pins_compatible_with_7: same flow with"7"to assert the helper does not hardcode"8".Manually verified with elasticsearch-py 9.3.0 + a stand-alone
reproduction of
apply_compat_withagainstTransport.perform_requestspies — all three calls go on the wire with the pinned major.
Reproduction (without the fix)
server, with
write_to_es=True._bulk.After the fix, set
[elasticsearch] es_compat_with = "8"and the samedeployment writes logs successfully and the SQL/Python hooks work.
Notes on the wrap target
The wrap is installed on
client.transport.perform_request. That is thelowest stable seam between the high-level
Elasticsearchclient and theHTTP layer in the current
elastic_transport8.x line; the per-API-methodAccept/Content-Typeheaders are constructed by elasticsearch-py andthen handed to
Transport.perform_request, which is where they need tobe rewritten before the request is serialized. If
elastic_transportever changes that signature, a forward-compatible alternative is to
subclass
BaseClientand override the per-request header negotiation —happy to take that direction in a follow-up if the maintainers prefer.
The wrap is also idempotent: a second call to
apply_compat_withon thesame client is a no-op (guarded by inspecting
transport.__dict__["perform_request"]).Out of scope / follow-ups
compatible-withnegotiation server-aware.es_compat_withconfiguration —multi-cluster deployments are out of scope here.
Note for the release manager
provider.yamlrecordsversion_added: 6.5.4for the new option. If thenext release window is
6.6.0instead, please feel free to bump thatduring release prep.
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rstor{issue_number}.significant.rst, in newsfragments.