Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][fn] Allow unknown fields in connectors config #20116

Merged
merged 6 commits into from
Apr 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions conf/functions_worker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,12 @@ validateConnectorConfig: false
# If it is set to true, you must ensure that it has been initialized by "bin/pulsar initialize-cluster-metadata" command.
initializedDlogMetadata: false

# Whether to ignore unknown properties when deserializing the connector configuration.
# After upgrading a connector to a new version with a new configuration, the new configuration may not be compatible with the old connector.
# In case of rollback, it's required to also rollback the connector configuration.
# Ignoring unknown fields makes possible to keep the new configuration and only rollback the connector.
ignoreUnknownConfigFields: false

###########################
# Arbitrary Configuration
###########################
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class InstanceConfig {
private boolean exposePulsarAdminClientEnabled = false;
private int metricsPort;
private List<String> additionalJavaRuntimeArguments = Collections.emptyList();
private boolean ignoreUnknownConfigFields;

/**
* Get the string representation of {@link #getInstanceId()}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,20 @@
package org.apache.pulsar.functions.instance;

import static org.apache.pulsar.functions.utils.FunctionCommon.convertFromFunctionDetailsSubscriptionPosition;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationConfig;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.deser.BeanDeserializer;
import com.google.common.annotations.VisibleForTesting;
import com.scurrilous.circe.checksum.Crc32cIntChecksum;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
Expand All @@ -34,6 +41,7 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import lombok.Getter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import net.jodah.typetools.TypeResolver;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -59,6 +67,7 @@
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.ProducerConfig;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
Expand Down Expand Up @@ -94,6 +103,7 @@
import org.apache.pulsar.functions.source.batch.BatchSourceExecutor;
import org.apache.pulsar.functions.utils.CryptoUtils;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.io.ConnectorUtils;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.Source;
import org.slf4j.Logger;
Expand Down Expand Up @@ -855,10 +865,7 @@ private void setupInput(ContextImpl contextImpl) throws Exception {
if (sourceSpec.getConfigs().isEmpty()) {
this.source.open(new HashMap<>(), contextImpl);
} else {
this.source.open(
ObjectMapperFactory.getMapper().reader().forType(new TypeReference<Map<String, Object>>() {
}).readValue(sourceSpec.getConfigs())
, contextImpl);
this.source.open(parseComponentConfig(sourceSpec.getConfigs()), contextImpl);
}
if (this.source instanceof PulsarSource) {
contextImpl.setInputConsumers(((PulsarSource) this.source).getInputConsumers());
Expand All @@ -870,6 +877,83 @@ private void setupInput(ContextImpl contextImpl) throws Exception {
Thread.currentThread().setContextClassLoader(this.instanceClassLoader);
}
}
private Map<String, Object> parseComponentConfig(String connectorConfigs) throws IOException {
return parseComponentConfig(connectorConfigs, instanceConfig, componentClassLoader, componentType);
}

static Map<String, Object> parseComponentConfig(String connectorConfigs,
InstanceConfig instanceConfig,
ClassLoader componentClassLoader,
org.apache.pulsar.functions.proto.Function
.FunctionDetails.ComponentType componentType)
throws IOException {
final Map<String, Object> config = ObjectMapperFactory
.getMapper()
.reader()
.forType(new TypeReference<Map<String, Object>>() {})
.readValue(connectorConfigs);
if (instanceConfig.isIgnoreUnknownConfigFields() && componentClassLoader instanceof NarClassLoader) {
final String configClassName;
if (componentType == org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SOURCE) {
configClassName = ConnectorUtils
.getConnectorDefinition((NarClassLoader) componentClassLoader).getSourceConfigClass();
} else if (componentType == org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SINK) {
configClassName = ConnectorUtils
.getConnectorDefinition((NarClassLoader) componentClassLoader).getSinkConfigClass();
} else {
return config;
}
if (configClassName != null) {

Class<?> configClass;
try {
configClass = Class.forName(configClassName,
true, Thread.currentThread().getContextClassLoader());
} catch (ClassNotFoundException e) {
throw new RuntimeException("Config class not found: " + configClassName, e);
}
final List<String> allFields = BeanPropertiesReader.getBeanProperties(configClass);

for (String s : config.keySet()) {
if (!allFields.contains(s)) {
log.error("Field '{}' not defined in the {} configuration {}, the field will be ignored",
s,
componentType,
configClass);
config.remove(s);
}
}
}
}
return config;
}

static final class BeanPropertiesReader {

private static final MapperBeanReader reader = new MapperBeanReader();

private static final class MapperBeanReader extends ObjectMapper {
@SneakyThrows
List<String> getBeanProperties(Class<?> valueType) {
final JsonParser parser = ObjectMapperFactory
.getMapper()
.getObjectMapper()
.createParser("");
DeserializationConfig config = getDeserializationConfig();
DeserializationContext ctxt = createDeserializationContext(parser, config);
BeanDeserializer deser = (BeanDeserializer)
_findRootDeserializer(ctxt, _typeFactory.constructType(valueType));
List<String> list = new ArrayList<>();
deser.properties().forEachRemaining(p -> list.add(p.getName()));
return list;
}
}

static List<String> getBeanProperties(Class<?> valueType) {
return reader.getBeanProperties(valueType);
}
}


private void setupOutput(ContextImpl contextImpl) throws Exception {

Expand Down Expand Up @@ -940,9 +1024,8 @@ private void setupOutput(ContextImpl contextImpl) throws Exception {
log.debug("Opening Sink with SinkSpec {} and contextImpl: {} ", sinkSpec,
contextImpl.toString());
}
this.sink.open(ObjectMapperFactory.getMapper().reader().forType(
new TypeReference<Map<String, Object>>() {
}).readValue(sinkSpec.getConfigs()), contextImpl);
final Map<String, Object> config = parseComponentConfig(sinkSpec.getConfigs());
this.sink.open(config, contextImpl);
}
} catch (Exception e) {
log.error("Sink open produced uncaught exception: ", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,38 @@
*/
package org.apache.pulsar.functions.instance;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;

import com.fasterxml.jackson.annotation.JsonIgnore;
import lombok.Getter;
import lombok.Setter;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.api.SerDe;
import org.apache.pulsar.functions.instance.stats.ComponentStatsManager;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.proto.Function.SinkSpec;
import org.apache.pulsar.functions.proto.Function.SinkSpecOrBuilder;
import org.apache.pulsar.functions.proto.Function.SourceSpecOrBuilder;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.jetbrains.annotations.NotNull;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

public class JavaInstanceRunnableTest {
Expand Down Expand Up @@ -159,7 +166,7 @@ public void testFunctionResultNull() throws Exception {

@NotNull
private JavaInstanceRunnable getJavaInstanceRunnable(boolean autoAck,
org.apache.pulsar.functions.proto.Function.ProcessingGuarantees processingGuarantees) throws Exception {
org.apache.pulsar.functions.proto.Function.ProcessingGuarantees processingGuarantees) throws Exception {
FunctionDetails functionDetails = FunctionDetails.newBuilder()
.setAutoAck(autoAck)
.setProcessingGuarantees(processingGuarantees).build();
Expand All @@ -184,23 +191,90 @@ public void testStatsManagerNull() throws Exception {

@Test
public void testSinkConfigParsingPreservesOriginalType() throws Exception {
SinkSpecOrBuilder sinkSpec = mock(SinkSpecOrBuilder.class);
when(sinkSpec.getConfigs()).thenReturn("{\"ttl\": 9223372036854775807}");
Map<String, Object> parsedConfig =
new ObjectMapper().readValue(sinkSpec.getConfigs(), new TypeReference<Map<String, Object>>() {
});
final Map<String, Object> parsedConfig = JavaInstanceRunnable.parseComponentConfig(
"{\"ttl\": 9223372036854775807}",
new InstanceConfig(),
null,
FunctionDetails.ComponentType.SINK
);
Assert.assertEquals(parsedConfig.get("ttl").getClass(), Long.class);
Assert.assertEquals(parsedConfig.get("ttl"), Long.MAX_VALUE);
}

@Test
public void testSourceConfigParsingPreservesOriginalType() throws Exception {
SourceSpecOrBuilder sourceSpec = mock(SourceSpecOrBuilder.class);
when(sourceSpec.getConfigs()).thenReturn("{\"ttl\": 9223372036854775807}");
Map<String, Object> parsedConfig =
new ObjectMapper().readValue(sourceSpec.getConfigs(), new TypeReference<Map<String, Object>>() {
});
final Map<String, Object> parsedConfig = JavaInstanceRunnable.parseComponentConfig(
"{\"ttl\": 9223372036854775807}",
new InstanceConfig(),
null,
FunctionDetails.ComponentType.SOURCE
);
Assert.assertEquals(parsedConfig.get("ttl").getClass(), Long.class);
Assert.assertEquals(parsedConfig.get("ttl"), Long.MAX_VALUE);
}


public static class ConnectorTestConfig1 {
public String field1;
}

@DataProvider(name = "configIgnoreUnknownFields")
public static Object[][] configIgnoreUnknownFields() {
return new Object[][]{
{false, FunctionDetails.ComponentType.SINK},
{true, FunctionDetails.ComponentType.SINK},
{false, FunctionDetails.ComponentType.SOURCE},
{true, FunctionDetails.ComponentType.SOURCE}
};
}

@Test(dataProvider = "configIgnoreUnknownFields")
public void testSinkConfigIgnoreUnknownFields(boolean ignoreUnknownConfigFields,
FunctionDetails.ComponentType type) throws Exception {
NarClassLoader narClassLoader = mock(NarClassLoader.class);
final ConnectorDefinition connectorDefinition = new ConnectorDefinition();
if (type == FunctionDetails.ComponentType.SINK) {
connectorDefinition.setSinkConfigClass(ConnectorTestConfig1.class.getName());
} else {
connectorDefinition.setSourceConfigClass(ConnectorTestConfig1.class.getName());
}
when(narClassLoader.getServiceDefinition(any())).thenReturn(ObjectMapperFactory
.getMapper().writer().writeValueAsString(connectorDefinition));
final InstanceConfig instanceConfig = new InstanceConfig();
instanceConfig.setIgnoreUnknownConfigFields(ignoreUnknownConfigFields);

final Map<String, Object> parsedConfig = JavaInstanceRunnable.parseComponentConfig(
"{\"field1\": \"value\", \"field2\": \"value2\"}",
instanceConfig,
narClassLoader,
type
);
if (ignoreUnknownConfigFields) {
Assert.assertEquals(parsedConfig.size(), 1);
Assert.assertEquals(parsedConfig.get("field1"), "value");
} else {
Assert.assertEquals(parsedConfig.size(), 2);
Assert.assertEquals(parsedConfig.get("field1"), "value");
Assert.assertEquals(parsedConfig.get("field2"), "value2");
}
}

public static class ConnectorTestConfig2 {
public static int constantField = 1;
public String field1;
private long withGetter;
@JsonIgnore
private ConnectorTestConfig1 ignore;

public long getWithGetter() {
return withGetter;
}
}

@Test
public void testBeanPropertiesReader() throws Exception {
final List<String> beanProperties = JavaInstanceRunnable.BeanPropertiesReader
.getBeanProperties(ConnectorTestConfig2.class);
Assert.assertEquals(new TreeSet<>(beanProperties), new TreeSet<>(Arrays.asList("field1", "withGetter")));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,12 @@ public class JavaInstanceStarter implements AutoCloseable {
+ "exposed to function context, default is disabled.", required = false)
public Boolean exposePulsarAdminClientEnabled = false;

@Parameter(names = "--ignore_unknown_config_fields",
description = "Whether to ignore unknown properties when deserializing the connector configuration.",
required = false)
public Boolean ignoreUnknownConfigFields = false;


private Server server;
private RuntimeSpawner runtimeSpawner;
private ThreadRuntimeFactory containerFactory;
Expand Down Expand Up @@ -177,6 +183,7 @@ public void start(String[] args, ClassLoader functionInstanceClassLoader, ClassL
instanceConfig.setClusterName(clusterName);
instanceConfig.setMaxPendingAsyncRequests(maxPendingAsyncRequests);
instanceConfig.setExposePulsarAdminClientEnabled(exposePulsarAdminClientEnabled);
instanceConfig.setIgnoreUnknownConfigFields(ignoreUnknownConfigFields);
Function.FunctionDetails.Builder functionDetailsBuilder = Function.FunctionDetails.newBuilder();
if (functionDetailsJsonString.charAt(0) == '\'') {
functionDetailsJsonString = functionDetailsJsonString.substring(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -435,10 +435,14 @@ && isNotBlank(authConfig.getClientAuthenticationParameters())) {
args.add("--metrics_port");
args.add(String.valueOf(instanceConfig.getMetricsPort()));

// only the Java instance supports --pending_async_requests right now.
// params supported only by the Java instance runtime.
if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.JAVA) {
args.add("--pending_async_requests");
args.add(String.valueOf(instanceConfig.getMaxPendingAsyncRequests()));

if (instanceConfig.isIgnoreUnknownConfigFields()) {
args.add("--ignore_unknown_config_fields");
}
}

// state storage configs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -738,6 +738,17 @@ public String getFunctionAuthProviderClassName() {
)
private List<String> additionalJavaRuntimeArguments = new ArrayList<>();

@FieldContext(
category = CATEGORY_CONNECTORS,
doc = "Whether to ignore unknown properties when deserializing the connector configuration. "
+ "After upgrading a connector to a new version with a new configuration, "
+ "the new configuration may not be compatible with the old connector. "
+ "In case of rollback, it's required to also rollback the connector configuration. "
+ "Ignoring unknown fields makes possible to keep the new configuration and "
+ "only rollback the connector."
)
private boolean ignoreUnknownConfigFields = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This configuration applies to the instances of the functions/connectors, and not to the function worker (that already ignores unknown fields)

what about 'functionsIgnoreUnknownConfigFields' ? (maybe we can do better, but connectors are actually functions)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about that and I agree. But there are other properties that are meant to be used by the java runner like
maxPendingAsyncRequests, exposeAdminClientEnabled, additionalJavaRuntimeArguments so I'd prefer to stick with this convention


public String getFunctionMetadataTopic() {
return String.format("persistent://%s/%s", pulsarFunctionsNamespace, functionMetadataTopicName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ InstanceConfig createInstanceConfig(FunctionDetails functionDetails, Function.Fu
if (workerConfig.getAdditionalJavaRuntimeArguments() != null) {
instanceConfig.setAdditionalJavaRuntimeArguments(workerConfig.getAdditionalJavaRuntimeArguments());
}
instanceConfig.setIgnoreUnknownConfigFields(workerConfig.isIgnoreUnknownConfigFields());
return instanceConfig;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@
#
name: alluxio
description: Writes data into Alluxio
sinkClass: org.apache.pulsar.io.alluxio.sink.AlluxioSink
sinkClass: org.apache.pulsar.io.alluxio.sink.AlluxioSink
sinkConfigClass: org.apache.pulsar.io.alluxio.sink.AlluxioSinkConfig
Loading
Loading