Skip to content

perf: improve performance #124

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Feb 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1 +1,15 @@
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->

# function-stream
35 changes: 34 additions & 1 deletion benchmark/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,18 @@ package benchmark

import (
"context"
"github.com/apache/pulsar-client-go/pulsaradmin"
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
"github.com/functionstream/functionstream/common"
"github.com/functionstream/functionstream/perf"
"github.com/functionstream/functionstream/restclient"
"github.com/functionstream/functionstream/server"
"io"
"log/slog"
"math/rand"
"os"
"runtime/pprof"
"strconv"
"testing"
"time"
)
Expand Down Expand Up @@ -50,9 +55,37 @@ func BenchmarkStressForBasicFunc(b *testing.B) {
})
}()

inputTopic := "test-input-" + strconv.Itoa(rand.Int())
outputTopic := "test-output-" + strconv.Itoa(rand.Int())
cfg := &pulsaradmin.Config{}
admin, err := pulsaradmin.NewClient(cfg)
if err != nil {
panic(err)
}
replicas := int32(5)
createTopic := func(t string) {
tn, err := utils.GetTopicName(t)
if err != nil {
panic(err)
}
err = admin.Topics().Create(*tn, int(replicas))
if err != nil {
panic(err)
}

}
createTopic(inputTopic)
createTopic(outputTopic)

pConfig := perf.Config{
PulsarURL: "pulsar://localhost:6650",
RequestRate: 500.0,
RequestRate: 100000.0,
Func: &restclient.Function{
Archive: "./bin/example_basic.wasm",
Inputs: []string{inputTopic},
Output: outputTopic,
Replicas: &replicas,
},
}

b.ReportAllocs()
Expand Down
1 change: 1 addition & 0 deletions common/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ import "errors"

var (
ErrorFunctionNotFound = errors.New("function not found")
ErrorFunctionExists = errors.New("function already exists")
)
11 changes: 6 additions & 5 deletions common/model/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@
package model

type Function struct {
Name string `json:"name"`
Archive string `json:"archive"`
Inputs []string `json:"inputs"`
Output string `json:"output"`
Config map[string]string `json:"config"`
Name string
Archive string
Inputs []string
Output string
Config map[string]string
Replicas int32
}
8 changes: 6 additions & 2 deletions examples/basic/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,17 @@ import (
)

type Person struct {
Name string `json:"name"`
Money int `json:"money"`
Name string `json:"name"`
Money int `json:"money"`
Expected int `json:"expected"`
}

func main() {
_, _ = fmt.Fprintln(os.Stderr, "Hello from Go!")
}

//export process
func process() {
dataBytes, err := io.ReadAll(os.Stdin)
if err != nil {
_, _ = fmt.Fprintln(os.Stderr, "Failed to read data:", err)
Expand Down
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/functionstream/functionstream
go 1.21

require (
github.com/apache/pulsar-client-go v0.11.1
github.com/apache/pulsar-client-go v0.12.0
github.com/bmizerany/perks v0.0.0-20230307044200-03f9df79da1e
github.com/gorilla/mux v1.8.1
github.com/pkg/errors v0.9.1
Expand Down Expand Up @@ -33,6 +33,8 @@ require (
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.17.5 // indirect
Expand Down
8 changes: 6 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ github.com/AthenZ/athenz v1.11.50 h1:mCyQhI32GHPpPde9NVChI46hpRjw+vX1Z4RN8GCDILE
github.com/AthenZ/athenz v1.11.50/go.mod h1:HfKWur/iDpTKNb2TVaKKy4mt+Qa0PnZpIOqcmR9/i+Q=
github.com/DataDog/zstd v1.5.5 h1:oWf5W7GtOLgp6bciQYDmhHHjdhYkALu6S/5Ni9ZgSvQ=
github.com/DataDog/zstd v1.5.5/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw=
github.com/apache/pulsar-client-go v0.11.1 h1:WxLitlPG4Dz62BblGlx51wm0rw76eRefJsWdawI22QM=
github.com/apache/pulsar-client-go v0.11.1/go.mod h1:FoijqJwgjroSKptIWp1vvK1CXs8dXnQiL8I+MHOri4A=
github.com/apache/pulsar-client-go v0.12.0 h1:rrMlwpr6IgLRPXLRRh2vSlcw5tGV2PUSjZwmqgh2B2I=
github.com/apache/pulsar-client-go v0.12.0/go.mod h1:dkutuH4oS2pXiGm+Ti7fQZ4MRjrMPZ8IJeEGAWMeckk=
github.com/ardielle/ardielle-go v1.5.2 h1:TilHTpHIQJ27R1Tl/iITBzMwiUGSlVfiVhwDNGM3Zj4=
github.com/ardielle/ardielle-go v1.5.2/go.mod h1:I4hy1n795cUhaVt/ojz83SNVCYIGsAFAONtv2Dr7HUI=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
Expand Down Expand Up @@ -63,6 +63,10 @@ github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY=
github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ=
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c h1:6rhixN/i8ZofjG1Y75iExal34USq5p+wiN1tpie8IrU=
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c/go.mod h1:NMPJylDgVpX0MLRlPy15sqSwOFv/U1GZ2m21JhFfek0=
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
Expand Down
56 changes: 38 additions & 18 deletions lib/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/functionstream/functionstream/common"
"github.com/functionstream/functionstream/common/model"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/tetratelabs/wazero"
"github.com/tetratelabs/wazero/api"
"github.com/tetratelabs/wazero/imports/wasi_snapshot_preview1"
Expand All @@ -32,19 +33,25 @@ import (
type FunctionInstance struct {
ctx context.Context
cancelFunc context.CancelFunc
definition model.Function
definition *model.Function
pc pulsar.Client
readyCh chan error
index int32
}

func NewFunctionInstance(definition model.Function, pc pulsar.Client) *FunctionInstance {
func NewFunctionInstance(definition *model.Function, pc pulsar.Client, index int32) *FunctionInstance {
ctx, cancelFunc := context.WithCancel(context.Background())
ctx.Value(logrus.Fields{
"function-name": definition.Name,
"function-index": index,
})
return &FunctionInstance{
ctx: ctx,
cancelFunc: cancelFunc,
definition: definition,
pc: pc,
readyCh: make(chan error),
index: index,
}
}

Expand All @@ -69,7 +76,7 @@ func (instance *FunctionInstance) Run() {
stdout := common.NewChanWriter()

config := wazero.NewModuleConfig().
WithStdout(stdout).WithStdin(stdin)
WithStdout(stdout).WithStdin(stdin).WithStderr(os.Stderr)

wasi_snapshot_preview1.MustInstantiate(instance.ctx, r)

Expand All @@ -82,6 +89,7 @@ func (instance *FunctionInstance) Run() {
consumer, err := instance.pc.Subscribe(pulsar.ConsumerOptions{
Topics: instance.definition.Inputs,
SubscriptionName: fmt.Sprintf("function-stream-%s", instance.definition.Name),
Type: pulsar.Failover,
})
if err != nil {
instance.readyCh <- errors.Wrap(err, "Error creating consumer")
Expand All @@ -102,8 +110,6 @@ func (instance *FunctionInstance) Run() {
producer.Close()
}()

instance.readyCh <- nil

handleErr := func(ctx context.Context, err error, message string, args ...interface{}) {
if errors.Is(err, context.Canceled) {
slog.InfoContext(instance.ctx, "function instance has been stopped")
Expand All @@ -112,6 +118,24 @@ func (instance *FunctionInstance) Run() {
slog.ErrorContext(ctx, message, args...)
}

// Trigger the "_start" function, WASI's "main".
mod, err := r.InstantiateWithConfig(instance.ctx, wasmBytes, config)
if err != nil {
if exitErr, ok := err.(*sys.ExitError); ok && exitErr.ExitCode() != 0 {
handleErr(instance.ctx, err, "Function exit with code", "code", exitErr.ExitCode())
} else if !ok {
handleErr(instance.ctx, err, "Error instantiating function")
}
return
}
process := mod.ExportedFunction("process")
if process == nil {
instance.readyCh <- errors.New("No process function found")
return
}

instance.readyCh <- nil

for {
msg, err := consumer.Receive(instance.ctx)
if err != nil {
Expand All @@ -120,14 +144,9 @@ func (instance *FunctionInstance) Run() {
}
stdin.ResetBuffer(msg.Payload())

// Trigger the "_start" function, WASI's "main".
_, err = r.InstantiateWithConfig(instance.ctx, wasmBytes, config)
_, err = process.Call(instance.ctx)
if err != nil {
if exitErr, ok := err.(*sys.ExitError); ok && exitErr.ExitCode() != 0 {
handleErr(instance.ctx, err, "Function exit with code", "code", exitErr.ExitCode())
} else if !ok {
handleErr(instance.ctx, err, "Error instantiating function")
}
handleErr(instance.ctx, err, "Error calling process function")
return
}

Expand All @@ -139,16 +158,17 @@ func (instance *FunctionInstance) Run() {
handleErr(instance.ctx, err, "Error sending message", "error", err, "messageId", id)
return
}
err = consumer.Ack(msg)
if err != nil {
handleErr(instance.ctx, err, "Error acknowledging message", "error", err, "messageId", id)
return
}
})
}
}

func (instance *FunctionInstance) WaitForReady() error {
err := <-instance.readyCh
if err != nil {
slog.ErrorContext(instance.ctx, "Error starting function instance", err)
}
return err
func (instance *FunctionInstance) WaitForReady() chan error {
return instance.readyCh
}

func (instance *FunctionInstance) Stop() {
Expand Down
33 changes: 24 additions & 9 deletions lib/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@ import (
"github.com/apache/pulsar-client-go/pulsar"
"github.com/functionstream/functionstream/common"
"github.com/functionstream/functionstream/common/model"
"log/slog"
"sync"
)

type FunctionManager struct {
functions map[string]*FunctionInstance
functions map[string][]*FunctionInstance
functionsLock sync.Mutex
pc pulsar.Client
}
Expand All @@ -35,29 +36,43 @@ func NewFunctionManager() (*FunctionManager, error) {
return nil, err
}
return &FunctionManager{
functions: make(map[string]*FunctionInstance),
functions: make(map[string][]*FunctionInstance),
pc: pc,
}, nil
}

func (fm *FunctionManager) StartFunction(f model.Function) error {
func (fm *FunctionManager) StartFunction(f *model.Function) error {
fm.functionsLock.Lock()
defer fm.functionsLock.Unlock()
instance := NewFunctionInstance(f, fm.pc)
fm.functions[f.Name] = instance
go instance.Run()
return instance.WaitForReady()
if _, exist := fm.functions[f.Name]; exist {
fm.functionsLock.Unlock()
return common.ErrorFunctionExists
}
fm.functions[f.Name] = make([]*FunctionInstance, f.Replicas)
for i := int32(0); i < f.Replicas; i++ {
instance := NewFunctionInstance(f, fm.pc, i)
fm.functions[f.Name][i] = instance
go instance.Run()
if err := <-instance.WaitForReady(); err != nil {
if err != nil {
slog.ErrorContext(instance.ctx, "Error starting function instance", err)
}
fm.functionsLock.Unlock()
return err
}
}
return nil
}

func (fm *FunctionManager) DeleteFunction(name string) error {
fm.functionsLock.Lock()
instance, exist := fm.functions[name]
instances, exist := fm.functions[name]
if !exist {
return common.ErrorFunctionNotFound
}
delete(fm.functions, name)
fm.functionsLock.Unlock()
if instance != nil {
for _, instance := range instances {
instance.Stop()
}
return nil
Expand Down
2 changes: 2 additions & 0 deletions openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ components:
type: string
output:
type: string
replicas:
type: integer
config:
type: object
additionalProperties:
Expand Down
Loading