Skip to content

[fix][client] Fix exclusive V5 scalable topic producer initial segment claim race condition#25778

Merged
merlimat merged 1 commit into
apache:masterfrom
pdolif:fix-25678
May 14, 2026
Merged

[fix][client] Fix exclusive V5 scalable topic producer initial segment claim race condition#25778
merlimat merged 1 commit into
apache:masterfrom
pdolif:fix-25678

Conversation

@pdolif
Copy link
Copy Markdown
Contributor

@pdolif pdolif commented May 14, 2026

Fixes #25678

Motivation

The V5 scalable topic producer with exclusive access mode initially claims all active segments. The test V5ProducerAccessModeTest.testExclusiveRejectsSecondProducer verifies that a second exclusive producer is rejected. Therefore, it starts two exclusive producers sequentially and asserts that the second producer creation fails. Currently, the test is flaky, and it happens that from the client perspective, both producers seem to be created successfully. The reason for that is a race condition between eagerAttachInitialAsync and onLayoutChange in ScalableTopicProducer.

For the second producer, the ProducerBuilderV5 creates the DagWatchClient, starts it, and calls eagerAttachInitialAsync once it receives the initial layout. Concurrently, onLayoutChange may receive the initial layout. Both methods try to claim the active segments and there is a race condition between the two who first creates the segment producer(s).

eagerAttachInitialAsync:

for (var seg : activeSegments) {
    if (segmentProducers.containsKey(seg.segmentId())) {
        continue; // -> if onLayoutChange created the producer, the segment is skipped here and the producer creation completes successfully
    }
    try {
        getOrCreateSegmentProducer(seg.segmentId());
    } catch (PulsarClientException e) {
        throw new java.util.concurrent.CompletionException(e);
    }
}

onLayoutChange:

for (var seg : newLayout.activeSegments()) {
    if (segmentProducers.containsKey(seg.segmentId())) {
        continue;
    }
    try {
        getOrCreateSegmentProducer(seg.segmentId());
    } catch (PulsarClientException e) {
        // if onLayoutChange creates the producer, errors are not visible to the producer creation flow
        log.warn()...
    }
}

Modifications

  • Skip the active segments claim in onLayoutChange for the initial change. The initial claim is done by eagerAttachInitialAsync.
  • Never skip the getOrCreateSegmentProducer call in eagerAttachInitialAsync to avoid issues if a second layout change is executed before eagerAttachInitialAsync created the producer(s).

Verifying this change

  • Make sure that the change passes the CI checks.

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

@merlimat merlimat merged commit 34e38f0 into apache:master May 14, 2026
44 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Flaky-test: V5ProducerAccessModeTest > testExclusiveRejectsSecondProducer

2 participants