Skip to content

Commit e4c4a35

Browse files
BewareMyPowersrinath-ctds
authored andcommitted
[improve][broker] Replace isServiceUnitActiveAsync with checkTopicNsOwnership (apache#24780)
(cherry picked from commit 46a76e9) (cherry picked from commit 05d88f7)
1 parent cd6d4a5 commit e4c4a35

File tree

7 files changed

+28
-76
lines changed

7 files changed

+28
-76
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java

Lines changed: 0 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,7 @@
4343
import java.util.concurrent.CompletableFuture;
4444
import java.util.concurrent.ConcurrentHashMap;
4545
import java.util.concurrent.CopyOnWriteArrayList;
46-
import java.util.concurrent.ExecutionException;
4746
import java.util.concurrent.TimeUnit;
48-
import java.util.concurrent.TimeoutException;
4947
import java.util.concurrent.atomic.AtomicInteger;
5048
import java.util.concurrent.atomic.AtomicReference;
5149
import java.util.function.Function;
@@ -1225,35 +1223,6 @@ public CompletableFuture<Boolean> isServiceUnitOwnedAsync(ServiceUnitId suName)
12251223
new IllegalArgumentException("Invalid class of NamespaceBundle: " + suName.getClass().getName()));
12261224
}
12271225

1228-
/**
1229-
* @deprecated This method is only used in test now.
1230-
*/
1231-
@Deprecated
1232-
public boolean isServiceUnitActive(TopicName topicName) {
1233-
try {
1234-
return isServiceUnitActiveAsync(topicName).get(pulsar.getConfig()
1235-
.getMetadataStoreOperationTimeoutSeconds(), SECONDS);
1236-
} catch (InterruptedException | ExecutionException | TimeoutException e) {
1237-
LOG.warn("Unable to find OwnedBundle for topic in time - [{}]", topicName, e);
1238-
throw new RuntimeException(e);
1239-
}
1240-
}
1241-
1242-
public CompletableFuture<Boolean> isServiceUnitActiveAsync(TopicName topicName) {
1243-
// TODO: Add unit tests cover it.
1244-
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
1245-
return getBundleAsync(topicName)
1246-
.thenCompose(bundle -> loadManager.get().checkOwnershipAsync(Optional.of(topicName), bundle));
1247-
}
1248-
return getBundleAsync(topicName).thenCompose(bundle -> {
1249-
Optional<CompletableFuture<OwnedBundle>> optionalFuture = ownershipCache.getOwnedBundleAsync(bundle);
1250-
if (optionalFuture.isEmpty()) {
1251-
return CompletableFuture.completedFuture(false);
1252-
}
1253-
return optionalFuture.get().thenApply(ob -> ob != null && ob.isActive());
1254-
});
1255-
}
1256-
12571226
private CompletableFuture<Boolean> isNamespaceOwnedAsync(NamespaceName fqnn) {
12581227
// TODO: Add unit tests cover it.
12591228
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java

Lines changed: 23 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1704,38 +1704,29 @@ private void checkOwnershipAndCreatePersistentTopic(final String topic, boolean
17041704
CompletableFuture<Optional<Topic>> topicFuture,
17051705
Map<String, String> properties) {
17061706
TopicName topicName = TopicName.get(topic);
1707-
pulsar.getNamespaceService().isServiceUnitActiveAsync(topicName)
1708-
.thenAccept(isActive -> {
1709-
if (isActive) {
1710-
CompletableFuture<Map<String, String>> propertiesFuture;
1711-
if (properties == null) {
1712-
//Read properties from storage when loading topic.
1713-
propertiesFuture = fetchTopicPropertiesAsync(topicName);
1714-
} else {
1715-
propertiesFuture = CompletableFuture.completedFuture(properties);
1716-
}
1717-
propertiesFuture.thenAccept(finalProperties ->
1718-
//TODO add topicName in properties?
1719-
createPersistentTopic0(topic, createIfMissing, topicFuture,
1720-
finalProperties)
1721-
).exceptionally(throwable -> {
1722-
log.warn("[{}] Read topic property failed", topic, throwable);
1723-
pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
1724-
topicFuture.completeExceptionally(throwable);
1725-
return null;
1726-
});
1727-
} else {
1728-
// namespace is being unloaded
1729-
String msg = String.format("Namespace is being unloaded, cannot add topic %s", topic);
1730-
log.warn(msg);
1731-
pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
1732-
topicFuture.completeExceptionally(new ServiceUnitNotReadyException(msg));
1733-
}
1734-
}).exceptionally(ex -> {
1735-
pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
1736-
topicFuture.completeExceptionally(ex);
1737-
return null;
1738-
});
1707+
checkTopicNsOwnership(topic).thenRun(() -> {
1708+
CompletableFuture<Map<String, String>> propertiesFuture;
1709+
if (properties == null) {
1710+
//Read properties from storage when loading topic.
1711+
propertiesFuture = fetchTopicPropertiesAsync(topicName);
1712+
} else {
1713+
propertiesFuture = CompletableFuture.completedFuture(properties);
1714+
}
1715+
propertiesFuture.thenAccept(finalProperties ->
1716+
//TODO add topicName in properties?
1717+
createPersistentTopic0(topic, createIfMissing, topicFuture,
1718+
finalProperties)
1719+
).exceptionally(throwable -> {
1720+
log.warn("[{}] Read topic property failed", topic, throwable);
1721+
pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
1722+
topicFuture.completeExceptionally(throwable);
1723+
return null;
1724+
});
1725+
}).exceptionally(e -> {
1726+
pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
1727+
topicFuture.completeExceptionally(e.getCause());
1728+
return null;
1729+
});
17391730
}
17401731

17411732
@VisibleForTesting

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,6 @@
7979
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
8080
import org.apache.pulsar.common.api.proto.ProtocolVersion;
8181
import org.apache.pulsar.common.naming.NamespaceBundle;
82-
import org.apache.pulsar.common.naming.TopicName;
8382
import org.apache.pulsar.common.util.netty.EventLoopUtil;
8483
import org.awaitility.Awaitility;
8584
import org.slf4j.Logger;
@@ -162,7 +161,6 @@ public void setup() throws Exception {
162161

163162
NamespaceService nsSvc = pulsarTestContext.getPulsarService().getNamespaceService();
164163
doReturn(true).when(nsSvc).isServiceUnitOwned(any(NamespaceBundle.class));
165-
doReturn(true).when(nsSvc).isServiceUnitActive(any(TopicName.class));
166164
doReturn(CompletableFuture.completedFuture(mock(NamespaceBundle.class))).when(nsSvc).getBundleAsync(any());
167165
doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).checkBundleOwnership(any(), any());
168166

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@
5151
import org.apache.pulsar.common.api.proto.CommandSubscribe;
5252
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
5353
import org.apache.pulsar.common.naming.NamespaceBundle;
54-
import org.apache.pulsar.common.naming.TopicName;
5554
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
5655
import org.slf4j.Logger;
5756
import org.slf4j.LoggerFactory;
@@ -103,7 +102,6 @@ public void setup(Method m) throws Exception {
103102
NamespaceService nsSvc = mock(NamespaceService.class);
104103
doReturn(nsSvc).when(pulsar).getNamespaceService();
105104
doReturn(true).when(nsSvc).isServiceUnitOwned(any(NamespaceBundle.class));
106-
doReturn(true).when(nsSvc).isServiceUnitActive(any(TopicName.class));
107105
doReturn(CompletableFuture.completedFuture(mock(NamespaceBundle.class))).when(nsSvc).getBundleAsync(any());
108106
doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).checkBundleOwnership(any(), any());
109107

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -225,8 +225,6 @@ public void setup() throws Exception {
225225
NamespaceBundle bundle = mock(NamespaceBundle.class);
226226
doReturn(CompletableFuture.completedFuture(bundle)).when(nsSvc).getBundleAsync(any());
227227
doReturn(true).when(nsSvc).isServiceUnitOwned(any());
228-
doReturn(true).when(nsSvc).isServiceUnitActive(any());
229-
doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).isServiceUnitActiveAsync(any());
230228
doReturn(CompletableFuture.completedFuture(mock(NamespaceBundle.class))).when(nsSvc).getBundleAsync(any());
231229
doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).checkBundleOwnership(any(), any());
232230

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -231,8 +231,6 @@ public void setup() throws Exception {
231231
.getBundleAsync(any());
232232
doReturn(CompletableFuture.completedFuture(true)).when(namespaceService).checkBundleOwnership(any(), any());
233233
doReturn(true).when(namespaceService).isServiceUnitOwned(any());
234-
doReturn(true).when(namespaceService).isServiceUnitActive(any());
235-
doReturn(CompletableFuture.completedFuture(true)).when(namespaceService).isServiceUnitActiveAsync(any());
236234
doReturn(CompletableFuture.completedFuture(topics)).when(namespaceService).getListOfTopics(
237235
NamespaceName.get("use", "ns-abc"), CommandGetTopicsOfNamespace.Mode.ALL);
238236
doReturn(CompletableFuture.completedFuture(topics)).when(namespaceService).getListOfUserTopics(
@@ -1601,8 +1599,8 @@ public void testProducerOnNotOwnedTopic() throws Exception {
16011599
setChannelConnected();
16021600

16031601
// Force the case where the broker doesn't own any topic
1604-
doReturn(CompletableFuture.completedFuture(false)).when(namespaceService)
1605-
.isServiceUnitActiveAsync(any(TopicName.class));
1602+
doReturn(CompletableFuture.failedFuture(new ServiceUnitNotReadyException("failed"))).when(brokerService)
1603+
.checkTopicNsOwnership(any(String.class));
16061604

16071605
// test PRODUCER failure case
16081606
ByteBuf clientCommand = Commands.newProducer(nonOwnedTopicName, 1 /* producer id */, 1 /* request id */,

pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,7 @@ public void testCheckOwnerShipFails(boolean injectTimeout) throws Exception {
245245
admin.topics().createNonPartitionedTopic(tpName);
246246
admin.namespaces().unload(ns);
247247

248-
// Inject an error when calling "NamespaceService.isServiceUnitActiveAsync".
248+
// Inject an error when loading the topic
249249
AtomicInteger failedTimes = new AtomicInteger();
250250
NamespaceService namespaceService = pulsar.getNamespaceService();
251251
doAnswer(invocation -> {
@@ -258,7 +258,7 @@ public void testCheckOwnerShipFails(boolean injectTimeout) throws Exception {
258258
return CompletableFuture.failedFuture(new RuntimeException("mocked error"));
259259
}
260260
return invocation.callRealMethod();
261-
}).when(namespaceService).isServiceUnitActiveAsync(any(TopicName.class));
261+
}).when(namespaceService).checkBundleOwnership(any(TopicName.class), any());
262262

263263
// Verify: the consumer can create successfully eventually.
264264
Consumer consumer = pulsarClient.newConsumer().topic(tpName).subscriptionName("s1").subscribe();
@@ -295,7 +295,7 @@ public void testTopicLoadAndDeleteAtTheSameTime(boolean injectTimeout) throws Ex
295295
pulsar.getDefaultManagedLedgerFactory().delete(TopicName.get(tpName).getPersistenceNamingEncoding());
296296
}
297297
return invocation.callRealMethod();
298-
}).when(namespaceService).isServiceUnitActiveAsync(any(TopicName.class));
298+
}).when(namespaceService).checkBundleOwnership(any(TopicName.class), any());
299299

300300
// Verify: the consumer create failed due to pulsar does not allow to create topic automatically.
301301
try {

0 commit comments

Comments
 (0)