diff --git a/pulsar-broker/build.gradle.kts b/pulsar-broker/build.gradle.kts index 38c5b91793299..5fdd16c30c5ab 100644 --- a/pulsar-broker/build.gradle.kts +++ b/pulsar-broker/build.gradle.kts @@ -132,6 +132,7 @@ dependencies { testImplementation(libs.jetty.ee8.proxy) testImplementation(libs.jetty.websocket.jetty.client) testImplementation(libs.opentelemetry.sdk.testing) + testImplementation(libs.oxia.testcontainers) testRuntimeOnly(libs.avro.protobuf) { exclude(group = "com.google.protobuf") } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/DagWatchSession.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/DagWatchSession.java index 51a8ed20c47ff..ef0f8947b50ec 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/DagWatchSession.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/DagWatchSession.java @@ -169,6 +169,16 @@ private ScalableTopicDAG buildDagProto(ScalableTopicLayoutResponse response) { } } + // Propagate the controller-broker URL so V5 clients can connect to the right broker + // for scalable-topic subscribe. Without this the client falls back to its configured + // service URL, which on a multi-broker cluster is rarely the controller leader. + if (response.controllerBrokerUrl() != null) { + dag.setControllerBrokerUrl(response.controllerBrokerUrl()); + } + if (response.controllerBrokerUrlTls() != null) { + dag.setControllerBrokerUrlTls(response.controllerBrokerUrlTls()); + } + return dag; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java index b63ee847e3484..8b1805cd31517 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java @@ -21,10 +21,8 @@ import io.github.merlimat.slog.Logger; import java.time.Duration; import java.util.LinkedHashMap; -import java.util.LinkedHashSet; import java.util.Map; import java.util.Optional; -import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import lombok.Getter; @@ -233,9 +231,11 @@ public CompletableFuture splitSegment(long segmentId) { SegmentInfo parent = currentLayout.getAllSegments().get(segmentId); String parentTopicName = toSegmentPersistentName(parent); - // Step 1: Discover subscriptions on the parent segment, then create child - // segment topics with those subscriptions (routed to owning brokers via admin API) - return discoverSubscriptions(parentTopicName) + // Step 1: Read the scalable topic's subscriptions from metadata (the single source + // of truth — segment topics may live on different brokers, but the subscription set + // is tracked here), then create child segment topics with those subscriptions + // already provisioned (the create call routes to each segment's owning broker). + return resources.listSubscriptionsAsync(topicName) .thenCompose(parentSubs -> { var subList = new java.util.ArrayList<>(parentSubs); return createSegmentTopic(child1, subList) @@ -277,13 +277,10 @@ public CompletableFuture mergeSegments(long segmentId1, long segm String parent1Topic = toSegmentPersistentName(parent1); String parent2Topic = toSegmentPersistentName(parent2); - // Step 1: Discover subscriptions from both parents (union), then create merged segment - return discoverSubscriptions(parent1Topic) - .thenCombine(discoverSubscriptions(parent2Topic), (subs1, subs2) -> { - Set allSubs = new LinkedHashSet<>(subs1); - allSubs.addAll(subs2); - return allSubs; - }) + // Step 1: Read the scalable topic's subscriptions from metadata (single source of + // truth, see splitSegment), then create the merged segment topic with those + // subscriptions provisioned. + return resources.listSubscriptionsAsync(topicName) .thenCompose(parentSubs -> createSegmentTopic(merged, new java.util.ArrayList<>(parentSubs))) // Step 2: Terminate both parent segment topics @@ -582,29 +579,6 @@ private CompletableFuture createSegmentTopic(SegmentInfo segment, java.uti } } - /** - * Discover all subscription names on a segment topic. Works whether the topic is - * on this broker or a remote one by using the admin client. - */ - private CompletableFuture> discoverSubscriptions(String segmentTopicName) { - // Try local first (avoids RPC if the segment is on this broker) - return brokerService.getTopicIfExists(segmentTopicName) - .thenCompose(optTopic -> { - if (optTopic.isPresent()) { - return CompletableFuture.completedFuture( - new LinkedHashSet<>(optTopic.get().getSubscriptions().keySet())); - } - // Topic is on a remote broker — use admin client - try { - return brokerService.getPulsar().getAdminClient() - .topics().getSubscriptionsAsync(segmentTopicName) - .thenApply(LinkedHashSet::new); - } catch (PulsarServerException e) { - return CompletableFuture.failedFuture(e); - } - }); - } - private CompletableFuture notifySubscriptions(SegmentLayout layout) { CompletableFuture[] futures = subscriptions.values().stream() .map(coordinator -> coordinator.onLayoutChange(layout)) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedMultiBrokerPulsarBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedMultiBrokerPulsarBaseTest.java new file mode 100644 index 0000000000000..85ae589099369 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedMultiBrokerPulsarBaseTest.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import lombok.CustomLog; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; + +/** + * Base class for tests that need a shared multi-broker cluster across test classes. + * + *

Companion to {@link SharedPulsarBaseTest}. Use this when a test specifically depends on + * behavior that only manifests across brokers — namespace ownership transfer, controller-leader + * failover, segment placement on different brokers, V5 client reconnect to a different broker. + * For everything else, prefer the single-broker {@link SharedPulsarBaseTest}: it's faster, has + * fewer moving parts, and is sufficient for most coverage. + * + *

Each test method gets a fresh namespace under {@link SharedMultiBrokerPulsarCluster#TENANT_NAME} + * (created in {@link #setupSharedMultiBrokerTest()} and force-deleted in + * {@link #cleanupSharedMultiBrokerTest()}). The cluster itself is JVM-wide and reused across + * every test class that extends this — see {@link SharedMultiBrokerPulsarCluster}. + * + *

Subclasses get: + *

    + *
  • {@link #admin} / {@link #pulsarClient} — handles aimed at broker 0; lookups against any + * broker correctly redirect to topic owners, so most tests should just use these.
  • + *
  • {@link #brokers} / {@link #admins} / {@link #clients} — full per-broker lists, in start + * order, for tests that need to address a specific broker (e.g. asserting topic + * ownership, killing a specific broker).
  • + *
  • {@link #newTopicName()} — generates a unique topic in the test namespace.
  • + *
+ */ +@CustomLog +public abstract class SharedMultiBrokerPulsarBaseTest { + + /** All brokers in the shared cluster, in start order. */ + protected List brokers; + /** Per-broker admin handles, aligned with {@link #brokers}. */ + protected List admins; + /** Per-broker client handles, aligned with {@link #brokers}. */ + protected List clients; + + /** Convenience: broker 0's admin. */ + protected PulsarAdmin admin; + /** Convenience: broker 0's client. */ + protected PulsarClient pulsarClient; + + private final List namespaces = new ArrayList<>(); + + /** Returns the unique namespace assigned to the current test method. */ + protected String getNamespace() { + return namespaces.get(0); + } + + /** Returns the broker service URL for broker {@code index}. */ + protected String getBrokerServiceUrl(int index) { + return brokers.get(index).getBrokerServiceUrl(); + } + + /** Returns the web service URL for broker {@code index}. */ + protected String getWebServiceUrl(int index) { + return brokers.get(index).getWebServiceAddress(); + } + + /** + * Creates a new {@link PulsarClient} connected to broker {@code index}. Callers are + * responsible for closing the returned client. + */ + protected PulsarClient newPulsarClient(int index) throws PulsarClientException { + return PulsarClient.builder().serviceUrl(brokers.get(index).getBrokerServiceUrl()).build(); + } + + /** + * Initializes (lazily) the shared cluster singleton and wires the per-class fields. Called + * once per test class. + */ + @BeforeClass(alwaysRun = true) + public void setupSharedMultiBrokerCluster() throws Exception { + SharedMultiBrokerPulsarCluster cluster = SharedMultiBrokerPulsarCluster.get(); + brokers = cluster.getBrokers(); + admins = cluster.getAdmins(); + clients = cluster.getClients(); + admin = cluster.getAdmin(); + pulsarClient = cluster.getClient(); + } + + /** Creates a unique namespace for the current test method. */ + @BeforeMethod(alwaysRun = true) + public void setupSharedMultiBrokerTest() throws Exception { + createNewNamespace(); + } + + /** Force-deletes all namespaces created during the test method. */ + @AfterMethod(alwaysRun = true) + public void cleanupSharedMultiBrokerTest() throws Exception { + for (String ns : namespaces) { + try { + admin.namespaces().deleteNamespace(ns, true); + log.info().attr("testNamespace", ns).log("Deleted test namespace"); + } catch (Exception e) { + log.warn().attr("deleteNamespace", ns).exceptionMessage(e).log("Failed to delete namespace"); + } + } + namespaces.clear(); + } + + /** Creates a new namespace under the shared tenant and registers it for cleanup. */ + protected String createNewNamespace() throws Exception { + String nsName = "test-" + UUID.randomUUID().toString().substring(0, 8); + String ns = SharedMultiBrokerPulsarCluster.TENANT_NAME + "/" + nsName; + admin.namespaces().createNamespace(ns, Set.of(SharedMultiBrokerPulsarCluster.CLUSTER_NAME)); + namespaces.add(ns); + log.info().attr("testNamespace", ns).log("Created test namespace"); + return ns; + } + + /** Generates a unique persistent topic name within the current test namespace. */ + protected String newTopicName() { + return "persistent://" + getNamespace() + "/topic-" + UUID.randomUUID().toString().substring(0, 8); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedMultiBrokerPulsarCluster.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedMultiBrokerPulsarCluster.java new file mode 100644 index 0000000000000..fde85c06ddb79 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedMultiBrokerPulsarCluster.java @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import io.oxia.testcontainers.OxiaContainer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import lombok.CustomLog; +import org.apache.bookkeeper.common.allocator.PoolingPolicy; +import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.TenantInfo; +import org.apache.pulsar.metadata.bookkeeper.BKCluster; +import org.apache.pulsar.tests.ThreadLeakDetectorListener; + +/** + * JVM-wide singleton that manages a lightweight multi-broker Pulsar cluster for integration tests. + * + *

Companion to {@link SharedPulsarCluster}, but spins up {@value #NUM_BROKERS} Pulsar brokers + * sharing a single bookie and an in-memory metadata store. Use this when a test specifically + * needs to exercise behavior that only manifests across brokers — namespace ownership transfer, + * controller-leader failover, segment placement on different brokers, V5 client reconnect to a + * different broker, etc. + * + *

The first broker's admin and client are exposed as the "primary" handles via + * {@link #getAdmin()} and {@link #getClient()}; per-broker handles are available via + * {@link #getBrokers()}, {@link #getAdmins()}, and {@link #getClients()}. Lookups against any + * broker correctly redirect to the broker that owns the requested topic, so most tests should + * just use the primary handles. + * + *

Lazy on first call to {@link #get()}; closed via JVM shutdown hook. + * + * @see SharedMultiBrokerPulsarBaseTest + */ +@CustomLog +public class SharedMultiBrokerPulsarCluster { + + public static final String CLUSTER_NAME = "multi-broker-test-cluster"; + public static final String TENANT_NAME = "multi-broker-test-tenant"; + + /** + * Number of brokers in the shared cluster. Three is the minimum that lets us exercise + * controller-leader failover (one leader + at least two followers) and segment placement + * across more than one broker. + */ + public static final int NUM_BROKERS = 3; + + private static volatile SharedMultiBrokerPulsarCluster instance; + + private OxiaContainer oxiaServer; + private String metadataStoreUrl; + private BKCluster bkCluster; + private final List brokers = new ArrayList<>(NUM_BROKERS); + private final List admins = new ArrayList<>(NUM_BROKERS); + private final List clients = new ArrayList<>(NUM_BROKERS); + + /** Returns the singleton instance, starting the cluster on first invocation. */ + public static SharedMultiBrokerPulsarCluster get() throws Exception { + if (instance == null) { + synchronized (SharedMultiBrokerPulsarCluster.class) { + if (instance == null) { + SharedMultiBrokerPulsarCluster cluster = new SharedMultiBrokerPulsarCluster(); + cluster.start(); + instance = cluster; + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + try { + instance.close(); + } catch (Exception e) { + log.error().exception(e).log("Failed to close SharedMultiBrokerPulsarCluster"); + } + })); + } + } + } + return instance; + } + + /** All brokers in the cluster, in start order. */ + public List getBrokers() { + return Collections.unmodifiableList(brokers); + } + + /** Per-broker {@link PulsarAdmin} handles, in the same order as {@link #getBrokers()}. */ + public List getAdmins() { + return Collections.unmodifiableList(admins); + } + + /** Per-broker {@link PulsarClient} handles, in the same order as {@link #getBrokers()}. */ + public List getClients() { + return Collections.unmodifiableList(clients); + } + + /** Convenience: the first broker's admin. Lookups redirect to topic-owning brokers. */ + public PulsarAdmin getAdmin() { + return admins.get(0); + } + + /** Convenience: the first broker's client. Lookups redirect to topic-owning brokers. */ + public PulsarClient getClient() { + return clients.get(0); + } + + @SuppressWarnings("deprecation") + private void start() throws Exception { + log.info().attr("brokers", NUM_BROKERS).log("Starting SharedMultiBrokerPulsarCluster"); + + // Real Oxia server (not the in-memory metadata store). Per-topic leader election + // (used by the ScalableTopicController) relies on per-session ephemeral nodes, and + // the in-memory store treats every connection on the same JVM as the same session + // — so multiple brokers all "win" the same election simultaneously. Oxia gives each + // broker its own session and the proper ephemeral / CAS semantics. Container-based + // because oxia ships no in-process server; tests skip cleanly when Docker isn't + // available. + oxiaServer = new OxiaContainer(OxiaContainer.DEFAULT_IMAGE_NAME); + oxiaServer.start(); + metadataStoreUrl = "oxia://" + oxiaServer.getServiceAddress(); + + // Single shared bookie. Same minimal config as SharedPulsarCluster — write quorum stays + // at 1 across brokers because the bookie count is the limiting factor, not the brokers. + ServerConfiguration bkConf = new ServerConfiguration(); + bkConf.setProperty("dbStorage_writeCacheMaxSizeMb", 32); + bkConf.setProperty("dbStorage_readAheadCacheMaxSizeMb", 4); + bkConf.setProperty("dbStorage_rocksDB_writeBufferSizeMB", 4); + bkConf.setProperty("dbStorage_rocksDB_blockCacheSize", 4 * 1024 * 1024); + bkConf.setJournalSyncData(false); + bkConf.setJournalWriteData(false); + bkConf.setProperty("journalMaxGroupWaitMSec", 0L); + bkConf.setProperty("journalPreAllocSizeMB", 1); + bkConf.setFlushInterval(60000); + bkConf.setGcWaitTime(60000); + bkConf.setAllowLoopback(true); + bkConf.setAdvertisedAddress("127.0.0.1"); + bkConf.setAllowEphemeralPorts(true); + bkConf.setNumAddWorkerThreads(0); + bkConf.setNumReadWorkerThreads(0); + bkConf.setNumHighPriorityWorkerThreads(0); + bkConf.setNumJournalCallbackThreads(0); + bkConf.setServerNumIOThreads(1); + bkConf.setNumLongPollWorkerThreads(1); + bkConf.setAllocatorPoolingPolicy(PoolingPolicy.UnpooledHeap); + bkConf.setLedgerStorageClass("org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage"); + + bkCluster = BKCluster.builder() + .baseServerConfiguration(bkConf) + .metadataServiceUri(metadataStoreUrl) + .numBookies(1) + .clearOldData(true) + .build(); + + // Start NUM_BROKERS brokers. The first one provisions the cluster + tenant; the rest + // discover them through the shared metadata store. + for (int i = 0; i < NUM_BROKERS; i++) { + PulsarService broker = startBroker(i); + brokers.add(broker); + + PulsarAdmin admin = PulsarAdmin.builder() + .serviceHttpUrl(broker.getWebServiceAddress()) + .build(); + admins.add(admin); + + PulsarClient client = PulsarClient.builder() + .serviceUrl(broker.getBrokerServiceUrl()) + .build(); + clients.add(client); + + if (i == 0) { + admin.clusters().createCluster(CLUSTER_NAME, + ClusterData.builder() + .serviceUrl(broker.getWebServiceAddress()) + .brokerServiceUrl(broker.getBrokerServiceUrl()) + .build()); + admin.tenants().createTenant(TENANT_NAME, + TenantInfo.builder() + .allowedClusters(Set.of(CLUSTER_NAME)) + .build()); + } + } + + log.info() + .attr("brokers", brokers.stream().map(PulsarService::getBrokerServiceUrl).toList()) + .log("SharedMultiBrokerPulsarCluster started"); + + // Reset thread-leak baseline so cluster threads aren't reported against the first test. + ThreadLeakDetectorListener.resetCapturedThreads(); + } + + private PulsarService startBroker(int index) throws Exception { + ServiceConfiguration config = new ServiceConfiguration(); + config.setMetadataStoreUrl(metadataStoreUrl); + config.setConfigurationMetadataStoreUrl(metadataStoreUrl); + config.setClusterName(CLUSTER_NAME); + config.setAdvertisedAddress("localhost"); + config.setBrokerServicePort(Optional.of(0)); + config.setWebServicePort(Optional.of(0)); + config.setManagedLedgerDefaultEnsembleSize(1); + config.setManagedLedgerDefaultWriteQuorum(1); + config.setManagedLedgerDefaultAckQuorum(1); + // More bundles than brokers so the load balancer has room to spread ownership. + config.setDefaultNumberOfNamespaceBundles(NUM_BROKERS * 2); + config.setBrokerShutdownTimeoutMs(0L); + config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); + config.setNumExecutorThreadPoolSize(5); + config.setManagedLedgerCacheSizeMB(8); + config.setActiveConsumerFailoverDelayTimeMillis(0); + config.setAllowAutoTopicCreationType( + org.apache.pulsar.common.policies.data.TopicType.NON_PARTITIONED); + config.setBookkeeperNumberOfChannelsPerBookie(1); + config.setBookkeeperClientExposeStatsToPrometheus(false); + config.setDispatcherRetryBackoffInitialTimeInMs(0); + config.setDispatcherRetryBackoffMaxTimeInMs(0); + config.setForceDeleteNamespaceAllowed(true); + config.setForceDeleteTenantAllowed(true); + config.setBrokerDeleteInactiveTopicsEnabled(false); + config.setBrokerDeduplicationEnabled(true); + + // Reduce thread pool sizes — three brokers each spinning up the default counts is heavy. + config.setNumIOThreads(2); + config.setNumOrderedExecutorThreads(2); + config.setNumHttpServerThreads(4); + config.setBookkeeperClientNumWorkerThreads(2); + config.setBookkeeperClientNumIoThreads(2); + config.setNumCacheExecutorThreadPoolSize(1); + config.setManagedLedgerNumSchedulerThreads(2); + config.setTopicOrderedExecutorThreadNum(4); + + // Load balancer is what makes this multi-broker. Disable shedding to keep tests + // deterministic: bundles assigned at first lookup don't move around mid-test. + config.setLoadBalancerEnabled(true); + config.setLoadBalancerSheddingEnabled(false); + + log.info().attr("index", index).log("Starting broker"); + PulsarService broker = new PulsarService(config); + broker.start(); + log.info().attr("index", index) + .attr("broker", broker.getBrokerServiceUrl()) + .attr("web", broker.getWebServiceAddress()) + .log("Broker started"); + return broker; + } + + private void close() throws Exception { + log.info("Closing SharedMultiBrokerPulsarCluster"); + // Tear down in reverse order: clients first so they don't interfere with broker shutdown. + for (int i = clients.size() - 1; i >= 0; i--) { + try { + clients.get(i).close(); + } catch (Exception e) { + log.warn().attr("index", i).exceptionMessage(e).log("Failed to close client"); + } + } + for (int i = admins.size() - 1; i >= 0; i--) { + try { + admins.get(i).close(); + } catch (Exception e) { + log.warn().attr("index", i).exceptionMessage(e).log("Failed to close admin"); + } + } + for (int i = brokers.size() - 1; i >= 0; i--) { + try { + brokers.get(i).close(); + } catch (Exception e) { + log.warn().attr("index", i).exceptionMessage(e).log("Failed to close broker"); + } + } + if (bkCluster != null) { + bkCluster.close(); + } + if (oxiaServer != null) { + oxiaServer.close(); + } + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MultiBrokerClientBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MultiBrokerClientBaseTest.java new file mode 100644 index 0000000000000..609da36a58ee9 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MultiBrokerClientBaseTest.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api.v5; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.UUID; +import org.apache.pulsar.broker.service.SharedMultiBrokerPulsarBaseTest; +import org.testng.annotations.AfterClass; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeClass; + +/** + * Base class for V5 client end-to-end tests that need a multi-broker cluster. + * + *

Companion to {@link V5ClientBaseTest} (single-broker). Use this when a test specifically + * exercises broker-side coordination across brokers — controller-leader placement, + * admin-operation redirects to the controller-owning broker, V5 consumer reconnection to a + * different broker, etc. For non-multi-broker concerns prefer the single-broker variant: it's + * faster and has fewer moving parts. + * + *

Builds one V5 {@link PulsarClient} per broker in the shared cluster (initialized in + * {@code @BeforeClass}, closed in {@code @AfterClass}) so tests can drive operations through a + * specific broker and observe how the system routes them. {@link #v5Client} aliases the first + * client; {@link #v5Clients} exposes the full list aligned with {@link #brokers}. + */ +public abstract class V5MultiBrokerClientBaseTest extends SharedMultiBrokerPulsarBaseTest { + + /** V5 client connected to broker 0. Shorthand for {@code v5Clients.get(0)}. */ + protected PulsarClient v5Client; + + /** Per-broker V5 clients, in the same order as {@link #brokers}. */ + protected List v5Clients; + + private final List trackedClients = new ArrayList<>(); + + @BeforeClass(alwaysRun = true) + public void setupSharedV5MultiBrokerClients() throws Exception { + List built = new ArrayList<>(brokers.size()); + for (var broker : brokers) { + built.add(PulsarClient.builder() + .serviceUrl(broker.getBrokerServiceUrl()) + .build()); + } + v5Clients = Collections.unmodifiableList(built); + v5Client = v5Clients.get(0); + } + + @AfterClass(alwaysRun = true) + public void closeSharedV5MultiBrokerClients() throws Exception { + if (v5Clients != null) { + for (PulsarClient c : v5Clients) { + try { + c.close(); + } catch (Exception ignored) { + // best-effort cleanup + } + } + v5Clients = null; + v5Client = null; + } + } + + /** + * Build a fresh V5 client connected to the given broker. The returned client is + * registered for automatic close at the end of the current test method — useful for + * tests that want a dedicated client (e.g. to sever just its connection). + */ + protected PulsarClient newV5Client(int brokerIndex) throws Exception { + PulsarClient client = PulsarClient.builder() + .serviceUrl(brokers.get(brokerIndex).getBrokerServiceUrl()) + .build(); + trackedClients.add(client); + return client; + } + + @AfterMethod(alwaysRun = true) + public void closeTrackedV5Clients() { + for (int i = trackedClients.size() - 1; i >= 0; i--) { + try { + trackedClients.get(i).close(); + } catch (Exception ignored) { + // best-effort cleanup + } + } + trackedClients.clear(); + } + + /** + * Create a scalable topic in the current test namespace and return its + * {@code topic://...} name. + */ + protected String newScalableTopic(int numInitialSegments) throws Exception { + String name = "topic://" + getNamespace() + "/scalable-" + + UUID.randomUUID().toString().substring(0, 8); + admin.scalableTopics().createScalableTopic(name, numInitialSegments); + return name; + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MultiBrokerScalableTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MultiBrokerScalableTopicTest.java new file mode 100644 index 0000000000000..b6e747ea634cb --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MultiBrokerScalableTopicTest.java @@ -0,0 +1,439 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api.v5; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; +import java.time.Duration; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import lombok.Cleanup; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition; +import org.apache.pulsar.client.api.v5.schema.Schema; +import org.apache.pulsar.common.naming.TopicName; +import org.awaitility.Awaitility; +import org.testng.annotations.Test; + +/** + * Coverage for V5 scalable topics across a multi-broker cluster. + * + *

Each scalable topic has a single {@link org.apache.pulsar.broker.service.scalable.ScalableTopicController} + * leader; the leader's brokerId is recorded in metadata. With three brokers and many topics + * the leader role spreads naturally across the cluster, so tests need to cope with admin + * operations and consumer sessions that target a non-owning broker. These tests assert that + * the routing layers do their job: + * + *

    + *
  • controller leadership distributes across brokers given enough topics;
  • + *
  • {@code admin.scalableTopics()} mutating calls (split / merge) follow the HTTP 307 + * redirect emitted by non-leader brokers and complete on the leader;
  • + *
  • V5 StreamConsumer attached through a non-owning broker still reaches the right + * controller via the lookup → controller-URL path and receives messages;
  • + *
  • V5 CheckpointConsumer with {@code consumerGroup(...)} (the only Checkpoint mode that + * runs through the controller) does the same.
  • + *
+ */ +public class V5MultiBrokerScalableTopicTest extends V5MultiBrokerClientBaseTest { + + /** + * Smoke: create a scalable topic, produce on broker 0's V5 client, consume on the last + * broker's V5 client. Lookups across brokers must converge on the segment owners. + */ + @Test + public void testProduceConsumeAcrossBrokers() throws Exception { + String topic = newScalableTopic(2); + + @Cleanup + Producer producer = v5Clients.get(0).newProducer(Schema.string()) + .topic(topic) + .create(); + + @Cleanup + QueueConsumer consumer = v5Clients.get(v5Clients.size() - 1) + .newQueueConsumer(Schema.string()) + .topic(topic) + .subscriptionName("smoke-sub") + .subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST) + .subscribe(); + + int n = 30; + Set sent = new HashSet<>(); + for (int i = 0; i < n; i++) { + String v = "v-" + i; + producer.newMessage().key("k-" + i).value(v).send(); + sent.add(v); + } + + Set received = new HashSet<>(); + for (int i = 0; i < n; i++) { + Message msg = consumer.receive(Duration.ofSeconds(5)); + assertNotNull(msg, "missing message #" + i); + received.add(msg.value()); + consumer.acknowledge(msg.id()); + } + assertEquals(received, sent); + } + + /** + * Drive controller materialization through a different broker each iteration so the + * leader-election race naturally lands on different brokers. The first broker to call + * {@code getOrCreateController} for a topic wins (subsequent peers just observe the + * existing leader); cycling the "first" broker spreads leadership across the cluster. + */ + @Test + public void testControllerLeadershipDistributesAcrossBrokers() throws Exception { + int numTopics = brokers.size() * 4; + Set leaders = new HashSet<>(); + for (int i = 0; i < numTopics; i++) { + String topic = newScalableTopic(1); + int firstBroker = i % brokers.size(); + // Force this broker to materialize the controller first → it becomes leader. + brokers.get(firstBroker).getBrokerService().getScalableTopicService() + .getOrCreateController(TopicName.get(topic)) + .get(5, java.util.concurrent.TimeUnit.SECONDS); + String leader = findControllerLeader(topic); + assertNotNull(leader, "controller leader must be elected for " + topic); + leaders.add(leader); + } + assertEquals(leaders.size(), brokers.size(), + "expected controller leadership to spread across every broker, got " + leaders); + } + + /** + * Splitting a scalable topic must succeed when the request hits a non-leader broker: + * the admin layer redirects the call to the controller-leader broker via 307, the admin + * client follows the redirect, and the metadata reflects the split. + */ + @Test + public void testSplitFromNonLeaderBrokerRedirectsToOwner() throws Exception { + String topic = newScalableTopic(1); + + // Force the parent segment topic to load by producing a message — split's + // terminate step requires the segment to exist on its owning broker. + @Cleanup + Producer producer = v5Client.newProducer(Schema.string()) + .topic(topic) + .create(); + producer.newMessage().value("warm-up").send(); + + int leaderIndex = findControllerLeaderIndex(topic); + int nonLeaderIndex = (leaderIndex + 1) % brokers.size(); + + long activeId = singleActiveSegmentId(topic); + // Issue the split through a non-leader broker's admin. Without redirect this would + // fail with a "not the leader" error; with redirect the leader applies the split. + admins.get(nonLeaderIndex).scalableTopics().splitSegment(topic, activeId); + + Awaitility.await().untilAsserted(() -> { + int active = 0; + var meta = admin.scalableTopics().getMetadata(topic); + for (var seg : meta.getSegments().values()) { + if (seg.isActive()) { + active++; + } + } + assertEquals(active, 2, "split must produce 2 active children"); + }); + } + + /** + * Same as {@link #testSplitFromNonLeaderBrokerRedirectsToOwner()}, but for merge: + * prepare a topic with two adjacent active children (via a split), then merge them + * through a non-leader broker's admin and assert the merge completed. + */ + @Test + public void testMergeFromNonLeaderBrokerRedirectsToOwner() throws Exception { + String topic = newScalableTopic(1); + // Warm-up — same reason as split: the parent segment topic must be loaded + // on its owning broker before split/merge can terminate it. + @Cleanup + Producer producer = v5Client.newProducer(Schema.string()) + .topic(topic) + .create(); + producer.newMessage().value("warm-up").send(); + + long parentId = singleActiveSegmentId(topic); + admin.scalableTopics().splitSegment(topic, parentId); + Awaitility.await().untilAsserted(() -> { + int active = 0; + for (var seg : admin.scalableTopics().getMetadata(topic).getSegments().values()) { + if (seg.isActive()) { + active++; + } + } + assertEquals(active, 2); + }); + + var meta = admin.scalableTopics().getMetadata(topic); + long[] activeIds = new long[2]; + int idx = 0; + for (var seg : meta.getSegments().values()) { + if (seg.isActive()) { + activeIds[idx++] = seg.getSegmentId(); + } + } + + int leaderIndex = findControllerLeaderIndex(topic); + int nonLeaderIndex = (leaderIndex + 1) % brokers.size(); + admins.get(nonLeaderIndex).scalableTopics() + .mergeSegments(topic, activeIds[0], activeIds[1]); + + Awaitility.await().untilAsserted(() -> { + int active = 0; + for (var seg : admin.scalableTopics().getMetadata(topic).getSegments().values()) { + if (seg.isActive()) { + active++; + } + } + assertEquals(active, 1, "merge must collapse to a single active segment"); + }); + } + + /** + * A V5 StreamConsumer subscribed via a non-owning broker must still reach the controller + * leader through the DAG-watch lookup → controller-URL path. Two consumers sharing the + * subscription on different brokers should split segments via the controller and together + * deliver every message exactly once. + */ + @Test + public void testStreamConsumerControllerCoordinationAcrossBrokers() throws Exception { + String topic = newScalableTopic(4); + String subscription = "cross-broker-stream"; + + @Cleanup + Producer producer = v5Clients.get(0).newProducer(Schema.string()) + .topic(topic) + .create(); + + int leaderIndex = findControllerLeaderIndex(topic); + int nonLeaderA = (leaderIndex + 1) % brokers.size(); + int nonLeaderB = (leaderIndex + 2) % brokers.size(); + + @Cleanup + StreamConsumer a = v5Clients.get(nonLeaderA).newStreamConsumer(Schema.string()) + .topic(topic) + .subscriptionName(subscription) + .consumerName("alice") + .subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST) + .subscribe(); + @Cleanup + StreamConsumer b = v5Clients.get(nonLeaderB).newStreamConsumer(Schema.string()) + .topic(topic) + .subscriptionName(subscription) + .consumerName("bob") + .subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST) + .subscribe(); + + int n = 80; + Set sent = new HashSet<>(); + for (int i = 0; i < n; i++) { + String v = "v-" + i; + producer.newMessage().key("k-" + i).value(v).send(); + sent.add(v); + } + + Set received = ConcurrentHashMap.newKeySet(); + Set aGot = ConcurrentHashMap.newKeySet(); + Set bGot = ConcurrentHashMap.newKeySet(); + Thread ta = drainStream(a, received, aGot); + Thread tb = drainStream(b, received, bGot); + ta.join(); + tb.join(); + + assertEquals(received, sent, "every message must be delivered exactly once across the group"); + Set overlap = new HashSet<>(aGot); + overlap.retainAll(bGot); + assertTrue(overlap.isEmpty(), "no message should be delivered to both consumers, overlap=" + overlap); + assertTrue(!aGot.isEmpty() && !bGot.isEmpty(), + "controller must split segments across both consumers (a=" + aGot.size() + + " b=" + bGot.size() + ")"); + } + + /** + * V5 CheckpointConsumer with {@code consumerGroup(...)} routes its session through the + * topic controller (same path as StreamConsumer). The same cross-broker test as above: + * two checkpoint consumers in a group, attached via non-owning brokers, must receive + * disjoint subsets that together cover every produced message. + */ + @Test + public void testCheckpointConsumerControllerCoordinationAcrossBrokers() throws Exception { + String topic = newScalableTopic(4); + String group = "cross-broker-checkpoint-group"; + + @Cleanup + Producer producer = v5Clients.get(0).newProducer(Schema.string()) + .topic(topic) + .create(); + + int leaderIndex = findControllerLeaderIndex(topic); + int nonLeaderA = (leaderIndex + 1) % brokers.size(); + int nonLeaderB = (leaderIndex + 2) % brokers.size(); + + @Cleanup + CheckpointConsumer a = v5Clients.get(nonLeaderA) + .newCheckpointConsumer(Schema.string()) + .topic(topic) + .consumerGroup(group) + .startPosition(Checkpoint.earliest()) + .create(); + @Cleanup + CheckpointConsumer b = v5Clients.get(nonLeaderB) + .newCheckpointConsumer(Schema.string()) + .topic(topic) + .consumerGroup(group) + .startPosition(Checkpoint.earliest()) + .create(); + + int n = 80; + Set sent = new HashSet<>(); + for (int i = 0; i < n; i++) { + String v = "v-" + i; + producer.newMessage().key("k-" + i).value(v).send(); + sent.add(v); + } + + Set received = ConcurrentHashMap.newKeySet(); + Set aGot = ConcurrentHashMap.newKeySet(); + Set bGot = ConcurrentHashMap.newKeySet(); + Thread ta = drainCheckpoint(a, received, aGot); + Thread tb = drainCheckpoint(b, received, bGot); + ta.join(); + tb.join(); + + assertEquals(received, sent, "every message must be delivered exactly once across the group"); + Set overlap = new HashSet<>(aGot); + overlap.retainAll(bGot); + assertTrue(overlap.isEmpty(), + "no message should be delivered to both checkpoint consumers, overlap=" + overlap); + assertTrue(!aGot.isEmpty() && !bGot.isEmpty(), + "controller must split segments across both consumers (a=" + aGot.size() + + " b=" + bGot.size() + ")"); + } + + // --- Helpers --- + + /** + * Returns the brokerId of the controller leader for {@code topic}. Forces the controller + * to materialize on every broker (so leader election runs), then waits until every + * broker's metadata store reflects the elected leader. The latter wait is what makes the + * subsequent V5 subscribe deterministic: the DAG-watch lookup from any broker reads the + * controller znode via its own metadata store, and we need that read to return the + * leader URL — not the empty fallback that pushes the client onto a non-leader broker. + */ + private String findControllerLeader(String topic) throws Exception { + TopicName tn = TopicName.get(topic); + // Step 1: force controller materialization + leader election on every broker. + for (PulsarService broker : brokers) { + broker.getBrokerService().getScalableTopicService().getOrCreateController(tn) + .get(5, java.util.concurrent.TimeUnit.SECONDS); + } + // Step 2: wait until each broker's metadata store sees the controller-lock znode. + // Without this, a lookup against a follower can return an empty controller URL — + // the watch hasn't propagated yet — and the client subscribes to the wrong broker. + Awaitility.await().untilAsserted(() -> { + for (PulsarService broker : brokers) { + var resources = broker.getPulsarResources().getScalableTopicResources(); + var optValue = resources.getStore().get(resources.controllerLockPath(tn)) + .get(5, java.util.concurrent.TimeUnit.SECONDS); + assertTrue(optValue.isPresent(), + "broker " + broker.getBrokerId() + + " must see controller lock for " + topic); + } + }); + var controller = brokers.get(0).getBrokerService().getScalableTopicService() + .getOrCreateController(tn).get(); + return controller.getLeaderBrokerId().get().orElseThrow(); + } + + private int findControllerLeaderIndex(String topic) throws Exception { + String leaderBrokerId = findControllerLeader(topic); + for (int i = 0; i < brokers.size(); i++) { + if (brokers.get(i).getBrokerId().equals(leaderBrokerId)) { + return i; + } + } + throw new AssertionError("controller leader '" + leaderBrokerId + + "' does not match any broker in cluster"); + } + + /** + * Returns the segment id of the (single) active segment of {@code topic}. Convenience + * for tests that work on a freshly-created scalable topic with one initial segment. + */ + private long singleActiveSegmentId(String topic) throws Exception { + var meta = admin.scalableTopics().getMetadata(topic); + long active = -1; + int count = 0; + for (var seg : meta.getSegments().values()) { + if (seg.isActive()) { + active = seg.getSegmentId(); + count++; + } + } + assertEquals(count, 1, "expected exactly one active segment"); + assertNotEquals(active, -1L); + return active; + } + + private Thread drainStream(StreamConsumer consumer, Set all, Set mine) { + Thread t = new Thread(() -> { + try { + MessageId last = null; + while (true) { + Message msg = consumer.receive(Duration.ofSeconds(1)); + if (msg == null) { + if (last != null) { + consumer.acknowledgeCumulative(last); + } + return; + } + all.add(msg.value()); + mine.add(msg.value()); + last = msg.id(); + } + } catch (Exception ignored) { + } + }, "stream-consumer-drainer"); + t.start(); + return t; + } + + private Thread drainCheckpoint(CheckpointConsumer consumer, + Set all, Set mine) { + Thread t = new Thread(() -> { + try { + while (true) { + Message msg = consumer.receive(Duration.ofSeconds(1)); + if (msg == null) { + return; + } + all.add(msg.value()); + mine.add(msg.value()); + } + } catch (Exception ignored) { + } + }, "checkpoint-consumer-drainer"); + t.start(); + return t; + } +}