Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pulsar-broker/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ dependencies {
testImplementation(libs.jetty.ee8.proxy)
testImplementation(libs.jetty.websocket.jetty.client)
testImplementation(libs.opentelemetry.sdk.testing)
testImplementation(libs.oxia.testcontainers)
testRuntimeOnly(libs.avro.protobuf) {
exclude(group = "com.google.protobuf")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,16 @@ private ScalableTopicDAG buildDagProto(ScalableTopicLayoutResponse response) {
}
}

// Propagate the controller-broker URL so V5 clients can connect to the right broker
// for scalable-topic subscribe. Without this the client falls back to its configured
// service URL, which on a multi-broker cluster is rarely the controller leader.
if (response.controllerBrokerUrl() != null) {
dag.setControllerBrokerUrl(response.controllerBrokerUrl());
}
if (response.controllerBrokerUrlTls() != null) {
dag.setControllerBrokerUrlTls(response.controllerBrokerUrlTls());
}

return dag;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,8 @@
import io.github.merlimat.slog.Logger;
import java.time.Duration;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import lombok.Getter;
Expand Down Expand Up @@ -233,9 +231,11 @@ public CompletableFuture<SegmentLayout> splitSegment(long segmentId) {
SegmentInfo parent = currentLayout.getAllSegments().get(segmentId);
String parentTopicName = toSegmentPersistentName(parent);

// Step 1: Discover subscriptions on the parent segment, then create child
// segment topics with those subscriptions (routed to owning brokers via admin API)
return discoverSubscriptions(parentTopicName)
// Step 1: Read the scalable topic's subscriptions from metadata (the single source
// of truth — segment topics may live on different brokers, but the subscription set
// is tracked here), then create child segment topics with those subscriptions
// already provisioned (the create call routes to each segment's owning broker).
return resources.listSubscriptionsAsync(topicName)
.thenCompose(parentSubs -> {
var subList = new java.util.ArrayList<>(parentSubs);
return createSegmentTopic(child1, subList)
Expand Down Expand Up @@ -277,13 +277,10 @@ public CompletableFuture<SegmentLayout> mergeSegments(long segmentId1, long segm
String parent1Topic = toSegmentPersistentName(parent1);
String parent2Topic = toSegmentPersistentName(parent2);

// Step 1: Discover subscriptions from both parents (union), then create merged segment
return discoverSubscriptions(parent1Topic)
.thenCombine(discoverSubscriptions(parent2Topic), (subs1, subs2) -> {
Set<String> allSubs = new LinkedHashSet<>(subs1);
allSubs.addAll(subs2);
return allSubs;
})
// Step 1: Read the scalable topic's subscriptions from metadata (single source of
// truth, see splitSegment), then create the merged segment topic with those
// subscriptions provisioned.
return resources.listSubscriptionsAsync(topicName)
.thenCompose(parentSubs -> createSegmentTopic(merged, new java.util.ArrayList<>(parentSubs)))

// Step 2: Terminate both parent segment topics
Expand Down Expand Up @@ -582,29 +579,6 @@ private CompletableFuture<Void> createSegmentTopic(SegmentInfo segment, java.uti
}
}

/**
* Discover all subscription names on a segment topic. Works whether the topic is
* on this broker or a remote one by using the admin client.
*/
private CompletableFuture<Set<String>> discoverSubscriptions(String segmentTopicName) {
// Try local first (avoids RPC if the segment is on this broker)
return brokerService.getTopicIfExists(segmentTopicName)
.thenCompose(optTopic -> {
if (optTopic.isPresent()) {
return CompletableFuture.completedFuture(
new LinkedHashSet<>(optTopic.get().getSubscriptions().keySet()));
}
// Topic is on a remote broker — use admin client
try {
return brokerService.getPulsar().getAdminClient()
.topics().getSubscriptionsAsync(segmentTopicName)
.thenApply(LinkedHashSet::new);
} catch (PulsarServerException e) {
return CompletableFuture.failedFuture(e);
}
});
}

private CompletableFuture<Void> notifySubscriptions(SegmentLayout layout) {
CompletableFuture<?>[] futures = subscriptions.values().stream()
.map(coordinator -> coordinator.onLayoutChange(layout))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import lombok.CustomLog;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;

/**
* Base class for tests that need a shared multi-broker cluster across test classes.
*
* <p>Companion to {@link SharedPulsarBaseTest}. Use this when a test specifically depends on
* behavior that only manifests across brokers — namespace ownership transfer, controller-leader
* failover, segment placement on different brokers, V5 client reconnect to a different broker.
* For everything else, prefer the single-broker {@link SharedPulsarBaseTest}: it's faster, has
* fewer moving parts, and is sufficient for most coverage.
*
* <p>Each test method gets a fresh namespace under {@link SharedMultiBrokerPulsarCluster#TENANT_NAME}
* (created in {@link #setupSharedMultiBrokerTest()} and force-deleted in
* {@link #cleanupSharedMultiBrokerTest()}). The cluster itself is JVM-wide and reused across
* every test class that extends this — see {@link SharedMultiBrokerPulsarCluster}.
*
* <p>Subclasses get:
* <ul>
* <li>{@link #admin} / {@link #pulsarClient} — handles aimed at broker 0; lookups against any
* broker correctly redirect to topic owners, so most tests should just use these.</li>
* <li>{@link #brokers} / {@link #admins} / {@link #clients} — full per-broker lists, in start
* order, for tests that need to address a specific broker (e.g. asserting topic
* ownership, killing a specific broker).</li>
* <li>{@link #newTopicName()} — generates a unique topic in the test namespace.</li>
* </ul>
*/
@CustomLog
public abstract class SharedMultiBrokerPulsarBaseTest {

/** All brokers in the shared cluster, in start order. */
protected List<PulsarService> brokers;
/** Per-broker admin handles, aligned with {@link #brokers}. */
protected List<PulsarAdmin> admins;
/** Per-broker client handles, aligned with {@link #brokers}. */
protected List<PulsarClient> clients;

/** Convenience: broker 0's admin. */
protected PulsarAdmin admin;
/** Convenience: broker 0's client. */
protected PulsarClient pulsarClient;

private final List<String> namespaces = new ArrayList<>();

/** Returns the unique namespace assigned to the current test method. */
protected String getNamespace() {
return namespaces.get(0);
}

/** Returns the broker service URL for broker {@code index}. */
protected String getBrokerServiceUrl(int index) {
return brokers.get(index).getBrokerServiceUrl();
}

/** Returns the web service URL for broker {@code index}. */
protected String getWebServiceUrl(int index) {
return brokers.get(index).getWebServiceAddress();
}

/**
* Creates a new {@link PulsarClient} connected to broker {@code index}. Callers are
* responsible for closing the returned client.
*/
protected PulsarClient newPulsarClient(int index) throws PulsarClientException {
return PulsarClient.builder().serviceUrl(brokers.get(index).getBrokerServiceUrl()).build();
}

/**
* Initializes (lazily) the shared cluster singleton and wires the per-class fields. Called
* once per test class.
*/
@BeforeClass(alwaysRun = true)
public void setupSharedMultiBrokerCluster() throws Exception {
SharedMultiBrokerPulsarCluster cluster = SharedMultiBrokerPulsarCluster.get();
brokers = cluster.getBrokers();
admins = cluster.getAdmins();
clients = cluster.getClients();
admin = cluster.getAdmin();
pulsarClient = cluster.getClient();
}

/** Creates a unique namespace for the current test method. */
@BeforeMethod(alwaysRun = true)
public void setupSharedMultiBrokerTest() throws Exception {
createNewNamespace();
}

/** Force-deletes all namespaces created during the test method. */
@AfterMethod(alwaysRun = true)
public void cleanupSharedMultiBrokerTest() throws Exception {
for (String ns : namespaces) {
try {
admin.namespaces().deleteNamespace(ns, true);
log.info().attr("testNamespace", ns).log("Deleted test namespace");
} catch (Exception e) {
log.warn().attr("deleteNamespace", ns).exceptionMessage(e).log("Failed to delete namespace");
}
}
namespaces.clear();
}

/** Creates a new namespace under the shared tenant and registers it for cleanup. */
protected String createNewNamespace() throws Exception {
String nsName = "test-" + UUID.randomUUID().toString().substring(0, 8);
String ns = SharedMultiBrokerPulsarCluster.TENANT_NAME + "/" + nsName;
admin.namespaces().createNamespace(ns, Set.of(SharedMultiBrokerPulsarCluster.CLUSTER_NAME));
namespaces.add(ns);
log.info().attr("testNamespace", ns).log("Created test namespace");
return ns;
}

/** Generates a unique persistent topic name within the current test namespace. */
protected String newTopicName() {
return "persistent://" + getNamespace() + "/topic-" + UUID.randomUUID().toString().substring(0, 8);
}
}
Loading
Loading