[feat] PIP-468: Implement V5 client with scalable topic producer and consumer types#25573
Merged
merlimat merged 2 commits intoapache:masterfrom Apr 24, 2026
Merged
[feat] PIP-468: Implement V5 client with scalable topic producer and consumer types#25573merlimat merged 2 commits intoapache:masterfrom
merlimat merged 2 commits intoapache:masterfrom
Conversation
lhotari
approved these changes
Apr 24, 2026
Member
lhotari
left a comment
There was a problem hiding this comment.
LGTM as a starting point. Just one comment about possible improvement to explain the MessageIdV5 in the javadoc.
…r types Add pulsar-client-v5 module implementing the V5 client API. Includes ScalableTopicProducer with segment routing, ScalableStreamConsumer, ScalableQueueConsumer, and ScalableCheckpointConsumer. Implement DagWatchClient for receiving DAG layout updates from broker. Add PulsarClientV5, builder implementations, schema/auth/crypto adapters, and ServiceLoader-based provider. Extend PulsarClientImpl with scalableTopicLookup support.
…r paths The V4 client's isScalableDomain() helper only matched topic://, so the internal segment:// domain leaked through the rejection check in subscribeAsync(), createProducerAsync(), and createReaderAsync(). That let segment:// topics flow into the normal connect/lookup path, making testRejectScalableDomainOnConsumer (and its producer/reader siblings) produce a connection-time exception instead of the expected InvalidTopicNameException. Update isScalableDomain() to also match TopicName.isSegment(), and fix the comment to reflect the new behavior.
This was referenced Apr 27, 2026
Merged
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Fourth (and final for this series) PR in PIP-468. Adds the
pulsar-client-v5module that implements the V5 client API (PIP-466) forscalable topics.
Core pieces
PulsarClientV5+PulsarClientBuilderV5— wraps the existingv4
PulsarClientImpland adds the scalable-topic routing layer on top.PulsarClientProviderV5— ServiceLoader SPI implementation for theV5 API.
ScalableTopicProducer— key-hashing segment routing, round-robinfor unkeyed messages, per-segment underlying v4 producers.
ScalableStreamConsumer— ordered, cumulative-ack consumer thatreceives segment assignments from the controller leader and drives v4
per-segment consumers.
ScalableQueueConsumer— controller-bypassing consumer thatattaches to every segment and drains them with the receive-loop fix
from the earlier iterations.
ScalableCheckpointConsumer— unmanaged connector-style consumerthat can resume from a caller-supplied
Checkpointacross allsegments.
DagWatchClient— client-side handle for the DAG watch session;implements the
DagWatchSessioncallback from the protocol PR.MessageIdV5— V5 message-id wrapping a v4 id with a per-segmentposition vector for cumulative ack.
CheckpointV5— caller-durable checkpoint includingEARLIEST/LATESTsentinels and timestamp variants.ClientSegmentLayout— client-side view of the DAG built from aScalableTopicDAGprotobuf.SegmentRouter— 16-bit Murmur3 key-hash routing and round-robinfor empty keys.
AuthenticationAdapter,CryptoKeyReaderAdapter,SchemaAdapter— bridge V5 API types to the underlying v4implementations.
V5Utils— small helpers (e.g.asScalableTopicNamewhich rewrapsa parsed name with the
topic://domain).Supporting v4 change
PulsarClientImpl.createSegmentProducerAsync— internal entry pointfor the V5 client to create producers on
segment://topics,bypassing the scalable-domain guard that's meant for user-facing
producer creation.
Tests
New unit test classes (48 tests)
V5UtilsTest(5 tests) —asScalableTopicNameon bare name /short form / persistent:// / topic:// inputs; also pins the
caching behavior for the topic:// fast path.
SegmentRouterTest(10 tests) — key-routing determinism andcorrectness (hash lands inside the picked segment's range),
single-segment, empty-segments error, uncovered-hash error,
round-robin cycling + wrap, hash 16-bit bound, string/bytes agree.
MessageIdV5Test(15 tests) — constructor + null-rejection,immutable position vector, equals/hashCode (position vector is
diagnostic, not identity), compareTo by segment then v4 id, foreign-
type rejection, byte-array roundtrip with and without position
vector, malformed input handling, EARLIEST/LATEST sentinels,
toString.ClientSegmentLayoutTest(7 tests) —fromProtofilters activesegments, sorts by hash-range start, builds segment topic names,
collects broker URLs, propagates the controller URL (plus TLS
variant), and the returned collections are immutable.
CheckpointV5Test(11 tests) — regular checkpoint roundtripwith multi-segment positions, empty positions, EARLIEST/LATEST
sentinel roundtrip (sentinel identity preserved, 1-byte compact
wire), timestamp variant roundtrip + wire shape, null / empty /
unknown-type input rejection, immutable positions map.
Already exercised by prior PRs in this stack
PulsarClientImplTest.createSegmentProducerAsync*— on [feat] PIP-468: Add scalable topics and segments admin APIs with CLI #25565 (thesmall v4 change we added here is also exercised there).
[feat] PIP-468: Add ScalableTopicController and broker infrastructure #25559.
Test plan
./gradlew :pulsar-client-v5:test --tests "org.apache.pulsar.client.impl.v5.*"— 48/48 new tests pass.(
pulsar-client-v5,pulsar-client-original,pulsar-broker,pulsar-common,pulsar-client-admin-api).