Skip to content

Commit

Permalink
[improve][fn] Allow unknown fields in connectors config (apache#20116)
Browse files Browse the repository at this point in the history
  • Loading branch information
nicoloboschi committed Apr 26, 2023
1 parent 0ec576b commit f7c0b3c
Show file tree
Hide file tree
Showing 14 changed files with 217 additions and 24 deletions.
6 changes: 6 additions & 0 deletions conf/functions_worker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,12 @@ validateConnectorConfig: false
# If it is set to true, you must ensure that it has been initialized by "bin/pulsar initialize-cluster-metadata" command.
initializedDlogMetadata: false

# Whether to ignore unknown properties when deserializing the connector configuration.
# After upgrading a connector to a new version with a new configuration, the new configuration may not be compatible with the old connector.
# In case of rollback, it's required to also rollback the connector configuration.
# Ignoring unknown fields makes possible to keep the new configuration and only rollback the connector.
ignoreUnknownConfigFields: false

###########################
# Arbitrary Configuration
###########################
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class InstanceConfig {
private boolean exposePulsarAdminClientEnabled = false;
private int metricsPort;
private List<String> additionalJavaRuntimeArguments = Collections.emptyList();
private boolean ignoreUnknownConfigFields;

/**
* Get the string representation of {@link #getInstanceId()}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,20 @@
package org.apache.pulsar.functions.instance;

import static org.apache.pulsar.functions.utils.FunctionCommon.convertFromFunctionDetailsSubscriptionPosition;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationConfig;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.deser.BeanDeserializer;
import com.google.common.annotations.VisibleForTesting;
import com.scurrilous.circe.checksum.Crc32cIntChecksum;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
Expand All @@ -34,6 +41,7 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import lombok.Getter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import net.jodah.typetools.TypeResolver;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -59,6 +67,7 @@
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.ProducerConfig;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
Expand Down Expand Up @@ -94,6 +103,7 @@
import org.apache.pulsar.functions.source.batch.BatchSourceExecutor;
import org.apache.pulsar.functions.utils.CryptoUtils;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.io.ConnectorUtils;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.Source;
import org.slf4j.Logger;
Expand Down Expand Up @@ -855,10 +865,7 @@ private void setupInput(ContextImpl contextImpl) throws Exception {
if (sourceSpec.getConfigs().isEmpty()) {
this.source.open(new HashMap<>(), contextImpl);
} else {
this.source.open(
ObjectMapperFactory.getMapper().reader().forType(new TypeReference<Map<String, Object>>() {
}).readValue(sourceSpec.getConfigs())
, contextImpl);
this.source.open(parseComponentConfig(sourceSpec.getConfigs()), contextImpl);
}
if (this.source instanceof PulsarSource) {
contextImpl.setInputConsumers(((PulsarSource) this.source).getInputConsumers());
Expand All @@ -870,6 +877,83 @@ private void setupInput(ContextImpl contextImpl) throws Exception {
Thread.currentThread().setContextClassLoader(this.instanceClassLoader);
}
}
private Map<String, Object> parseComponentConfig(String connectorConfigs) throws IOException {
return parseComponentConfig(connectorConfigs, instanceConfig, componentClassLoader, componentType);
}

static Map<String, Object> parseComponentConfig(String connectorConfigs,
InstanceConfig instanceConfig,
ClassLoader componentClassLoader,
org.apache.pulsar.functions.proto.Function
.FunctionDetails.ComponentType componentType)
throws IOException {
final Map<String, Object> config = ObjectMapperFactory
.getMapper()
.reader()
.forType(new TypeReference<Map<String, Object>>() {})
.readValue(connectorConfigs);
if (instanceConfig.isIgnoreUnknownConfigFields() && componentClassLoader instanceof NarClassLoader) {
final String configClassName;
if (componentType == org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SOURCE) {
configClassName = ConnectorUtils
.getConnectorDefinition((NarClassLoader) componentClassLoader).getSourceConfigClass();
} else if (componentType == org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SINK) {
configClassName = ConnectorUtils
.getConnectorDefinition((NarClassLoader) componentClassLoader).getSinkConfigClass();
} else {
return config;
}
if (configClassName != null) {

Class<?> configClass;
try {
configClass = Class.forName(configClassName,
true, Thread.currentThread().getContextClassLoader());
} catch (ClassNotFoundException e) {
throw new RuntimeException("Config class not found: " + configClassName, e);
}
final List<String> allFields = BeanPropertiesReader.getBeanProperties(configClass);

for (String s : config.keySet()) {
if (!allFields.contains(s)) {
log.error("Field '{}' not defined in the {} configuration {}, the field will be ignored",
s,
componentType,
configClass);
config.remove(s);
}
}
}
}
return config;
}

static final class BeanPropertiesReader {

private static final MapperBeanReader reader = new MapperBeanReader();

private static final class MapperBeanReader extends ObjectMapper {
@SneakyThrows
List<String> getBeanProperties(Class<?> valueType) {
final JsonParser parser = ObjectMapperFactory
.getMapper()
.getObjectMapper()
.createParser("");
DeserializationConfig config = getDeserializationConfig();
DeserializationContext ctxt = createDeserializationContext(parser, config);
BeanDeserializer deser = (BeanDeserializer)
_findRootDeserializer(ctxt, _typeFactory.constructType(valueType));
List<String> list = new ArrayList<>();
deser.properties().forEachRemaining(p -> list.add(p.getName()));
return list;
}
}

static List<String> getBeanProperties(Class<?> valueType) {
return reader.getBeanProperties(valueType);
}
}


private void setupOutput(ContextImpl contextImpl) throws Exception {

Expand Down Expand Up @@ -940,9 +1024,8 @@ private void setupOutput(ContextImpl contextImpl) throws Exception {
log.debug("Opening Sink with SinkSpec {} and contextImpl: {} ", sinkSpec,
contextImpl.toString());
}
this.sink.open(ObjectMapperFactory.getMapper().reader().forType(
new TypeReference<Map<String, Object>>() {
}).readValue(sinkSpec.getConfigs()), contextImpl);
final Map<String, Object> config = parseComponentConfig(sinkSpec.getConfigs());
this.sink.open(config, contextImpl);
}
} catch (Exception e) {
log.error("Sink open produced uncaught exception: ", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,38 @@
*/
package org.apache.pulsar.functions.instance;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;

import com.fasterxml.jackson.annotation.JsonIgnore;
import lombok.Getter;
import lombok.Setter;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.api.SerDe;
import org.apache.pulsar.functions.instance.stats.ComponentStatsManager;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.proto.Function.SinkSpec;
import org.apache.pulsar.functions.proto.Function.SinkSpecOrBuilder;
import org.apache.pulsar.functions.proto.Function.SourceSpecOrBuilder;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.jetbrains.annotations.NotNull;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

public class JavaInstanceRunnableTest {
Expand Down Expand Up @@ -159,7 +166,7 @@ public void testFunctionResultNull() throws Exception {

@NotNull
private JavaInstanceRunnable getJavaInstanceRunnable(boolean autoAck,
org.apache.pulsar.functions.proto.Function.ProcessingGuarantees processingGuarantees) throws Exception {
org.apache.pulsar.functions.proto.Function.ProcessingGuarantees processingGuarantees) throws Exception {
FunctionDetails functionDetails = FunctionDetails.newBuilder()
.setAutoAck(autoAck)
.setProcessingGuarantees(processingGuarantees).build();
Expand All @@ -184,23 +191,90 @@ public void testStatsManagerNull() throws Exception {

@Test
public void testSinkConfigParsingPreservesOriginalType() throws Exception {
SinkSpecOrBuilder sinkSpec = mock(SinkSpecOrBuilder.class);
when(sinkSpec.getConfigs()).thenReturn("{\"ttl\": 9223372036854775807}");
Map<String, Object> parsedConfig =
new ObjectMapper().readValue(sinkSpec.getConfigs(), new TypeReference<Map<String, Object>>() {
});
final Map<String, Object> parsedConfig = JavaInstanceRunnable.parseComponentConfig(
"{\"ttl\": 9223372036854775807}",
new InstanceConfig(),
null,
FunctionDetails.ComponentType.SINK
);
Assert.assertEquals(parsedConfig.get("ttl").getClass(), Long.class);
Assert.assertEquals(parsedConfig.get("ttl"), Long.MAX_VALUE);
}

@Test
public void testSourceConfigParsingPreservesOriginalType() throws Exception {
SourceSpecOrBuilder sourceSpec = mock(SourceSpecOrBuilder.class);
when(sourceSpec.getConfigs()).thenReturn("{\"ttl\": 9223372036854775807}");
Map<String, Object> parsedConfig =
new ObjectMapper().readValue(sourceSpec.getConfigs(), new TypeReference<Map<String, Object>>() {
});
final Map<String, Object> parsedConfig = JavaInstanceRunnable.parseComponentConfig(
"{\"ttl\": 9223372036854775807}",
new InstanceConfig(),
null,
FunctionDetails.ComponentType.SOURCE
);
Assert.assertEquals(parsedConfig.get("ttl").getClass(), Long.class);
Assert.assertEquals(parsedConfig.get("ttl"), Long.MAX_VALUE);
}


public static class ConnectorTestConfig1 {
public String field1;
}

@DataProvider(name = "configIgnoreUnknownFields")
public static Object[][] configIgnoreUnknownFields() {
return new Object[][]{
{false, FunctionDetails.ComponentType.SINK},
{true, FunctionDetails.ComponentType.SINK},
{false, FunctionDetails.ComponentType.SOURCE},
{true, FunctionDetails.ComponentType.SOURCE}
};
}

@Test(dataProvider = "configIgnoreUnknownFields")
public void testSinkConfigIgnoreUnknownFields(boolean ignoreUnknownConfigFields,
FunctionDetails.ComponentType type) throws Exception {
NarClassLoader narClassLoader = mock(NarClassLoader.class);
final ConnectorDefinition connectorDefinition = new ConnectorDefinition();
if (type == FunctionDetails.ComponentType.SINK) {
connectorDefinition.setSinkConfigClass(ConnectorTestConfig1.class.getName());
} else {
connectorDefinition.setSourceConfigClass(ConnectorTestConfig1.class.getName());
}
when(narClassLoader.getServiceDefinition(any())).thenReturn(ObjectMapperFactory
.getMapper().writer().writeValueAsString(connectorDefinition));
final InstanceConfig instanceConfig = new InstanceConfig();
instanceConfig.setIgnoreUnknownConfigFields(ignoreUnknownConfigFields);

final Map<String, Object> parsedConfig = JavaInstanceRunnable.parseComponentConfig(
"{\"field1\": \"value\", \"field2\": \"value2\"}",
instanceConfig,
narClassLoader,
type
);
if (ignoreUnknownConfigFields) {
Assert.assertEquals(parsedConfig.size(), 1);
Assert.assertEquals(parsedConfig.get("field1"), "value");
} else {
Assert.assertEquals(parsedConfig.size(), 2);
Assert.assertEquals(parsedConfig.get("field1"), "value");
Assert.assertEquals(parsedConfig.get("field2"), "value2");
}
}

public static class ConnectorTestConfig2 {
public static int constantField = 1;
public String field1;
private long withGetter;
@JsonIgnore
private ConnectorTestConfig1 ignore;

public long getWithGetter() {
return withGetter;
}
}

@Test
public void testBeanPropertiesReader() throws Exception {
final List<String> beanProperties = JavaInstanceRunnable.BeanPropertiesReader
.getBeanProperties(ConnectorTestConfig2.class);
Assert.assertEquals(new TreeSet<>(beanProperties), new TreeSet<>(Arrays.asList("field1", "withGetter")));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,12 @@ public class JavaInstanceStarter implements AutoCloseable {
+ "exposed to function context, default is disabled.", required = false)
public Boolean exposePulsarAdminClientEnabled = false;

@Parameter(names = "--ignore_unknown_config_fields",
description = "Whether to ignore unknown properties when deserializing the connector configuration.",
required = false)
public Boolean ignoreUnknownConfigFields = false;


private Server server;
private RuntimeSpawner runtimeSpawner;
private ThreadRuntimeFactory containerFactory;
Expand Down Expand Up @@ -177,6 +183,7 @@ public void start(String[] args, ClassLoader functionInstanceClassLoader, ClassL
instanceConfig.setClusterName(clusterName);
instanceConfig.setMaxPendingAsyncRequests(maxPendingAsyncRequests);
instanceConfig.setExposePulsarAdminClientEnabled(exposePulsarAdminClientEnabled);
instanceConfig.setIgnoreUnknownConfigFields(ignoreUnknownConfigFields);
Function.FunctionDetails.Builder functionDetailsBuilder = Function.FunctionDetails.newBuilder();
if (functionDetailsJsonString.charAt(0) == '\'') {
functionDetailsJsonString = functionDetailsJsonString.substring(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -435,10 +435,14 @@ && isNotBlank(authConfig.getClientAuthenticationParameters())) {
args.add("--metrics_port");
args.add(String.valueOf(instanceConfig.getMetricsPort()));

// only the Java instance supports --pending_async_requests right now.
// params supported only by the Java instance runtime.
if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.JAVA) {
args.add("--pending_async_requests");
args.add(String.valueOf(instanceConfig.getMaxPendingAsyncRequests()));

if (instanceConfig.isIgnoreUnknownConfigFields()) {
args.add("--ignore_unknown_config_fields");
}
}

// state storage configs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -738,6 +738,17 @@ public String getFunctionAuthProviderClassName() {
)
private List<String> additionalJavaRuntimeArguments = new ArrayList<>();

@FieldContext(
category = CATEGORY_CONNECTORS,
doc = "Whether to ignore unknown properties when deserializing the connector configuration. "
+ "After upgrading a connector to a new version with a new configuration, "
+ "the new configuration may not be compatible with the old connector. "
+ "In case of rollback, it's required to also rollback the connector configuration. "
+ "Ignoring unknown fields makes possible to keep the new configuration and "
+ "only rollback the connector."
)
private boolean ignoreUnknownConfigFields = false;

public String getFunctionMetadataTopic() {
return String.format("persistent://%s/%s", pulsarFunctionsNamespace, functionMetadataTopicName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ InstanceConfig createInstanceConfig(FunctionDetails functionDetails, Function.Fu
if (workerConfig.getAdditionalJavaRuntimeArguments() != null) {
instanceConfig.setAdditionalJavaRuntimeArguments(workerConfig.getAdditionalJavaRuntimeArguments());
}
instanceConfig.setIgnoreUnknownConfigFields(workerConfig.isIgnoreUnknownConfigFields());
return instanceConfig;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@
#
name: alluxio
description: Writes data into Alluxio
sinkClass: org.apache.pulsar.io.alluxio.sink.AlluxioSink
sinkClass: org.apache.pulsar.io.alluxio.sink.AlluxioSink
sinkConfigClass: org.apache.pulsar.io.alluxio.sink.AlluxioSinkConfig
Loading

0 comments on commit f7c0b3c

Please sign in to comment.