diff --git a/.gitignore b/.gitignore index f5984c87..6326197c 100644 --- a/.gitignore +++ b/.gitignore @@ -104,5 +104,6 @@ dist .tern-port .idea +.run bin/ .DS_Store diff --git a/Makefile b/Makefile index 7269364d..ee079a40 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,17 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + .PHONY: build +.PHONY: license build: go build -v -o bin/function-stream ./cmd @@ -25,3 +38,6 @@ gen_rest_client: --global-property apiDocs,apis,models,supportingFiles rm -r restclient/go.mod restclient/go.sum restclient/.travis.yml restclient/.openapi-generator-ignore \ restclient/git_push.sh restclient/.openapi-generator restclient/api restclient/test + +license: + ./license-checker/license-checker.sh diff --git a/benchmark/bench_test.go b/benchmark/bench_test.go index b3810850..3beb1711 100644 --- a/benchmark/bench_test.go +++ b/benchmark/bench_test.go @@ -19,6 +19,7 @@ import ( "github.com/apache/pulsar-client-go/pulsaradmin" "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils" "github.com/functionstream/functionstream/common" + "github.com/functionstream/functionstream/lib" "github.com/functionstream/functionstream/perf" "github.com/functionstream/functionstream/restclient" "github.com/functionstream/functionstream/server" @@ -77,9 +78,9 @@ func BenchmarkStressForBasicFunc(b *testing.B) { createTopic(inputTopic) createTopic(outputTopic) - pConfig := perf.Config{ + pConfig := &perf.Config{ PulsarURL: "pulsar://localhost:6650", - RequestRate: 100000.0, + RequestRate: 200000.0, Func: &restclient.Function{ Archive: "./bin/example_basic.wasm", Inputs: []string{inputTopic}, @@ -116,3 +117,73 @@ func BenchmarkStressForBasicFunc(b *testing.B) { b.Fatal(err) } } + +func BenchmarkStressForBasicFuncWithMemoryQueue(b *testing.B) { + prepareEnv() + + memoryQueueFactory := lib.NewMemoryQueueFactory() + + svrConf := &lib.Config{ + QueueBuilder: func(ctx context.Context, config *lib.Config) (lib.EventQueueFactory, error) { + return memoryQueueFactory, nil + }, + } + + fm, err := lib.NewFunctionManager(svrConf) + if err != nil { + b.Fatal(err) + } + s := server.NewWithFM(fm) + go func() { + common.RunProcess(func() (io.Closer, error) { + go s.Run() + return s, nil + }) + }() + + inputTopic := "test-input-" + strconv.Itoa(rand.Int()) + outputTopic := "test-output-" + strconv.Itoa(rand.Int()) + + replicas := int32(15) + + pConfig := &perf.Config{ + RequestRate: 200000.0, + Func: &restclient.Function{ + Archive: "./bin/example_basic.wasm", + Inputs: []string{inputTopic}, + Output: outputTopic, + Replicas: &replicas, + }, + QueueBuilder: func(ctx context.Context, c *lib.Config) (lib.EventQueueFactory, error) { + return memoryQueueFactory, nil + }, + } + + b.ReportAllocs() + + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(30*time.Second)) + defer cancel() + + profile := "BenchmarkStressForBasicFunc.pprof" + file, err := os.Create(profile) + if err != nil { + b.Fatal(err) + } + defer func() { + _ = file.Close() + }() + + err = pprof.StartCPUProfile(file) + if err != nil { + b.Fatal(err) + } + + perf.New(pConfig).Run(ctx) + + pprof.StopCPUProfile() + + err = s.Close() + if err != nil { + b.Fatal(err) + } +} diff --git a/cmd/perf/cmd.go b/cmd/perf/cmd.go index 701cb7d9..bc3e1fe5 100644 --- a/cmd/perf/cmd.go +++ b/cmd/perf/cmd.go @@ -30,7 +30,7 @@ var ( Run: exec, } - config = perf.Config{} + config = &perf.Config{} ) func init() { diff --git a/lib/config.go b/lib/config.go new file mode 100644 index 00000000..7d82094e --- /dev/null +++ b/lib/config.go @@ -0,0 +1,27 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package lib + +import ( + "context" +) + +type QueueBuilder func(ctx context.Context, config *Config) (EventQueueFactory, error) + +type Config struct { + ListenAddr string + PulsarURL string + QueueBuilder QueueBuilder +} diff --git a/lib/event_queue.go b/lib/event_queue.go new file mode 100644 index 00000000..898dc57c --- /dev/null +++ b/lib/event_queue.go @@ -0,0 +1,65 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package lib + +import ( + "context" +) + +type Event interface { + GetPayload() []byte + Ack() +} + +type SourceQueueConfig struct { + Topics []string + SubName string +} + +type SinkQueueConfig struct { + Topic string +} + +//type EventQueueFactory func(ctx context.Context, config *QueueConfig, function *model.Function) (EventQueue, error) + +type EventQueueFactory interface { + NewSourceChan(ctx context.Context, config *SourceQueueConfig) (<-chan Event, error) + NewSinkChan(ctx context.Context, config *SinkQueueConfig) (chan<- Event, error) +} + +type EventQueue interface { + GetSendChan() (chan<- Event, error) + GetRecvChan() (<-chan Event, error) +} + +type AckableEvent struct { + payload []byte + ackFunc func() +} + +func NewAckableEvent(payload []byte, ackFunc func()) *AckableEvent { + return &AckableEvent{ + payload: payload, + ackFunc: ackFunc, + } +} + +func (e *AckableEvent) GetPayload() []byte { + return e.payload +} + +func (e *AckableEvent) Ack() { + e.ackFunc() +} diff --git a/lib/instance.go b/lib/instance.go index d3603aab..3a915448 100644 --- a/lib/instance.go +++ b/lib/instance.go @@ -17,7 +17,6 @@ package lib import ( "context" "fmt" - "github.com/apache/pulsar-client-go/pulsar" "github.com/functionstream/functionstream/common" "github.com/functionstream/functionstream/common/model" "github.com/pkg/errors" @@ -31,27 +30,27 @@ import ( ) type FunctionInstance struct { - ctx context.Context - cancelFunc context.CancelFunc - definition *model.Function - pc pulsar.Client - readyCh chan error - index int32 + ctx context.Context + cancelFunc context.CancelFunc + definition *model.Function + queueFactory EventQueueFactory + readyCh chan error + index int32 } -func NewFunctionInstance(definition *model.Function, pc pulsar.Client, index int32) *FunctionInstance { +func NewFunctionInstance(definition *model.Function, queueFactory EventQueueFactory, index int32) *FunctionInstance { ctx, cancelFunc := context.WithCancel(context.Background()) ctx.Value(logrus.Fields{ "function-name": definition.Name, "function-index": index, }) return &FunctionInstance{ - ctx: ctx, - cancelFunc: cancelFunc, - definition: definition, - pc: pc, - readyCh: make(chan error), - index: index, + ctx: ctx, + cancelFunc: cancelFunc, + definition: definition, + queueFactory: queueFactory, + readyCh: make(chan error), + index: index, } } @@ -86,30 +85,6 @@ func (instance *FunctionInstance) Run() { return } - consumer, err := instance.pc.Subscribe(pulsar.ConsumerOptions{ - Topics: instance.definition.Inputs, - SubscriptionName: fmt.Sprintf("function-stream-%s", instance.definition.Name), - Type: pulsar.Failover, - }) - if err != nil { - instance.readyCh <- errors.Wrap(err, "Error creating consumer") - return - } - defer func() { - consumer.Close() - }() - - producer, err := instance.pc.CreateProducer(pulsar.ProducerOptions{ - Topic: instance.definition.Output, - }) - if err != nil { - instance.readyCh <- errors.Wrap(err, "Error creating producer") - return - } - defer func() { - producer.Close() - }() - handleErr := func(ctx context.Context, err error, message string, args ...interface{}) { if errors.Is(err, context.Canceled) { slog.InfoContext(instance.ctx, "function instance has been stopped") @@ -134,40 +109,31 @@ func (instance *FunctionInstance) Run() { return } - instance.readyCh <- nil - - for { - msg, err := consumer.Receive(instance.ctx) - if err != nil { - handleErr(instance.ctx, err, "Error receiving message") - return - } - stdin.ResetBuffer(msg.Payload()) + sourceChan, err := instance.queueFactory.NewSourceChan(instance.ctx, &SourceQueueConfig{Topics: instance.definition.Inputs, SubName: fmt.Sprintf("function-stream-%s", instance.definition.Name)}) + if err != nil { + instance.readyCh <- errors.Wrap(err, "Error creating source event queue") + return + } + sinkChan, err := instance.queueFactory.NewSinkChan(instance.ctx, &SinkQueueConfig{Topic: instance.definition.Output}) + if err != nil { + instance.readyCh <- errors.Wrap(err, "Error creating sink event queue") + return + } + instance.readyCh <- nil + for e := range sourceChan { + stdin.ResetBuffer(e.GetPayload()) _, err = process.Call(instance.ctx) if err != nil { handleErr(instance.ctx, err, "Error calling process function") return } - output := stdout.GetAndReset() - producer.SendAsync(instance.ctx, &pulsar.ProducerMessage{ - Payload: output, - }, func(id pulsar.MessageID, message *pulsar.ProducerMessage, err error) { - if err != nil { - handleErr(instance.ctx, err, "Error sending message", "error", err, "messageId", id) - return - } - err = consumer.Ack(msg) - if err != nil { - handleErr(instance.ctx, err, "Error acknowledging message", "error", err, "messageId", id) - return - } - }) + sinkChan <- NewAckableEvent(output, e.Ack) } } -func (instance *FunctionInstance) WaitForReady() chan error { +func (instance *FunctionInstance) WaitForReady() <-chan error { return instance.readyCh } diff --git a/lib/manager.go b/lib/manager.go index 8fdce08a..52b12193 100644 --- a/lib/manager.go +++ b/lib/manager.go @@ -15,7 +15,7 @@ package lib import ( - "github.com/apache/pulsar-client-go/pulsar" + "context" "github.com/functionstream/functionstream/common" "github.com/functionstream/functionstream/common/model" "log/slog" @@ -23,21 +23,19 @@ import ( ) type FunctionManager struct { - functions map[string][]*FunctionInstance - functionsLock sync.Mutex - pc pulsar.Client + functions map[string][]*FunctionInstance + functionsLock sync.Mutex + eventQueueFactory EventQueueFactory } -func NewFunctionManager() (*FunctionManager, error) { - pc, err := pulsar.NewClient(pulsar.ClientOptions{ - URL: common.GetConfig().PulsarURL, - }) +func NewFunctionManager(config *Config) (*FunctionManager, error) { + eventQueueFactory, err := config.QueueBuilder(context.Background(), config) if err != nil { return nil, err } return &FunctionManager{ - functions: make(map[string][]*FunctionInstance), - pc: pc, + functions: make(map[string][]*FunctionInstance), + eventQueueFactory: eventQueueFactory, }, nil } @@ -50,7 +48,7 @@ func (fm *FunctionManager) StartFunction(f *model.Function) error { } fm.functions[f.Name] = make([]*FunctionInstance, f.Replicas) for i := int32(0); i < f.Replicas; i++ { - instance := NewFunctionInstance(f, fm.pc, i) + instance := NewFunctionInstance(f, fm.eventQueueFactory, i) fm.functions[f.Name][i] = instance go instance.Run() if err := <-instance.WaitForReady(); err != nil { diff --git a/lib/memory_queue.go b/lib/memory_queue.go new file mode 100644 index 00000000..de6b988a --- /dev/null +++ b/lib/memory_queue.go @@ -0,0 +1,65 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package lib + +import ( + "context" + "sync" +) + +type MemoryQueueFactory struct { + mu sync.Mutex + queues map[string]chan Event +} + +func NewMemoryQueueFactory() EventQueueFactory { + return &MemoryQueueFactory{ + queues: make(map[string]chan Event), + } +} + +func (f *MemoryQueueFactory) getOrCreateChan(name string) chan Event { + if queue, ok := f.queues[name]; ok { + return queue + } + f.mu.Lock() + defer f.mu.Unlock() + c := make(chan Event) + f.queues[name] = c + return c +} + +func (f *MemoryQueueFactory) NewSourceChan(ctx context.Context, config *SourceQueueConfig) (<-chan Event, error) { + result := make(chan Event) + for _, topic := range config.Topics { + t := topic + go func() { + c := f.getOrCreateChan(t) + for { + select { + case <-ctx.Done(): + return + case event := <-c: + result <- event + } + } + }() + } + return result, nil +} + +func (f *MemoryQueueFactory) NewSinkChan(ctx context.Context, config *SinkQueueConfig) (chan<- Event, error) { + return f.getOrCreateChan(config.Topic), nil +} diff --git a/lib/pulsar_queue.go b/lib/pulsar_queue.go new file mode 100644 index 00000000..55300a48 --- /dev/null +++ b/lib/pulsar_queue.go @@ -0,0 +1,105 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package lib + +import ( + "context" + "github.com/apache/pulsar-client-go/pulsar" + "github.com/pkg/errors" + "log/slog" +) + +type PulsarEventQueueFactory struct { + newSourceChan func(ctx context.Context, config *SourceQueueConfig) (<-chan Event, error) + newSinkChan func(ctx context.Context, config *SinkQueueConfig) (chan<- Event, error) +} + +func (f *PulsarEventQueueFactory) NewSourceChan(ctx context.Context, config *SourceQueueConfig) (<-chan Event, error) { + return f.newSourceChan(ctx, config) +} + +func (f *PulsarEventQueueFactory) NewSinkChan(ctx context.Context, config *SinkQueueConfig) (chan<- Event, error) { + return f.newSinkChan(ctx, config) +} + +func NewPulsarEventQueueFactory(ctx context.Context, config *Config) (EventQueueFactory, error) { + pc, err := pulsar.NewClient(pulsar.ClientOptions{ + URL: config.PulsarURL, + }) + if err != nil { + return nil, err + } + go func() { + <-ctx.Done() + pc.Close() + }() + handleErr := func(ctx context.Context, err error, message string, args ...interface{}) { + if errors.Is(err, context.Canceled) { + slog.InfoContext(ctx, "function instance has been stopped") + return + } + slog.ErrorContext(ctx, message, args...) + } + return &PulsarEventQueueFactory{ + newSourceChan: func(ctx context.Context, config *SourceQueueConfig) (<-chan Event, error) { + c := make(chan Event) + consumer, err := pc.Subscribe(pulsar.ConsumerOptions{ + Topics: config.Topics, + SubscriptionName: config.SubName, + Type: pulsar.Failover, + }) + if err != nil { + return nil, errors.Wrap(err, "Error creating consumer") + } + go func() { + defer consumer.Close() + for msg := range consumer.Chan() { + c <- NewAckableEvent(msg.Payload(), func() { + err := consumer.Ack(msg) + if err != nil { + handleErr(ctx, err, "Error acknowledging message", "error", err) + return + } + }) + } + }() + return c, nil + }, + newSinkChan: func(ctx context.Context, config *SinkQueueConfig) (chan<- Event, error) { + c := make(chan Event) + producer, err := pc.CreateProducer(pulsar.ProducerOptions{ + Topic: config.Topic, + }) + if err != nil { + return nil, errors.Wrap(err, "Error creating producer") + } + go func() { + defer producer.Close() + for e := range c { + producer.SendAsync(ctx, &pulsar.ProducerMessage{ + Payload: e.GetPayload(), + }, func(id pulsar.MessageID, message *pulsar.ProducerMessage, err error) { + if err != nil { + handleErr(ctx, err, "Error sending message", "error", err, "messageId", id) + return + } + e.Ack() + }) + } + }() + return c, nil + }, + }, nil +} diff --git a/perf/perf.go b/perf/perf.go index 207a1d74..d7188112 100644 --- a/perf/perf.go +++ b/perf/perf.go @@ -18,8 +18,8 @@ import ( "context" "encoding/json" "fmt" - "github.com/apache/pulsar-client-go/pulsar" "github.com/bmizerany/perks/quantile" + "github.com/functionstream/functionstream/lib" "github.com/functionstream/functionstream/restclient" "golang.org/x/time/rate" "log/slog" @@ -31,9 +31,10 @@ import ( ) type Config struct { - PulsarURL string - RequestRate float64 - Func *restclient.Function + PulsarURL string + RequestRate float64 + Func *restclient.Function + QueueBuilder lib.QueueBuilder } type Perf interface { @@ -41,15 +42,24 @@ type Perf interface { } type perf struct { - config Config - producer pulsar.Producer - consumer pulsar.Consumer + config *Config + input chan<- lib.Event + output <-chan lib.Event + queueBuilder lib.QueueBuilder } -func New(config Config) Perf { - return &perf{ +func New(config *Config) Perf { + p := &perf{ config: config, } + if config.QueueBuilder == nil { + p.queueBuilder = func(ctx context.Context, c *lib.Config) (lib.EventQueueFactory, error) { + return lib.NewPulsarEventQueueFactory(ctx, c) + } + } else { + p.queueBuilder = config.QueueBuilder + } + return p } type Person struct { @@ -76,35 +86,37 @@ func (p *perf) Run(ctx context.Context) { } } - client, err := pulsar.NewClient(pulsar.ClientOptions{ - URL: p.config.PulsarURL, - }) + config := &lib.Config{ + PulsarURL: p.config.PulsarURL, + } + + queueFactory, err := p.queueBuilder(context.Background(), config) if err != nil { slog.Error( - "Failed to create Pulsar client", + "Failed to create Event Queue Factory", slog.Any("error", err), ) os.Exit(1) } - p.consumer, err = client.Subscribe(pulsar.ConsumerOptions{ - Topic: f.Output, - SubscriptionName: "perf", + p.input, err = queueFactory.NewSinkChan(context.Background(), &lib.SinkQueueConfig{ + Topic: f.Inputs[0], }) if err != nil { slog.Error( - "Failed to create Pulsar Consumer", + "Failed to create Sink Perf Channel", slog.Any("error", err), ) os.Exit(1) } - p.producer, err = client.CreateProducer(pulsar.ProducerOptions{ - Topic: f.Inputs[0], + p.output, err = queueFactory.NewSourceChan(context.Background(), &lib.SourceQueueConfig{ + Topics: []string{f.Output}, + SubName: "perf", }) if err != nil { slog.Error( - "Failed to create Pulsar Producer", + "Failed to create Source Perf Channel", slog.Any("error", err), ) os.Exit(1) @@ -168,7 +180,6 @@ func (p *perf) Run(ctx context.Context) { func (p *perf) generateTraffic(ctx context.Context, latencyCh chan int64, failureCount *int64) { limiter := rate.NewLimiter(rate.Limit(p.config.RequestRate), int(p.config.RequestRate)) - msgCh := p.consumer.Chan() count := 0 next := make(chan bool, int(p.config.RequestRate)) for { @@ -189,20 +200,12 @@ func (p *perf) generateTraffic(ctx context.Context, latencyCh chan int64, failur os.Exit(1) } start := time.Now() - p.producer.SendAsync(ctx, &pulsar.ProducerMessage{ - Payload: jsonBytes, - }, func(id pulsar.MessageID, message *pulsar.ProducerMessage, err error) { - if err != nil { - slog.Warn( - "Failed to produce message", - slog.Any("error", err), - ) - } - }) + p.input <- lib.NewAckableEvent(jsonBytes, func() {}) next <- true - m := <-msgCh + e := <-p.output latencyCh <- time.Since(start).Microseconds() - payload := m.Payload() + payload := e.GetPayload() + e.Ack() var out Person err = json.Unmarshal(payload, &out) if err != nil { diff --git a/common/config.go b/server/config_loader.go similarity index 58% rename from common/config.go rename to server/config_loader.go index ea030dc4..08b94ac5 100644 --- a/common/config.go +++ b/server/config_loader.go @@ -12,27 +12,37 @@ * limitations under the License. */ -package common +package server import ( + "context" "log/slog" "os" + + "github.com/functionstream/functionstream/lib" ) -type Config struct { - ListenAddr string - PulsarURL string -} +var loadedConfig *lib.Config -var loadedConfig *Config +const ( + PulsarQueueType = "pulsar" +) -func GetConfig() *Config { - if loadedConfig == nil { - loadedConfig = &Config{ - ListenAddr: getEnvWithDefault("PORT", ":7300"), - PulsarURL: getEnvWithDefault("PULSAR_URL", "pulsar://localhost:6650"), +func init() { + loadedConfig = &lib.Config{ + ListenAddr: getEnvWithDefault("PORT", ":7300"), + PulsarURL: getEnvWithDefault("PULSAR_URL", "pulsar://localhost:6650"), + } + queueType := getEnvWithDefault("QUEUE_TYPE", PulsarQueueType) + switch queueType { + case PulsarQueueType: + loadedConfig.QueueBuilder = func(ctx context.Context, c *lib.Config) (lib.EventQueueFactory, error) { + return lib.NewPulsarEventQueueFactory(ctx, c) } } +} + +func GetConfig() *lib.Config { return loadedConfig } diff --git a/server/server.go b/server/server.go index 88237527..1919fd70 100644 --- a/server/server.go +++ b/server/server.go @@ -33,7 +33,7 @@ type Server struct { } func New() *Server { - manager, err := lib.NewFunctionManager() + manager, err := lib.NewFunctionManager(GetConfig()) if err != nil { slog.Error("Error creating function manager", err) } @@ -42,6 +42,12 @@ func New() *Server { } } +func NewWithFM(fm *lib.FunctionManager) *Server { + return &Server{ + manager: fm, + } +} + func (s *Server) Run() { slog.Info("Hello, Function Stream!") err := s.startRESTHandlers() @@ -101,7 +107,7 @@ func (s *Server) startRESTHandlers() error { } }).Methods("DELETE") - return http.ListenAndServe(common.GetConfig().ListenAddr, r) + return http.ListenAndServe(GetConfig().ListenAddr, r) } func (s *Server) Close() error {