Skip to content

Commit

Permalink
[improve][broker] Add custom dynamic configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
nodece committed Aug 17, 2023
1 parent 0cb1c78 commit 1d99f18
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CancellationException;
Expand Down Expand Up @@ -2407,22 +2408,26 @@ private void handleDynamicConfigurationUpdates() {
return;
}
Field configField = configFieldWrapper.field;
Object newValue = FieldParser.value(data.get(configKey), configField);
if (configField != null) {
Consumer listener = configRegisteredListeners.get(configKey);
try {
Object existingValue = configField.get(pulsar.getConfiguration());
Consumer listener = configRegisteredListeners.get(configKey);
Object newValue = null;
Object existingValue;

try {
if (configField != null) {
newValue = FieldParser.value(data.get(configKey), configField);
existingValue = configField.get(pulsar.getConfiguration());
configField.set(pulsar.getConfiguration(), newValue);
log.info("Successfully updated configuration {}/{}", configKey,
data.get(configKey));
if (listener != null && !existingValue.equals(newValue)) {
listener.accept(newValue);
}
} catch (Exception e) {
log.error("Failed to update config {}/{}", configKey, newValue);
} else {
newValue = value;
existingValue = configFieldWrapper.customValue;
configFieldWrapper.customValue = String.valueOf(newValue);
}
} else {
log.error("Found non-dynamic field in dynamicConfigMap {}/{}", configKey, newValue);
log.info("Successfully updated configuration {}/{}", configKey, data.get(configKey));
if (listener != null && !Objects.equals(existingValue, newValue)) {
listener.accept(newValue);
}
} catch (Exception e) {
log.error("Failed to update config {}/{}", configKey, newValue);
}
});
});
Expand Down Expand Up @@ -2850,20 +2855,27 @@ public <T> void registerConfigurationListener(String configKey, Consumer<T> list

private void addDynamicConfigValidator(String key, Predicate<String> validator) {
validateConfigKey(key);
if (dynamicConfigurationMap.containsKey(key)) {
dynamicConfigurationMap.get(key).validator = validator;
}
dynamicConfigurationMap.get(key).validator = validator;
}

private void validateConfigKey(String key) {
try {
ServiceConfiguration.class.getDeclaredField(key);
} catch (Exception e) {
log.error("ServiceConfiguration key {} not found {}", key, e.getMessage());
throw new IllegalArgumentException("Invalid service config " + key, e);
if (!dynamicConfigurationMap.containsKey(key)) {
throw new IllegalArgumentException(key + " doesn't exits in the dynamicConfigurationMap");
}
}

/**
* Allows the third-party plugin to register a custom dynamic configuration.
*/
public void registerCustomDynamicConfiguration(String key, Predicate<String> validator) {
if (dynamicConfigurationMap.containsKey(key)) {
throw new IllegalArgumentException(key + " already exists in the dynamicConfigurationMap");
}
ConfigField configField = ConfigField.newCustomConfigField(null);
configField.validator = validator;
dynamicConfigurationMap.put(key, configField);
}

private void createDynamicConfigPathIfNotExist() {
try {
Optional<Map<String, String>> configCache =
Expand Down Expand Up @@ -3227,13 +3239,24 @@ public void unblockDispatchersOnUnAckMessages(List<PersistentDispatcherMultipleC
}

private static class ConfigField {
// field holds the pulsar dynamic configuration.
final Field field;

// customValue holds the external dynamic configuration.
String customValue;

Predicate<String> validator;

public ConfigField(Field field) {
super();
this.field = field;
}

public static ConfigField newCustomConfigField(String customValue) {
ConfigField configField = new ConfigField(null);
configField.customValue = customValue;
return configField;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@
*/
package org.apache.pulsar.broker.admin;

import static org.assertj.core.api.Assertions.assertThat;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.fail;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import javax.ws.rs.core.Response;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
Expand All @@ -46,18 +49,18 @@ public void cleanup() throws Exception {
}

@Test
public void TestGetAllDynamicConfigurations() throws Exception {
public void testGetAllDynamicConfigurations() throws Exception {
Map<String,String> configs = admin.brokers().getAllDynamicConfigurations();
assertNotNull(configs);
}

@Test
public void TestDeleteDynamicConfiguration() throws Exception {
public void testDeleteDynamicConfiguration() throws Exception {
admin.brokers().deleteDynamicConfiguration("dispatcherMinReadBatchSize");
}

@Test
public void TestDeleteInvalidDynamicConfiguration() {
public void testDeleteInvalidDynamicConfiguration() {
try {
admin.brokers().deleteDynamicConfiguration("errorName");
fail("exception should be thrown");
Expand All @@ -69,4 +72,29 @@ public void TestDeleteInvalidDynamicConfiguration() {
}
}
}

@Test
public void testAddCustomDynamicConfiguration() throws PulsarAdminException, InterruptedException {
String key = "my-broker-config-key-1";

// register
pulsar.getBrokerService().registerCustomDynamicConfiguration(key, null);
Map<String, String> allDynamicConfigurations = admin.brokers().getAllDynamicConfigurations();
assertThat(allDynamicConfigurations).doesNotContainKey(key);

// update with listener
AtomicReference<String> changeValue = new AtomicReference<>(null);
Consumer<String> stringConsumer = changeValue::set;
pulsar.getBrokerService().registerConfigurationListener(key, stringConsumer);
String newValue = "my-broker-config-value-1";
admin.brokers().updateDynamicConfiguration(key, newValue);
allDynamicConfigurations = admin.brokers().getAllDynamicConfigurations();
assertThat(allDynamicConfigurations).containsKey(key).containsValue(newValue);
assertThat(changeValue.get()).isEqualTo(newValue);

// delete
admin.brokers().deleteDynamicConfiguration(key);
allDynamicConfigurations = admin.brokers().getAllDynamicConfigurations();
assertThat(allDynamicConfigurations).doesNotContainKey(key);
}
}

0 comments on commit 1d99f18

Please sign in to comment.