T track(T closeable) {
+ v5Resources.add(closeable);
+ return closeable;
+ }
+
+ @AfterMethod(alwaysRun = true)
+ public void closeV5Resources() {
+ // Close in reverse order: consumers/producers before the client they belong to.
+ for (int i = v5Resources.size() - 1; i >= 0; i--) {
+ AutoCloseable c = v5Resources.get(i);
+ try {
+ c.close();
+ } catch (Exception ignored) {
+ // Best-effort cleanup; tests have already asserted what they care about.
+ }
+ }
+ v5Resources.clear();
+ }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5SmokeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5SmokeTest.java
new file mode 100644
index 0000000000000..f613671f33434
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5SmokeTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import java.time.Duration;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.testng.annotations.Test;
+
+/**
+ * Smoke test: end-to-end verification that the longest V5 path works against a real broker.
+ *
+ * Exercises: admin {@code createScalableTopic} → {@code DagWatchClient} session → segment
+ * lookup → per-segment v4 producer creation → wire-format send → segment v4 consumer attach →
+ * receive → ack on the V5 {@link QueueConsumer}.
+ */
+public class V5SmokeTest extends V5ClientBaseTest {
+
+ @Test
+ public void testProduceAndConsumeOneMessageOnSingleSegmentTopic() throws Exception {
+ String topic = newScalableTopic(1);
+ PulsarClient client = newV5Client();
+
+ Producer producer = client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+ track(producer);
+
+ QueueConsumer consumer = client.newQueueConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName("smoke-sub")
+ .subscribe();
+ track(consumer);
+
+ MessageId sentId = producer.newMessage().value("hello-pulsar-v5").send();
+ assertNotNull(sentId, "producer must return a message id");
+
+ Message received = consumer.receive(Duration.ofSeconds(10));
+ assertNotNull(received, "consumer must receive within timeout");
+ assertEquals(received.value(), "hello-pulsar-v5");
+ consumer.acknowledge(received.id());
+ }
+}
diff --git a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableCheckpointConsumer.java b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableCheckpointConsumer.java
index d7b205f79e26b..534f885663dce 100644
--- a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableCheckpointConsumer.java
+++ b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableCheckpointConsumer.java
@@ -316,14 +316,14 @@ private CompletableFuture> createSegmentReaderAsync(ActiveSegment segm
PulsarClientImpl v4Client = client.v4Client();
org.apache.pulsar.client.api.MessageId startMsgId = resolveStartPosition(segment.segmentId());
- var builder = v4Client.newReader(v4Schema)
- .topic(segment.segmentTopicName())
- .startMessageId(startMsgId);
+ var segConf = new org.apache.pulsar.client.impl.conf.ReaderConfigurationData();
+ segConf.getTopicNames().add(segment.segmentTopicName());
+ segConf.setStartMessageId(startMsgId);
if (consumerName != null) {
- builder.readerName(consumerName + "-seg-" + segment.segmentId());
+ segConf.setReaderName(consumerName + "-seg-" + segment.segmentId());
}
- return builder.createAsync()
+ return v4Client.createSegmentReaderAsync(segConf, v4Schema)
.thenApply(reader -> {
startReadLoop(reader, segment.segmentId());
return reader;
diff --git a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableQueueConsumer.java b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableQueueConsumer.java
index 28b6ba8af6bf7..4a1dae8bcdfd4 100644
--- a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableQueueConsumer.java
+++ b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableQueueConsumer.java
@@ -317,14 +317,14 @@ private CompletableFuture subscribeSegments(ClientSegmentLayout layout) {
private CompletableFuture> createSegmentConsumerAsync(
ActiveSegment segment) {
PulsarClientImpl v4Client = client.v4Client();
- var builder = v4Client.newConsumer(v4Schema)
- .topic(segment.segmentTopicName())
- .subscriptionName(subscriptionName)
- .subscriptionType(SubscriptionType.Shared);
+ var segConf = new org.apache.pulsar.client.impl.conf.ConsumerConfigurationData();
+ segConf.getTopicNames().add(segment.segmentTopicName());
+ segConf.setSubscriptionName(subscriptionName);
+ segConf.setSubscriptionType(SubscriptionType.Shared);
if (consumerConf.getConsumerName() != null) {
- builder.consumerName(consumerConf.getConsumerName() + "-seg-" + segment.segmentId());
+ segConf.setConsumerName(consumerConf.getConsumerName() + "-seg-" + segment.segmentId());
}
- return builder.subscribeAsync()
+ return v4Client.subscribeSegmentAsync(segConf, v4Schema)
.thenApply(consumer -> {
startReceiveLoop(consumer, segment.segmentId());
return consumer;
diff --git a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableStreamConsumer.java b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableStreamConsumer.java
index 2642e0f4d57ab..5137ce0462f61 100644
--- a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableStreamConsumer.java
+++ b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableStreamConsumer.java
@@ -305,14 +305,14 @@ private CompletableFuture subscribeSegments(ClientSegmentLayout layout) {
private CompletableFuture> createSegmentConsumerAsync(
ActiveSegment segment) {
PulsarClientImpl v4Client = client.v4Client();
- var builder = v4Client.newConsumer(v4Schema)
- .topic(segment.segmentTopicName())
- .subscriptionName(subscriptionName)
- .subscriptionType(SubscriptionType.Exclusive);
+ var segConf = new org.apache.pulsar.client.impl.conf.ConsumerConfigurationData();
+ segConf.getTopicNames().add(segment.segmentTopicName());
+ segConf.setSubscriptionName(subscriptionName);
+ segConf.setSubscriptionType(SubscriptionType.Exclusive);
if (consumerConf.getConsumerName() != null) {
- builder.consumerName(consumerConf.getConsumerName() + "-seg-" + segment.segmentId());
+ segConf.setConsumerName(consumerConf.getConsumerName() + "-seg-" + segment.segmentId());
}
- return builder.subscribeAsync()
+ return v4Client.subscribeSegmentAsync(segConf, v4Schema)
.thenApply(consumer -> {
startReceiveLoop(consumer, segment.segmentId());
return consumer;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index ca3343fd3aea7..baf6198420e13 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -502,6 +502,70 @@ public CompletableFuture> createSegmentProducerAsync(
return createProducerAsync(topic, conf, schema, null);
}
+ /**
+ * Create a reader against a segment topic bypassing the scalable domain check.
+ * This is intended for internal use by the V5 {@code CheckpointConsumer} to read each
+ * segment's underlying {@code segment://} topic.
+ */
+ public CompletableFuture> createSegmentReaderAsync(ReaderConfigurationData conf,
+ Schema schema) {
+ if (state.get() != State.Open) {
+ return FutureUtil.failedFuture(new PulsarClientException.AlreadyClosedException("Client already closed"));
+ }
+ if (conf == null) {
+ return FutureUtil.failedFuture(
+ new PulsarClientException.InvalidConfigurationException("Reader configuration undefined"));
+ }
+ if (conf.getTopicNames().size() != 1) {
+ return FutureUtil.failedFuture(
+ new PulsarClientException.InvalidConfigurationException(
+ "createSegmentReaderAsync requires exactly one topic, got "
+ + conf.getTopicNames().size()));
+ }
+ String topic = conf.getTopicName();
+ if (!TopicName.isValid(topic)) {
+ return FutureUtil.failedFuture(
+ new PulsarClientException.InvalidTopicNameException("Invalid topic name: '" + topic + "'"));
+ }
+ if (conf.getStartMessageId() == null) {
+ return FutureUtil.failedFuture(
+ new PulsarClientException.InvalidConfigurationException("Invalid startMessageId"));
+ }
+ return preProcessSchemaBeforeSubscribe(this, schema, topic)
+ .thenCompose(schemaClone -> createSingleTopicReaderAsync(conf, schemaClone));
+ }
+
+ /**
+ * Subscribe to a segment topic bypassing the scalable domain check.
+ * This is intended for internal use by the V5 client to subscribe to per-segment v4
+ * topics for the {@code segment://} backing topics it owns.
+ */
+ public CompletableFuture> subscribeSegmentAsync(ConsumerConfigurationData conf,
+ Schema schema) {
+ if (state.get() != State.Open) {
+ return FutureUtil.failedFuture(new PulsarClientException.AlreadyClosedException("Client already closed"));
+ }
+ if (conf == null) {
+ return FutureUtil.failedFuture(
+ new PulsarClientException.InvalidConfigurationException("Consumer configuration undefined"));
+ }
+ if (conf.getTopicNames().size() != 1) {
+ return FutureUtil.failedFuture(
+ new PulsarClientException.InvalidConfigurationException(
+ "subscribeSegmentAsync requires exactly one topic, got " + conf.getTopicNames().size()));
+ }
+ String topic = conf.getSingleTopic();
+ if (!TopicName.isValid(topic)) {
+ return FutureUtil.failedFuture(
+ new PulsarClientException.InvalidTopicNameException("Invalid topic name: '" + topic + "'"));
+ }
+ if (isBlank(conf.getSubscriptionName())) {
+ return FutureUtil.failedFuture(
+ new PulsarClientException.InvalidConfigurationException("Empty subscription name"));
+ }
+ return singleTopicSubscribeAsync(conf, schema, null);
+ }
+
/**
* Reject {@code topic://} (PIP-460 scalable topics) and {@code segment://} (the internal
* backing-topic domain used by V5 scalable topics). Users on the V4 SDK must switch to the