Skip to content

[fix][broker] Fix non-batched null-value messages not removed during topic compaction#25817

Open
grishaf wants to merge 1 commit into
apache:masterfrom
grishaf:fix/compaction-null-value-tombstone
Open

[fix][broker] Fix non-batched null-value messages not removed during topic compaction#25817
grishaf wants to merge 1 commit into
apache:masterfrom
grishaf:fix/compaction-null-value-tombstone

Conversation

@grishaf
Copy link
Copy Markdown

@grishaf grishaf commented May 19, 2026

Motivation

When a non-Java producer (C++, Python, Go) sends a null-value message (tombstone) on a compacted topic, the key is not removed during topic compaction. The tombstone is retained in the compacted view instead of being deleted.
Root cause:
AbstractTwoPhaseCompactor.extractKeyAndSize() computed payload size using headersAndPayload.readableBytes(), which returns the combined size of the serialized MessageMetadata + payload. For null-value messages the payload is empty, but the metadata is always present, so readableBytes() is always > 0 (e.g. 32 bytes of metadata). This prevents the tombstone path (size <= 0 → latestForKey.remove(key)) from being reached in phase one of compaction.
The batch code path is not affected because it extracts per-message payload sizes from SingleMessageMetadata.payloadSize, which correctly returns 0 for null-value messages. The Java client always sets numMessagesInBatch in the metadata (even with enableBatching(false)), so all Java-produced messages go through the batch path — which is why this bug was never caught by existing tests.

Modifications

  • AbstractTwoPhaseCompactor.extractKeyAndSize(): Fixed to compute the correct payload-only size by using Commands.skipMessageMetadata() to skip past the metadata before reading readableBytes(). For compressed messages, getUncompressedSize() was already correct.
  • EventTimeOrderCompactor.extractMessageCompactionData(): Refactored to reuse extractKeyAndSize() instead of duplicating the (buggy) size calculation.

Verifying this change

This change added tests and can be verified as follows:

  • Added CompactionTest.testNonBatchedMessageWithNullValue — writes raw non-batch entries (no numMessagesInBatch in metadata, simulating C++/Python producers) directly to the managed ledger, triggers compaction, and verifies tombstoned keys are removed from the compacted view.
    | Test | Without fix | With fix |
    |-------------------------------------|------------------------------------|----------|
    | testNonBatchedMessageWithNullValue | FAILED: expected [2] but found [4] | PASSED |

Does this pull request potentially affect one of the following parts:

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

…topic compaction

When a non-Java producer (C++, Python, Go) sends a null-value message
(tombstone) on a compacted topic, the key is not removed during topic
compaction. The tombstone is retained in the compacted view instead of
being deleted.

Root cause:
`AbstractTwoPhaseCompactor.extractKeyAndSize()` computes payload size
using `headersAndPayload.readableBytes()`, which returns the combined
size of the serialized MessageMetadata + payload. For null-value
messages the payload is empty, but the metadata is always present,
so readableBytes() is always > 0 (e.g. 32 bytes of metadata). This
prevents the tombstone path (`size <= 0 -> latestForKey.remove(key)`)
from being reached in phase one of compaction.

The batch code path is not affected because it extracts per-message
payload sizes from `SingleMessageMetadata.payloadSize`, which correctly
returns 0 for null-value messages. The Java client always sets
`numMessagesInBatch` in the metadata (even with `enableBatching(false)`),
so all Java-produced messages go through the batch path -- which is why
this bug was never caught by existing tests.

Fix:
Check `msgMetadata.hasNullValue() && msgMetadata.isNullValue()` in
`extractKeyAndSize()` and return size 0, so the compaction phase one
correctly removes the key from `latestForKey`.

Also refactored `EventTimeOrderCompactor.extractMessageCompactionData()`
to reuse `extractKeyAndSize()` instead of duplicating the size logic.

Verification (testNonBatchedMessageWithNullValue):
Writes raw non-batch entries (no numMessagesInBatch) to the managed
ledger, triggers compaction, and reads the compacted view.

| Test                                | Without fix                        | With fix |
|-------------------------------------|------------------------------------|----------|
| testNonBatchedMessageWithNullValue  | FAILED: expected [2] but found [4] | PASSED   |

Co-authored-by: Cursor <cursoragent@cursor.com>
Comment on lines +476 to +478
ByteBuf headersAndPayload = m.getHeadersAndPayload();
Commands.skipMessageMetadata(headersAndPayload);
payloadSize = headersAndPayload.readableBytes();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to call ByteBuf.duplicate() so that the original headerAndPayload buffer's readIndex doesn't advance:

Suggested change
ByteBuf headersAndPayload = m.getHeadersAndPayload();
Commands.skipMessageMetadata(headersAndPayload);
payloadSize = headersAndPayload.readableBytes();
ByteBuf headersAndPayload = m.getHeadersAndPayload().duplicate();
Commands.skipMessageMetadata(headersAndPayload);
payloadSize = headersAndPayload.readableBytes();

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants