Skip to content

Commit

Permalink
[improve][fn] Pass FunctionDetails to Go instance (apache#22350)
Browse files Browse the repository at this point in the history
  • Loading branch information
jiangpengcheng authored and Technoboy- committed Apr 1, 2024
1 parent 9a43ab0 commit 99231d0
Show file tree
Hide file tree
Showing 6 changed files with 236 additions and 0 deletions.
2 changes: 2 additions & 0 deletions pulsar-function-go/conf/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ type Conf struct {
UserConfig string `json:"userConfig" yaml:"userConfig"`
//metrics config
MetricsPort int `json:"metricsPort" yaml:"metricsPort"`
// FunctionDetails
FunctionDetails string `json:"functionDetails" yaml:"functionDetails"`
}

var (
Expand Down
11 changes: 11 additions & 0 deletions pulsar-function-go/pf/instanceConf.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ import (
"time"

"github.com/apache/pulsar/pulsar-function-go/conf"
log "github.com/apache/pulsar/pulsar-function-go/logutil"
pb "github.com/apache/pulsar/pulsar-function-go/pb"
"google.golang.org/protobuf/encoding/protojson"
)

// This is the config passed to the Golang Instance. Contains all the information
Expand Down Expand Up @@ -122,6 +124,15 @@ func newInstanceConfWithConf(cfg *conf.Conf) *instanceConf {
tlsAllowInsecure: cfg.TLSAllowInsecureConnection,
tlsHostnameVerification: cfg.TLSHostnameVerificationEnable,
}
// parse the raw function details and ignore the unmarshal error(fallback to original way)
if cfg.FunctionDetails != "" {
functionDetails := pb.FunctionDetails{}
if err := protojson.Unmarshal([]byte(cfg.FunctionDetails), &functionDetails); err != nil {
log.Errorf("Failed to unmarshal function details: %v", err)
} else {
instanceConf.funcDetails = functionDetails
}
}

if instanceConf.funcDetails.ProcessingGuarantees == pb.ProcessingGuarantees_EFFECTIVELY_ONCE {
panic("Go instance current not support EFFECTIVELY_ONCE processing guarantees.")
Expand Down
207 changes: 207 additions & 0 deletions pulsar-function-go/pf/instanceConf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package pf

import (
"fmt"
"testing"

cfg "github.com/apache/pulsar/pulsar-function-go/conf"
Expand Down Expand Up @@ -113,3 +114,209 @@ func TestInstanceConf_Fail(t *testing.T) {
newInstanceConfWithConf(&cfg.Conf{ProcessingGuarantees: 3})
}, "Should have a panic")
}

func TestInstanceConf_WithDetails(t *testing.T) {
cfg := &cfg.Conf{
FunctionDetails: `{"tenant":"public","namespace":"default","name":"test-function","className":"process",
"logTopic":"test-logs","userConfig":"{\"key1\":\"value1\"}","runtime":"GO","autoAck":true,"parallelism":1,
"source":{"configs":"{\"username\":\"admin\"}","typeClassName":"string","timeoutMs":"15000",
"subscriptionName":"test-subscription","inputSpecs":{"input":{"schemaType":"avro","receiverQueueSize":{"value":1000},
"schemaProperties":{"schema_prop1":"schema1"},"consumerProperties":{"consumer_prop1":"consumer1"},"cryptoSpec":
{"cryptoKeyReaderClassName":"key-reader","producerCryptoFailureAction":"SEND","consumerCryptoFailureAction":"CONSUME"}}}
,"negativeAckRedeliveryDelayMs":"15000"},"sink":{"configs":"{\"password\":\"admin\"}","topic":"test-output",
"typeClassName":"string","schemaType":"avro","producerSpec":{"maxPendingMessages":2000,"useThreadLocalProducers":true,
"cryptoSpec":{"cryptoKeyReaderClassName":"key-reader","producerCryptoFailureAction":"DISCARD"},
"batchBuilder":"DEFAULT"}},"resources":{"cpu":2.0,"ram":"1024","disk":"1024"},"packageUrl":"/path/to/package",
"retryDetails":{"maxMessageRetries":3,"deadLetterTopic":"test-dead-letter-topic"},"secretsMap":
"{\"secret1\":\"secret-value1\"}","runtimeFlags":"flags","componentType":"FUNCTION","customRuntimeOptions":"options",
"retainOrdering":true,"retainKeyOrdering":true,"subscriptionPosition":"EARLIEST"}`,
}
instanceConf := newInstanceConfWithConf(cfg)
assert.Equal(t, "public", instanceConf.funcDetails.Tenant)
assert.Equal(t, "default", instanceConf.funcDetails.Namespace)
assert.Equal(t, "test-function", instanceConf.funcDetails.Name)
assert.Equal(t, "process", instanceConf.funcDetails.ClassName)
assert.Equal(t, "test-logs", instanceConf.funcDetails.LogTopic)
assert.Equal(t, pb.ProcessingGuarantees_ATLEAST_ONCE, instanceConf.funcDetails.ProcessingGuarantees)
assert.Equal(t, `{"key1":"value1"}`, instanceConf.funcDetails.UserConfig)
assert.Equal(t, `{"secret1":"secret-value1"}`, instanceConf.funcDetails.SecretsMap)
assert.Equal(t, pb.FunctionDetails_GO, instanceConf.funcDetails.Runtime)

assert.Equal(t, true, instanceConf.funcDetails.AutoAck)
assert.Equal(t, int32(1), instanceConf.funcDetails.Parallelism)

sourceSpec := pb.SourceSpec{
TypeClassName: "string",
TimeoutMs: 15000,
Configs: `{"username":"admin"}`,
SubscriptionName: "test-subscription",
SubscriptionType: pb.SubscriptionType_SHARED,
NegativeAckRedeliveryDelayMs: 15000,
InputSpecs: map[string]*pb.ConsumerSpec{
"input": {
SchemaType: "avro",
SchemaProperties: map[string]string{
"schema_prop1": "schema1",
},
ConsumerProperties: map[string]string{
"consumer_prop1": "consumer1",
},
ReceiverQueueSize: &pb.ConsumerSpec_ReceiverQueueSize{
Value: 1000,
},
CryptoSpec: &pb.CryptoSpec{
CryptoKeyReaderClassName: "key-reader",
ProducerCryptoFailureAction: pb.CryptoSpec_SEND,
ConsumerCryptoFailureAction: pb.CryptoSpec_CONSUME,
},
},
},
}
assert.Equal(t, sourceSpec.String(), instanceConf.funcDetails.Source.String())

sinkSpec := pb.SinkSpec{
TypeClassName: "string",
Topic: "test-output",
Configs: `{"password":"admin"}`,
SchemaType: "avro",
ProducerSpec: &pb.ProducerSpec{
MaxPendingMessages: 2000,
UseThreadLocalProducers: true,
CryptoSpec: &pb.CryptoSpec{
CryptoKeyReaderClassName: "key-reader",
ProducerCryptoFailureAction: pb.CryptoSpec_DISCARD,
ConsumerCryptoFailureAction: pb.CryptoSpec_FAIL,
},
BatchBuilder: "DEFAULT",
},
}
assert.Equal(t, sinkSpec.String(), instanceConf.funcDetails.Sink.String())

resource := pb.Resources{
Cpu: 2.0,
Ram: 1024,
Disk: 1024,
}
assert.Equal(t, resource.String(), instanceConf.funcDetails.Resources.String())
assert.Equal(t, "/path/to/package", instanceConf.funcDetails.PackageUrl)

retryDetails := pb.RetryDetails{
MaxMessageRetries: 3,
DeadLetterTopic: "test-dead-letter-topic",
}
assert.Equal(t, retryDetails.String(), instanceConf.funcDetails.RetryDetails.String())

assert.Equal(t, "flags", instanceConf.funcDetails.RuntimeFlags)
assert.Equal(t, pb.FunctionDetails_FUNCTION, instanceConf.funcDetails.ComponentType)
assert.Equal(t, "options", instanceConf.funcDetails.CustomRuntimeOptions)
assert.Equal(t, "", instanceConf.funcDetails.Builtin)
assert.Equal(t, true, instanceConf.funcDetails.RetainOrdering)
assert.Equal(t, true, instanceConf.funcDetails.RetainKeyOrdering)
assert.Equal(t, pb.SubscriptionPosition_EARLIEST, instanceConf.funcDetails.SubscriptionPosition)
}

func TestInstanceConf_WithEmptyOrInvalidDetails(t *testing.T) {
testCases := []struct {
name string
details string
}{
{
name: "empty details",
details: "",
},
{
name: "invalid details",
details: "error",
},
}

for i, testCase := range testCases {

t.Run(fmt.Sprintf("testCase[%d] %s", i, testCase.name), func(t *testing.T) {
cfg := &cfg.Conf{
FunctionDetails: testCase.details,
Tenant: "public",
NameSpace: "default",
Name: "test-function",
LogTopic: "test-logs",
ProcessingGuarantees: 0,
UserConfig: `{"key1":"value1"}`,
SecretsMap: `{"secret1":"secret-value1"}`,
Runtime: 3,
AutoACK: true,
Parallelism: 1,
SubscriptionType: 1,
TimeoutMs: 15000,
SubscriptionName: "test-subscription",
CleanupSubscription: false,
SubscriptionPosition: 0,
SinkSpecTopic: "test-output",
SinkSchemaType: "avro",
Cpu: 2.0,
Ram: 1024,
Disk: 1024,
MaxMessageRetries: 3,
DeadLetterTopic: "test-dead-letter-topic",
SourceInputSpecs: map[string]string{
"input": `{"schemaType":"avro","receiverQueueSize":{"value":1000},"schemaProperties":
{"schema_prop1":"schema1"},"consumerProperties":{"consumer_prop1":"consumer1"}}`,
},
}
instanceConf := newInstanceConfWithConf(cfg)

assert.Equal(t, "public", instanceConf.funcDetails.Tenant)
assert.Equal(t, "default", instanceConf.funcDetails.Namespace)
assert.Equal(t, "test-function", instanceConf.funcDetails.Name)
assert.Equal(t, "test-logs", instanceConf.funcDetails.LogTopic)
assert.Equal(t, pb.ProcessingGuarantees_ATLEAST_ONCE, instanceConf.funcDetails.ProcessingGuarantees)
assert.Equal(t, `{"key1":"value1"}`, instanceConf.funcDetails.UserConfig)
assert.Equal(t, `{"secret1":"secret-value1"}`, instanceConf.funcDetails.SecretsMap)
assert.Equal(t, pb.FunctionDetails_GO, instanceConf.funcDetails.Runtime)

assert.Equal(t, true, instanceConf.funcDetails.AutoAck)
assert.Equal(t, int32(1), instanceConf.funcDetails.Parallelism)

sourceSpec := pb.SourceSpec{
SubscriptionType: pb.SubscriptionType_FAILOVER,
TimeoutMs: 15000,
SubscriptionName: "test-subscription",
CleanupSubscription: false,
SubscriptionPosition: pb.SubscriptionPosition_LATEST,
InputSpecs: map[string]*pb.ConsumerSpec{
"input": {
SchemaType: "avro",
SchemaProperties: map[string]string{
"schema_prop1": "schema1",
},
ConsumerProperties: map[string]string{
"consumer_prop1": "consumer1",
},
ReceiverQueueSize: &pb.ConsumerSpec_ReceiverQueueSize{
Value: 1000,
},
},
},
}
assert.Equal(t, sourceSpec.String(), instanceConf.funcDetails.Source.String())

sinkSpec := pb.SinkSpec{
Topic: "test-output",
SchemaType: "avro",
}
assert.Equal(t, sinkSpec.String(), instanceConf.funcDetails.Sink.String())

resource := pb.Resources{
Cpu: 2.0,
Ram: 1024,
Disk: 1024,
}
assert.Equal(t, resource.String(), instanceConf.funcDetails.Resources.String())

retryDetails := pb.RetryDetails{
MaxMessageRetries: 3,
DeadLetterTopic: "test-dead-letter-topic",
}
assert.Equal(t, retryDetails.String(), instanceConf.funcDetails.RetryDetails.String())
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,6 @@ public class GoInstanceConfig {
private String deadLetterTopic = "";

private int metricsPort;

private String functionDetails = "";
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,12 @@ public static List<String> getGoInstanceCmd(InstanceConfig instanceConfig,
final List<String> args = new LinkedList<>();
GoInstanceConfig goInstanceConfig = new GoInstanceConfig();

// pass the raw functino details directly so that we don't need to assemble the `instanceConf.funcDetails`
// manually in Go instance
String functionDetails =
JsonFormat.printer().omittingInsignificantWhitespace().print(instanceConfig.getFunctionDetails());
goInstanceConfig.setFunctionDetails(functionDetails);

if (instanceConfig.getClusterName() != null) {
goInstanceConfig.setClusterName(instanceConfig.getClusterName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1006,6 +1006,14 @@ private void verifyGolangInstance(InstanceConfig config) throws Exception {
assertEquals(goInstanceConfig.get("expectedHealthCheckInterval"), 0);
assertEquals(goInstanceConfig.get("deadLetterTopic"), "");
assertEquals(goInstanceConfig.get("metricsPort"), 4331);
assertEquals(goInstanceConfig.get("functionDetails"), "{\"tenant\":\"tenant\",\"namespace\":\"namespace\","
+ "\"name\":\"container\",\"className\":\"org.apache.pulsar.functions.utils.functioncache"
+ ".AddFunction\",\"logTopic\":\"container-log\",\"runtime\":\"GO\",\"source\":{\"className\":\"org"
+ ".pulsar.pulsar.TestSource\",\"subscriptionType\":\"FAILOVER\",\"typeClassName\":\"java.lang"
+ ".String\",\"inputSpecs\":{\"test_src\":{}}},\"sink\":{\"className\":\"org.pulsar.pulsar"
+ ".TestSink\",\"topic\":\"container-output\",\"serDeClassName\":\"org.apache.pulsar.functions"
+ ".runtime.serde.Utf8Serializer\",\"typeClassName\":\"java.lang.String\"},\"resources\":{\"cpu\":1"
+ ".0,\"ram\":\"1000\",\"disk\":\"10000\"}}");

// check padding and xmx
V1Container containerSpec = container.getFunctionContainer(Collections.emptyList(), RESOURCES);
Expand Down

0 comments on commit 99231d0

Please sign in to comment.