[feat] PIP-468: Multi-broker shared cluster + V5 cross-broker tests#25633
Merged
merlimat merged 2 commits intoapache:masterfrom Apr 30, 2026
Merged
[feat] PIP-468: Multi-broker shared cluster + V5 cross-broker tests#25633merlimat merged 2 commits intoapache:masterfrom
merlimat merged 2 commits intoapache:masterfrom
Conversation
… tests Add a JVM-wide multi-broker test cluster mirroring SharedPulsarCluster, plus V5 cross-broker coverage for scalable topics. Three brokers share a single bookie + a real test ZK server (the in-memory metadata store collapses ephemeral sessions across instances, so leader election doesn't work correctly across multiple brokers in the same JVM). Infrastructure: - SharedMultiBrokerPulsarCluster — singleton with NUM_BROKERS=3, lazy start, shutdown-hook teardown. Exposes per-broker PulsarService/PulsarAdmin/ PulsarClient lists plus broker-0 conveniences. - SharedMultiBrokerPulsarBaseTest — v4 base wrapping the cluster, fresh namespace per test method. - V5MultiBrokerClientBaseTest — V5 layer on top, with one V5 PulsarClient per broker. Tests (V5MultiBrokerScalableTopicTest): - testProduceConsumeAcrossBrokers — produce on broker 0's V5 client, consume on the last broker's; lookup must route to segment owners. - testControllerLeadershipDistributesAcrossBrokers — cycle which broker materializes each new topic's controller; assert leadership lands on every broker. - testStreamConsumerControllerCoordinationAcrossBrokers — alice + bob attached via two non-leader brokers must reach the controller via the lookup → controller-URL path; segments split across the group. - testCheckpointConsumerControllerCoordinationAcrossBrokers — same with consumerGroup(...). Broker fix: - DagWatchSession.buildDagProto was setting controllerBrokerUrl on the response object but never propagating it into the ScalableTopicDAG proto. Without this V5 clients always saw a null controller URL and fell back to their configured service URL — on multi-broker that's rarely the controller leader, so consumer subscribe lands on the wrong broker. Single-broker tests didn't catch it because the fallback URL was always the controller. Deferred (TODO in test file): split/merge from non-leader broker. The redirect itself works, but the controller's discoverSubscriptions calls admin.topics().getSubscriptionsAsync(segment://...) which the v4 Topics REST endpoint doesn't route. Needs a separate broker change.
…t/merge tests - Replace TestZKServer with OxiaContainer for the shared multi-broker cluster. Per-session ephemeral semantics still work (each broker has its own session), and Oxia is the simpler default; tests skip cleanly when Docker isn't available. - Drop ScalableTopicController.discoverSubscriptions and stop probing segment topics for their subscription set. The scalable topic's own metadata (resources.listSubscriptionsAsync) is the source of truth — segments are storage layout, not authoritative subscription state. This also fixes a multi-broker bug where the controller fell back to admin.topics().getSubscriptionsAsync(segment://...), which the v4 Topics REST endpoint doesn't route. - Re-enable the previously-deferred split/merge cross-broker tests (testSplitFromNonLeaderBrokerRedirectsToOwner / testMergeFromNonLeaderBrokerRedirectsToOwner). They produce a single warm-up message before splitting so the parent segment topic is loaded on its owning broker — split's terminate step requires the topic to exist there.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Add a JVM-wide multi-broker test cluster mirroring
SharedPulsarCluster, plusV5 cross-broker coverage for scalable topics. Three brokers share a single
bookie + a real Oxia metadata store (the in-memory store collapses ephemeral
sessions across instances, so leader election doesn't work correctly across
multiple brokers in the same JVM).
Infrastructure
SharedMultiBrokerPulsarCluster— JVM-wide singleton withNUM_BROKERS = 3,lazy start, shutdown-hook teardown. Backed by
OxiaContainer(testcontainers).Exposes per-broker
PulsarService/PulsarAdmin/PulsarClientlists plusbroker-0 conveniences.
SharedMultiBrokerPulsarBaseTest— v4 base wrapping the cluster with a freshnamespace per test method.
V5MultiBrokerClientBaseTest— V5 layer on top, with one V5PulsarClientper broker.
Tests (
V5MultiBrokerScalableTopicTest)testProduceConsumeAcrossBrokers— produce on broker 0's V5 client, consumeon the last broker's; lookups must route to segment owners.
testControllerLeadershipDistributesAcrossBrokers— cycle which brokermaterializes each new topic's controller; assert leadership lands on every
broker.
testStreamConsumerControllerCoordinationAcrossBrokers— alice + bob attachedvia two non-leader brokers must reach the controller via the lookup →
controller-URL path; segments split across the group.
testCheckpointConsumerControllerCoordinationAcrossBrokers— same withconsumerGroup(...).testSplitFromNonLeaderBrokerRedirectsToOwner— split request to a non-leaderbroker is 307-redirected to the controller leader.
testMergeFromNonLeaderBrokerRedirectsToOwner— same for merge.Broker fixes (uncovered while building the tests)
DagWatchSession.buildDagProtowas settingcontrollerBrokerUrlon theresponse object but never propagating it into the
ScalableTopicDAGproto.Without this V5 clients always saw a null controller URL and fell back to
their configured service URL — on multi-broker that's rarely the controller
leader, so consumer subscribe lands on the wrong broker. Single-broker tests
didn't catch it because the fallback URL was always the controller.
ScalableTopicControllerpreviously discovered subscriptions for split / mergeby querying segment topics through
admin.topics().getSubscriptionsAsync( segment://...), which the v4 Topics REST endpoint doesn't route. Replacedwith
resources.listSubscriptionsAsync(topicName)— the scalable-topicmetadata is the single source of truth for subscriptions; segments are storage
layout. Single-broker tests didn't catch it because the segment topic was
always locally cached and short-circuited the admin path.
Matching PR(s) in forked repositories