Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] PIP-300: Add custom dynamic configuration for plugins #20884

Merged
merged 9 commits into from
Dec 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 4 additions & 7 deletions pip/pip-300.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ the `pulsar-admin` to change the values of customized configuration, but the Pul

## In Scope

The goal of this PIP is to allow the `pulsar-admin` to update the values of customized configuration.
The goal of this PIP is to allow the `pulsar-admin` to update the values of customized configuration, and also use the
listener to listen the customized configuration changes
by `org.apache.pulsar.broker.service.BrokerService.registerConfigurationListener`.

# Detailed Design

Expand All @@ -31,12 +33,7 @@ public void registerCustomDynamicConfiguration(String key, Predicate<String> val
if (dynamicConfigurationMap.containsKey(key)) {
throw new IllegalArgumentException(key + " already exists in the dynamicConfigurationMap");
}
try {
ServiceConfiguration.class.getDeclaredField(key);
throw new IllegalArgumentException("Only register custom configuration");
} catch (Exception ignored) { {

}

ConfigField configField = ConfigField.newCustomConfigField(null);
configField.validator = validator;
dynamicConfigurationMap.put(key, configField);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2514,22 +2514,25 @@ private void handleDynamicConfigurationUpdates() {
return;
}
Field configField = configFieldWrapper.field;
Object newValue = FieldParser.value(data.get(configKey), configField);
if (configField != null) {
Consumer listener = configRegisteredListeners.get(configKey);
try {
Object existingValue = configField.get(pulsar.getConfiguration());
Consumer listener = configRegisteredListeners.get(configKey);
try {
final Object existingValue;
final Object newValue;
if (configField != null) {
newValue = FieldParser.value(data.get(configKey), configField);
existingValue = configField.get(pulsar.getConfiguration());
configField.set(pulsar.getConfiguration(), newValue);
log.info("Successfully updated configuration {}/{}", configKey,
data.get(configKey));
if (listener != null && !existingValue.equals(newValue)) {
listener.accept(newValue);
}
} catch (Exception e) {
log.error("Failed to update config {}/{}", configKey, newValue);
} else {
newValue = value;
existingValue = configFieldWrapper.customValue;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if customValue equals null?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The customValue defaults to null, when the config has been changed, this value will be updated, see configFieldWrapper.customValue = String.valueOf(newValue);.

configFieldWrapper.customValue = newValue == null ? null : String.valueOf(newValue);
}
} else {
log.error("Found non-dynamic field in dynamicConfigMap {}/{}", configKey, newValue);
log.info("Successfully updated configuration {}/{}", configKey, data.get(configKey));
if (listener != null && !Objects.equals(existingValue, newValue)) {
listener.accept(newValue);
}
} catch (Exception e) {
log.error("Failed to update config {}", configKey, e);
}
});
});
Expand Down Expand Up @@ -2968,20 +2971,27 @@ public <T> void registerConfigurationListener(String configKey, Consumer<T> list

private void addDynamicConfigValidator(String key, Predicate<String> validator) {
validateConfigKey(key);
if (dynamicConfigurationMap.containsKey(key)) {
dynamicConfigurationMap.get(key).validator = validator;
}
dynamicConfigurationMap.get(key).validator = validator;
}

private void validateConfigKey(String key) {
try {
ServiceConfiguration.class.getDeclaredField(key);
} catch (Exception e) {
log.error("ServiceConfiguration key {} not found {}", key, e.getMessage());
throw new IllegalArgumentException("Invalid service config " + key, e);
if (!dynamicConfigurationMap.containsKey(key)) {
throw new IllegalArgumentException(key + " doesn't exits in the dynamicConfigurationMap");
}
}

/**
* Allows the third-party plugin to register a custom dynamic configuration.
*/
public void registerCustomDynamicConfiguration(String key, Predicate<String> validator) {
if (dynamicConfigurationMap.containsKey(key)) {
throw new IllegalArgumentException(key + " already exists in the dynamicConfigurationMap");
}
ConfigField configField = ConfigField.newCustomConfigField(null);
configField.validator = validator;
dynamicConfigurationMap.put(key, configField);
}

private void createDynamicConfigPathIfNotExist() {
try {
Optional<Map<String, String>> configCache =
Expand Down Expand Up @@ -3356,13 +3366,24 @@ public void unblockDispatchersOnUnAckMessages(List<PersistentDispatcherMultipleC
}

private static class ConfigField {
// field holds the pulsar dynamic configuration.
final Field field;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why you can't reuse this field?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For external dynamic configuration, when using the field, users need to provide the instance to which the field belongs, I think this is complex, so the customValue has been added.

The purpose of this value is to compare with the old configuration value and notify the user.


// customValue holds the external dynamic configuration.
volatile String customValue;

Predicate<String> validator;

public ConfigField(Field field) {
super();
this.field = field;
}

public static ConfigField newCustomConfigField(String customValue) {
ConfigField configField = new ConfigField(null);
configField.customValue = customValue;
return configField;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,18 @@
*/
package org.apache.pulsar.broker.admin;

import static org.assertj.core.api.Assertions.assertThat;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertThrows;
import static org.testng.Assert.fail;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import javax.ws.rs.core.Response;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -69,4 +73,38 @@ public void TestDeleteInvalidDynamicConfiguration() {
}
}
}

@Test
public void testRegisterCustomDynamicConfiguration() throws PulsarAdminException {
String key = "my-broker-config-key-1";
String invalidValue = "invalid-value";

// register
pulsar.getBrokerService().registerCustomDynamicConfiguration(key, value -> !value.equals(invalidValue));
assertThrows(IllegalArgumentException.class,
() -> pulsar.getBrokerService().registerCustomDynamicConfiguration(key, null));
Map<String, String> allDynamicConfigurations = admin.brokers().getAllDynamicConfigurations();
assertThat(allDynamicConfigurations).doesNotContainKey(key);

// update with listener
AtomicReference<String> changeValue = new AtomicReference<>(null);
pulsar.getBrokerService().registerConfigurationListener(key, changeValue::set);
String newValue = "my-broker-config-value-1";
admin.brokers().updateDynamicConfiguration(key, newValue);
BewareMyPower marked this conversation as resolved.
Show resolved Hide resolved
allDynamicConfigurations = admin.brokers().getAllDynamicConfigurations();
assertThat(allDynamicConfigurations.get(key)).isEqualTo(newValue);

Awaitility.await().untilAsserted(() -> {
assertThat(changeValue.get()).isEqualTo(newValue);
});

// update with invalid value
assertThrows(PulsarAdminException.PreconditionFailedException.class,
() -> admin.brokers().updateDynamicConfiguration(key, invalidValue));

// delete
admin.brokers().deleteDynamicConfiguration(key);
allDynamicConfigurations = admin.brokers().getAllDynamicConfigurations();
assertThat(allDynamicConfigurations).doesNotContainKey(key);
}
}
Loading