diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index ba4756647a55f..fbdbd611d50c6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -834,9 +834,7 @@ private void updatePublishDispatcher(Optional optPolicies) { } // attach the resource-group level rate limiters, if set - String rgName = policies != null && policies.resource_group_name != null - ? policies.resource_group_name - : null; + String rgName = policies.resource_group_name; if (rgName != null) { final ResourceGroup resourceGroup = brokerService.getPulsar().getResourceGroupServiceManager().resourceGroupGet(rgName); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java index e61597e2d139f..67cc46d95fada 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java @@ -133,7 +133,7 @@ public boolean tryAcquire(int numbers, long bytes) { } @Override - public void close() throws Exception { + public void close() { rateLimitFunction.apply(); replaceLimiters(null); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiter.java index 397887978b2b5..931f35cfa1bd4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiter.java @@ -71,4 +71,9 @@ public interface PublishRateLimiter extends AutoCloseable { * @param bytes */ boolean tryAcquire(int numbers, long bytes); + + /** + * Close the limiter. + */ + void close(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterDisable.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterDisable.java index 81c4b82317f83..72c8132128e19 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterDisable.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterDisable.java @@ -63,7 +63,7 @@ public boolean tryAcquire(int numbers, long bytes) { } @Override - public void close() throws Exception { + public void close() { // No-op } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java index 0e1200edc31cb..f1646684b82cb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java @@ -110,7 +110,7 @@ public boolean tryAcquire(int numbers, long bytes) { } @Override - public void close() throws Exception { + public void close() { // no-op } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index c6ea96179d6f8..1dbe95fb15a30 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -462,11 +462,7 @@ public CompletableFuture close(boolean closeWithoutWaitingClientDisconnect replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect())); producers.values().forEach(producer -> futures.add(producer.disconnect())); if (topicPublishRateLimiter != null) { - try { - topicPublishRateLimiter.close(); - } catch (Exception e) { - log.warn("Error closing topicPublishRateLimiter for topic {}", topic, e); - } + topicPublishRateLimiter.close(); } subscriptions.forEach((s, sub) -> futures.add(sub.disconnect())); if (this.resourceGroupPublishLimiter != null) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 5505c940af65d..6bcfa36320c39 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1270,11 +1270,7 @@ public CompletableFuture close(boolean closeWithoutWaitingClientDisconnect replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect())); producers.values().forEach(producer -> futures.add(producer.disconnect())); if (topicPublishRateLimiter != null) { - try { - topicPublishRateLimiter.close(); - } catch (Exception e) { - log.warn("Error closing topicPublishRateLimiter for topic {}", topic, e); - } + topicPublishRateLimiter.close(); } subscriptions.forEach((s, sub) -> futures.add(sub.disconnect())); if (this.resourceGroupPublishLimiter != null) {