[feat] [test] PIP-468: V5 producer/consumer knob coverage + related client fixes#25588
Merged
merlimat merged 9 commits intoapache:masterfrom Apr 28, 2026
Merged
[feat] [test] PIP-468: V5 producer/consumer knob coverage + related client fixes#25588merlimat merged 9 commits intoapache:masterfrom
merlimat merged 9 commits intoapache:masterfrom
Conversation
ScalableQueueConsumer and ScalableStreamConsumer were creating fresh ConsumerConfigurationData per segment that only carried over the subscription name and consumer name. Every other builder knob — receiverQueueSize, ackTimeout, acknowledgmentGroupTime, maxAcknowledgmentGroupSize, negativeAckRedeliveryBackoff, ackTimeoutRedeliveryBackoff, deadLetterPolicy, readCompacted, replicateSubscriptionState, encryption — was silently dropped on the floor. Clone the user-facing ConsumerConfigurationData and only override the fields that must be per-segment (topicNames, subscription type) and the consumer name suffix. This makes the V5 consumer builders behave the way users expect: setting a knob on the V5 builder actually applies to the underlying per-segment v4 consumers. The same structural issue exists in ScalableTopicProducer and ScalableCheckpointConsumer; those will be fixed in follow-ups since they need a small builder refactor to retain the user-facing config.
V5's ProducerAccessMode uses SCREAMING_SNAKE_CASE (SHARED, EXCLUSIVE, EXCLUSIVE_WITH_FENCING, WAIT_FOR_EXCLUSIVE) but v4's enum uses PascalCase (Shared, Exclusive, ...). The previous valueOf(name()) call threw IllegalArgumentException for every value. Map explicitly via switch.
ScalableTopicProducer creates one v4 producer per active segment lazily on first send. Previously the per-segment ProducerConfigurationData was built from scratch and only carried sendTimeout / blockIfQueueFull / producerName, so every other builder knob (compression, batching, chunking, encryption, initialSequenceId, accessMode, properties, ...) was silently dropped on the floor. Clone the user-facing producerConf instead, mirroring the consumer-side fix from 025da64. Two related fixes folded in: - lastSequenceId() returned -1 until a segment producer existed; now it starts from the configured initialSequenceId so a caller that sets initialSequenceId(N) and immediately reads back gets N. - getOrCreateSegmentProducer wrapped v4 PulsarClientException in a bare RuntimeException via computeIfAbsent, so newMessage().send() callers saw an unchecked failure for producer-fenced / busy collisions. Unwrap and rethrow as the V5 PulsarClientException the contract promises.
Most V5 e2e tests want a single PulsarClient per test class rather than
one per test method (mirrors the v4 pulsarClient field on the parent
SharedPulsarBaseTest). Add a shared {@code v5Client} field initialized
in @BeforeClass and closed in @afterclass, and update newV5Client() to
remain available for tests that specifically need an isolated client.
One test per Schema.* factory: produce a sentinel value with the schema, consume it back via a V5 QueueConsumer using the same schema, assert the deserialized value equals what was sent. - Primitives: bytes, string, bool, int8 / int16 / int32 / int64, float32, float64. - Polymorphic: json(Pojo) and avro(Pojo) using a small public no-args Pojo with name/age. - protobuf: uses the existing TestMessage proto already generated for the broker's other schema tests. autoProduceBytes is intentionally skipped — it requires a pre- registered topic schema and belongs with the producer-knob tests. 12 tests, ~1.1 s of test work.
Five new e2e tests exercising the producer builder knobs end-to-end through the V5 scalable-topic pipeline: - V5MessageMetadataTest: eventTime, properties (single + batch), deliverAfter, deliverAt. - V5ProducerCompressionTest: roundtrip across all CompressionType values plus disabled / default. - V5ProducerBatchingTest: ofDefault, ofDisabled, tight delay, small batch size. - V5ProducerSequenceIdTest: lastSequenceId reflects sends and initialSequenceId; explicit per-message sequenceId honored; dedup of duplicate sequenceId. - V5ProducerAccessModeTest: SHARED multi-producer, EXCLUSIVE rejects a second send, WAIT_FOR_EXCLUSIVE succeeds after the first releases.
Three new e2e tests exercising the consumer builder knobs through the V5 scalable-topic pipeline: - V5SubscriptionInitialPositionTest: EARLIEST sees prior data and LATEST skips it, for both QueueConsumer and StreamConsumer. - V5AckTimeoutTest: an unacked message is redelivered after the configured ackTimeout. - V5DeadLetterPolicyTest: after maxRedeliverCount nacks, the broker forwards the message to the configured dead-letter topic.
…cess modes Per-segment v4 producers were created lazily on first send, so an EXCLUSIVE / EXCLUSIVE_WITH_FENCING / WAIT_FOR_EXCLUSIVE producer's exclusivity guarantee only kicked in for the segment hit by the next send — a second EXCLUSIVE producer could create() and even send to a different segment before the broker noticed the collision. Eagerly claim every active segment at create() time when the access mode requires exclusivity: - ScalableTopicProducer.eagerAttachInitialAsync() walks the active layout and synchronously creates v4 producers for each segment. Failures propagate so create() fails up front instead of deferring the collision to first send. - ProducerBuilderV5.createAsync wires this in via thenCompose, with a closeAsync on the partial producer if eager attach fails. - onLayoutChange runs a best-effort eager attach for newly-introduced segments after a split, so exclusivity continues to cover the topic as it scales out. (Best-effort because the callback can't propagate.) Update V5ProducerAccessModeTest to match: the second EXCLUSIVE producer now fails at create() rather than at first send, and the WAIT_FOR_EXCLUSIVE test asserts the second create() actually blocks until the first releases.
…licyTest Document the V5 limitation that prevents the DLQ topic from being a scalable topic: the source consumer's underlying v4 ConsumerImpl creates the DLQ producer via client.newProducer(...), which rejects topic:// scalable topic names. Routing the DLQ producer through V5's segment-bypass path is required before a scalable DLQ can be used here. Adjust the redelivery loop to match v4 DLQ semantics: the consumer forwards to DLQ only when redeliveryCount > maxRedeliverCount (strictly greater), so with maxRedeliverCount=2 the user sees three deliveries and the fourth is intercepted.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Round-out V5 client end-to-end coverage for producer / consumer builder knobs, plus the V5 client fixes those tests surfaced. Builds on PIP-468 (#25573, #25586) and is independent of #25587.
V5 client fixes:
ConsumerConfigurationDatato per-segment v4 consumers (was building a fresh config that dropped DLQ / negative-ack backoff / ackTimeout / subscriptionInitialPosition / etc.).ProducerConfigurationDatato per-segment v4 producers (same problem on the producer side — compression / batching / chunking / encryption / initialSequenceId / accessMode / properties were silently dropped).ProducerAccessModeenum from V5's SCREAMING_SNAKE_CASE to v4's PascalCase (the previousvalueOf(name())threwIllegalArgumentExceptionfor every value).create()time. Per-segment producers were lazy, so a second EXCLUSIVE producer couldcreate()before the broker noticed the collision;WAIT_FOR_EXCLUSIVEdid not actually block atcreate().lastSequenceId()now reflects the configuredinitialSequenceIdeven before any segment producer exists.getOrCreateSegmentProducerno longer wraps v4PulsarClientExceptionin a bareRuntimeException.V5ClientBaseTestwith a shared per-classv5Clientfield (mirrors the v4pulsarClientfield) to keep tests focused.New e2e tests (all under
pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/):V5SchemaRoundtripTest— bytes / string / bool / int8/16/32/64 / float32/64 / json / avro / protobuf / autoProduceBytesV5MessageMetadataTest— eventTime, properties (single + batch), deliverAfter, deliverAtV5ProducerCompressionTest— allCompressionTypevalues + disabled + defaultV5ProducerBatchingTest— default, disabled, tight delay, small batch sizeV5ProducerSequenceIdTest—lastSequenceId,initialSequenceId, explicit sequenceId, dedupV5ProducerAccessModeTest— SHARED multi-producer, EXCLUSIVE rejects second create(), WAIT_FOR_EXCLUSIVE blocks then succeeds after releaseV5SubscriptionInitialPositionTest— EARLIEST / LATEST for QueueConsumer + StreamConsumerV5AckTimeoutTest— unacked redelivery afterackTimeoutV5DeadLetterPolicyTest— message lands on DLQ aftermaxRedeliverCount. Documents a known V5 gap: the source consumer's underlying v4ConsumerImplcreates the DLQ producer viaclient.newProducer(...), which rejectstopic://scalable URIs, so the DLQ topic is currently a regular persistent topic.Motivation
PR #25586 covered a smoke test. The basics are tracked in #25587. This PR adds the missing knob-level coverage and lands the V5 fixes the tests surfaced — keeping #25587 untouched.
Test plan
./gradlew :pulsar-broker:test --tests "org.apache.pulsar.client.api.v5.V5SchemaRoundtripTest"./gradlew :pulsar-broker:test --tests "org.apache.pulsar.client.api.v5.V5MessageMetadataTest"./gradlew :pulsar-broker:test --tests "org.apache.pulsar.client.api.v5.V5ProducerCompressionTest"./gradlew :pulsar-broker:test --tests "org.apache.pulsar.client.api.v5.V5ProducerBatchingTest"./gradlew :pulsar-broker:test --tests "org.apache.pulsar.client.api.v5.V5ProducerSequenceIdTest"./gradlew :pulsar-broker:test --tests "org.apache.pulsar.client.api.v5.V5ProducerAccessModeTest"./gradlew :pulsar-broker:test --tests "org.apache.pulsar.client.api.v5.V5SubscriptionInitialPositionTest"./gradlew :pulsar-broker:test --tests "org.apache.pulsar.client.api.v5.V5AckTimeoutTest"./gradlew :pulsar-broker:test --tests "org.apache.pulsar.client.api.v5.V5DeadLetterPolicyTest"Matching PR
area/test