Skip to content

Commit

Permalink
adding sink spec (#1700)
Browse files Browse the repository at this point in the history
* adding sink spec

* addressing comments

* fixing proto numbering
  • Loading branch information
jerrypeng authored and sijie committed May 1, 2018
1 parent fd576ae commit 5721892
Show file tree
Hide file tree
Showing 17 changed files with 151 additions and 115 deletions.
Expand Up @@ -62,6 +62,7 @@
import org.apache.pulsar.functions.shaded.io.netty.buffer.ByteBuf;
import org.apache.pulsar.functions.shaded.io.netty.buffer.ByteBufUtil;
import org.apache.pulsar.functions.shaded.io.netty.buffer.Unpooled;
import org.apache.pulsar.functions.shaded.proto.Function.SinkSpec;
import org.apache.pulsar.functions.shaded.proto.Function.SourceSpec;
import org.apache.pulsar.functions.shaded.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.shaded.proto.Function.SubscriptionType;
Expand Down Expand Up @@ -553,6 +554,16 @@ protected FunctionDetails convert(FunctionConfig functionConfig)
}
functionDetailsBuilder.setSource(sourceSpecBuilder);

// Setup sink
SinkSpec.Builder sinkSpecBuilder = SinkSpec.newBuilder();
if (functionConfig.getOutput() != null) {
sinkSpecBuilder.setTopic(functionConfig.getOutput());
}
if (functionConfig.getOutputSerdeClassName() != null) {
sinkSpecBuilder.setSerDeClassName(functionConfig.getOutputSerdeClassName());
}
functionDetailsBuilder.setSink(sinkSpecBuilder);

if (functionConfig.getTenant() != null) {
functionDetailsBuilder.setTenant(functionConfig.getTenant());
}
Expand All @@ -565,12 +576,6 @@ protected FunctionDetails convert(FunctionConfig functionConfig)
if (functionConfig.getClassName() != null) {
functionDetailsBuilder.setClassName(functionConfig.getClassName());
}
if (functionConfig.getOutput() != null) {
functionDetailsBuilder.setOutput(functionConfig.getOutput());
}
if (functionConfig.getOutputSerdeClassName() != null) {
functionDetailsBuilder.setOutputSerdeClassName(functionConfig.getOutputSerdeClassName());
}
if (functionConfig.getLogTopic() != null) {
functionDetailsBuilder.setLogTopic(functionConfig.getLogTopic());
}
Expand Down
Expand Up @@ -137,12 +137,12 @@ public Collection<String> getInputTopics() {

@Override
public String getOutputTopic() {
return config.getFunctionDetails().getOutput();
return config.getFunctionDetails().getSink().getTopic();
}

@Override
public String getOutputSerdeClassName() {
return config.getFunctionDetails().getOutputSerdeClassName();
return config.getFunctionDetails().getSink().getSerDeClassName();
}

@Override
Expand Down
Expand Up @@ -310,7 +310,7 @@ private void processResult(Record srcRecord,
throw result.getSystemException();
} else {
stats.incrementSuccessfullyProcessed(endTime - startTime);
if (result.getResult() != null && instanceConfig.getFunctionDetails().getOutput() != null) {
if (result.getResult() != null && instanceConfig.getFunctionDetails().getSink().getTopic() != null) {
byte[] output;
try {
output = outputSerDe.serialize(result.getResult());
Expand Down Expand Up @@ -417,12 +417,12 @@ private static void addSystemMetrics(String metricName, double value, InstanceCo

private void setupSerDe(Class<?>[] typeArgs, ClassLoader clsLoader) {
if (!Void.class.equals(typeArgs[1])) { // return type is not `Void.class`
if (instanceConfig.getFunctionDetails().getOutputSerdeClassName() == null
|| instanceConfig.getFunctionDetails().getOutputSerdeClassName().isEmpty()
|| instanceConfig.getFunctionDetails().getOutputSerdeClassName().equals(DefaultSerDe.class.getName())) {
if (instanceConfig.getFunctionDetails().getSink().getSerDeClassName() == null
|| instanceConfig.getFunctionDetails().getSink().getSerDeClassName().isEmpty()
|| instanceConfig.getFunctionDetails().getSink().getSerDeClassName().equals(DefaultSerDe.class.getName())) {
outputSerDe = InstanceUtils.initializeDefaultSerDe(typeArgs[1]);
} else {
this.outputSerDe = InstanceUtils.initializeSerDe(instanceConfig.getFunctionDetails().getOutputSerdeClassName(), clsLoader, typeArgs[1]);
this.outputSerDe = InstanceUtils.initializeSerDe(instanceConfig.getFunctionDetails().getSink().getSerDeClassName(), clsLoader, typeArgs[1]);
}
Class<?>[] outputSerdeTypeArgs = TypeResolver.resolveRawArguments(SerDe.class, outputSerDe.getClass());
if (outputSerDe.getClass().getName().equals(DefaultSerDe.class.getName())) {
Expand Down
Expand Up @@ -70,7 +70,7 @@ public void close() {
try {
producer.close();
} catch (PulsarClientException e) {
log.warn("Fail to close producer for processor {}", functionDetails.getOutput(), e);
log.warn("Fail to close producer for processor {}", functionDetails.getSink().getTopic(), e);
}
}
}
Expand Down
Expand Up @@ -72,7 +72,7 @@ public void close() {
try {
producer.close();
} catch (PulsarClientException e) {
log.warn("Fail to close producer for processor {}", functionDetails.getOutput(), e);
log.warn("Fail to close producer for processor {}", functionDetails.getSink().getTopic(), e);
}
}
}
Expand Down
Expand Up @@ -128,9 +128,9 @@ public void postReceiveMessage(Record record) {}

@Override
public void setupOutput(SerDe outputSerDe) throws Exception {
String outputTopic = functionDetails.getOutput();
String outputTopic = functionDetails.getSink().getTopic();
if (outputTopic != null
&& !functionDetails.getOutput().isEmpty()
&& !outputTopic.isEmpty()
&& outputSerDe != null) {
log.info("Starting producer for output topic {}", outputTopic);
initializeOutputProducer(outputTopic);
Expand Down
106 changes: 53 additions & 53 deletions pulsar-functions/instance/src/main/python/Function_pb2.py
Expand Up @@ -39,7 +39,7 @@
name='Function.proto',
package='proto',
syntax='proto3',
serialized_pb=_b('\n\x0e\x46unction.proto\x12\x05proto\"\xf9\x03\n\x0f\x46unctionDetails\x12\x0e\n\x06tenant\x18\x01 \x01(\t\x12\x11\n\tnamespace\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 \x01(\t\x12\x11\n\tclassName\x18\x04 \x01(\t\x12\x1c\n\x14outputSerdeClassName\x18\x05 \x01(\t\x12\x0e\n\x06output\x18\x06 \x01(\t\x12\x10\n\x08logTopic\x18\x07 \x01(\t\x12\x39\n\x14processingGuarantees\x18\x08 \x01(\x0e\x32\x1b.proto.ProcessingGuarantees\x12:\n\nuserConfig\x18\t \x03(\x0b\x32&.proto.FunctionDetails.UserConfigEntry\x12/\n\x07runtime\x18\n \x01(\x0e\x32\x1e.proto.FunctionDetails.Runtime\x12\x0f\n\x07\x61utoAck\x18\x0b \x01(\x08\x12\x13\n\x0bparallelism\x18\x0c \x01(\x05\x12!\n\x06source\x18\r \x01(\x0b\x32\x11.proto.SourceSpec\x12\x1d\n\x04sink\x18\x0e \x01(\x0b\x32\x0f.proto.SinkSpec\x1a\x31\n\x0fUserConfigEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"\x1f\n\x07Runtime\x12\x08\n\x04JAVA\x10\x00\x12\n\n\x06PYTHON\x10\x01\"\xf1\x01\n\nSourceSpec\x12\x11\n\tclassName\x18\x01 \x01(\t\x12\x0f\n\x07\x63onfigs\x18\x02 \x01(\t\x12\x31\n\x10subscriptionType\x18\x03 \x01(\x0e\x32\x17.proto.SubscriptionType\x12M\n\x16topicsToSerDeClassName\x18\x04 \x03(\x0b\x32-.proto.SourceSpec.TopicsToSerDeClassNameEntry\x1a=\n\x1bTopicsToSerDeClassNameEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\".\n\x08SinkSpec\x12\x11\n\tclassName\x18\x01 \x01(\t\x12\x0f\n\x07\x63onfigs\x18\x02 \x01(\t\".\n\x17PackageLocationMetaData\x12\x13\n\x0bpackagePath\x18\x01 \x01(\t\"\xa1\x01\n\x10\x46unctionMetaData\x12/\n\x0f\x66unctionDetails\x18\x01 \x01(\x0b\x32\x16.proto.FunctionDetails\x12\x37\n\x0fpackageLocation\x18\x02 \x01(\x0b\x32\x1e.proto.PackageLocationMetaData\x12\x0f\n\x07version\x18\x03 \x01(\x04\x12\x12\n\ncreateTime\x18\x04 \x01(\x04\"Q\n\x08Instance\x12\x31\n\x10\x66unctionMetaData\x18\x01 \x01(\x0b\x32\x17.proto.FunctionMetaData\x12\x12\n\ninstanceId\x18\x02 \x01(\x05\"A\n\nAssignment\x12!\n\x08instance\x18\x01 \x01(\x0b\x32\x0f.proto.Instance\x12\x10\n\x08workerId\x18\x02 \x01(\t*O\n\x14ProcessingGuarantees\x12\x10\n\x0c\x41TLEAST_ONCE\x10\x00\x12\x0f\n\x0b\x41TMOST_ONCE\x10\x01\x12\x14\n\x10\x45\x46\x46\x45\x43TIVELY_ONCE\x10\x02*,\n\x10SubscriptionType\x12\n\n\x06SHARED\x10\x00\x12\x0c\n\x08\x46\x41ILOVER\x10\x01\x42-\n!org.apache.pulsar.functions.protoB\x08\x46unctionb\x06proto3')
serialized_pb=_b('\n\x0e\x46unction.proto\x12\x05proto\"\xcb\x03\n\x0f\x46unctionDetails\x12\x0e\n\x06tenant\x18\x01 \x01(\t\x12\x11\n\tnamespace\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 \x01(\t\x12\x11\n\tclassName\x18\x04 \x01(\t\x12\x10\n\x08logTopic\x18\x05 \x01(\t\x12\x39\n\x14processingGuarantees\x18\x06 \x01(\x0e\x32\x1b.proto.ProcessingGuarantees\x12:\n\nuserConfig\x18\x07 \x03(\x0b\x32&.proto.FunctionDetails.UserConfigEntry\x12/\n\x07runtime\x18\x08 \x01(\x0e\x32\x1e.proto.FunctionDetails.Runtime\x12\x0f\n\x07\x61utoAck\x18\t \x01(\x08\x12\x13\n\x0bparallelism\x18\n \x01(\x05\x12!\n\x06source\x18\x0b \x01(\x0b\x32\x11.proto.SourceSpec\x12\x1d\n\x04sink\x18\x0c \x01(\x0b\x32\x0f.proto.SinkSpec\x1a\x31\n\x0fUserConfigEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"\x1f\n\x07Runtime\x12\x08\n\x04JAVA\x10\x00\x12\n\n\x06PYTHON\x10\x01\"\xf1\x01\n\nSourceSpec\x12\x11\n\tclassName\x18\x01 \x01(\t\x12\x0f\n\x07\x63onfigs\x18\x02 \x01(\t\x12\x31\n\x10subscriptionType\x18\x03 \x01(\x0e\x32\x17.proto.SubscriptionType\x12M\n\x16topicsToSerDeClassName\x18\x04 \x03(\x0b\x32-.proto.SourceSpec.TopicsToSerDeClassNameEntry\x1a=\n\x1bTopicsToSerDeClassNameEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"U\n\x08SinkSpec\x12\x11\n\tclassName\x18\x01 \x01(\t\x12\x0f\n\x07\x63onfigs\x18\x02 \x01(\t\x12\r\n\x05topic\x18\x04 \x01(\t\x12\x16\n\x0eserDeClassName\x18\x05 \x01(\t\".\n\x17PackageLocationMetaData\x12\x13\n\x0bpackagePath\x18\x01 \x01(\t\"\xa1\x01\n\x10\x46unctionMetaData\x12/\n\x0f\x66unctionDetails\x18\x01 \x01(\x0b\x32\x16.proto.FunctionDetails\x12\x37\n\x0fpackageLocation\x18\x02 \x01(\x0b\x32\x1e.proto.PackageLocationMetaData\x12\x0f\n\x07version\x18\x03 \x01(\x04\x12\x12\n\ncreateTime\x18\x04 \x01(\x04\"Q\n\x08Instance\x12\x31\n\x10\x66unctionMetaData\x18\x01 \x01(\x0b\x32\x17.proto.FunctionMetaData\x12\x12\n\ninstanceId\x18\x02 \x01(\x05\"A\n\nAssignment\x12!\n\x08instance\x18\x01 \x01(\x0b\x32\x0f.proto.Instance\x12\x10\n\x08workerId\x18\x02 \x01(\t*O\n\x14ProcessingGuarantees\x12\x10\n\x0c\x41TLEAST_ONCE\x10\x00\x12\x0f\n\x0b\x41TMOST_ONCE\x10\x01\x12\x14\n\x10\x45\x46\x46\x45\x43TIVELY_ONCE\x10\x02*,\n\x10SubscriptionType\x12\n\n\x06SHARED\x10\x00\x12\x0c\n\x08\x46\x41ILOVER\x10\x01\x42-\n!org.apache.pulsar.functions.protoB\x08\x46unctionb\x06proto3')
)

_PROCESSINGGUARANTEES = _descriptor.EnumDescriptor(
Expand All @@ -63,8 +63,8 @@
],
containing_type=None,
options=None,
serialized_start=1187,
serialized_end=1266,
serialized_start=1180,
serialized_end=1259,
)
_sym_db.RegisterEnumDescriptor(_PROCESSINGGUARANTEES)

Expand All @@ -86,8 +86,8 @@
],
containing_type=None,
options=None,
serialized_start=1268,
serialized_end=1312,
serialized_start=1261,
serialized_end=1305,
)
_sym_db.RegisterEnumDescriptor(_SUBSCRIPTIONTYPE)

Expand Down Expand Up @@ -116,8 +116,8 @@
],
containing_type=None,
options=None,
serialized_start=500,
serialized_end=531,
serialized_start=454,
serialized_end=485,
)
_sym_db.RegisterEnumDescriptor(_FUNCTIONDETAILS_RUNTIME)

Expand Down Expand Up @@ -155,8 +155,8 @@
extension_ranges=[],
oneofs=[
],
serialized_start=449,
serialized_end=498,
serialized_start=403,
serialized_end=452,
)

_FUNCTIONDETAILS = _descriptor.Descriptor(
Expand Down Expand Up @@ -195,71 +195,57 @@
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='outputSerdeClassName', full_name='proto.FunctionDetails.outputSerdeClassName', index=4,
name='logTopic', full_name='proto.FunctionDetails.logTopic', index=4,
number=5, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='output', full_name='proto.FunctionDetails.output', index=5,
number=6, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='logTopic', full_name='proto.FunctionDetails.logTopic', index=6,
number=7, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='processingGuarantees', full_name='proto.FunctionDetails.processingGuarantees', index=7,
number=8, type=14, cpp_type=8, label=1,
name='processingGuarantees', full_name='proto.FunctionDetails.processingGuarantees', index=5,
number=6, type=14, cpp_type=8, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='userConfig', full_name='proto.FunctionDetails.userConfig', index=8,
number=9, type=11, cpp_type=10, label=3,
name='userConfig', full_name='proto.FunctionDetails.userConfig', index=6,
number=7, type=11, cpp_type=10, label=3,
has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='runtime', full_name='proto.FunctionDetails.runtime', index=9,
number=10, type=14, cpp_type=8, label=1,
name='runtime', full_name='proto.FunctionDetails.runtime', index=7,
number=8, type=14, cpp_type=8, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='autoAck', full_name='proto.FunctionDetails.autoAck', index=10,
number=11, type=8, cpp_type=7, label=1,
name='autoAck', full_name='proto.FunctionDetails.autoAck', index=8,
number=9, type=8, cpp_type=7, label=1,
has_default_value=False, default_value=False,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='parallelism', full_name='proto.FunctionDetails.parallelism', index=11,
number=12, type=5, cpp_type=1, label=1,
name='parallelism', full_name='proto.FunctionDetails.parallelism', index=9,
number=10, type=5, cpp_type=1, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='source', full_name='proto.FunctionDetails.source', index=12,
number=13, type=11, cpp_type=10, label=1,
name='source', full_name='proto.FunctionDetails.source', index=10,
number=11, type=11, cpp_type=10, label=1,
has_default_value=False, default_value=None,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='sink', full_name='proto.FunctionDetails.sink', index=13,
number=14, type=11, cpp_type=10, label=1,
name='sink', full_name='proto.FunctionDetails.sink', index=11,
number=12, type=11, cpp_type=10, label=1,
has_default_value=False, default_value=None,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
Expand All @@ -278,7 +264,7 @@
oneofs=[
],
serialized_start=26,
serialized_end=531,
serialized_end=485,
)


Expand Down Expand Up @@ -315,8 +301,8 @@
extension_ranges=[],
oneofs=[
],
serialized_start=714,
serialized_end=775,
serialized_start=668,
serialized_end=729,
)

_SOURCESPEC = _descriptor.Descriptor(
Expand Down Expand Up @@ -366,8 +352,8 @@
extension_ranges=[],
oneofs=[
],
serialized_start=534,
serialized_end=775,
serialized_start=488,
serialized_end=729,
)


Expand All @@ -392,6 +378,20 @@
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='topic', full_name='proto.SinkSpec.topic', index=2,
number=4, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='serDeClassName', full_name='proto.SinkSpec.serDeClassName', index=3,
number=5, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
],
extensions=[
],
Expand All @@ -404,8 +404,8 @@
extension_ranges=[],
oneofs=[
],
serialized_start=777,
serialized_end=823,
serialized_start=731,
serialized_end=816,
)


Expand Down Expand Up @@ -435,8 +435,8 @@
extension_ranges=[],
oneofs=[
],
serialized_start=825,
serialized_end=871,
serialized_start=818,
serialized_end=864,
)


Expand Down Expand Up @@ -487,8 +487,8 @@
extension_ranges=[],
oneofs=[
],
serialized_start=874,
serialized_end=1035,
serialized_start=867,
serialized_end=1028,
)


Expand Down Expand Up @@ -525,8 +525,8 @@
extension_ranges=[],
oneofs=[
],
serialized_start=1037,
serialized_end=1118,
serialized_start=1030,
serialized_end=1111,
)


Expand Down Expand Up @@ -563,8 +563,8 @@
extension_ranges=[],
oneofs=[
],
serialized_start=1120,
serialized_end=1185,
serialized_start=1113,
serialized_end=1178,
)

_FUNCTIONDETAILS_USERCONFIGENTRY.containing_type = _FUNCTIONDETAILS
Expand Down

0 comments on commit 5721892

Please sign in to comment.