Skip to content

Commit

Permalink
[Refactor] refactor some method to java8. (#10859)
Browse files Browse the repository at this point in the history
* Refactor refactor some method to java8.

* Fixed  checkstyle
  • Loading branch information
mattisonchao committed Jun 8, 2021
1 parent e72a0d8 commit 4989033
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -653,12 +653,9 @@ config, localMetadataStore, getZkClient(), bkClientFactory, ioEventLoopGroup
attributeMap.put(WebService.ATTRIBUTE_PULSAR_NAME, this);
Map<String, Object> vipAttributeMap = Maps.newHashMap();
vipAttributeMap.put(VipStatus.ATTRIBUTE_STATUS_FILE_PATH, this.config.getStatusFilePath());
vipAttributeMap.put(VipStatus.ATTRIBUTE_IS_READY_PROBE, new Supplier<Boolean>() {
@Override
public Boolean get() {
// Ensure the VIP status is only visible when the broker is fully initialized
return state == State.Started;
}
vipAttributeMap.put(VipStatus.ATTRIBUTE_IS_READY_PROBE, (Supplier<Boolean>) () -> {
// Ensure the VIP status is only visible when the broker is fully initialized
return state == State.Started;
});
this.webService.addRestResources("/",
VipStatus.class.getPackage().getName(), false, vipAttributeMap);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import org.apache.pulsar.common.policies.data.RawBookieInfo;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.zookeeper.ZkBookieRackAffinityMapping;
import org.apache.pulsar.zookeeper.ZooKeeperCache.Deserializer;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -67,14 +66,9 @@ public BookiesRackConfiguration getBookiesRackInfo() throws Exception {
validateSuperUserAccess();

return localZkCache().getData(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH,
new Deserializer<BookiesRackConfiguration>() {

@Override
public BookiesRackConfiguration deserialize(String key, byte[] content) throws Exception {
return ObjectMapperFactory.getThreadLocal().readValue(content, BookiesRackConfiguration.class);
}

}).orElse(new BookiesRackConfiguration());
(key, content) ->
ObjectMapperFactory.getThreadLocal().readValue(content, BookiesRackConfiguration.class))
.orElse(new BookiesRackConfiguration());
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2125,32 +2125,28 @@ private void updateDynamicServiceConfiguration() {
}));
// register a listener: it updates field value and triggers appropriate registered field-listener only if
// field's value has been changed so, registered doesn't have to update field value in ServiceConfiguration
dynamicConfigurationCache.registerListener(new ZooKeeperCacheListener<Map<String, String>>() {
@SuppressWarnings("unchecked")
@Override
public void onUpdate(String path, Map<String, String> data, Stat stat) {
if (BROKER_SERVICE_CONFIGURATION_PATH.equalsIgnoreCase(path) && data != null) {
data.forEach((configKey, value) -> {
Field configField = dynamicConfigurationMap.get(configKey).field;
Object newValue = FieldParser.value(data.get(configKey), configField);
if (configField != null) {
Consumer listener = configRegisteredListeners.get(configKey);
try {
Object existingValue = configField.get(pulsar.getConfiguration());
configField.set(pulsar.getConfiguration(), newValue);
log.info("Successfully updated configuration {}/{}", configKey,
data.get(configKey));
if (listener != null && !existingValue.equals(newValue)) {
listener.accept(newValue);
}
} catch (Exception e) {
log.error("Failed to update config {}/{}", configKey, newValue);
dynamicConfigurationCache.registerListener((path, data, stat) -> {
if (BROKER_SERVICE_CONFIGURATION_PATH.equalsIgnoreCase(path) && data != null) {
data.forEach((configKey, value) -> {
Field configField = dynamicConfigurationMap.get(configKey).field;
Object newValue = FieldParser.value(data.get(configKey), configField);
if (configField != null) {
Consumer listener = configRegisteredListeners.get(configKey);
try {
Object existingValue = configField.get(pulsar.getConfiguration());
configField.set(pulsar.getConfiguration(), newValue);
log.info("Successfully updated configuration {}/{}", configKey,
data.get(configKey));
if (listener != null && !existingValue.equals(newValue)) {
listener.accept(newValue);
}
} else {
log.error("Found non-dynamic field in dynamicConfigMap {}/{}", configKey, newValue);
} catch (Exception e) {
log.error("Failed to update config {}/{}", configKey, newValue);
}
});
}
} else {
log.error("Found non-dynamic field in dynamicConfigMap {}/{}", configKey, newValue);
}
});
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Range;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -151,7 +152,7 @@ public synchronized void addConsumer(Consumer consumer) throws BrokerServiceExce
}

consumerList.add(consumer);
consumerList.sort((c1, c2) -> c1.getPriorityLevel() - c2.getPriorityLevel());
consumerList.sort(Comparator.comparingInt(Consumer::getPriorityLevel));
consumerSet.add(consumer);
}

Expand Down

0 comments on commit 4989033

Please sign in to comment.