Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat][io] Support configuration secret interpolation #20901

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions pulsar-functions/instance/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,15 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<environmentVariables>
<TEST_JAVA_INSTANCE_PARSE_ENV_VAR>some-configuration</TEST_JAVA_INSTANCE_PARSE_ENV_VAR>
</environmentVariables>
</configuration>
</plugin>
</plugins>
</build>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -862,11 +862,7 @@ private void setupInput(ContextImpl contextImpl) throws Exception {
Thread.currentThread().setContextClassLoader(this.componentClassLoader);
}
try {
if (sourceSpec.getConfigs().isEmpty()) {
this.source.open(new HashMap<>(), contextImpl);
} else {
this.source.open(parseComponentConfig(sourceSpec.getConfigs()), contextImpl);
}
this.source.open(augmentAndFilterConnectorConfig(sourceSpec.getConfigs()), contextImpl);
if (this.source instanceof PulsarSource) {
contextImpl.setInputConsumers(((PulsarSource) this.source).getInputConsumers());
}
Expand All @@ -877,31 +873,60 @@ private void setupInput(ContextImpl contextImpl) throws Exception {
Thread.currentThread().setContextClassLoader(this.instanceClassLoader);
}
}
private Map<String, Object> parseComponentConfig(String connectorConfigs) throws IOException {
return parseComponentConfig(connectorConfigs, instanceConfig, componentClassLoader, componentType);

/**
* Recursively interpolate configured secrets into the config map by calling
* {@link SecretsProvider#interpolateSecretForValue(String)}.
* @param secretsProvider - the secrets provider that will convert secret's values into config values.
* @param configs - the connector configuration map, which will be mutated.
*/
private static void interpolateSecretsIntoConfigs(SecretsProvider secretsProvider,
Map<String, Object> configs) {
for (Map.Entry<String, Object> entry : configs.entrySet()) {
Object value = entry.getValue();
if (value instanceof String) {
String updatedValue = secretsProvider.interpolateSecretForValue((String) value);
if (updatedValue != null) {
entry.setValue(updatedValue);
}
} else if (value instanceof Map) {
interpolateSecretsIntoConfigs(secretsProvider, (Map<String, Object>) value);
}
}
}

private Map<String, Object> augmentAndFilterConnectorConfig(String connectorConfigs) throws IOException {
return augmentAndFilterConnectorConfig(connectorConfigs, instanceConfig, secretsProvider,
componentClassLoader, componentType);
}

static Map<String, Object> parseComponentConfig(String connectorConfigs,
InstanceConfig instanceConfig,
ClassLoader componentClassLoader,
org.apache.pulsar.functions.proto.Function
static Map<String, Object> augmentAndFilterConnectorConfig(String connectorConfigs,
InstanceConfig instanceConfig,
SecretsProvider secretsProvider,
ClassLoader componentClassLoader,
org.apache.pulsar.functions.proto.Function
.FunctionDetails.ComponentType componentType)
throws IOException {
final Map<String, Object> config = ObjectMapperFactory
final Map<String, Object> config = connectorConfigs.isEmpty() ? new HashMap<>() : ObjectMapperFactory
.getMapper()
.reader()
.forType(new TypeReference<Map<String, Object>>() {})
.readValue(connectorConfigs);
if (componentType != org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SINK
&& componentType != org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SOURCE) {
return config;
}

interpolateSecretsIntoConfigs(secretsProvider, config);

if (instanceConfig.isIgnoreUnknownConfigFields() && componentClassLoader instanceof NarClassLoader) {
final String configClassName;
if (componentType == org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SOURCE) {
configClassName = ConnectorUtils
.getConnectorDefinition((NarClassLoader) componentClassLoader).getSourceConfigClass();
} else if (componentType == org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SINK) {
} else {
configClassName = ConnectorUtils
.getConnectorDefinition((NarClassLoader) componentClassLoader).getSinkConfigClass();
} else {
return config;
}
if (configClassName != null) {

Expand Down Expand Up @@ -1014,19 +1039,11 @@ private void setupOutput(ContextImpl contextImpl) throws Exception {
Thread.currentThread().setContextClassLoader(this.componentClassLoader);
}
try {
if (sinkSpec.getConfigs().isEmpty()) {
if (log.isDebugEnabled()) {
log.debug("Opening Sink with empty hashmap with contextImpl: {} ", contextImpl.toString());
}
this.sink.open(new HashMap<>(), contextImpl);
} else {
if (log.isDebugEnabled()) {
log.debug("Opening Sink with SinkSpec {} and contextImpl: {} ", sinkSpec,
contextImpl.toString());
}
final Map<String, Object> config = parseComponentConfig(sinkSpec.getConfigs());
this.sink.open(config, contextImpl);
if (log.isDebugEnabled()) {
log.debug("Opening Sink with SinkSpec {} and contextImpl: {} ", sinkSpec.getConfigs(),
contextImpl.toString());
}
this.sink.open(augmentAndFilterConnectorConfig(sinkSpec.getConfigs()), contextImpl);
} catch (Exception e) {
log.error("Sink open produced uncaught exception: ", e);
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.proto.Function.SinkSpec;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.secretsprovider.EnvironmentBasedSecretsProvider;
import org.jetbrains.annotations.NotNull;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
Expand Down Expand Up @@ -191,9 +192,10 @@ public void testStatsManagerNull() throws Exception {

@Test
public void testSinkConfigParsingPreservesOriginalType() throws Exception {
final Map<String, Object> parsedConfig = JavaInstanceRunnable.parseComponentConfig(
final Map<String, Object> parsedConfig = JavaInstanceRunnable.augmentAndFilterConnectorConfig(
"{\"ttl\": 9223372036854775807}",
new InstanceConfig(),
new EnvironmentBasedSecretsProvider(),
null,
FunctionDetails.ComponentType.SINK
);
Expand All @@ -203,16 +205,69 @@ public void testSinkConfigParsingPreservesOriginalType() throws Exception {

@Test
public void testSourceConfigParsingPreservesOriginalType() throws Exception {
final Map<String, Object> parsedConfig = JavaInstanceRunnable.parseComponentConfig(
final Map<String, Object> parsedConfig = JavaInstanceRunnable.augmentAndFilterConnectorConfig(
"{\"ttl\": 9223372036854775807}",
new InstanceConfig(),
new EnvironmentBasedSecretsProvider(),
null,
FunctionDetails.ComponentType.SOURCE
);
Assert.assertEquals(parsedConfig.get("ttl").getClass(), Long.class);
Assert.assertEquals(parsedConfig.get("ttl"), Long.MAX_VALUE);
}

@DataProvider(name = "component")
public Object[][] component() {
return new Object[][]{
// Schema: component type, whether to map in secrets
{ FunctionDetails.ComponentType.SINK },
{ FunctionDetails.ComponentType.SOURCE },
{ FunctionDetails.ComponentType.FUNCTION },
{ FunctionDetails.ComponentType.UNKNOWN },
};
}

@Test(dataProvider = "component")
public void testEmptyStringInput(FunctionDetails.ComponentType componentType) throws Exception {
final Map<String, Object> parsedConfig = JavaInstanceRunnable.augmentAndFilterConnectorConfig(
"",
new InstanceConfig(),
new EnvironmentBasedSecretsProvider(),
null,
componentType
);
Assert.assertEquals(parsedConfig.size(), 0);
}

// Environment variables are set in the pom.xml file
@Test(dataProvider = "component")
public void testInterpolatingEnvironmentVariables(FunctionDetails.ComponentType componentType) throws Exception {
final Map<String, Object> parsedConfig = JavaInstanceRunnable.augmentAndFilterConnectorConfig(
"""
{
"key": {
"key1": "${TEST_JAVA_INSTANCE_PARSE_ENV_VAR}",
"key2": "${unset-env-var}"
},
"key3": "${TEST_JAVA_INSTANCE_PARSE_ENV_VAR}"
}
""",
new InstanceConfig(),
new EnvironmentBasedSecretsProvider(),
null,
componentType
);
if ((componentType == FunctionDetails.ComponentType.SOURCE
|| componentType == FunctionDetails.ComponentType.SINK)) {
Assert.assertEquals(((Map) parsedConfig.get("key")).get("key1"), "some-configuration");
Assert.assertEquals(((Map) parsedConfig.get("key")).get("key2"), "${unset-env-var}");
Assert.assertEquals(parsedConfig.get("key3"), "some-configuration");
} else {
Assert.assertEquals(((Map) parsedConfig.get("key")).get("key1"), "${TEST_JAVA_INSTANCE_PARSE_ENV_VAR}");
Assert.assertEquals(((Map) parsedConfig.get("key")).get("key2"), "${unset-env-var}");
Assert.assertEquals(parsedConfig.get("key3"), "${TEST_JAVA_INSTANCE_PARSE_ENV_VAR}");
}
}

public static class ConnectorTestConfig1 {
public String field1;
Expand Down Expand Up @@ -243,9 +298,10 @@ public void testSinkConfigIgnoreUnknownFields(boolean ignoreUnknownConfigFields,
final InstanceConfig instanceConfig = new InstanceConfig();
instanceConfig.setIgnoreUnknownConfigFields(ignoreUnknownConfigFields);

final Map<String, Object> parsedConfig = JavaInstanceRunnable.parseComponentConfig(
final Map<String, Object> parsedConfig = JavaInstanceRunnable.augmentAndFilterConnectorConfig(
"{\"field1\": \"value\", \"field2\": \"value2\"}",
instanceConfig,
new EnvironmentBasedSecretsProvider(),
narClassLoader,
type
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,20 @@
*/
package org.apache.pulsar.functions.secretsprovider;

import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
* This defines a very simple Secrets Provider that looks up environment variable
* thats named the same as secretName and fetches it.
*/
public class EnvironmentBasedSecretsProvider implements SecretsProvider {

/**
* Pattern to match ${secretName} in the value.
*/
private static final Pattern interpolationPattern = Pattern.compile("\\$\\{(.+?)}");

/**
* Fetches a secret.
*
Expand All @@ -33,4 +41,15 @@ public class EnvironmentBasedSecretsProvider implements SecretsProvider {
public String provideSecret(String secretName, Object pathToSecret) {
return System.getenv(secretName);
}

@Override
public String interpolateSecretForValue(String value) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to add unit tests to the method separately.

Matcher m = interpolationPattern.matcher(value);
if (m.matches()) {
String secretName = m.group(1);
// If the secret doesn't exist, we return null and don't override the current value.
return provideSecret(secretName, null);
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,16 @@ default void init(Map<String, String> config) {}
* @return The actual secret
*/
String provideSecret(String secretName, Object pathToSecret);

/**
* If the passed value is formatted as a reference to a secret, as defined by the implementation, return the
* referenced secret. If the value is not formatted as a secret reference or the referenced secret does not exist,
* return null.
*
* @param value a config value that may be formatted as a reference to a secret
* @return the materialized secret. Otherwise, null.
*/
default String interpolateSecretForValue(String value) {
return null;
}
}
Loading