[fix][broker] PIP-468: skip deleted segments in checkpoint consumer#25719
Merged
Conversation
Segment topics must only be created by the controller's explicit-create path — never auto-created on client connect — so a producer or consumer racing a controller-driven delete (post-prune retention GC, force-delete) can't silently re-create the topic and mask the deletion. - BrokerService.isAllowAutoTopicCreationAsync now returns false for TopicDomain.segment. - ScalableTopics.createScalableTopic materializes the initial segment(s) up front via the admin client (so multi-broker bundle ownership is honored) instead of relying on auto-create-on-publish. - Segments.createSegment switched to getTopic(name, true) for explicit creation now that the auto-create policy forbids segments. - Segments.deleteSegment actually propagates ?force=true to deleteForcefully() (was being silently dropped). - ScalableCheckpointConsumer swallows TopicDoesNotExistException / NotFoundException at reader-create and during the read loop, dropping the segment from segmentReaders + lastReceivedPositions so the consumer keeps delivering from segments that still exist.
- ScalableTopics / Segments: replace fully-qualified java.util.{ArrayList,
Optional} with imports.
- ScalableTopicController.initialize: after winning leadership, call
ensureActiveSegmentsExist() which idempotently re-creates any active
segments whose backing topics are missing — recovering from a
createScalableTopic that committed metadata but failed to materialize
all initial segments, or an out-of-band force-delete of an active
segment. Sealed segments are intentionally not healed: a missing sealed
topic means data was retention-pruned, and the V5 checkpoint consumer
silently skips it.
- ScalableTopicControllerTest: add testInitializeRecreatesMissingActive
Segments + testInitializeDoesNotCreateSegmentsWhenNotLeader. clear
Invocations() in testSplitSegment / testMergeSegments so the new
init-time createSegmentAsync calls don't perturb their existing counts.
10 tasks
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Segment topics must only be created by the controller's explicit-create path — never auto-created on client connect — so a producer or consumer racing a controller-driven delete (post-prune retention GC, force-delete) can't silently re-create the topic and mask the deletion. Plus, when a checkpoint consumer does observe a missing segment topic, it must silently skip it instead of failing.
BrokerService.isAllowAutoTopicCreationAsyncnow returnsfalseforTopicDomain.segment.ScalableTopics.createScalableTopicmaterializes the initial segment(s) up front via the admin client (so multi-broker bundle ownership is honored) instead of relying on auto-create-on-publish.Segments.createSegmentswitched togetTopic(name, true)for explicit creation now that the auto-create policy forbids segments.Segments.deleteSegmentactually propagates?force=truetodeleteForcefully()(was being silently dropped).ScalableTopicController.initialize, after winning leadership, idempotently re-creates any active-segment backing topics that are missing — recovering from acreateScalableTopicthat committed metadata but failed to materialize all initial segments, or an out-of-band force-delete of an active segment. Sealed segments are intentionally not healed: a missing sealed topic means data was retention-pruned, and the V5 checkpoint consumer skips it.ScalableCheckpointConsumerswallowsTopicDoesNotExistException/NotFoundExceptionat reader-create and during the read loop, dropping the segment fromsegmentReaders+lastReceivedPositionsso the consumer keeps delivering from segments that still exist.Test plan
V5CheckpointConsumerSkipDeletedSegmentTest: 2-segment topic, force-delete one, open consumer → must subscribe to survivor and deliver its full backlog. Verified the newSegment backing topic deleted (retention expired); skippinglog path actually fires.ScalableTopicControllerTest.testInitializeRecreatesMissingActiveSegments+testInitializeDoesNotCreateSegmentsWhenNotLeadercover the recovery path.ScalableTopicControllerTest.testSplitSegment/testMergeSegmentsupdated for the new init-timecreateSegmentAsynccalls (clearInvocations).org.apache.pulsar.client.api.v5.*andorg.apache.pulsar.broker.service.scalable.*tests pass.