diff --git a/README.md b/README.md index 234cf570..dd859c0e 100644 --- a/README.md +++ b/README.md @@ -103,19 +103,7 @@ like `bin/example_basic.wasm`. You have two ways to start the function stream server. -### Option 1: Standalone Mode (for development and testing) - -Use this command to start the standalone server: - -```shell -bin/function-stream standalone -``` - -### Option 2: Server Mode (for production) - -First, start an Apache Pulsar service. See this [doc](https://pulsar.apache.org/docs/en/standalone/) for instructions. - -Then, use this command to start the server based on Apache Pulsar: +Use this command to start the function stream server: ```shell bin/function-stream server diff --git a/admin/client/docs/ModelFunction.md b/admin/client/docs/ModelFunction.md index 0f69b3f3..8306182f 100644 --- a/admin/client/docs/ModelFunction.md +++ b/admin/client/docs/ModelFunction.md @@ -9,14 +9,14 @@ Name | Type | Description | Notes **Namespace** | Pointer to **string** | | [optional] **Replicas** | **int32** | | **Runtime** | [**ModelRuntimeConfig**](ModelRuntimeConfig.md) | | -**Sink** | Pointer to [**ModelTubeConfig**](ModelTubeConfig.md) | | [optional] -**Source** | Pointer to [**[]ModelTubeConfig**](ModelTubeConfig.md) | | [optional] +**Sink** | [**ModelTubeConfig**](ModelTubeConfig.md) | | +**Source** | [**[]ModelTubeConfig**](ModelTubeConfig.md) | | ## Methods ### NewModelFunction -`func NewModelFunction(name string, replicas int32, runtime ModelRuntimeConfig, ) *ModelFunction` +`func NewModelFunction(name string, replicas int32, runtime ModelRuntimeConfig, sink ModelTubeConfig, source []ModelTubeConfig, ) *ModelFunction` NewModelFunction instantiates a new ModelFunction object This constructor will assign default values to properties that have it defined, @@ -160,11 +160,6 @@ and a boolean to check if the value has been set. SetSink sets Sink field to given value. -### HasSink - -`func (o *ModelFunction) HasSink() bool` - -HasSink returns a boolean if a field has been set. ### GetSource @@ -185,11 +180,6 @@ and a boolean to check if the value has been set. SetSource sets Source field to given value. -### HasSource - -`func (o *ModelFunction) HasSource() bool` - -HasSource returns a boolean if a field has been set. [[Back to Model list]](../README.md#documentation-for-models) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to README]](../README.md) diff --git a/admin/client/docs/ModelRuntimeConfig.md b/admin/client/docs/ModelRuntimeConfig.md index 44d2a0d9..5c40572d 100644 --- a/admin/client/docs/ModelRuntimeConfig.md +++ b/admin/client/docs/ModelRuntimeConfig.md @@ -5,13 +5,13 @@ Name | Type | Description | Notes ------------ | ------------- | ------------- | ------------- **Config** | Pointer to **map[string]interface{}** | | [optional] -**Type** | Pointer to **string** | | [optional] +**Type** | **string** | | ## Methods ### NewModelRuntimeConfig -`func NewModelRuntimeConfig() *ModelRuntimeConfig` +`func NewModelRuntimeConfig(type_ string, ) *ModelRuntimeConfig` NewModelRuntimeConfig instantiates a new ModelRuntimeConfig object This constructor will assign default values to properties that have it defined, @@ -70,11 +70,6 @@ and a boolean to check if the value has been set. SetType sets Type field to given value. -### HasType - -`func (o *ModelRuntimeConfig) HasType() bool` - -HasType returns a boolean if a field has been set. [[Back to Model list]](../README.md#documentation-for-models) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to README]](../README.md) diff --git a/admin/client/docs/ModelTubeConfig.md b/admin/client/docs/ModelTubeConfig.md index 2cf86887..45b8a00b 100644 --- a/admin/client/docs/ModelTubeConfig.md +++ b/admin/client/docs/ModelTubeConfig.md @@ -5,13 +5,13 @@ Name | Type | Description | Notes ------------ | ------------- | ------------- | ------------- **Config** | Pointer to **map[string]interface{}** | | [optional] -**Type** | Pointer to **string** | | [optional] +**Type** | **string** | | ## Methods ### NewModelTubeConfig -`func NewModelTubeConfig() *ModelTubeConfig` +`func NewModelTubeConfig(type_ string, ) *ModelTubeConfig` NewModelTubeConfig instantiates a new ModelTubeConfig object This constructor will assign default values to properties that have it defined, @@ -70,11 +70,6 @@ and a boolean to check if the value has been set. SetType sets Type field to given value. -### HasType - -`func (o *ModelTubeConfig) HasType() bool` - -HasType returns a boolean if a field has been set. [[Back to Model list]](../README.md#documentation-for-models) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to README]](../README.md) diff --git a/admin/client/model_model_function.go b/admin/client/model_model_function.go index 1d2b43a6..eb5f1dbc 100644 --- a/admin/client/model_model_function.go +++ b/admin/client/model_model_function.go @@ -26,8 +26,8 @@ type ModelFunction struct { Namespace *string `json:"namespace,omitempty"` Replicas int32 `json:"replicas"` Runtime ModelRuntimeConfig `json:"runtime"` - Sink *ModelTubeConfig `json:"sink,omitempty"` - Source []ModelTubeConfig `json:"source,omitempty"` + Sink ModelTubeConfig `json:"sink"` + Source []ModelTubeConfig `json:"source"` } type _ModelFunction ModelFunction @@ -36,11 +36,13 @@ type _ModelFunction ModelFunction // This constructor will assign default values to properties that have it defined, // and makes sure properties required by API are set, but the set of arguments // will change when the set of required properties is changed -func NewModelFunction(name string, replicas int32, runtime ModelRuntimeConfig) *ModelFunction { +func NewModelFunction(name string, replicas int32, runtime ModelRuntimeConfig, sink ModelTubeConfig, source []ModelTubeConfig) *ModelFunction { this := ModelFunction{} this.Name = name this.Replicas = replicas this.Runtime = runtime + this.Sink = sink + this.Source = source return &this } @@ -188,66 +190,50 @@ func (o *ModelFunction) SetRuntime(v ModelRuntimeConfig) { o.Runtime = v } -// GetSink returns the Sink field value if set, zero value otherwise. +// GetSink returns the Sink field value func (o *ModelFunction) GetSink() ModelTubeConfig { - if o == nil || IsNil(o.Sink) { + if o == nil { var ret ModelTubeConfig return ret } - return *o.Sink + + return o.Sink } -// GetSinkOk returns a tuple with the Sink field value if set, nil otherwise +// GetSinkOk returns a tuple with the Sink field value // and a boolean to check if the value has been set. func (o *ModelFunction) GetSinkOk() (*ModelTubeConfig, bool) { - if o == nil || IsNil(o.Sink) { + if o == nil { return nil, false } - return o.Sink, true -} - -// HasSink returns a boolean if a field has been set. -func (o *ModelFunction) HasSink() bool { - if o != nil && !IsNil(o.Sink) { - return true - } - - return false + return &o.Sink, true } -// SetSink gets a reference to the given ModelTubeConfig and assigns it to the Sink field. +// SetSink sets field value func (o *ModelFunction) SetSink(v ModelTubeConfig) { - o.Sink = &v + o.Sink = v } -// GetSource returns the Source field value if set, zero value otherwise. +// GetSource returns the Source field value func (o *ModelFunction) GetSource() []ModelTubeConfig { - if o == nil || IsNil(o.Source) { + if o == nil { var ret []ModelTubeConfig return ret } + return o.Source } -// GetSourceOk returns a tuple with the Source field value if set, nil otherwise +// GetSourceOk returns a tuple with the Source field value // and a boolean to check if the value has been set. func (o *ModelFunction) GetSourceOk() ([]ModelTubeConfig, bool) { - if o == nil || IsNil(o.Source) { + if o == nil { return nil, false } return o.Source, true } -// HasSource returns a boolean if a field has been set. -func (o *ModelFunction) HasSource() bool { - if o != nil && !IsNil(o.Source) { - return true - } - - return false -} - -// SetSource gets a reference to the given []ModelTubeConfig and assigns it to the Source field. +// SetSource sets field value func (o *ModelFunction) SetSource(v []ModelTubeConfig) { o.Source = v } @@ -271,12 +257,8 @@ func (o ModelFunction) ToMap() (map[string]interface{}, error) { } toSerialize["replicas"] = o.Replicas toSerialize["runtime"] = o.Runtime - if !IsNil(o.Sink) { - toSerialize["sink"] = o.Sink - } - if !IsNil(o.Source) { - toSerialize["source"] = o.Source - } + toSerialize["sink"] = o.Sink + toSerialize["source"] = o.Source return toSerialize, nil } @@ -288,6 +270,8 @@ func (o *ModelFunction) UnmarshalJSON(data []byte) (err error) { "name", "replicas", "runtime", + "sink", + "source", } allProperties := make(map[string]interface{}) diff --git a/admin/client/model_model_runtime_config.go b/admin/client/model_model_runtime_config.go index cbaeae5d..4298c0da 100644 --- a/admin/client/model_model_runtime_config.go +++ b/admin/client/model_model_runtime_config.go @@ -11,7 +11,9 @@ API version: 1.0.0 package adminclient import ( + "bytes" "encoding/json" + "fmt" ) // checks if the ModelRuntimeConfig type satisfies the MappedNullable interface at compile time @@ -20,15 +22,18 @@ var _ MappedNullable = &ModelRuntimeConfig{} // ModelRuntimeConfig struct for ModelRuntimeConfig type ModelRuntimeConfig struct { Config map[string]interface{} `json:"config,omitempty"` - Type *string `json:"type,omitempty"` + Type string `json:"type"` } +type _ModelRuntimeConfig ModelRuntimeConfig + // NewModelRuntimeConfig instantiates a new ModelRuntimeConfig object // This constructor will assign default values to properties that have it defined, // and makes sure properties required by API are set, but the set of arguments // will change when the set of required properties is changed -func NewModelRuntimeConfig() *ModelRuntimeConfig { +func NewModelRuntimeConfig(type_ string) *ModelRuntimeConfig { this := ModelRuntimeConfig{} + this.Type = type_ return &this } @@ -72,36 +77,28 @@ func (o *ModelRuntimeConfig) SetConfig(v map[string]interface{}) { o.Config = v } -// GetType returns the Type field value if set, zero value otherwise. +// GetType returns the Type field value func (o *ModelRuntimeConfig) GetType() string { - if o == nil || IsNil(o.Type) { + if o == nil { var ret string return ret } - return *o.Type + + return o.Type } -// GetTypeOk returns a tuple with the Type field value if set, nil otherwise +// GetTypeOk returns a tuple with the Type field value // and a boolean to check if the value has been set. func (o *ModelRuntimeConfig) GetTypeOk() (*string, bool) { - if o == nil || IsNil(o.Type) { + if o == nil { return nil, false } - return o.Type, true + return &o.Type, true } -// HasType returns a boolean if a field has been set. -func (o *ModelRuntimeConfig) HasType() bool { - if o != nil && !IsNil(o.Type) { - return true - } - - return false -} - -// SetType gets a reference to the given string and assigns it to the Type field. +// SetType sets field value func (o *ModelRuntimeConfig) SetType(v string) { - o.Type = &v + o.Type = v } func (o ModelRuntimeConfig) MarshalJSON() ([]byte, error) { @@ -117,12 +114,47 @@ func (o ModelRuntimeConfig) ToMap() (map[string]interface{}, error) { if !IsNil(o.Config) { toSerialize["config"] = o.Config } - if !IsNil(o.Type) { - toSerialize["type"] = o.Type - } + toSerialize["type"] = o.Type return toSerialize, nil } +func (o *ModelRuntimeConfig) UnmarshalJSON(data []byte) (err error) { + // This validates that all required properties are included in the JSON object + // by unmarshalling the object into a generic map with string keys and checking + // that every required field exists as a key in the generic map. + requiredProperties := []string{ + "type", + } + + allProperties := make(map[string]interface{}) + + err = json.Unmarshal(data, &allProperties) + + if err != nil { + return err + } + + for _, requiredProperty := range requiredProperties { + if _, exists := allProperties[requiredProperty]; !exists { + return fmt.Errorf("no value given for required property %v", requiredProperty) + } + } + + varModelRuntimeConfig := _ModelRuntimeConfig{} + + decoder := json.NewDecoder(bytes.NewReader(data)) + decoder.DisallowUnknownFields() + err = decoder.Decode(&varModelRuntimeConfig) + + if err != nil { + return err + } + + *o = ModelRuntimeConfig(varModelRuntimeConfig) + + return err +} + type NullableModelRuntimeConfig struct { value *ModelRuntimeConfig isSet bool diff --git a/admin/client/model_model_tube_config.go b/admin/client/model_model_tube_config.go index 72fae2f5..62fe85be 100644 --- a/admin/client/model_model_tube_config.go +++ b/admin/client/model_model_tube_config.go @@ -11,7 +11,9 @@ API version: 1.0.0 package adminclient import ( + "bytes" "encoding/json" + "fmt" ) // checks if the ModelTubeConfig type satisfies the MappedNullable interface at compile time @@ -20,15 +22,18 @@ var _ MappedNullable = &ModelTubeConfig{} // ModelTubeConfig struct for ModelTubeConfig type ModelTubeConfig struct { Config map[string]interface{} `json:"config,omitempty"` - Type *string `json:"type,omitempty"` + Type string `json:"type"` } +type _ModelTubeConfig ModelTubeConfig + // NewModelTubeConfig instantiates a new ModelTubeConfig object // This constructor will assign default values to properties that have it defined, // and makes sure properties required by API are set, but the set of arguments // will change when the set of required properties is changed -func NewModelTubeConfig() *ModelTubeConfig { +func NewModelTubeConfig(type_ string) *ModelTubeConfig { this := ModelTubeConfig{} + this.Type = type_ return &this } @@ -72,36 +77,28 @@ func (o *ModelTubeConfig) SetConfig(v map[string]interface{}) { o.Config = v } -// GetType returns the Type field value if set, zero value otherwise. +// GetType returns the Type field value func (o *ModelTubeConfig) GetType() string { - if o == nil || IsNil(o.Type) { + if o == nil { var ret string return ret } - return *o.Type + + return o.Type } -// GetTypeOk returns a tuple with the Type field value if set, nil otherwise +// GetTypeOk returns a tuple with the Type field value // and a boolean to check if the value has been set. func (o *ModelTubeConfig) GetTypeOk() (*string, bool) { - if o == nil || IsNil(o.Type) { + if o == nil { return nil, false } - return o.Type, true + return &o.Type, true } -// HasType returns a boolean if a field has been set. -func (o *ModelTubeConfig) HasType() bool { - if o != nil && !IsNil(o.Type) { - return true - } - - return false -} - -// SetType gets a reference to the given string and assigns it to the Type field. +// SetType sets field value func (o *ModelTubeConfig) SetType(v string) { - o.Type = &v + o.Type = v } func (o ModelTubeConfig) MarshalJSON() ([]byte, error) { @@ -117,12 +114,47 @@ func (o ModelTubeConfig) ToMap() (map[string]interface{}, error) { if !IsNil(o.Config) { toSerialize["config"] = o.Config } - if !IsNil(o.Type) { - toSerialize["type"] = o.Type - } + toSerialize["type"] = o.Type return toSerialize, nil } +func (o *ModelTubeConfig) UnmarshalJSON(data []byte) (err error) { + // This validates that all required properties are included in the JSON object + // by unmarshalling the object into a generic map with string keys and checking + // that every required field exists as a key in the generic map. + requiredProperties := []string{ + "type", + } + + allProperties := make(map[string]interface{}) + + err = json.Unmarshal(data, &allProperties) + + if err != nil { + return err + } + + for _, requiredProperty := range requiredProperties { + if _, exists := allProperties[requiredProperty]; !exists { + return fmt.Errorf("no value given for required property %v", requiredProperty) + } + } + + varModelTubeConfig := _ModelTubeConfig{} + + decoder := json.NewDecoder(bytes.NewReader(data)) + decoder.DisallowUnknownFields() + err = decoder.Decode(&varModelTubeConfig) + + if err != nil { + return err + } + + *o = ModelTubeConfig(varModelTubeConfig) + + return err +} + type NullableModelTubeConfig struct { value *ModelTubeConfig isSet bool diff --git a/admin/utils/utils.go b/admin/utils/utils.go index b08815b4..fbd5e604 100644 --- a/admin/utils/utils.go +++ b/admin/utils/utils.go @@ -22,21 +22,44 @@ import ( adminclient "github.com/functionstream/function-stream/admin/client" ) -func MakeQueueSourceTubeConfig(subName string, topics ...string) []adminclient.ModelTubeConfig { +const ( + PulsarQueue string = "pulsar" + MemoryQueue string = "memory" +) + +func MakeMemorySourceTubeConfig(topics ...string) []adminclient.ModelTubeConfig { + return MakeQueueSourceTubeConfig(MemoryQueue, "fs", topics...) +} + +func MakePulsarSourceTubeConfig(topics ...string) []adminclient.ModelTubeConfig { + return MakeQueueSourceTubeConfig(PulsarQueue, "fs", topics...) +} + +func MakeQueueSourceTubeConfig(queueType string, subName string, topics ...string) []adminclient.ModelTubeConfig { return []adminclient.ModelTubeConfig{ { + Type: queueType, Config: map[string]interface{}{ - "topicList": append([]string{}, topics...), - "subName": subName, + "inputs": append([]string{}, topics...), + "subscription-name": subName, }, }, } } -func MakeQueueSinkTubeConfig(topic string) *adminclient.ModelTubeConfig { +func MakeMemorySinkTubeConfig(topic string) *adminclient.ModelTubeConfig { + return MakeQueueSinkTubeConfig(MemoryQueue, topic) +} + +func MakePulsarSinkTubeConfig(topic string) *adminclient.ModelTubeConfig { + return MakeQueueSinkTubeConfig(PulsarQueue, topic) +} + +func MakeQueueSinkTubeConfig(queueType string, topic string) *adminclient.ModelTubeConfig { return &adminclient.ModelTubeConfig{ + Type: queueType, Config: map[string]interface{}{ - "topic": topic, + "output": topic, }, } } @@ -49,22 +72,19 @@ func GetInputTopics(f *adminclient.ModelFunction) ([]string, error) { if len(config) < 1 { return nil, fmt.Errorf("source config for function %s is empty", f.Name) } - if topicList, ok := config["topicList"].([]string); ok { + if topicList, ok := config["inputs"].([]string); ok { return topicList, nil } - return nil, fmt.Errorf("source config for function %s has no topicList", f.Name) + return nil, fmt.Errorf("source config for function %s has no input topics", f.Name) } func GetOutputTopic(f *adminclient.ModelFunction) (string, error) { - if f.Sink == nil { - return "", fmt.Errorf("function %s has no sink", f.Name) - } config := f.Sink.Config if len(config) < 1 { return "", fmt.Errorf("sink config for function %s is empty", f.Name) } - if topic, ok := config["topic"].(string); ok { + if topic, ok := config["output"].(string); ok { return topic, nil } - return "", fmt.Errorf("sink config for function %s has no topic", f.Name) + return "", fmt.Errorf("sink config for function %s has no output topic", f.Name) } diff --git a/apidocs.json b/apidocs.json index 05a9a44b..8c3d0b4d 100644 --- a/apidocs.json +++ b/apidocs.json @@ -337,6 +337,8 @@ "required": [ "name", "runtime", + "source", + "sink", "replicas" ], "properties": { @@ -371,6 +373,9 @@ } }, "model.RuntimeConfig": { + "required": [ + "type" + ], "properties": { "config": { "type": "object" @@ -381,6 +386,9 @@ } }, "model.TubeConfig": { + "required": [ + "type" + ], "properties": { "config": { "type": "object" diff --git a/benchmark/bench_test.go b/benchmark/bench_test.go index eadf0652..793171e2 100644 --- a/benchmark/bench_test.go +++ b/benchmark/bench_test.go @@ -18,6 +18,8 @@ package benchmark import ( "context" + "github.com/functionstream/function-stream/fs" + "github.com/functionstream/function-stream/fs/runtime/wazero" "math/rand" "os" "runtime/pprof" @@ -30,7 +32,6 @@ import ( adminclient "github.com/functionstream/function-stream/admin/client" adminutils "github.com/functionstream/function-stream/admin/utils" "github.com/functionstream/function-stream/common" - "github.com/functionstream/function-stream/fs" "github.com/functionstream/function-stream/fs/contube" "github.com/functionstream/function-stream/perf" "github.com/functionstream/function-stream/server" @@ -74,12 +75,13 @@ func BenchmarkStressForBasicFunc(b *testing.B) { RequestRate: 200000.0, Func: &adminclient.ModelFunction{ Runtime: adminclient.ModelRuntimeConfig{ + Type: common.WASMRuntime, Config: map[string]interface{}{ common.RuntimeArchiveConfigKey: "../bin/example_basic.wasm", }, }, - Source: adminutils.MakeQueueSourceTubeConfig("fs", inputTopic), - Sink: adminutils.MakeQueueSinkTubeConfig(outputTopic), + Source: adminutils.MakePulsarSourceTubeConfig(inputTopic), + Sink: *adminutils.MakePulsarSinkTubeConfig(outputTopic), Replicas: replicas, }, } @@ -112,7 +114,8 @@ func BenchmarkStressForBasicFunc(b *testing.B) { func BenchmarkStressForBasicFuncWithMemoryQueue(b *testing.B) { memoryQueueFactory := contube.NewMemoryQueueFactory(context.Background()) - s, err := server.NewServer(server.WithFunctionManager(fs.WithDefaultTubeFactory(memoryQueueFactory))) + s, err := server.NewServer(server.WithFunctionManager(fs.WithTubeFactory(common.MemoryTubeType, memoryQueueFactory), + fs.WithRuntimeFactory(common.WASMRuntime, wazero.NewWazeroFunctionRuntimeFactory()))) if err != nil { b.Fatal(err) } @@ -131,12 +134,13 @@ func BenchmarkStressForBasicFuncWithMemoryQueue(b *testing.B) { RequestRate: 200000.0, Func: &adminclient.ModelFunction{ Runtime: adminclient.ModelRuntimeConfig{ + Type: common.WASMRuntime, Config: map[string]interface{}{ common.RuntimeArchiveConfigKey: "../bin/example_basic.wasm", }, }, - Source: adminutils.MakeQueueSourceTubeConfig("fs", inputTopic), - Sink: adminutils.MakeQueueSinkTubeConfig(outputTopic), + Source: adminutils.MakeMemorySourceTubeConfig(inputTopic), + Sink: *adminutils.MakeMemorySinkTubeConfig(outputTopic), Replicas: replicas, }, QueueBuilder: func(ctx context.Context) (contube.TubeFactory, error) { diff --git a/cmd/client/create/cmd.go b/cmd/client/create/cmd.go index b0cb0751..37d84cfa 100644 --- a/cmd/client/create/cmd.go +++ b/cmd/client/create/cmd.go @@ -67,11 +67,13 @@ func exec(_ *cobra.Command, _ []string) { cli := adminclient.NewAPIClient(cfg) f := adminclient.ModelFunction{ Name: config.name, - Runtime: adminclient.ModelRuntimeConfig{Config: map[string]interface{}{ - fs_cmmon.RuntimeArchiveConfigKey: config.archive, - }}, - Source: utils.MakeQueueSourceTubeConfig("fs", config.inputs...), - Sink: utils.MakeQueueSinkTubeConfig(config.output), + Runtime: adminclient.ModelRuntimeConfig{ + Type: fs_cmmon.WASMRuntime, + Config: map[string]interface{}{ + fs_cmmon.RuntimeArchiveConfigKey: config.archive, + }}, + Source: utils.MakeMemorySourceTubeConfig(config.inputs...), + Sink: *utils.MakeMemorySinkTubeConfig(config.output), Replicas: config.replica, } diff --git a/cmd/main.go b/cmd/main.go index ceb6c601..7f56fdfe 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -23,7 +23,6 @@ import ( "github.com/functionstream/function-stream/cmd/client" "github.com/functionstream/function-stream/cmd/perf" "github.com/functionstream/function-stream/cmd/server" - "github.com/functionstream/function-stream/cmd/standalone" "github.com/spf13/cobra" ) @@ -39,7 +38,6 @@ func init() { rootCmd.AddCommand(server.Cmd) rootCmd.AddCommand(client.Cmd) rootCmd.AddCommand(perf.Cmd) - rootCmd.AddCommand(standalone.Cmd) } func main() { diff --git a/cmd/server/cmd.go b/cmd/server/cmd.go index decc5ce9..d8d69df2 100644 --- a/cmd/server/cmd.go +++ b/cmd/server/cmd.go @@ -65,7 +65,10 @@ func exec(*cobra.Command, []string) { return nil, err } } - s, err := server.NewServer(server.WithConfig(c)) + s, err := server.NewServer( + server.WithTubeFactoryBuilders(server.GetBuiltinTubeFactoryBuilder()), + server.WithRuntimeFactoryBuilders(server.GetBuiltinRuntimeFactoryBuilder()), + server.WithConfig(c)) if err != nil { return nil, err } diff --git a/cmd/standalone/cmd.go b/cmd/standalone/cmd.go deleted file mode 100644 index 90c04590..00000000 --- a/cmd/standalone/cmd.go +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Copyright 2024 Function Stream Org. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package standalone - -import ( - "context" - "io" - - "github.com/functionstream/function-stream/common" - "github.com/functionstream/function-stream/server" - "github.com/spf13/cobra" -) - -var ( - Cmd = &cobra.Command{ - Use: "standalone", - Short: "Start a standalone server", - Long: `Start a standalone server`, - Run: exec, - } -) - -func exec(*cobra.Command, []string) { - common.RunProcess(func() (io.Closer, error) { - config, err := server.LoadConfigFromFile("conf/standalone.yaml") - if err != nil { - return nil, err - } - s, err := server.NewServer(server.WithConfig(config)) - if err != nil { - return nil, err - } - go s.Run(context.Background()) - return s, nil - }) -} diff --git a/common/model/function.go b/common/model/function.go index 883e9ce4..9568fad4 100644 --- a/common/model/function.go +++ b/common/model/function.go @@ -24,23 +24,23 @@ import ( ) type TubeConfig struct { - Type *string `json:"type,omitempty"` // Default to `default` + Type string `json:"type"` // Default to `default` Config contube.ConfigMap `json:"config,omitempty"` } type ConfigMap map[string]interface{} type RuntimeConfig struct { - Config ConfigMap `json:"config,omitempty" yaml:"config,omitempty"` - Type *string `json:"type,omitempty" yaml:"type,omitempty"` + Config ConfigMap `json:"config,omitempty"` + Type string `json:"type"` } type Function struct { Name string `json:"name"` Namespace string `json:"namespace,omitempty"` - Runtime *RuntimeConfig `json:"runtime"` - Sources []*TubeConfig `json:"source,omitempty"` - Sink *TubeConfig `json:"sink,omitempty"` + Runtime RuntimeConfig `json:"runtime"` + Sources []TubeConfig `json:"source"` + Sink TubeConfig `json:"sink"` Config map[string]string `json:"config,omitempty"` Replicas int32 `json:"replicas"` } @@ -55,15 +55,9 @@ func (f *Function) Validate() error { if strings.Contains(f.Namespace, "/") { return errors.New("namespace should not contain '/'") } - if f.Runtime == nil { - return errors.New("runtime should be configured") - } if len(f.Sources) == 0 { return errors.New("sources should be configured") } - if f.Sink == nil { - return errors.New("sink should be configured") - } if f.Replicas <= 0 { return errors.New("replicas should be greater than 0") } diff --git a/common/model/function_serde_test.go b/common/model/function_serde_test.go index 3ea3d58f..8909a46a 100644 --- a/common/model/function_serde_test.go +++ b/common/model/function_serde_test.go @@ -22,16 +22,15 @@ import ( "reflect" "testing" - "github.com/functionstream/function-stream/common" "gopkg.in/yaml.v3" ) func TestFunctionSerde(t *testing.T) { f := Function{ Name: "TestFunction", - Runtime: &RuntimeConfig{Type: common.OptionalStr("runtime"), Config: map[string]interface{}{"key": "value"}}, - Sources: []*TubeConfig{{Type: common.OptionalStr("source"), Config: map[string]interface{}{"key": "value"}}}, - Sink: &TubeConfig{Type: common.OptionalStr("sink"), Config: map[string]interface{}{"key": "value"}}, + Runtime: RuntimeConfig{Type: "runtime", Config: map[string]interface{}{"key": "value"}}, + Sources: []TubeConfig{{Type: "source", Config: map[string]interface{}{"key": "value"}}}, + Sink: TubeConfig{Type: "sink", Config: map[string]interface{}{"key": "value"}}, Config: map[string]string{"key": "value"}, Replicas: 2, } @@ -77,9 +76,9 @@ func TestFunctionSerde(t *testing.T) { func TestFunctionSerdeWithNil(t *testing.T) { f := Function{ Name: "TestFunction", - Runtime: nil, - Sources: nil, - Sink: nil, + Runtime: RuntimeConfig{Config: map[string]interface{}{}}, + Sources: []TubeConfig{}, + Sink: TubeConfig{Config: map[string]interface{}{}}, Config: map[string]string{"key": "value"}, Replicas: 2, } @@ -99,6 +98,10 @@ func TestFunctionSerdeWithNil(t *testing.T) { t.Fatal("JSON Deserialization error:", err) } + // TODO: We should override the MarshalJson for the Function + f2.Sink.Config = map[string]interface{}{} + f2.Runtime.Config = map[string]interface{}{} + if !reflect.DeepEqual(f, f2) { t.Error("JSON Deserialization does not match original") } @@ -111,9 +114,6 @@ func TestFunctionSerdeWithNil(t *testing.T) { fmt.Println(string(data)) - f.Sources = []*TubeConfig{} // The nil would be expected to be converted to a zero-length array for the YAML - // serialization - // YAML Deserialization err = yaml.Unmarshal(data, &f2) if err != nil { diff --git a/conf/function-stream.yaml b/conf/function-stream.yaml index 001c50dc..360f196a 100644 --- a/conf/function-stream.yaml +++ b/conf/function-stream.yaml @@ -13,17 +13,10 @@ # limitations under the License. listen_addr: ":7300" -tube_factory: +queue: + type: "pulsar" + config: + pulsar_url: "pulsar://localhost:6650" +tube-config: pulsar: - type: "pulsar" - config: - pulsar_url: "pulsar://localhost:6650" - memory: - Type: "memory" - default: - ref: "pulsar" -runtime_factory: - wasm: - type: "wasm" - default: - ref: "wasm" \ No newline at end of file + pulsar_url: "pulsar://localhost:6650" \ No newline at end of file diff --git a/conf/standalone.yaml b/conf/standalone.yaml deleted file mode 100644 index 24bfcb85..00000000 --- a/conf/standalone.yaml +++ /dev/null @@ -1,25 +0,0 @@ -# Copyright 2024 Function Stream Org. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -listen_addr: ":7300" -tube_factory: - memory: - Type: "memory" - default: - ref: "memory" -runtime_factory: - wasm: - type: "wasm" - default: - ref: "wasm" \ No newline at end of file diff --git a/fs/contube/contube.go b/fs/contube/contube.go index 1ff7b91c..8c1e3a4d 100644 --- a/fs/contube/contube.go +++ b/fs/contube/contube.go @@ -19,7 +19,6 @@ package contube import ( "context" "encoding/json" - "fmt" "github.com/go-playground/validator/v10" "github.com/pkg/errors" ) @@ -36,63 +35,22 @@ type Record interface { } type SourceQueueConfig struct { - Topics []string - SubName string + Topics []string `json:"inputs" validate:"required"` + SubName string `json:"subscription-name" validate:"required"` } type SinkQueueConfig struct { - Topic string -} - -const ( - TopicKey = "topic" - TopicListKey = "topicList" - SubNameKey = "subName" -) - -func NewSourceQueueConfig(config ConfigMap) (*SourceQueueConfig, error) { - var result SourceQueueConfig - // The list value type should be considered as interface{} - if topics, ok := config[TopicListKey].([]interface{}); ok { - var topicStrList []string - for _, v := range topics { - if topicStr, ok := v.(string); ok { - topicStrList = append(topicStrList, v.(string)) - } else { - return nil, fmt.Errorf("invalid topic in the %s: %s", TopicListKey, topicStr) - } - } - result.Topics = topicStrList - } - if subName, ok := config[SubNameKey].(string); ok { - result.SubName = subName - } - return &result, nil + Topic string `json:"output" validate:"required"` } func (c *SourceQueueConfig) ToConfigMap() ConfigMap { - topicListInterface := make([]interface{}, len(c.Topics)) - for i, v := range c.Topics { - topicListInterface[i] = v - } - return ConfigMap{ - TopicListKey: topicListInterface, - SubNameKey: c.SubName, - } -} - -func NewSinkQueueConfig(config ConfigMap) *SinkQueueConfig { - var result SinkQueueConfig - if topic, ok := config[TopicKey].(string); ok { - result.Topic = topic - } - return &result + configMap, _ := ToConfigMap(c) + return configMap } func (c *SinkQueueConfig) ToConfigMap() ConfigMap { - return ConfigMap{ - TopicKey: c.Topic, - } + configMap, _ := ToConfigMap(c) + return configMap } type ConfigMap map[string]interface{} @@ -120,6 +78,18 @@ func (c ConfigMap) ToConfigStruct(v any) error { return validate.Struct(v) } +func ToConfigMap(v any) (ConfigMap, error) { + jsonData, err := json.Marshal(v) + if err != nil { + return nil, err + } + var result ConfigMap + if err := json.Unmarshal(jsonData, &result); err != nil { + return nil, err + } + return result, nil +} + type SourceTubeFactory interface { // NewSourceTube returns a new channel that can be used to receive events // The channel would be closed when the context is done diff --git a/fs/contube/memory.go b/fs/contube/memory.go index 176b0812..0d8dc4b3 100644 --- a/fs/contube/memory.go +++ b/fs/contube/memory.go @@ -78,8 +78,8 @@ func (f *MemoryQueueFactory) release(name string) { } func (f *MemoryQueueFactory) NewSourceTube(ctx context.Context, configMap ConfigMap) (<-chan Record, error) { - config, err := NewSourceQueueConfig(configMap) - if err != nil { + config := SourceQueueConfig{} + if err := configMap.ToConfigStruct(&config); err != nil { return nil, err } result := make(chan Record) @@ -115,7 +115,10 @@ func (f *MemoryQueueFactory) NewSourceTube(ctx context.Context, configMap Config } func (f *MemoryQueueFactory) NewSinkTube(ctx context.Context, configMap ConfigMap) (chan<- Record, error) { - config := NewSinkQueueConfig(configMap) + config := SinkQueueConfig{} + if err := configMap.ToConfigStruct(&config); err != nil { + return nil, err + } c := f.getOrCreateChan(config.Topic) wrapperC := make(chan Record) go func() { diff --git a/fs/contube/pulsar.go b/fs/contube/pulsar.go index 2e12700d..68545d85 100644 --- a/fs/contube/pulsar.go +++ b/fs/contube/pulsar.go @@ -38,7 +38,7 @@ func NewPulsarTubeFactoryConfig(configMap ConfigMap) (*PulsarTubeFactoryConfig, if pulsarURL, ok := configMap[PulsarURLKey].(string); ok { result.PulsarURL = pulsarURL } else { - return nil, errors.Errorf("Missing required field %s", PulsarURLKey) + result.PulsarURL = "pulsar://localhost:6650" } return &result, nil } @@ -55,16 +55,19 @@ type PulsarEventQueueFactory struct { } func (f *PulsarEventQueueFactory) NewSourceTube(ctx context.Context, configMap ConfigMap) (<-chan Record, error) { - config, err := NewSourceQueueConfig(configMap) - if err != nil { + config := SourceQueueConfig{} + if err := configMap.ToConfigStruct(&config); err != nil { return nil, err } - return f.newSourceChan(ctx, config) + return f.newSourceChan(ctx, &config) } func (f *PulsarEventQueueFactory) NewSinkTube(ctx context.Context, configMap ConfigMap) (chan<- Record, error) { - config := NewSinkQueueConfig(configMap) - return f.newSinkChan(ctx, config) + config := SinkQueueConfig{} + if err := configMap.ToConfigStruct(&config); err != nil { + return nil, err + } + return f.newSinkChan(ctx, &config) } func NewPulsarEventQueueFactory(ctx context.Context, configMap ConfigMap) (TubeFactory, error) { diff --git a/fs/manager.go b/fs/manager.go index fa701ca2..b91ce0c4 100644 --- a/fs/manager.go +++ b/fs/manager.go @@ -74,6 +74,7 @@ type managerOptions struct { instanceFactory api.FunctionInstanceFactory stateStore api.StateStore dontUseDefaultStateStore bool + queueFactory contube.TubeFactory } type ManagerOption interface { @@ -92,9 +93,11 @@ func WithTubeFactory(name string, factory contube.TubeFactory) ManagerOption { return c, nil }) } - -func WithDefaultTubeFactory(factory contube.TubeFactory) ManagerOption { - return WithTubeFactory("default", factory) +func WithQueueFactory(factory contube.TubeFactory) ManagerOption { + return managerOptionFunc(func(c *managerOptions) (*managerOptions, error) { + c.queueFactory = factory + return c, nil + }) } func WithRuntimeFactory(name string, factory api.FunctionRuntimeFactory) ManagerOption { @@ -104,10 +107,6 @@ func WithRuntimeFactory(name string, factory api.FunctionRuntimeFactory) Manager }) } -func WithDefaultRuntimeFactory(factory api.FunctionRuntimeFactory) ManagerOption { - return WithRuntimeFactory("default", factory) -} - func WithInstanceFactory(factory api.FunctionInstanceFactory) ManagerOption { return managerOptionFunc(func(c *managerOptions) (*managerOptions, error) { c.instanceFactory = factory @@ -129,7 +128,6 @@ func NewFunctionManager(opts ...ManagerOption) (FunctionManager, error) { runtimeFactoryMap: make(map[string]api.FunctionRuntimeFactory), } options.runtimeFactoryMap["default"] = wazero.NewWazeroFunctionRuntimeFactory() - options.tubeFactoryMap["default"] = contube.NewMemoryQueueFactory(context.Background()) options.instanceFactory = NewDefaultInstanceFactory() for _, o := range opts { _, err := o.apply(options) @@ -163,26 +161,12 @@ func NewFunctionManager(opts ...ManagerOption) (FunctionManager, error) { } func (fm *functionManagerImpl) getTubeFactory(tubeConfig *model.TubeConfig) (contube.TubeFactory, error) { - get := func(t string) (contube.TubeFactory, error) { - factory, exist := fm.options.tubeFactoryMap[t] - if !exist { - fm.log.ErrorContext(context.Background(), "tube factory not found", "type", t) - return nil, common.ErrorTubeFactoryNotFound - } - return factory, nil - - } - if tubeConfig == nil || tubeConfig.Type == nil { - return get("default") - } - return get(*tubeConfig.Type) -} - -func (fm *functionManagerImpl) getRuntimeType(runtimeConfig *model.RuntimeConfig) string { - if runtimeConfig == nil || runtimeConfig.Type == nil { - return "default" + factory, exist := fm.options.tubeFactoryMap[tubeConfig.Type] + if !exist { + fm.log.ErrorContext(context.Background(), "tube factory not found", "type", tubeConfig.Type) + return nil, common.ErrorTubeFactoryNotFound } - return *runtimeConfig.Type + return factory, nil } func (fm *functionManagerImpl) getRuntimeFactory(t string) (api.FunctionRuntimeFactory, error) { @@ -211,7 +195,7 @@ func (fm *functionManagerImpl) StartFunction(f *model.Function) error { // TODO: fm.functionsLock.Unlock() funcCtx := fm.createFuncCtx() for i := int32(0); i < f.Replicas; i++ { - runtimeType := fm.getRuntimeType(f.Runtime) + runtimeType := f.Runtime.Type instance := fm.options.instanceFactory.NewFunctionInstance(f, funcCtx, i, slog.With( slog.String("name", f.Name), @@ -227,7 +211,7 @@ func (fm *functionManagerImpl) StartFunction(f *model.Function) error { // TODO: } var sources []<-chan contube.Record for _, t := range f.Sources { - sourceFactory, err := fm.getTubeFactory(t) + sourceFactory, err := fm.getTubeFactory(&t) if err != nil { return nil } @@ -237,7 +221,7 @@ func (fm *functionManagerImpl) StartFunction(f *model.Function) error { // TODO: } sources = append(sources, sourceChan) } - sinkFactory, err := fm.getTubeFactory(f.Sink) + sinkFactory, err := fm.getTubeFactory(&f.Sink) if err != nil { return nil } @@ -291,7 +275,11 @@ func (fm *functionManagerImpl) ListFunctions() (result []string) { func (fm *functionManagerImpl) ProduceEvent(name string, event contube.Record) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - c, err := fm.options.tubeFactoryMap["default"].NewSinkTube(ctx, (&contube.SinkQueueConfig{Topic: name}).ToConfigMap()) + factory, ok := fm.options.tubeFactoryMap[common.MemoryTubeType] + if !ok { + return errors.New("memory tube factory not found") + } + c, err := factory.NewSinkTube(ctx, (&contube.SinkQueueConfig{Topic: name}).ToConfigMap()) if err != nil { return err } @@ -302,7 +290,11 @@ func (fm *functionManagerImpl) ProduceEvent(name string, event contube.Record) e func (fm *functionManagerImpl) ConsumeEvent(name string) (contube.Record, error) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - c, err := fm.options.tubeFactoryMap["default"].NewSourceTube(ctx, (&contube.SourceQueueConfig{ + factory, ok := fm.options.tubeFactoryMap[common.MemoryTubeType] + if !ok { + return nil, errors.New("memory tube factory not found") + } + c, err := factory.NewSourceTube(ctx, (&contube.SourceQueueConfig{ Topics: []string{name}, SubName: "consume-" + strconv.Itoa(rand.Int())}).ToConfigMap()) if err != nil { return nil, err diff --git a/fs/runtime/grpc/grpc_func_test.go b/fs/runtime/grpc/grpc_func_test.go index 4c5636ee..1f810961 100644 --- a/fs/runtime/grpc/grpc_func_test.go +++ b/fs/runtime/grpc/grpc_func_test.go @@ -49,7 +49,7 @@ func TestFMWithGRPCRuntime(t *testing.T) { fm, err := fs.NewFunctionManager( fs.WithRuntimeFactory("grpc", fsService), - fs.WithDefaultTubeFactory(contube.NewMemoryQueueFactory(ctx)), + fs.WithTubeFactory("memory", contube.NewMemoryQueueFactory(ctx)), ) if err != nil { t.Fatal(err) @@ -59,21 +59,23 @@ func TestFMWithGRPCRuntime(t *testing.T) { outputTopic := "output" f := &model.Function{ Name: "test", - Runtime: &model.RuntimeConfig{ - Type: common.OptionalStr("grpc"), + Runtime: model.RuntimeConfig{ + Type: "grpc", Config: map[string]interface{}{ "addr": addr, }, }, - Sources: []*model.TubeConfig{ + Sources: []model.TubeConfig{ { + Type: common.MemoryTubeType, Config: (&contube.SourceQueueConfig{ Topics: []string{inputTopic}, SubName: "test", }).ToConfigMap(), }, }, - Sink: &model.TubeConfig{ + Sink: model.TubeConfig{ + Type: common.MemoryTubeType, Config: (&contube.SinkQueueConfig{ Topic: outputTopic, }).ToConfigMap(), diff --git a/fs/runtime/wazero/wazero_runtime.go b/fs/runtime/wazero/wazero_runtime.go index 99ac4b67..1752c3f8 100644 --- a/fs/runtime/wazero/wazero_runtime.go +++ b/fs/runtime/wazero/wazero_runtime.go @@ -57,7 +57,7 @@ func (f *WazeroFunctionRuntimeFactory) NewFunctionRuntime(instance api.FunctionI wasi_snapshot_preview1.MustInstantiate(instance.Context(), r) - if instance.Definition().Runtime == nil || instance.Definition().Runtime.Config == nil { + if instance.Definition().Runtime.Config == nil { return nil, errors.New("No runtime config found") } path, exist := instance.Definition().Runtime.Config["archive"] diff --git a/functions/example-functions.yaml b/functions/example-functions.yaml index da32ab66..084375d2 100644 --- a/functions/example-functions.yaml +++ b/functions/example-functions.yaml @@ -20,10 +20,12 @@ runtime: archive: "bin/example_basic.wasm" sources: - config: - topicList: + inputs: - "input" - subName: "fs" + subscription-name: "fs" + type: "memory" sink: config: - topic: "output" + output: "output" + type: "memory" replicas: 1 \ No newline at end of file diff --git a/perf/perf.go b/perf/perf.go index 2cf288ff..a636ac14 100644 --- a/perf/perf.go +++ b/perf/perf.go @@ -94,8 +94,8 @@ func (p *perf) Run(ctx context.Context) { common.RuntimeArchiveConfigKey: "./bin/example_basic.wasm", }, }, - Source: utils.MakeQueueSourceTubeConfig("test-sub", "test-input-"+strconv.Itoa(rand.Int())), - Sink: utils.MakeQueueSinkTubeConfig("test-output-" + strconv.Itoa(rand.Int())), + Source: utils.MakeMemorySourceTubeConfig("test-input-" + strconv.Itoa(rand.Int())), + Sink: *utils.MakeMemorySinkTubeConfig("test-output-" + strconv.Itoa(rand.Int())), } } f.Name = name diff --git a/server/config.go b/server/config.go index 231ecab4..f2c2af5e 100644 --- a/server/config.go +++ b/server/config.go @@ -27,6 +27,7 @@ import ( ) type FactoryConfig struct { + // Deprecate Ref *string `mapstructure:"ref"` Type *string `mapstructure:"type"` Config *common.ConfigMap `mapstructure:"config"` @@ -37,64 +38,43 @@ type StateStoreConfig struct { Config *common.ConfigMap `mapstructure:"config"` } +type QueueConfig struct { + Type string `mapstructure:"type"` + Config common.ConfigMap `mapstructure:"config"` +} + type Config struct { // ListenAddr is the address that the function stream REST service will listen on. - ListenAddr string `mapstructure:"listen_addr"` + ListenAddr string `mapstructure:"listen-addr"` - // TubeFactory is the list of tube factories that the function stream server will use. - TubeFactory map[string]*FactoryConfig `mapstructure:"tube_factory"` + Queue QueueConfig `mapstructure:"queue"` - // RuntimeFactory is the list of runtime factories that the function stream server will use. - RuntimeFactory map[string]*FactoryConfig `mapstructure:"runtime_factory"` + TubeConfig map[string]common.ConfigMap `mapstructure:"tube-config"` + + RuntimeConfig map[string]common.ConfigMap `mapstructure:"runtime-config"` // StateStore is the configuration for the state store that the function stream server will use. // Optional - StateStore *StateStoreConfig `mapstructure:"state_store"` + StateStore *StateStoreConfig `mapstructure:"state-store"` // FunctionStore is the path to the function store - FunctionStore string `mapstructure:"function_store"` + FunctionStore string `mapstructure:"function-store"` - EnableTLS bool `mapstructure:"enable_tls"` - TLSCertFile string `mapstructure:"tls_cert_file"` - TLSKeyFile string `mapstructure:"tls_key_file"` + EnableTLS bool `mapstructure:"enable-tls"` + TLSCertFile string `mapstructure:"tls-cert-file"` + TLSKeyFile string `mapstructure:"tls-key-file"` } func init() { - viper.SetDefault("listen_addr", ":7300") - viper.SetDefault("function_store", "./functions") -} - -func preprocessFactoriesConfig(n string, m map[string]*FactoryConfig) error { - for name, factory := range m { - if ref := factory.Ref; ref != nil && *ref != "" { - referred, ok := m[strings.ToLower(*ref)] - if !ok { - return errors.Errorf("%s factory %s refers to non-existent factory %s", n, name, *ref) - } - if factory.Type == nil { - factory.Type = referred.Type - } - factory.Config = common.MergeConfig(referred.Config, factory.Config) - } - } - - for name, factory := range m { - if factory.Type == nil { - return errors.Errorf("%s factory %s has no type", n, name) - } - } - return nil + viper.SetDefault("listen-addr", ":7300") + viper.SetDefault("function-store", "./functions") } func (c *Config) preprocessConfig() error { if c.ListenAddr == "" { return errors.New("ListenAddr shouldn't be empty") } - err := preprocessFactoriesConfig("Tube", c.TubeFactory) - if err != nil { - return err - } - return preprocessFactoriesConfig("Runtime", c.RuntimeFactory) + return nil } func loadConfig() (*Config, error) { @@ -126,7 +106,9 @@ func LoadConfigFromEnv() (*Config, error) { value := parts[1] slog.Info("Loading environment variable", "key", key, "value", value) - viper.Set(strings.Replace(key, "__", ".", -1), value) + key = strings.Replace(key, "__", ".", -1) + key = strings.Replace(key, "_", "-", -1) + viper.Set(key, value) } } diff --git a/server/config_test.go b/server/config_test.go index f735cb56..488981fc 100644 --- a/server/config_test.go +++ b/server/config_test.go @@ -17,12 +17,11 @@ package server import ( - "os" - "testing" - "github.com/spf13/viper" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "os" + "testing" ) func TestLoadConfigFromYaml(t *testing.T) { @@ -39,10 +38,8 @@ func TestLoadConfigFromJson(t *testing.T) { func TestLoadConfigFromEnv(t *testing.T) { assert.Nil(t, os.Setenv("FS_LISTEN_ADDR", ":17300")) - assert.Nil(t, os.Setenv("FS_TUBE_FACTORY__MY_PULSAR__TYPE", "pulsar")) - assert.Nil(t, os.Setenv("FS_TUBE_FACTORY__MY_PULSAR__CONFIG__PULSAR_URL", "pulsar://localhost:6651")) - assert.Nil(t, os.Setenv("FS_TUBE_FACTORY__MY_MEMORY__TYPE", "memory")) - assert.Nil(t, os.Setenv("FS_TUBE_FACTORY__DEFAULT__REF", "my_pulsar")) + assert.Nil(t, os.Setenv("FS_TUBE_CONFIG__MY_TUBE__KEY", "value")) + assert.Nil(t, os.Setenv("FS_RUNTIME_CONFIG__CUSTOM_RUNTIME__NAME", "test")) viper.AutomaticEnv() @@ -53,23 +50,9 @@ func TestLoadConfigFromEnv(t *testing.T) { func assertConfig(t *testing.T, c *Config) { assert.Equal(t, ":17300", c.ListenAddr) - require.Contains(t, c.TubeFactory, "my_pulsar") - assert.Equal(t, "pulsar", *c.TubeFactory["my_pulsar"].Type) - - if config := c.TubeFactory["my_pulsar"].Config; config != nil { - assert.Equal(t, "pulsar://localhost:6651", (*config)["pulsar_url"]) - } else { - t.Fatal("pulsar config is nil") - } - - require.Contains(t, c.TubeFactory, "my_memory") - assert.Equal(t, "memory", *c.TubeFactory["my_memory"].Type) + require.Contains(t, c.TubeConfig, "my-tube") + assert.Equal(t, "value", c.TubeConfig["my-tube"]["key"]) - require.Contains(t, c.TubeFactory, "default") - assert.Equal(t, "my_pulsar", *c.TubeFactory["default"].Ref) - if config := c.TubeFactory["default"].Config; config != nil { - assert.Equal(t, "pulsar://localhost:6651", (*config)["pulsar_url"]) - } else { - t.Fatal("pulsar config is nil") - } + require.Contains(t, c.RuntimeConfig, "custom-runtime") + assert.Equal(t, "test", c.RuntimeConfig["custom-runtime"]["name"]) } diff --git a/server/function_store_test.go b/server/function_store_test.go index 85cd5001..4f67c2a1 100644 --- a/server/function_store_test.go +++ b/server/function_store_test.go @@ -70,20 +70,23 @@ func newTestFunctionManagerImpl() fs.FunctionManager { func createTestFunction(name string) *model.Function { return &model.Function{ - Runtime: &model.RuntimeConfig{ + Runtime: model.RuntimeConfig{ + Type: common.WASMRuntime, Config: map[string]interface{}{ common.RuntimeArchiveConfigKey: "../bin/example_basic.wasm", }, }, - Sources: []*model.TubeConfig{ + Sources: []model.TubeConfig{ { + Type: common.MemoryTubeType, Config: (&contube.SourceQueueConfig{ Topics: []string{"input"}, SubName: "test", }).ToConfigMap(), }, }, - Sink: &model.TubeConfig{ + Sink: model.TubeConfig{ + Type: common.MemoryTubeType, Config: (&contube.SinkQueueConfig{ Topic: "output", }).ToConfigMap(), diff --git a/server/server.go b/server/server.go index 2638dbed..664d6b4f 100644 --- a/server/server.go +++ b/server/server.go @@ -36,13 +36,15 @@ import ( "github.com/functionstream/function-stream/fs/statestore" "github.com/go-openapi/spec" "github.com/pkg/errors" - "k8s.io/utils/set" ) var ( ErrUnsupportedTRuntimeType = errors.New("unsupported runtime type") ErrUnsupportedTubeType = errors.New("unsupported tube type") ErrUnsupportedStateStore = errors.New("unsupported state store") + + ErrCreateFactory = errors.New("error creating factory") + ErrUnsupportedQueueType = errors.New("unsupported queue type") ) type Server struct { @@ -58,16 +60,19 @@ type RuntimeLoaderType func(c *FactoryConfig) (api.FunctionRuntimeFactory, error type StateStoreLoaderType func(c *StateStoreConfig) (api.StateStore, error) type serverOptions struct { - httpListener net.Listener - managerOpts []fs.ManagerOption - httpTubeFact *contube.HttpTubeFactory - tubeLoader TubeLoaderType - runtimeLoader RuntimeLoaderType - stateStoreLoader StateStoreLoaderType - functionStore string - enableTls bool - tlsCertFile string - tlsKeyFile string + httpListener net.Listener + managerOpts []fs.ManagerOption + httpTubeFact *contube.HttpTubeFactory + stateStoreLoader StateStoreLoaderType + functionStore string + enableTls bool + tlsCertFile string + tlsKeyFile string + tubeFactoryBuilders map[string]func(configMap common.ConfigMap) (contube.TubeFactory, error) + tubeConfig map[string]common.ConfigMap + runtimeFactoryBuilders map[string]func(configMap common.ConfigMap) (api.FunctionRuntimeFactory, error) + runtimeConfig map[string]common.ConfigMap + queueConfig QueueConfig } type ServerOption interface { @@ -80,7 +85,11 @@ func (f serverOptionFunc) apply(c *serverOptions) (*serverOptions, error) { return f(c) } +// Low-level server configuration + // WithFunctionManager sets the function Manager for the server. +// Only for testing +// Deprecate func WithFunctionManager(opts ...fs.ManagerOption) ServerOption { return serverOptionFunc(func(o *serverOptions) (*serverOptions, error) { o.managerOpts = append(o.managerOpts, opts...) @@ -106,91 +115,83 @@ func WithHttpTubeFactory(factory *contube.HttpTubeFactory) ServerOption { }) } -// WithTubeLoader sets the loader for the tube factory. -// This must be called before WithConfig. -func WithTubeLoader(loader TubeLoaderType) ServerOption { +func WithQueueConfig(config QueueConfig) ServerOption { return serverOptionFunc(func(o *serverOptions) (*serverOptions, error) { - o.tubeLoader = loader + o.queueConfig = config return o, nil }) } -// WithRuntimeLoader sets the loader for the runtime factory. -// This must be called before WithConfig. -func WithRuntimeLoader(loader RuntimeLoaderType) ServerOption { +func WithTubeFactoryBuilder(name string, builder func(configMap common.ConfigMap) (contube.TubeFactory, error)) ServerOption { return serverOptionFunc(func(o *serverOptions) (*serverOptions, error) { - o.runtimeLoader = loader + o.tubeFactoryBuilders[name] = builder return o, nil }) } -func WithStateStoreLoader(loader func(c *StateStoreConfig) (api.StateStore, error)) ServerOption { +func WithTubeFactoryBuilders(builder map[string]func(configMap common.ConfigMap) (contube.TubeFactory, error)) ServerOption { return serverOptionFunc(func(o *serverOptions) (*serverOptions, error) { - o.stateStoreLoader = loader + for n, b := range builder { + o.tubeFactoryBuilders[n] = b + } return o, nil }) } -func getRefFactory(m map[string]*FactoryConfig, name string, visited set.Set[string]) (string, error) { - if visited.Has(name) { - return "", errors.Errorf("circular reference of factory %s", name) - } - visited.Insert(name) - f, ok := m[name] - if !ok { - return "", errors.Errorf("tube factory %s not found", name) - } - if f.Ref != nil { - return getRefFactory(m, strings.ToLower(*f.Ref), visited) - } - return name, nil +func WithRuntimeFactoryBuilder(name string, builder func(configMap common.ConfigMap) (api.FunctionRuntimeFactory, error)) ServerOption { + return serverOptionFunc(func(o *serverOptions) (*serverOptions, error) { + o.runtimeFactoryBuilders[name] = builder + return o, nil + }) } -func initFactories[T any](m map[string]*FactoryConfig, newFactory func(c *FactoryConfig) (T, error), - setup func(n string, f T)) error { - factoryMap := make(map[string]T) - - for name := range m { - refName, err := getRefFactory(m, name, set.New[string]()) - if err != nil { - return err - } - if _, ok := factoryMap[refName]; !ok { - fc, exist := m[refName] - if !exist { - return errors.Errorf("factory %s not found, which the factory %s is pointed to", refName, name) - } - if fc.Type == nil { - return errors.Errorf("factory %s type is not set", refName) - } - f, err := newFactory(fc) - if err != nil { - return err - } - factoryMap[refName] = f +func WithRuntimeFactoryBuilders(builder map[string]func(configMap common.ConfigMap) (api.FunctionRuntimeFactory, error)) ServerOption { + return serverOptionFunc(func(o *serverOptions) (*serverOptions, error) { + for n, b := range builder { + o.runtimeFactoryBuilders[n] = b } - factoryMap[name] = factoryMap[refName] - setup(name, factoryMap[name]) + return o, nil + }) +} + +func WithStateStoreLoader(loader func(c *StateStoreConfig) (api.StateStore, error)) ServerOption { + return serverOptionFunc(func(o *serverOptions) (*serverOptions, error) { + o.stateStoreLoader = loader + return o, nil + }) +} + +func GetBuiltinTubeFactoryBuilder() map[string]func(configMap common.ConfigMap) (contube.TubeFactory, error) { + return map[string]func(configMap common.ConfigMap) (contube.TubeFactory, error){ + common.PulsarTubeType: func(configMap common.ConfigMap) (contube.TubeFactory, error) { + return contube.NewPulsarEventQueueFactory(context.Background(), contube.ConfigMap(configMap)) + }, + common.MemoryTubeType: func(configMap common.ConfigMap) (contube.TubeFactory, error) { + return contube.NewMemoryQueueFactory(context.Background()), nil + }, } - return nil } -func DefaultTubeLoader(c *FactoryConfig) (contube.TubeFactory, error) { - switch strings.ToLower(*c.Type) { - case common.PulsarTubeType: - return contube.NewPulsarEventQueueFactory(context.Background(), contube.ConfigMap(*c.Config)) - case common.MemoryTubeType: - return contube.NewMemoryQueueFactory(context.Background()), nil +func GetBuiltinRuntimeFactoryBuilder() map[string]func(configMap common.ConfigMap) (api.FunctionRuntimeFactory, error) { + return map[string]func(configMap common.ConfigMap) (api.FunctionRuntimeFactory, error){ + common.WASMRuntime: func(configMap common.ConfigMap) (api.FunctionRuntimeFactory, error) { + return wazero.NewWazeroFunctionRuntimeFactory(), nil + }, } - return nil, errors.WithMessagef(ErrUnsupportedTubeType, "unsupported tube type :%s", *c.Type) } -func DefaultRuntimeLoader(c *FactoryConfig) (api.FunctionRuntimeFactory, error) { - switch strings.ToLower(*c.Type) { - case common.WASMRuntime: - return wazero.NewWazeroFunctionRuntimeFactory(), nil +func setupFactories[T any](factoryBuilder map[string]func(configMap common.ConfigMap) (T, error), + config map[string]common.ConfigMap, +) (map[string]T, error) { + factories := make(map[string]T) + for name, builder := range factoryBuilder { + f, err := builder(config[name]) + if err != nil { + return nil, errors.WithMessagef(ErrCreateFactory, "error creating factory %s: %v", name, err) + } + factories[name] = f } - return nil, errors.WithMessagef(ErrUnsupportedTRuntimeType, "unsupported runtime type: %s", *c.Type) + return factories, nil } func DefaultStateStoreLoader(c *StateStoreConfig) (api.StateStore, error) { @@ -216,19 +217,9 @@ func WithConfig(config *Config) ServerOption { o.tlsCertFile = config.TLSCertFile o.tlsKeyFile = config.TLSKeyFile } - err = initFactories[contube.TubeFactory](config.TubeFactory, o.tubeLoader, func(n string, f contube.TubeFactory) { - o.managerOpts = append(o.managerOpts, fs.WithTubeFactory(n, f)) - }) - if err != nil { - return nil, err - } - err = initFactories[api.FunctionRuntimeFactory](config.RuntimeFactory, o.runtimeLoader, - func(n string, f api.FunctionRuntimeFactory) { - o.managerOpts = append(o.managerOpts, fs.WithRuntimeFactory(n, f)) - }) - if err != nil { - return nil, err - } + o.tubeConfig = config.TubeConfig + o.queueConfig = config.Queue + o.runtimeConfig = config.RuntimeConfig if config.StateStore != nil { stateStore, err := o.stateStoreLoader(config.StateStore) if err != nil { @@ -245,12 +236,13 @@ func NewServer(opts ...ServerOption) (*Server, error) { options := &serverOptions{} httpTubeFact := contube.NewHttpTubeFactory(context.Background()) options.managerOpts = []fs.ManagerOption{ - fs.WithDefaultTubeFactory(contube.NewMemoryQueueFactory(context.Background())), fs.WithTubeFactory("http", httpTubeFact), } options.httpTubeFact = httpTubeFact - options.tubeLoader = DefaultTubeLoader - options.runtimeLoader = DefaultRuntimeLoader + options.tubeFactoryBuilders = make(map[string]func(configMap common.ConfigMap) (contube.TubeFactory, error)) + options.tubeConfig = make(map[string]common.ConfigMap) + options.runtimeFactoryBuilders = make(map[string]func(configMap common.ConfigMap) (api.FunctionRuntimeFactory, error)) + options.runtimeConfig = make(map[string]common.ConfigMap) options.stateStoreLoader = DefaultStateStoreLoader for _, o := range opts { if o == nil { @@ -261,6 +253,38 @@ func NewServer(opts ...ServerOption) (*Server, error) { return nil, err } } + + // Config Tube Factory + if tubeFactories, err := setupFactories(options.tubeFactoryBuilders, options.tubeConfig); err == nil { + for name, f := range tubeFactories { + options.managerOpts = append(options.managerOpts, fs.WithTubeFactory(name, f)) + } + } else { + return nil, err + } + + // Config Runtime Factory + if runtimeFactories, err := setupFactories(options.runtimeFactoryBuilders, options.runtimeConfig); err == nil { + for name, f := range runtimeFactories { + options.managerOpts = append(options.managerOpts, fs.WithRuntimeFactory(name, f)) + } + } else { + return nil, err + } + + // Config Queue Factory + if options.queueConfig.Type != "" { + queueFactoryBuilder, ok := options.tubeFactoryBuilders[options.queueConfig.Type] + if !ok { + return nil, errors.WithMessagef(ErrUnsupportedQueueType, "unsupported queue type %s", options.queueConfig.Type) + } + queueFactory, err := queueFactoryBuilder(options.queueConfig.Config) + if err != nil { + return nil, errors.WithMessagef(ErrCreateFactory, "error creating queue factory: %v", err) + } + options.managerOpts = append(options.managerOpts, fs.WithQueueFactory(queueFactory)) + } + manager, err := fs.NewFunctionManager(options.managerOpts...) if err != nil { return nil, err @@ -295,27 +319,18 @@ func NewServer(opts ...ServerOption) (*Server, error) { func NewDefaultServer() (*Server, error) { defaultConfig := &Config{ ListenAddr: ":7300", - TubeFactory: map[string]*FactoryConfig{ - "pulsar": { - Type: common.OptionalStr(common.PulsarTubeType), - Config: &common.ConfigMap{ - contube.PulsarURLKey: "pulsar://localhost:6650", - }, - }, - "default": { - Ref: common.OptionalStr("pulsar"), - }, + Queue: QueueConfig{ + Type: common.MemoryTubeType, + Config: common.ConfigMap{}, }, - RuntimeFactory: map[string]*FactoryConfig{ - "wasm": { - Type: common.OptionalStr(common.WASMRuntime), - }, - "default": { - Ref: common.OptionalStr("wasm"), + TubeConfig: map[string]common.ConfigMap{ + common.PulsarTubeType: { + contube.PulsarURLKey: "pulsar://localhost:6650", }, }, + RuntimeConfig: map[string]common.ConfigMap{}, } - return NewServer(WithConfig(defaultConfig)) + return NewServer(WithTubeFactoryBuilders(GetBuiltinTubeFactoryBuilder()), WithRuntimeFactoryBuilders(GetBuiltinRuntimeFactoryBuilder()), WithConfig(defaultConfig)) } func (s *Server) Run(context context.Context) { diff --git a/server/server_test.go b/server/server_test.go index d62c1ba3..e942c062 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -47,6 +47,8 @@ func startStandaloneSvr(t *testing.T, ctx context.Context, opts ...ServerOption) ln := getListener(t) defaultOpts := []ServerOption{ WithHttpListener(ln), + WithTubeFactoryBuilders(GetBuiltinTubeFactoryBuilder()), + WithRuntimeFactoryBuilders(GetBuiltinRuntimeFactoryBuilder()), } s, err := NewServer( append(defaultOpts, opts...)..., @@ -72,20 +74,23 @@ func TestStandaloneBasicFunction(t *testing.T) { outputTopic := "test-output-" + strconv.Itoa(rand.Int()) funcConf := &model.Function{ - Runtime: &model.RuntimeConfig{ + Runtime: model.RuntimeConfig{ + Type: common.WASMRuntime, Config: map[string]interface{}{ common.RuntimeArchiveConfigKey: "../bin/example_basic.wasm", }, }, - Sources: []*model.TubeConfig{ + Sources: []model.TubeConfig{ { + Type: common.MemoryTubeType, Config: (&contube.SourceQueueConfig{ Topics: []string{inputTopic}, SubName: "test", }).ToConfigMap(), }, }, - Sink: &model.TubeConfig{ + Sink: model.TubeConfig{ + Type: common.MemoryTubeType, Config: (&contube.SinkQueueConfig{ Topic: outputTopic, }).ToConfigMap(), @@ -136,18 +141,20 @@ func TestHttpTube(t *testing.T) { endpoint := "test-endpoint" funcConf := &model.Function{ - Runtime: &model.RuntimeConfig{ + Runtime: model.RuntimeConfig{ + Type: common.WASMRuntime, Config: map[string]interface{}{ common.RuntimeArchiveConfigKey: "../bin/example_basic.wasm", }, }, - Sources: []*model.TubeConfig{{ - Type: common.OptionalStr(common.HttpTubeType), + Sources: []model.TubeConfig{{ + Type: common.HttpTubeType, Config: map[string]interface{}{ contube.EndpointKey: endpoint, }, }}, - Sink: &model.TubeConfig{ + Sink: model.TubeConfig{ + Type: common.MemoryTubeType, Config: (&contube.SinkQueueConfig{ Topic: "output", }).ToConfigMap(), @@ -231,22 +238,26 @@ func (r *MockRuntime) Stop() { func TestStatefulFunction(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - s, httpAddr := startStandaloneSvr(t, ctx, WithFunctionManager(fs.WithDefaultRuntimeFactory(&MockRuntimeFactory{}))) + s, httpAddr := startStandaloneSvr(t, ctx, WithFunctionManager(fs.WithRuntimeFactory("mock", &MockRuntimeFactory{}))) input := "input" output := "output" funcConf := &model.Function{ - Name: "test-func", - Runtime: &model.RuntimeConfig{}, - Sources: []*model.TubeConfig{ + Name: "test-func", + Runtime: model.RuntimeConfig{ + Type: "mock", + }, + Sources: []model.TubeConfig{ { + Type: common.MemoryTubeType, Config: (&contube.SourceQueueConfig{ Topics: []string{input}, SubName: "test", }).ToConfigMap(), }, }, - Sink: &model.TubeConfig{ + Sink: model.TubeConfig{ + Type: common.MemoryTubeType, Config: (&contube.SinkQueueConfig{ Topic: "output", }).ToConfigMap(), diff --git a/tests/integration_test.go b/tests/integration_test.go index b788e749..f594809f 100644 --- a/tests/integration_test.go +++ b/tests/integration_test.go @@ -64,19 +64,13 @@ func TestBasicFunction(t *testing.T) { f := adminclient.ModelFunction{ Name: name, Runtime: adminclient.ModelRuntimeConfig{ + Type: common.WASMRuntime, Config: map[string]interface{}{ common.RuntimeArchiveConfigKey: "../bin/example_basic.wasm", }, }, - Source: []adminclient.ModelTubeConfig{ - { - Config: map[string]interface{}{ - "topicList": []string{inputTopic}, - "subName": "fs", - }, - }, - }, - Sink: utils.MakeQueueSinkTubeConfig(outputTopic), + Source: utils.MakePulsarSourceTubeConfig(inputTopic), + Sink: *utils.MakePulsarSinkTubeConfig(outputTopic), Replicas: 1, } diff --git a/tests/test_config.json b/tests/test_config.json index 8c89ab5d..3a84cc56 100644 --- a/tests/test_config.json +++ b/tests/test_config.json @@ -1,17 +1,13 @@ { - "listen_addr": ":17300", - "tube_factory": { - "my_pulsar": { - "type": "pulsar", - "config": { - "pulsar_url": "pulsar://localhost:6651" - } - }, - "my_memory": { - "type": "memory" - }, - "default": { - "ref": "my_pulsar" + "listen-addr": ":17300", + "tube-config": { + "my-tube": { + "key": "value" + } + }, + "runtime-config": { + "custom-runtime": { + "name": "test" } } } \ No newline at end of file diff --git a/tests/test_config.yaml b/tests/test_config.yaml index a9a0dd12..d86ba7e4 100644 --- a/tests/test_config.yaml +++ b/tests/test_config.yaml @@ -12,13 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -listen_addr: ":17300" -tube_factory: - my_pulsar: - type: "pulsar" - config: - pulsar_url: "pulsar://localhost:6651" - my_memory: - type: "memory" - default: - ref: "my_pulsar" \ No newline at end of file +listen-addr: ":17300" +tube-config: + my-tube: + key: "value" +runtime-config: + custom-runtime: + name: "test" \ No newline at end of file