feat(append): dynamically cap log-block buffer at per-task memory#18860
feat(append): dynamically cap log-block buffer at per-task memory#18860nsivabalan wants to merge 5 commits into
Conversation
…-flush gate HoodieAppendHandle.bufferRecord/bufferInsertAndUpdate adds the post-prepareRecord object — a clone of populatedRecord (HoodieAvroIndexedRecord wrapping a fully-deserialized IndexedRecord, many KB per record on Spark engines) — to recordList. But the block-flush gate in flushToDiskIfRequired sized the *incoming* record, which on Spark engines is a compact UnsafeRow many times smaller. The estimate under-counted retained heap, so the gate `numberOfRecords >= maxBlockSize / averageRecordSize` fired far too late, recordList grew well past one block's worth of heap, and the subsequent HFileDataBlock serialization OOMed on metadata table writes. For Avro-engine writes the incoming and post-prepare shapes are similar, so this change is effectively a no-op there. Changes: - writeToBuffer/bufferRecord/bufferInsertAndUpdate now return the buffered record (or null for delete/ignored/error paths). - Call sites in doAppend / doWrite / write(Map) flip order: buffer first, then gate-check with the buffered record. - flushToDiskIfRequired sizes the buffered record. Seeds the EWMA lazily on the first non-null buffered record (replacing the wrong eager seed in init() that ran before any prepareRecord conversion). Guards the gate against averageRecordSize == 0 to avoid div-by-zero on delete-only prefixes. - sizeEstimator changed from final to settable via a @VisibleForTesting setSizeEstimator; other @VisibleForTesting getters added for the new test. Behavioral note: - estimatedNumberOfBytesWritten (consumed by canWrite to roll log file groups) now reflects the JOL size of the buffered Avro record — a strict overestimate of on-disk bytes. Log file groups roll slightly earlier on Spark engines as a result; the existing hoodie.logfile.to.parquet.compression.ratio knob retunes if needed. Tests: adds TestHoodieAppendHandle with 6 unit tests covering: - estimator sees buffered, not incoming, record - lazy initial seed (no longer eagerly set from the wrong object in init()) - delete-only windows do not perturb the estimate or trigger flush - EWMA blends 0.8/0.2 after the second sample - gate fires when buffered records exceed maxBlockSize - harness self-check
The previous commit framed the root cause as Spark engine UnsafeRow vs Avro IndexedRecord, but the writer path is Avro-typed on both engines — the actual under-count comes from comparing the incoming record (payload still in compact/deflated wire form) against the buffered record (post- prepareRecord clone with a fully-materialized Avro IndexedRecord plus prepended meta-fields). Re-words three comments: - init() seed comment - writeToBuffer Javadoc - flushToDiskIfRequired Javadoc No code change; behavior identical.
…k memory HoodieAppendHandle's in-memory record buffer is bounded only by hoodie.logfile.data.block.max.size (default 256MB). When operators bump that knob — or executors are sized smaller than the configured block size — the buffer can exceed actual per-task heap and OOM. The prior PR in this stack (apache#18843) made averageRecordSize trustworthy by sizing the post-prepareRecord buffered record; with that in hand we can also lower the block budget itself when the executor cannot afford the configured ceiling. The flush gate now uses effectiveBlockSize = min(maxBlockSize, dynamicCeiling) where dynamicCeiling derives from the same engine properties IOUtils already exposes for merge/compaction: (executorMemory * (1 - spark.memory.fraction) / executorTaskNum) * hoodie.memory.logfile.append.fraction floored at HoodieMemoryConfig.MIN_MEMORY_FOR_LOG_APPEND_BUFFER_IN_BYTES (16MB, smaller than the 100MB spillable-map floor because many append handles can be active concurrently in a single task). When the engine does not expose memory/cores (Flink's TaskContextSupplier returns Option.empty() unconditionally; Spark also returns empty if SparkEnv is absent), the new sibling IOUtils helper returns Option.empty() and effectiveBlockSize collapses to maxBlockSize — no behavior change. Changes: - HoodieMemoryConfig: new MAX_MEMORY_FRACTION_FOR_LOG_APPEND (hoodie.memory.logfile.append.fraction, default 0.6) + MIN_MEMORY_FOR_LOG_APPEND_BUFFER_IN_BYTES constant (16MB). - IOUtils: new getMaxMemoryAllowedForLogAppend(supplier, fraction, minFloor): Option<Long>. Distinct from getMaxMemoryAllowedForMerge in two ways — returns Option (lets callers fall back to a static cap rather than the spillable-map 1GB default) and accepts the floor as a parameter. Honors EngineProperty.SINGLE_TASK_CORES like the existing helper. Existing merge/compaction callers untouched. - HoodieAppendHandle: maxBlockSize moves to constructor init alongside a new effectiveBlockSize field; both flush-gate references in flushToDiskIfRequired consult effectiveBlockSize. INFO log line when the dynamic cap is below the configured ceiling. @VisibleForTesting getters for effectiveBlockSize and maxBlockSize. Behavioral notes: - canWrite / estimatedNumberOfBytesWritten still use maxBlockSize indirectly; this change is scoped to the flush gate only. Log-file- group rolling thresholds are unchanged. Lowering those is a follow-up if warranted. - Small executors will see smaller, more numerous log blocks — intentional tradeoff vs. OOM. Read-path compaction merges blocks, so on-disk file count is unaffected. - Default fraction (0.6) matches merge/compaction defaults. Users who never set hoodie.memory.logfile.append.fraction get sensible behavior. Tests: - TestHoodieAppendHandle gains 5 new tests covering: no-engine-props fallback, dynamic-cap-wins, 16MB floor, configured-ceiling-wins, flush-gate-fires-early. Adds StubTaskContextSupplier mirroring Spark's three engine props. 12/12 tests pass. - New TestIOUtils with 5 tests pinning the helper contract directly: empty-when-any-prop-missing, formula, parameterized floor honored (mirrors the 100MB-vs-16MB collision concern), SINGLE_TASK_CORES accounting. 5/5 tests pass. - mvn checkstyle:check on both modules: 0 violations.
…fraction Adds pinning tests for the new MAX_MEMORY_FRACTION_FOR_LOG_APPEND config and the MIN_MEMORY_FOR_LOG_APPEND_BUFFER_IN_BYTES constant: - testLogAppendFractionDefault: default is 0.6, matching merge/compaction defaults so out-of-the-box behavior is consistent. - testLogAppendFractionOverrideViaProperties: user-supplied value via Properties overrides the default. - testLogAppendBufferFloorIs16MB: pins the 16MB floor constant value. Also drops a stale orientation comment in HoodieAppendHandle.init() about "averageRecordSize is seeded lazily in flushToDiskIfRequired" — the rationale already lives at the seed site (in the flushToDiskIfRequired Javadoc), so a reader landing fresh doesn't need the diff-oriented breadcrumb here. This comment was added by the parent stack PR; removing it here since the change is adjacent to the dynamic-cap code.
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for working on this! The PR adds a dynamic per-task memory ceiling on HoodieAppendHandle's in-memory log-block buffer and fixes the sizing bug (where the pre-prepareRecord incoming record was sized instead of the post-prepareRecord buffered clone). The change is well-tested and well-documented, with thoughtful handling of the engine-properties-absent fallback path. One observation worth confirming inline about a downstream interaction with canWrite() sizing. Please take a look at the inline comment, and this should be ready for a Hudi committer or PMC member to take it from here. One naming inconsistency in the new @VisibleForTesting accessors; otherwise the code is clean and readable.
| @VisibleForTesting | ||
| void simulateBufferedRecordForTest(HoodieRecord bufferedRecord) { | ||
| numberOfRecords++; | ||
| flushToDiskIfRequired(bufferedRecord, false); |
There was a problem hiding this comment.
🤖 Have you considered the downstream effect on canWrite()? It computes parquetMaxFileSize >= estimatedNumberOfBytesWritten * logFileToParquetCompressionRatio, and estimatedNumberOfBytesWritten is now accumulated using the larger post-prepareRecord averageRecordSize (on Spark, the in-memory Avro graph vs. the prior UnsafeRow). That likely makes canWrite() return false earlier than before, so log files may roll more aggressively. Is that the intended trade-off, or does logFileToParquetCompressionRatio need re-tuning given the new sizing basis?
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| this.sizeEstimator = sizeEstimator; | ||
| } | ||
|
|
||
| @VisibleForTesting |
There was a problem hiding this comment.
🤖 nit: every other @VisibleForTesting accessor in this block uses the ForTest suffix (getNumberOfRecordsForTest, getEffectiveBlockSizeForTest, etc.), but this one doesn't — could you rename it to getAverageRecordSizeForTest() for consistency? As-is it reads like a production-visible API.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for the updates! The latest changes are a small comment cleanup in HoodieAppendHandle.init() and a new TestHoodieMemoryConfig covering the default fraction, property override, and the 16MB floor constant — the assertions correctly pin the values defined in HoodieMemoryConfig. Of the prior findings: the canWrite() downstream-effect question was addressed in the PR description (explicitly called out as an intentional scope-limit with rolling thresholds left as a follow-up); the getAverageRecordSize() naming nit remains open but is minor and won't block. No new issues flagged from this automated pass — a Hudi committer or PMC member can take it from here for a final review.
cc @yihua
…ting accessors Renames in HoodieAppendHandle to address PR feedback on naming consistency (the @VisibleForTesting annotation already signals intent): - getNumberOfRecordsForTest -> getNumberOfRecords - getEstimatedBytesWrittenForTest -> getEstimatedNumberOfBytesWritten - simulateBufferedRecordForTest -> simulateBufferedRecord - getEffectiveBlockSizeForTest -> getEffectiveBlockSize - getMaxBlockSizeForTest -> getMaxBlockSize TestHoodieAppendHandle call sites updated accordingly. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for working on this! The PR adds a dynamic per-task memory ceiling on HoodieAppendHandle's in-memory log-block buffer and reroutes the flush gate to size the post-prepareRecord buffered record rather than the incoming pre-prepareRecord record. The fallback path (no engine memory/cores exposed) cleanly collapses to the configured maxBlockSize. No new issues flagged from this automated pass beyond the canWrite() downstream concern already raised in a prior round — a Hudi committer or PMC member can take it from here for a final review. A few naming and readability suggestions below — mostly around the test-only helper exposed on the production class and a redundant parameter name.
cc @yihua
| * so callers (e.g., {@code HoodieAppendHandle}) can use a smaller floor than the spillable-map | ||
| * 100MB. | ||
| */ | ||
| public static Option<Long> getMaxMemoryAllowedForLogAppend( |
There was a problem hiding this comment.
🤖 nit: minFloor is redundant since a floor is already a minimum — could you rename to just floor or minBytes? Same applies to the docs above.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| } | ||
|
|
||
| @VisibleForTesting | ||
| void simulateBufferedRecord(HoodieRecord bufferedRecord) { |
There was a problem hiding this comment.
🤖 nit: simulateBufferedRecord is purely a test fixture (increments numberOfRecords then calls the gate). Could you keep this logic inside the test's TestableAppendHandle subclass rather than the production class? The other @VisibleForTesting accessors here are simple getters/setters, but this one bakes test orchestration into the production API.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| * partition-mismatch failure). The returned reference is what {@link #flushToDiskIfRequired} | ||
| * sizes; sizing the incoming record under-counts heap because the incoming payload is | ||
| * typically still in its compact/deflated wire form, whereas the buffered record holds | ||
| * the fully-deserialized Avro graph plus meta-fields. |
There was a problem hiding this comment.
🤖 nit: this javadoc (and the similar ones on bufferRecord/bufferInsertAndUpdate/flushToDiskIfRequired) is quite long and leans on internal implementation details (prepareRecord clones, deflated payloads, etc.). Could you trim to a 1-2 line contract + a brief "why" note? Future refactors will silently drift from these descriptions.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #18860 +/- ##
============================================
- Coverage 68.92% 68.80% -0.12%
- Complexity 29076 29147 +71
============================================
Files 2509 2514 +5
Lines 139470 139958 +488
Branches 17117 17193 +76
============================================
+ Hits 96130 96302 +172
- Misses 35584 35884 +300
- Partials 7756 7772 +16
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
Change Logs
Adds a dynamic per-task memory ceiling on
HoodieAppendHandle's in-memory record buffer, applied as an upper bound on top ofhoodie.logfile.data.block.max.size. Closes #18859.The flush gate in
flushToDiskIfRequirednow useswhere
dynamicCeiling = executorMemory * (1 - spark.memory.fraction) / task_slots * hoodie.memory.logfile.append.fraction, floored atHoodieMemoryConfig.MIN_MEMORY_FOR_LOG_APPEND_BUFFER_IN_BYTES(16 MB — smaller than the 100 MB spillable-map floor because many append handles can be active concurrently in a single task).When the engine does not expose memory/cores (Flink's
TaskContextSupplier.getPropertyreturnsOption.empty()unconditionally; Spark also returns empty ifSparkEnvis absent), the new siblingIOUtilshelper returnsOption.empty()andeffectiveBlockSizecollapses tomaxBlockSize— no behavior change on those engines.Stacked on #18843. That PR makes
averageRecordSizeaccurate by sizing the buffered post-prepareRecordrecord; without that, the dynamic cap is computed against an under-counted estimate and the gate still misses. Please merge #18843 first.New config
hoodie.memory.logfile.append.fraction(default0.6, advanced) — fraction of per-task user memory available to the append-handle buffer. Default matcheshoodie.memory.merge.fractionandhoodie.memory.compaction.fraction. Users who never set it get sensible behavior.Implementation
HoodieMemoryConfig: newMAX_MEMORY_FRACTION_FOR_LOG_APPENDConfigProperty +MIN_MEMORY_FOR_LOG_APPEND_BUFFER_IN_BYTESconstant (16 MB).IOUtils: newgetMaxMemoryAllowedForLogAppend(supplier, fraction, minFloor): Option<Long>. Distinct from the existinggetMaxMemoryAllowedForMergein two ways: returnsOption(lets callers fall back to a static cap rather than the spillable-map 1GB default when engine properties are absent), and accepts the floor as a parameter (so the 100 MB spillable-map floor and the smaller 16 MB log-append floor cannot collide on a shared constant). HonorsEngineProperty.SINGLE_TASK_CORESlike the existing helper. Existing merge/compaction callers untouched.HoodieAppendHandle:maxBlockSizemoves to constructor init alongside a neweffectiveBlockSizefield; bothmaxBlockSizereferences influshToDiskIfRequiredswap toeffectiveBlockSize. INFO log line when the dynamic cap is below the configured ceiling.@VisibleForTestinggetters foreffectiveBlockSizeandmaxBlockSize.Behavior
prepareRecordmaterializes the Avro graph.canWrite/estimatedNumberOfBytesWrittenstill usemaxBlockSizeindirectly to roll log-file groups; this change is intentionally scoped to the flush gate only. Lowering log-file-group rolling thresholds is a follow-up if warranted.Impact
On Spark: a job running with
hoodie.logfile.data.block.max.size=256m(default) on a 2 GB / 4 core executor withspark.memory.fraction=0.6and the defaulthoodie.memory.logfile.append.fraction=0.6will now cap the buffer at2GB * 0.4 / 4 * 0.6 = 120 MBinstead of 256 MB. On a generous 16 GB / 4 core executor, the dynamic ceiling (~960 MB) exceeds the 256 MB configured ceiling, so the configured value wins. No regression on appropriately-sized executors.On Flink: zero behavior change. The cap depends on engine-property exposure that Flink does not provide.
Risk level (write none, low medium or high below)
low
Documentation Update
hoodie.memory.logfile.append.fractionwill appear in the auto-generated config docs (viamarkAdvanced()+withDocumentation(...)).Contributor's checklist
TestHoodieAppendHandle(5 new tests covering fallback, cap, floor, ceiling-wins, gate-fires-early) and newTestIOUtils(5 tests covering empty-when-any-prop-missing, formula, parameterized floor,SINGLE_TASK_CORESaccounting).