diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index be85f545e62ba..991b8940292dd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -653,12 +653,9 @@ config, localMetadataStore, getZkClient(), bkClientFactory, ioEventLoopGroup attributeMap.put(WebService.ATTRIBUTE_PULSAR_NAME, this); Map vipAttributeMap = Maps.newHashMap(); vipAttributeMap.put(VipStatus.ATTRIBUTE_STATUS_FILE_PATH, this.config.getStatusFilePath()); - vipAttributeMap.put(VipStatus.ATTRIBUTE_IS_READY_PROBE, new Supplier() { - @Override - public Boolean get() { - // Ensure the VIP status is only visible when the broker is fully initialized - return state == State.Started; - } + vipAttributeMap.put(VipStatus.ATTRIBUTE_IS_READY_PROBE, (Supplier) () -> { + // Ensure the VIP status is only visible when the broker is fully initialized + return state == State.Started; }); this.webService.addRestResources("/", VipStatus.class.getPackage().getName(), false, vipAttributeMap); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Bookies.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Bookies.java index 8eae28205b97f..57e5e47435e3a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Bookies.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Bookies.java @@ -48,7 +48,6 @@ import org.apache.pulsar.common.policies.data.RawBookieInfo; import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.zookeeper.ZkBookieRackAffinityMapping; -import org.apache.pulsar.zookeeper.ZooKeeperCache.Deserializer; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,14 +66,9 @@ public BookiesRackConfiguration getBookiesRackInfo() throws Exception { validateSuperUserAccess(); return localZkCache().getData(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, - new Deserializer() { - - @Override - public BookiesRackConfiguration deserialize(String key, byte[] content) throws Exception { - return ObjectMapperFactory.getThreadLocal().readValue(content, BookiesRackConfiguration.class); - } - - }).orElse(new BookiesRackConfiguration()); + (key, content) -> + ObjectMapperFactory.getThreadLocal().readValue(content, BookiesRackConfiguration.class)) + .orElse(new BookiesRackConfiguration()); } @GET diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index f14d182c34f6e..2bc1d9cade06e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -2125,32 +2125,28 @@ private void updateDynamicServiceConfiguration() { })); // register a listener: it updates field value and triggers appropriate registered field-listener only if // field's value has been changed so, registered doesn't have to update field value in ServiceConfiguration - dynamicConfigurationCache.registerListener(new ZooKeeperCacheListener>() { - @SuppressWarnings("unchecked") - @Override - public void onUpdate(String path, Map data, Stat stat) { - if (BROKER_SERVICE_CONFIGURATION_PATH.equalsIgnoreCase(path) && data != null) { - data.forEach((configKey, value) -> { - Field configField = dynamicConfigurationMap.get(configKey).field; - Object newValue = FieldParser.value(data.get(configKey), configField); - if (configField != null) { - Consumer listener = configRegisteredListeners.get(configKey); - try { - Object existingValue = configField.get(pulsar.getConfiguration()); - configField.set(pulsar.getConfiguration(), newValue); - log.info("Successfully updated configuration {}/{}", configKey, - data.get(configKey)); - if (listener != null && !existingValue.equals(newValue)) { - listener.accept(newValue); - } - } catch (Exception e) { - log.error("Failed to update config {}/{}", configKey, newValue); + dynamicConfigurationCache.registerListener((path, data, stat) -> { + if (BROKER_SERVICE_CONFIGURATION_PATH.equalsIgnoreCase(path) && data != null) { + data.forEach((configKey, value) -> { + Field configField = dynamicConfigurationMap.get(configKey).field; + Object newValue = FieldParser.value(data.get(configKey), configField); + if (configField != null) { + Consumer listener = configRegisteredListeners.get(configKey); + try { + Object existingValue = configField.get(pulsar.getConfiguration()); + configField.set(pulsar.getConfiguration(), newValue); + log.info("Successfully updated configuration {}/{}", configKey, + data.get(configKey)); + if (listener != null && !existingValue.equals(newValue)) { + listener.accept(newValue); } - } else { - log.error("Found non-dynamic field in dynamicConfigMap {}/{}", configKey, newValue); + } catch (Exception e) { + log.error("Failed to update config {}/{}", configKey, newValue); } - }); - } + } else { + log.error("Found non-dynamic field in dynamicConfigMap {}/{}", configKey, newValue); + } + }); } }); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 3426355f8de79..90ad1e1e22fe2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -23,6 +23,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Range; import java.util.Collections; +import java.util.Comparator; import java.util.List; import java.util.Optional; import java.util.Set; @@ -151,7 +152,7 @@ public synchronized void addConsumer(Consumer consumer) throws BrokerServiceExce } consumerList.add(consumer); - consumerList.sort((c1, c2) -> c1.getPriorityLevel() - c2.getPriorityLevel()); + consumerList.sort(Comparator.comparingInt(Consumer::getPriorityLevel)); consumerSet.add(consumer); }