Skip to content

Commit

Permalink
Standardize on input/output terminology for Pulsar Functions (#1378)
Browse files Browse the repository at this point in the history
* fix usage of 'sink' terminology

* fix usages of 'source'

* fix additional usages
  • Loading branch information
lucperkins authored and merlimat committed Mar 15, 2018
1 parent cad2039 commit dc34ab6
Show file tree
Hide file tree
Showing 19 changed files with 243 additions and 243 deletions.
4 changes: 2 additions & 2 deletions pulsar-client-cpp/python/functions/context.py
Expand Up @@ -100,8 +100,8 @@ def publish(self, topic_name, message):
pass pass


@abstractmethod @abstractmethod
def get_sink_topic(self): def get_output_topic(self):
'''Returns the sink topic of function''' '''Returns the output topic of function'''
pass pass


@abstractmethod @abstractmethod
Expand Down
Expand Up @@ -170,13 +170,13 @@ public void testLocalRunnerCmdYaml() throws Exception {
@Test @Test
public void testCreateFunction() throws Exception { public void testCreateFunction() throws Exception {
String fnName = TEST_NAME + "-function"; String fnName = TEST_NAME + "-function";
String sourceTopicName = TEST_NAME + "-source-topic"; String inputTopicName = TEST_NAME + "-input-topic";
String sinkTopicName = TEST_NAME + "-sink-topic"; String outputTopicName = TEST_NAME + "-output-topic";
cmd.run(new String[] { cmd.run(new String[] {
"create", "create",
"--name", fnName, "--name", fnName,
"--inputs", sourceTopicName, "--inputs", inputTopicName,
"--output", sinkTopicName, "--output", outputTopicName,
"--jar", "SomeJar.jar", "--jar", "SomeJar.jar",
"--tenant", "sample", "--tenant", "sample",
"--namespace", "ns1", "--namespace", "ns1",
Expand All @@ -185,8 +185,8 @@ public void testCreateFunction() throws Exception {


CreateFunction creater = cmd.getCreater(); CreateFunction creater = cmd.getCreater();
assertEquals(fnName, creater.getFunctionName()); assertEquals(fnName, creater.getFunctionName());
assertEquals(sourceTopicName, creater.getInputs()); assertEquals(inputTopicName, creater.getInputs());
assertEquals(sinkTopicName, creater.getOutput()); assertEquals(outputTopicName, creater.getOutput());


verify(functions, times(1)).createFunction(any(FunctionConfig.class), anyString()); verify(functions, times(1)).createFunction(any(FunctionConfig.class), anyString());


Expand All @@ -195,13 +195,13 @@ public void testCreateFunction() throws Exception {
@Test @Test
public void testCreateWithoutTenant() throws Exception { public void testCreateWithoutTenant() throws Exception {
String fnName = TEST_NAME + "-function"; String fnName = TEST_NAME + "-function";
String sourceTopicName = "persistent://tenant/standalone/namespace/source-topic"; String inputTopicName = "persistent://tenant/standalone/namespace/input-topic";
String sinkTopicName = "persistent://tenant/standalone/namespace/sink-topic"; String outputTopicName = "persistent://tenant/standalone/namespace/output-topic";
cmd.run(new String[] { cmd.run(new String[] {
"create", "create",
"--name", fnName, "--name", fnName,
"--inputs", sourceTopicName, "--inputs", inputTopicName,
"--output", sinkTopicName, "--output", outputTopicName,
"--jar", "SomeJar.jar", "--jar", "SomeJar.jar",
"--namespace", "ns1", "--namespace", "ns1",
"--className", DummyFunction.class.getName(), "--className", DummyFunction.class.getName(),
Expand All @@ -215,13 +215,13 @@ public void testCreateWithoutTenant() throws Exception {
@Test @Test
public void testCreateWithoutNamespace() throws Exception { public void testCreateWithoutNamespace() throws Exception {
String fnName = TEST_NAME + "-function"; String fnName = TEST_NAME + "-function";
String sourceTopicName = "persistent://tenant/standalone/namespace/source-topic"; String inputTopicName = "persistent://tenant/standalone/namespace/input-topic";
String sinkTopicName = "persistent://tenant/standalone/namespace/sink-topic"; String outputTopicName = "persistent://tenant/standalone/namespace/output-topic";
cmd.run(new String[] { cmd.run(new String[] {
"create", "create",
"--name", fnName, "--name", fnName,
"--inputs", sourceTopicName, "--inputs", inputTopicName,
"--output", sinkTopicName, "--output", outputTopicName,
"--jar", "SomeJar.jar", "--jar", "SomeJar.jar",
"--className", DummyFunction.class.getName(), "--className", DummyFunction.class.getName(),
}); });
Expand All @@ -234,12 +234,12 @@ public void testCreateWithoutNamespace() throws Exception {


@Test @Test
public void testCreateWithoutFunctionName() throws Exception { public void testCreateWithoutFunctionName() throws Exception {
String sourceTopicName = TEST_NAME + "-source-topic"; String inputTopicName = TEST_NAME + "-input-topic";
String sinkTopicName = TEST_NAME + "-sink-topic"; String outputTopicName = TEST_NAME + "-output-topic";
cmd.run(new String[] { cmd.run(new String[] {
"create", "create",
"--inputs", sourceTopicName, "--inputs", inputTopicName,
"--output", sinkTopicName, "--output", outputTopicName,
"--jar", "SomeJar.jar", "--jar", "SomeJar.jar",
"--tenant", "sample", "--tenant", "sample",
"--namespace", "ns1", "--namespace", "ns1",
Expand All @@ -252,19 +252,19 @@ public void testCreateWithoutFunctionName() throws Exception {
} }


@Test @Test
public void testCreateWithoutSinkTopic() throws Exception { public void testCreateWithoutOutputTopic() throws Exception {
String sourceTopicName = TEST_NAME + "-source-topic"; String inputTopicName = TEST_NAME + "-input-topic";
cmd.run(new String[] { cmd.run(new String[] {
"create", "create",
"--inputs", sourceTopicName, "--inputs", inputTopicName,
"--jar", "SomeJar.jar", "--jar", "SomeJar.jar",
"--tenant", "sample", "--tenant", "sample",
"--namespace", "ns1", "--namespace", "ns1",
"--className", DummyFunction.class.getName(), "--className", DummyFunction.class.getName(),
}); });


CreateFunction creater = cmd.getCreater(); CreateFunction creater = cmd.getCreater();
assertEquals(sourceTopicName + "-" + "CmdFunctionsTest$DummyFunction" + "-output", creater.getFunctionConfig().getOutput()); assertEquals(inputTopicName + "-" + "CmdFunctionsTest$DummyFunction" + "-output", creater.getFunctionConfig().getOutput());
verify(functions, times(1)).createFunction(any(FunctionConfig.class), anyString()); verify(functions, times(1)).createFunction(any(FunctionConfig.class), anyString());
} }


Expand Down Expand Up @@ -313,16 +313,16 @@ public void testDeleteFunction() throws Exception {
@Test @Test
public void testUpdateFunction() throws Exception { public void testUpdateFunction() throws Exception {
String fnName = TEST_NAME + "-function"; String fnName = TEST_NAME + "-function";
String sourceTopicName = TEST_NAME + "-source-topic"; String inputTopicName = TEST_NAME + "-input-topic";
String sinkTopicName = TEST_NAME + "-sink-topic"; String outputTopicName = TEST_NAME + "-output-topic";






cmd.run(new String[] { cmd.run(new String[] {
"update", "update",
"--name", fnName, "--name", fnName,
"--inputs", sourceTopicName, "--inputs", inputTopicName,
"--output", sinkTopicName, "--output", outputTopicName,
"--jar", "SomeJar.jar", "--jar", "SomeJar.jar",
"--tenant", "sample", "--tenant", "sample",
"--namespace", "ns1", "--namespace", "ns1",
Expand All @@ -331,8 +331,8 @@ public void testUpdateFunction() throws Exception {


UpdateFunction updater = cmd.getUpdater(); UpdateFunction updater = cmd.getUpdater();
assertEquals(fnName, updater.getFunctionName()); assertEquals(fnName, updater.getFunctionName());
assertEquals(sourceTopicName, updater.getInputs()); assertEquals(inputTopicName, updater.getInputs());
assertEquals(sinkTopicName, updater.getOutput()); assertEquals(outputTopicName, updater.getOutput());


verify(functions, times(1)).updateFunction(any(FunctionConfig.class), anyString()); verify(functions, times(1)).updateFunction(any(FunctionConfig.class), anyString());
} }
Expand Down
Expand Up @@ -44,16 +44,16 @@ public interface Context {
String getTopicName(); String getTopicName();


/** /**
* Get a list of all source topics * Get a list of all input topics
* @return a list of all source topics * @return a list of all input topics
*/ */
Collection<String> getSourceTopics(); Collection<String> getInputTopics();


/** /**
* Get sink topic of function * Get the output topic of the function
* @return sink topic name * @return output topic name
*/ */
String getSinkTopic(); String getOutputTopic();


/** /**
* Get output Serde class * Get output Serde class
Expand Down
Expand Up @@ -86,21 +86,21 @@ public void update(double value) {
private ProducerConfiguration producerConfiguration; private ProducerConfiguration producerConfiguration;
private PulsarClient pulsarClient; private PulsarClient pulsarClient;
private ClassLoader classLoader; private ClassLoader classLoader;
private Map<String, Consumer> sourceConsumers; private Map<String, Consumer> inputConsumers;
@Getter @Getter
@Setter @Setter
private StateContextImpl stateContext; private StateContextImpl stateContext;


public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client, public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client,
ClassLoader classLoader, Map<String, Consumer> sourceConsumers) { ClassLoader classLoader, Map<String, Consumer> inputConsumers) {
this.config = config; this.config = config;
this.logger = logger; this.logger = logger;
this.pulsarClient = client; this.pulsarClient = client;
this.classLoader = classLoader; this.classLoader = classLoader;
this.accumulatedMetrics = new ConcurrentHashMap<>(); this.accumulatedMetrics = new ConcurrentHashMap<>();
this.publishProducers = new HashMap<>(); this.publishProducers = new HashMap<>();
this.publishSerializers = new HashMap<>(); this.publishSerializers = new HashMap<>();
this.sourceConsumers = sourceConsumers; this.inputConsumers = inputConsumers;
producerConfiguration = new ProducerConfiguration(); producerConfiguration = new ProducerConfiguration();
producerConfiguration.setBlockIfQueueFull(true); producerConfiguration.setBlockIfQueueFull(true);
producerConfiguration.setBatchingEnabled(true); producerConfiguration.setBatchingEnabled(true);
Expand All @@ -124,12 +124,12 @@ public String getTopicName() {
} }


@Override @Override
public Collection<String> getSourceTopics() { public Collection<String> getInputTopics() {
return sourceConsumers.keySet(); return inputConsumers.keySet();
} }


@Override @Override
public String getSinkTopic() { public String getOutputTopic() {
return config.getFunctionConfig().getOutput(); return config.getFunctionConfig().getOutput();
} }


Expand Down Expand Up @@ -236,7 +236,7 @@ public <O> CompletableFuture<Void> publish(String topicName, O object, String se


@Override @Override
public CompletableFuture<Void> ack(byte[] messageId, String topic) { public CompletableFuture<Void> ack(byte[] messageId, String topic) {
if (!sourceConsumers.containsKey(topic)) { if (!inputConsumers.containsKey(topic)) {
throw new RuntimeException("No such input topic " + topic); throw new RuntimeException("No such input topic " + topic);
} }


Expand All @@ -246,7 +246,7 @@ public CompletableFuture<Void> ack(byte[] messageId, String topic) {
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException("Invalid message id to ack", e); throw new RuntimeException("Invalid message id to ack", e);
} }
return sourceConsumers.get(topic).acknowledgeAsync(actualMessageId); return inputConsumers.get(topic).acknowledgeAsync(actualMessageId);
} }


@Override @Override
Expand Down
Expand Up @@ -48,11 +48,11 @@ public class JavaInstance implements AutoCloseable {
public JavaInstance(InstanceConfig config, Object userClassObject, public JavaInstance(InstanceConfig config, Object userClassObject,
ClassLoader clsLoader, ClassLoader clsLoader,
PulsarClient pulsarClient, PulsarClient pulsarClient,
Map<String, Consumer> sourceConsumers) { Map<String, Consumer> inputConsumers) {
// TODO: cache logger instances by functions? // TODO: cache logger instances by functions?
Logger instanceLog = LoggerFactory.getLogger("function-" + config.getFunctionConfig().getName()); Logger instanceLog = LoggerFactory.getLogger("function-" + config.getFunctionConfig().getName());


this.context = new ContextImpl(config, instanceLog, pulsarClient, clsLoader, sourceConsumers); this.context = new ContextImpl(config, instanceLog, pulsarClient, clsLoader, inputConsumers);


// create the functions // create the functions
if (userClassObject instanceof Function) { if (userClassObject instanceof Function) {
Expand Down

0 comments on commit dc34ab6

Please sign in to comment.