Skip to content

Commit

Permalink
Integrate Functions with Schema (#2397)
Browse files Browse the repository at this point in the history
* Integrate functions and io with schema registry

* Added missing license headers

* Renamed topicSchema to inputSpecs

* Fixed comments

* Fixed cli arg docs

* Fixed schema arguments

* Fixed errors after merge

* Fixed instance parameters

* Fixed tests and addressed comments

* Fixed PulsarSourceTest after merge

* Took feedback and made changes backwards compatible

* Fixed compilation issues

* Fixed bug

* fixed test compilation

* Fixed bug

* Fixed bug

* Fix pythn instance

* Fixed the way cmdsink populates sinkconfig

* Addressed feedback
  • Loading branch information
srkukarni committed Aug 21, 2018
1 parent 5980169 commit 7bff81e
Show file tree
Hide file tree
Showing 49 changed files with 1,545 additions and 1,049 deletions.
Expand Up @@ -57,7 +57,6 @@
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.functions.api.utils.DefaultSerDe;
import org.apache.pulsar.functions.api.utils.IdentityFunction;
import org.apache.pulsar.functions.instance.JavaInstanceRunnable;
import org.apache.pulsar.functions.proto.Function;
Expand All @@ -69,6 +68,7 @@
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList;
import org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData.DataDigest;
import org.apache.pulsar.functions.sink.PulsarSink;
import org.apache.pulsar.functions.source.TopicSchema;
import org.apache.pulsar.functions.utils.Reflections;
import org.apache.pulsar.functions.utils.Utils;
import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
Expand Down Expand Up @@ -246,7 +246,7 @@ private WorkerService createPulsarFunctionWorker(ServiceConfiguration config) {

/**
* Validates pulsar sink e2e functionality on functions.
*
*
* @throws Exception
*/
@Test(timeOut = 20000)
Expand Down Expand Up @@ -409,7 +409,7 @@ protected static FunctionDetails createSinkConfig(String jarFile, String tenant,
sourceSpecBuilder.setTypeClassName(typeArg.getName());
sourceSpecBuilder.setTopicsPattern(sourceTopicPattern);
sourceSpecBuilder.setSubscriptionName(subscriptionName);
sourceSpecBuilder.putTopicsToSerDeClassName(sourceTopicPattern, DefaultSerDe.class.getName());
sourceSpecBuilder.putTopicsToSerDeClassName(sourceTopicPattern, "");
functionDetailsBuilder.setAutoAck(true);
functionDetailsBuilder.setSource(sourceSpecBuilder);

Expand Down Expand Up @@ -487,7 +487,7 @@ public void testFileUrlFunctionWithoutPassingTypeArgs() throws Exception {
// source spec classname should be empty so that the default pulsar source will be used
SourceSpec.Builder sourceSpecBuilder = SourceSpec.newBuilder();
sourceSpecBuilder.setSubscriptionType(Function.SubscriptionType.FAILOVER);
sourceSpecBuilder.putTopicsToSerDeClassName(sinkTopic, DefaultSerDe.class.getName());
sourceSpecBuilder.putTopicsToSerDeClassName(sinkTopic, TopicSchema.DEFAULT_SERDE);
functionDetailsBuilder.setAutoAck(true);
functionDetailsBuilder.setSource(sourceSpecBuilder);

Expand All @@ -507,7 +507,7 @@ public void testFileUrlFunctionWithoutPassingTypeArgs() throws Exception {
assertEquals(functionMetadata.getSink().getTypeClassName(), typeArgs[1].getName());

}

@Test(timeOut = 20000)
public void testFunctionStopAndRestartApi() throws Exception {

Expand Down
Expand Up @@ -320,11 +320,11 @@ public interface Functions {
*
*/
Set<String> getSinks() throws PulsarAdminException;

/**
* Get list of workers present under a cluster
* @return
* @throws PulsarAdminException
* @throws PulsarAdminException
*/
List<WorkerInfo> getCluster() throws PulsarAdminException;
}
Expand Up @@ -25,6 +25,9 @@
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;

/**
* Schema definition for Strings encoded in UTF-8 format.
*/
public class StringSchema implements Schema<String> {
private final Charset charset;
private static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8;
Expand Down
Expand Up @@ -41,7 +41,6 @@
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.utils.DefaultSerDe;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.sink.PulsarSink;
import org.apache.pulsar.functions.utils.Reflections;
Expand Down Expand Up @@ -138,7 +137,6 @@ public void setup() throws Exception {
.thenReturn(true);
when(Reflections.classImplementsIface(anyString(), any())).thenReturn(true);
when(Reflections.createInstance(eq(DummyFunction.class.getName()), any(File.class))).thenReturn(new DummyFunction());
when(Reflections.createInstance(eq(DefaultSerDe.class.getName()), any(File.class))).thenReturn(new DefaultSerDe(String.class));
PowerMockito.stub(PowerMockito.method(Utils.class, "fileExists")).toReturn(true);
}

Expand Down
Expand Up @@ -75,13 +75,15 @@
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.proto.Function.ConsumerSpec;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.proto.Function.Resources;
import org.apache.pulsar.functions.proto.Function.SinkSpec;
import org.apache.pulsar.functions.proto.Function.SourceSpec;
import org.apache.pulsar.functions.proto.Function.SubscriptionType;
import org.apache.pulsar.functions.runtime.ProcessRuntimeFactory;
import org.apache.pulsar.functions.runtime.RuntimeSpawner;
import org.apache.pulsar.functions.utils.ConsumerConfig;
import org.apache.pulsar.functions.utils.FunctionConfig;
import org.apache.pulsar.functions.utils.Reflections;
import org.apache.pulsar.functions.utils.Utils;
Expand Down Expand Up @@ -166,7 +168,7 @@ abstract class FunctionCommand extends BaseCommand {

@Parameter(names = "--name", description = "The function's name")
protected String functionName;

@Override
void processArguments() throws Exception {
super.processArguments();
Expand Down Expand Up @@ -228,14 +230,16 @@ abstract class FunctionDetailsCommand extends BaseCommand {
description = "Path to the main Python file for the function (if the function is written in Python)",
listConverter = StringConverter.class)
protected String pyFile;
@Parameter(names = "--inputs", description = "The function's input topic or topics (multiple topics can be specified as a comma-separated list)")
@Parameter(names = { "-i",
"--inputs" }, description = "The function's input topic or topics (multiple topics can be specified as a comma-separated list)")
protected String inputs;
// for backwards compatibility purposes
@Parameter(names = "--topicsPattern", description = "TopicsPattern to consume from list of topics under a namespace that match the pattern. [--input] and [--topic-pattern] are mutually exclusive. Add SerDe class name for a pattern in --custom-serde-inputs (supported for java fun only)", hidden = true)
protected String DEPRECATED_topicsPattern;
@Parameter(names = "--topics-pattern", description = "The topic pattern to consume from list of topics under a namespace that match the pattern. [--input] and [--topic-pattern] are mutually exclusive. Add SerDe class name for a pattern in --custom-serde-inputs (supported for java fun only)")
protected String topicsPattern;
@Parameter(names = "--output", description = "The function's output topic (use skipOutput flag to skip output topic)")

@Parameter(names = {"-o", "--output"}, description = "The function's output topic (use skipOutput flag to skip output topic)")
protected String output;
@Parameter(names = "--skip-output", description = "Skip publishing function output to output topic")
protected boolean skipOutput;
Expand All @@ -244,6 +248,10 @@ abstract class FunctionDetailsCommand extends BaseCommand {
protected String DEPRECATED_logTopic;
@Parameter(names = "--log-topic", description = "The topic to which the function's logs are produced")
protected String logTopic;

@Parameter(names = {"-st", "--schema-type"}, description = "The builtin schema type or custom schema class name to be used for messages output by the function")
protected String schemaType = "";

// for backwards compatibility purposes
@Parameter(names = "--customSerdeInputs", description = "The map of input topics to SerDe class names (as a JSON string)", hidden = true)
protected String DEPRECATED_customSerdeInputString;
Expand Down Expand Up @@ -319,6 +327,7 @@ private void mergeArgs() {
if (!StringUtils.isBlank(DEPRECATED_logTopic)) logTopic = DEPRECATED_logTopic;
if (!StringUtils.isBlank(DEPRECATED_outputSerdeClassName)) outputSerdeClassName = DEPRECATED_outputSerdeClassName;
if (!StringUtils.isBlank(DEPRECATED_customSerdeInputString)) customSerdeInputString = DEPRECATED_customSerdeInputString;

if (!StringUtils.isBlank(DEPRECATED_fnConfigFile)) fnConfigFile = DEPRECATED_fnConfigFile;
if (DEPRECATED_processingGuarantees != FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE) processingGuarantees = DEPRECATED_processingGuarantees;
if (!StringUtils.isBlank(DEPRECATED_userConfigString)) userConfigString = DEPRECATED_userConfigString;
Expand Down Expand Up @@ -382,23 +391,21 @@ void processArguments() throws Exception {
if (null != outputSerdeClassName) {
functionConfig.setOutputSerdeClassName(outputSerdeClassName);
}

if (null != schemaType) {
functionConfig.setOutputSchemaType(schemaType);
}
if (null != processingGuarantees) {
functionConfig.setProcessingGuarantees(processingGuarantees);
}

functionConfig.setRetainOrdering(retainOrdering);

if (null != userConfigString) {
Type type = new TypeToken<Map<String, String>>(){}.getType();
Map<String, Object> userConfigMap = new Gson().fromJson(userConfigString, type);
functionConfig.setUserConfig(userConfigMap);
}
if (functionConfig.getInputs() == null) {
functionConfig.setInputs(new LinkedList<>());
}
if (functionConfig.getCustomSerdeInputs() == null) {
functionConfig.setCustomSerdeInputs(new HashMap<>());
}
if (functionConfig.getUserConfig() == null) {
functionConfig.setUserConfig(new HashMap<>());
}
Expand Down Expand Up @@ -467,7 +474,7 @@ void processArguments() throws Exception {
}

protected void validateFunctionConfigs(FunctionConfig functionConfig) {

if (isBlank(functionConfig.getOutput()) && !functionConfig.isSkipOutput()) {
throw new ParameterException(
"output topic is not present (pass skipOutput flag to skip publish output on topic)");
Expand Down Expand Up @@ -589,17 +596,6 @@ private void inferMissingNamespace(FunctionConfig functionConfig) {
functionConfig.setNamespace(DEFAULT_NAMESPACE);
}

private String getUniqueInput(FunctionConfig functionConfig) {
if (functionConfig.getInputs().size() + functionConfig.getCustomSerdeInputs().size() != 1) {
throw new IllegalArgumentException();
}
if (functionConfig.getInputs().size() == 1) {
return functionConfig.getInputs().iterator().next();
} else {
return functionConfig.getCustomSerdeInputs().keySet().iterator().next();
}
}

protected FunctionDetails convert(FunctionConfig functionConfig)
throws IOException {

Expand All @@ -622,12 +618,49 @@ protected FunctionDetails convert(FunctionConfig functionConfig)

// Setup source
SourceSpec.Builder sourceSpecBuilder = SourceSpec.newBuilder();
Map<String, String> topicToSerDeClassNameMap = new HashMap<>();
topicToSerDeClassNameMap.putAll(functionConfig.getCustomSerdeInputs());
functionConfig.getInputs().forEach(v -> topicToSerDeClassNameMap.put(v, ""));
sourceSpecBuilder.putAllTopicsToSerDeClassName(topicToSerDeClassNameMap);
if (StringUtils.isNotBlank(functionConfig.getTopicsPattern())) {
sourceSpecBuilder.setTopicsPattern(functionConfig.getTopicsPattern());
if (functionConfig.getInputs() != null) {
functionConfig.getInputs().forEach((topicName -> {
sourceSpecBuilder.putInputSpecs(topicName,
ConsumerSpec.newBuilder()
.setIsRegexPattern(false)
.build());
}));
}
if (functionConfig.getTopicsPattern() != null && !functionConfig.getTopicsPattern().isEmpty()) {
sourceSpecBuilder.putInputSpecs(functionConfig.getTopicsPattern(),
ConsumerSpec.newBuilder()
.setIsRegexPattern(true)
.build());
}
if (functionConfig.getCustomSerdeInputs() != null) {
functionConfig.getCustomSerdeInputs().forEach((topicName, serdeClassName) -> {
sourceSpecBuilder.putInputSpecs(topicName,
ConsumerSpec.newBuilder()
.setSerdeClassName(serdeClassName)
.setIsRegexPattern(false)
.build());
});
}
if (functionConfig.getCustomSchemaInputs() != null) {
functionConfig.getCustomSchemaInputs().forEach((topicName, schemaType) -> {
sourceSpecBuilder.putInputSpecs(topicName,
ConsumerSpec.newBuilder()
.setSchemaType(schemaType)
.setIsRegexPattern(false)
.build());
});
}
if (functionConfig.getInputSpecs() != null) {
functionConfig.getInputSpecs().forEach((topicName, consumerConf) -> {
ConsumerSpec.Builder bldr = ConsumerSpec.newBuilder()
.setIsRegexPattern(consumerConf.isRegexPattern());
if (!StringUtils.isBlank(consumerConf.getSchemaType())) {
bldr.setSchemaType(consumerConf.getSchemaType());
} else if (!StringUtils.isBlank(consumerConf.getSerdeClassName())) {
bldr.setSerdeClassName(consumerConf.getSerdeClassName());
}
sourceSpecBuilder.putInputSpecs(topicName, bldr.build());
});
}

// Set subscription type based on ordering and EFFECTIVELY_ONCE semantics
Expand All @@ -636,7 +669,7 @@ protected FunctionDetails convert(FunctionConfig functionConfig)
? SubscriptionType.FAILOVER
: SubscriptionType.SHARED;
sourceSpecBuilder.setSubscriptionType(subType);

if (typeArgs != null) {
sourceSpecBuilder.setTypeClassName(typeArgs[0].getName());
}
Expand All @@ -650,9 +683,13 @@ protected FunctionDetails convert(FunctionConfig functionConfig)
if (functionConfig.getOutput() != null) {
sinkSpecBuilder.setTopic(functionConfig.getOutput());
}
if (functionConfig.getOutputSerdeClassName() != null) {
if (!StringUtils.isBlank(functionConfig.getOutputSerdeClassName())) {
sinkSpecBuilder.setSerDeClassName(functionConfig.getOutputSerdeClassName());
}
if (!StringUtils.isBlank(functionConfig.getOutputSchemaType())) {
sinkSpecBuilder.setSchemaType(functionConfig.getOutputSchemaType());
}

if (typeArgs != null) {
sinkSpecBuilder.setTypeClassName(typeArgs[1].getName());
}
Expand Down Expand Up @@ -836,10 +873,10 @@ void runCmd() throws Exception {

@Parameters(commandDescription = "Restart function instance")
class RestartFunction extends FunctionCommand {

@Parameter(names = "--instance-id", description = "The function instanceId (restart all instances if instance-id is not provided")
protected String instanceId;

@Override
void runCmd() throws Exception {
if (isNotBlank(instanceId)) {
Expand Down Expand Up @@ -1191,6 +1228,7 @@ protected static void startLocalRun(org.apache.pulsar.functions.proto.Function.F
if (serviceUrl == null) {
serviceUrl = DEFAULT_SERVICE_URL;
}

try (ProcessRuntimeFactory containerFactory = new ProcessRuntimeFactory(serviceUrl, stateStorageServiceUrl, authConfig, null, null,
null)) {
List<RuntimeSpawner> spawners = new LinkedList<>();
Expand Down

0 comments on commit 7bff81e

Please sign in to comment.