[Connectors Python] Preemptively check bulk batch size against chunk_max_mem_size and perform pre-append flush#4012
Conversation
- Implement logic to flush the current batch before adding a new document if it exceeds the defined size or memory limits. This ensures that bulk requests remain within the specified constraints. - Adjust the handling of oversized documents to ensure they are sent individually when necessary, maintaining the integrity of batch processing. This change improves the efficiency and reliability of document handling in the Elasticsearch Sink.
- Introduce new tests to validate the behavior of the Sink's batch processing, ensuring it correctly handles memory overflow, chunk size boundaries, and oversized documents. - Implement helper functions to facilitate the creation of mock queues and sink instances for testing. These enhancements improve the reliability and correctness of the Sink's document handling in various scenarios.
- Bump `idna` from 3.13 to 3.14 - Bump `propcache` from 0.4.1 to 0.5.2 These updates ensure that the project is using the latest versions of these dependencies, which may include important bug fixes and improvements.
|
@coderabbitai full review |
There was a problem hiding this comment.
Pull request overview
This PR updates the Elasticsearch Sink batching logic to preemptively flush bulks before adding a new document when the next append would exceed configured batch memory limits, aiming to prevent bulk requests from surpassing chunk_mem_size and triggering downstream 413/OOM issues.
Changes:
- Adjust
Sink._runflush logic to evaluate thresholds before appending a new doc. - Add async tests covering memory-based pre-flush behavior, chunk-size boundary flushing, oversized single-doc behavior, and trailing flush behavior.
- Refresh
NOTICE.txtdependency entries foridnaandpropcache.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
| app/connectors_service/connectors/es/sink.py | Moves flush condition to a pre-append check to keep bulk requests within chunk_mem_size. |
| app/connectors_service/tests/test_sink.py | Adds tests for the new preemptive flush behavior and related edge cases. |
| app/connectors_service/NOTICE.txt | Updates third-party notice versions for idna and propcache. |
| libs/connectors_sdk/NOTICE.txt | Updates third-party notice versions for idna and propcache. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
artem-shelkovnikov
left a comment
There was a problem hiding this comment.
I don't really remember why we did not do it in the first place, I vaguely remember thinking about it and something stopped me from doing it.
I thought about this change throughout my day and could not find the reason not to merge it, but I'd love to hear a second opinion from somebody from @elastic/search-extract-and-transform just in case. It's a bit of a complicated topic.
…tively-check-the-batch-size # Conflicts: # app/connectors_service/NOTICE.txt # libs/connectors_sdk/NOTICE.txt
- Update the batch processing logic to preemptively check the prospective entry count before flushing, ensuring that the batch size does not exceed the defined `chunk_size` during mixed operations (delete, index, update). - Introduce new helper functions for creating update and delete documents in tests. - Add a test case to validate the behavior of the Sink when handling mixed operations, ensuring that batch sizes remain within limits. These changes improve the reliability of document handling in the Elasticsearch Sink, particularly when dealing with varying operation types.
- Simplify batch dispatch logic by consolidating document creation helper functions for better readability and maintainability. - Improve pre-append and post-append checks to ensure batches are dispatched correctly based on size and memory constraints, enhancing performance and reducing latency. - Update tests to validate the new logic, ensuring that batch sizes remain within defined limits during mixed operations and that oversized documents are handled appropriately. These changes optimize the document handling process in the Elasticsearch Sink, ensuring efficient and reliable batch processing.
- Introduce new helper functions for document ID extraction and queue management to improve test readability and maintainability. - Update test cases to ensure proper flushing behavior before exceeding chunk size and memory limits, validating the handling of mixed operations and oversized documents. - Rename test functions for clarity, reflecting their purpose more accurately. These changes optimize the testing framework for the Elasticsearch Sink, ensuring robust validation of batch processing logic.
seanstory
left a comment
There was a problem hiding this comment.
One nit, but lgtm. Great fix!
| self._logger.warning(f"Skip document {doc} as '_id' is missing.") | ||
| continue | ||
| # Flush before adding if this doc would overflow either cap. | ||
| # `_bulk_op` emits 1 entry for deletes and 2 for index/update, |
There was a problem hiding this comment.
Yes, but, I don't think that's really what we want to be counting. The metadata object in an index/update is trivially small. Like:
{ "index" : { "_index" : "test", "_id" : "1" } }
I don't think we need to count that as part of the "chunk size".
I think we can do len(batch) + 1 > self.chunk_size...
And I think that's consistent with the old behavior where:
batch.extend(self._bulk_op(doc, operation))
is always adding 1 element to batch, even though doc and operation are technically two separate objects.
There was a problem hiding this comment.
I respectfully disagree. First of all I would like to set the record straight in terms of vocabulary.
| Term | Meaning |
|---|---|
| doc/file | one source record |
| entry/line | one element in batch — _bulk_op produces 1 for delete (action line), 2 for index/update (action line + source line) |
len(batch) |
counts entries (since batch.extend(...) flattens) |
chunk_size |
historically len(batch) >= chunk_size, i.e. caps entries, not docs |
len(batch) + 1 > chunk_size doesn't fix the bug
chunk_size=4, sequence delete + index + index:
| Step | len before |
Pre len+1 > 4? |
After append | Post len >= 4? |
|---|---|---|---|---|
| delete (+1) | 0, skip | – | 1 | no |
| index (+2) | 1 | no | 3 | no |
| index (+2) | 3 | no | 5 | yes → ships 5-entry batch |
The post-check fires after the cap is already broken; it can dispatch but can't un-append. +1 undercounts every index/update by one — the pre-check has to predict the actual extend delta, which is 1 or 2.
So the logs are confusing. elasticsearch.bulk.chunk_size is n, and user sees logs like:
Task 1 - Sending a batch of n+1 ops
Clear separation of source docs and elasticsearch entries
Judging from
metadata object in an index/update is trivially small
adding 1 element to batch, even though doc and operation are technically two separate objects
I don't think we need to count that as part of the "chunk size".
I believe your actual proposal is that chunk_size should mean docs, not entries:
- It's a behavior change — existing operators tuned against entries; switching silently ~doubles batch size for index/update workloads.
- Right implementation is a dedicated doc counter, not a magic
+1:
docs_in_batch += 1 # after extend
if docs_in_batch + 1 > self.chunk_size or ...: # pre
if docs_in_batch >= self.chunk_size or ...: # postSo it would be more understandable to the user.
- 1000 new docs result in a batch of 1000 docs, that is actually 2000 entries (operation + source).
- 1000 deleted docs result in a batch of 1000 entries (operation only)
That should be a part of a separate enhancement PR, not a bug fix.
Proposal
Keep entry-counting in this PR — it preserves the existing contract and only fixes the overshoot. entries = 1 if op == OP_DELETE else 2 isn't a magic number; it mirrors _bulk_op one line above, so both move together if it ever changes.
metadata object in an index/update is trivially small
Actually the metadata line is essentially the same size for all three ops
I don't think we need to count that as part of the "chunk size"
Not counting metadata would stop us from counting delete operations, because it is the only part of the delete operation
even thoughdocandoperationare technically two separate objects
operation is just an operation enum, it is not added separately, but that is a nitpick.
There was a problem hiding this comment.
counts entries (since batch.extend(...) flattens)
🤦 you're right, and this was the key bit that I missed. My brain read this as equivalent to append.
I believe your actual proposal is that chunk_size should mean docs, not entries
Yep. And I'd always been under the impression that's what this was. And I expect that was the author's impression too. But you're right, even if this has long been a misunderstaning on my/our side, changing it now would be a behavior change. I agree with your conclusion, we should ignore my comment and leave this as you implemented it.
Thanks for pushing back. 🫡
…tively-check-the-batch-size
…chunk_max_mem_size and perform pre-append flush (#4012) (#4026) Backports the following commits to 9.3: - [Connectors Python] Preemptively check bulk batch size against chunk_max_mem_size and perform pre-append flush (#4012) Co-authored-by: Jan-Kazlouski-elastic <jan.kazlouski@elastic.co> Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
…chunk_max_mem_size and perform pre-append flush (#4012) (#4025) Backports the following commits to 9.4: - [Connectors Python] Preemptively check bulk batch size against chunk_max_mem_size and perform pre-append flush (#4012) Co-authored-by: Jan-Kazlouski-elastic <jan.kazlouski@elastic.co> Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
…ce-max-document-size-config Integrates upstream PR #4012 (preemptive `chunk_max_mem_size` flush in `Sink._run`) on top of the local `max_text_document_size` cap. Conflicts in `connectors/es/sink.py`: - `Sink._run` cap-check + pre-flush ordering: build `ops = self._bulk_op(doc, operation)` once, then run the text cap check (drop + `continue` if oversized text doc), then the upstream prospective pre-flush, then stats accounting, then `batch.extend(ops)` and the upstream post-append flush. Replaced upstream's inline `entries = 1 if OP_DELETE else 2` with `len(ops)` since the bulk op list is already built. Conflicts in `tests/test_sink.py`: - Both branches added a top-level `_make_sink` helper with incompatible signatures. Renamed the local cap-test factory to `_make_cap_sink` and kept upstream's generic `_make_sink(queue, *, chunk_size, chunk_mem_size)` unchanged so the new chunk-flush tests need no edits. - Local cap tests switched to upstream's `_queue_yielding` helper to drop the duplicate `_queue_with_items`. Co-authored-by: Cursor <cursoragent@cursor.com>
Closes https://github.com/elastic/search-team/issues/14453
Sink._runpreviously checked the batch thresholds after appending eachnew doc, meaning a batch could be dispatched only once it had already exceeded
chunk_mem_size. For workloads with docs near the limit (e.g. five 1–2 MiB docsagainst a 5 MiB ceiling) this produced bulk requests larger than configured,
risking
413/OOM errors downstream.This PR moves the threshold check to fire pre-emptively: before adding a
doc, if the prospective batch size (
bulk_size + doc_size) would exceedchunk_mem_size— or the batch would exceedchunk_sizeentries — the currentbatch is flushed first, then the new doc starts a fresh batch. An
if batchguard ensures an oversized single doc is still dispatched on its own rather
than producing an empty flush.
Changes
connectors/es/sink.py: relocate and tighten the flush condition inSink._runso dispatched bulks stay at or belowchunk_mem_size.tests/test_sink.py: add four async tests coveringchunk_sizeboundary flushing,NOTICE.txt: refreshidna(3.13 → 3.14) andpropcache(0.4.1 → 0.5.2)entries to match current pinned dependencies.
Testing
Testing was performed by running functional tests 2 times, one without fix (on main) and one with the fix. The only change is replacement of
get_mib_sizewithget_sizefor more accurate values in the logs.config.yml has chunk_max_mem_size: 1 #MiB
Command:
Before fix:
There are three batches dispatched above
1.0 MiB, and none of them are single-doc batches — proving the size cap is being violated by the multi-doc batching path itself.No such cases in fixed version:
There is exactly one batch dispatched above
1.0 MiB, and it is a single-document batch — the unavoidable case the new code explicitly preserves via theif batch:guard.Every other batch in the fixed log stays at or below ~1.0 MiB — the largest non-single-doc batches are values like
0.9718 MiB,0.9711 MiB,0.9125 MiB, etc., all comfortably under the ceiling.Checklists
Pre-Review Checklist
config.yml.example)Release Note
[Optional] Fix Elasticsearch sink occasionally dispatching bulk requests larger than the
configured
chunk_mem_size, which could trigger413 Request Entity Too Largeor memory pressure on the cluster. Batches are now flushed pre-emptively so any
single bulk stays within the configured size and memory limits.