Skip to content
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -104,5 +104,6 @@ dist
.tern-port

.idea
.run
bin/
.DS_Store
16 changes: 16 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,17 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

.PHONY: build
.PHONY: license
build:
go build -v -o bin/function-stream ./cmd

Expand All @@ -25,3 +38,6 @@ gen_rest_client:
--global-property apiDocs,apis,models,supportingFiles
rm -r restclient/go.mod restclient/go.sum restclient/.travis.yml restclient/.openapi-generator-ignore \
restclient/git_push.sh restclient/.openapi-generator restclient/api restclient/test

license:
./license-checker/license-checker.sh
75 changes: 73 additions & 2 deletions benchmark/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/apache/pulsar-client-go/pulsaradmin"
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
"github.com/functionstream/functionstream/common"
"github.com/functionstream/functionstream/lib"
"github.com/functionstream/functionstream/perf"
"github.com/functionstream/functionstream/restclient"
"github.com/functionstream/functionstream/server"
Expand Down Expand Up @@ -77,9 +78,9 @@ func BenchmarkStressForBasicFunc(b *testing.B) {
createTopic(inputTopic)
createTopic(outputTopic)

pConfig := perf.Config{
pConfig := &perf.Config{
PulsarURL: "pulsar://localhost:6650",
RequestRate: 100000.0,
RequestRate: 200000.0,
Func: &restclient.Function{
Archive: "./bin/example_basic.wasm",
Inputs: []string{inputTopic},
Expand Down Expand Up @@ -116,3 +117,73 @@ func BenchmarkStressForBasicFunc(b *testing.B) {
b.Fatal(err)
}
}

func BenchmarkStressForBasicFuncWithMemoryQueue(b *testing.B) {
prepareEnv()

memoryQueueFactory := lib.NewMemoryQueueFactory()

svrConf := &lib.Config{
QueueBuilder: func(ctx context.Context, config *lib.Config) (lib.EventQueueFactory, error) {
return memoryQueueFactory, nil
},
}

fm, err := lib.NewFunctionManager(svrConf)
if err != nil {
b.Fatal(err)
}
s := server.NewWithFM(fm)
go func() {
common.RunProcess(func() (io.Closer, error) {
go s.Run()
return s, nil
})
}()

inputTopic := "test-input-" + strconv.Itoa(rand.Int())
outputTopic := "test-output-" + strconv.Itoa(rand.Int())

replicas := int32(15)

pConfig := &perf.Config{
RequestRate: 200000.0,
Func: &restclient.Function{
Archive: "./bin/example_basic.wasm",
Inputs: []string{inputTopic},
Output: outputTopic,
Replicas: &replicas,
},
QueueBuilder: func(ctx context.Context, c *lib.Config) (lib.EventQueueFactory, error) {
return memoryQueueFactory, nil
},
}

b.ReportAllocs()

ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(30*time.Second))
defer cancel()

profile := "BenchmarkStressForBasicFunc.pprof"
file, err := os.Create(profile)
if err != nil {
b.Fatal(err)
}
defer func() {
_ = file.Close()
}()

err = pprof.StartCPUProfile(file)
if err != nil {
b.Fatal(err)
}

perf.New(pConfig).Run(ctx)

pprof.StopCPUProfile()

err = s.Close()
if err != nil {
b.Fatal(err)
}
}
2 changes: 1 addition & 1 deletion cmd/perf/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ var (
Run: exec,
}

config = perf.Config{}
config = &perf.Config{}
)

func init() {
Expand Down
27 changes: 27 additions & 0 deletions lib/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package lib

import (
"context"
)

type QueueBuilder func(ctx context.Context, config *Config) (EventQueueFactory, error)

type Config struct {
ListenAddr string
PulsarURL string
QueueBuilder QueueBuilder
}
65 changes: 65 additions & 0 deletions lib/event_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package lib

import (
"context"
)

type Event interface {
GetPayload() []byte
Ack()
}

type SourceQueueConfig struct {
Topics []string
SubName string
}

type SinkQueueConfig struct {
Topic string
}

//type EventQueueFactory func(ctx context.Context, config *QueueConfig, function *model.Function) (EventQueue, error)

type EventQueueFactory interface {
NewSourceChan(ctx context.Context, config *SourceQueueConfig) (<-chan Event, error)
NewSinkChan(ctx context.Context, config *SinkQueueConfig) (chan<- Event, error)
}

type EventQueue interface {
GetSendChan() (chan<- Event, error)
GetRecvChan() (<-chan Event, error)
}

type AckableEvent struct {
payload []byte
ackFunc func()
}

func NewAckableEvent(payload []byte, ackFunc func()) *AckableEvent {
return &AckableEvent{
payload: payload,
ackFunc: ackFunc,
}
}

func (e *AckableEvent) GetPayload() []byte {
return e.payload
}

func (e *AckableEvent) Ack() {
e.ackFunc()
}
90 changes: 28 additions & 62 deletions lib/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package lib
import (
"context"
"fmt"
"github.com/apache/pulsar-client-go/pulsar"
"github.com/functionstream/functionstream/common"
"github.com/functionstream/functionstream/common/model"
"github.com/pkg/errors"
Expand All @@ -31,27 +30,27 @@ import (
)

type FunctionInstance struct {
ctx context.Context
cancelFunc context.CancelFunc
definition *model.Function
pc pulsar.Client
readyCh chan error
index int32
ctx context.Context
cancelFunc context.CancelFunc
definition *model.Function
queueFactory EventQueueFactory
readyCh chan error
index int32
}

func NewFunctionInstance(definition *model.Function, pc pulsar.Client, index int32) *FunctionInstance {
func NewFunctionInstance(definition *model.Function, queueFactory EventQueueFactory, index int32) *FunctionInstance {
ctx, cancelFunc := context.WithCancel(context.Background())
ctx.Value(logrus.Fields{
"function-name": definition.Name,
"function-index": index,
})
return &FunctionInstance{
ctx: ctx,
cancelFunc: cancelFunc,
definition: definition,
pc: pc,
readyCh: make(chan error),
index: index,
ctx: ctx,
cancelFunc: cancelFunc,
definition: definition,
queueFactory: queueFactory,
readyCh: make(chan error),
index: index,
}
}

Expand Down Expand Up @@ -86,30 +85,6 @@ func (instance *FunctionInstance) Run() {
return
}

consumer, err := instance.pc.Subscribe(pulsar.ConsumerOptions{
Topics: instance.definition.Inputs,
SubscriptionName: fmt.Sprintf("function-stream-%s", instance.definition.Name),
Type: pulsar.Failover,
})
if err != nil {
instance.readyCh <- errors.Wrap(err, "Error creating consumer")
return
}
defer func() {
consumer.Close()
}()

producer, err := instance.pc.CreateProducer(pulsar.ProducerOptions{
Topic: instance.definition.Output,
})
if err != nil {
instance.readyCh <- errors.Wrap(err, "Error creating producer")
return
}
defer func() {
producer.Close()
}()

handleErr := func(ctx context.Context, err error, message string, args ...interface{}) {
if errors.Is(err, context.Canceled) {
slog.InfoContext(instance.ctx, "function instance has been stopped")
Expand All @@ -134,40 +109,31 @@ func (instance *FunctionInstance) Run() {
return
}

instance.readyCh <- nil

for {
msg, err := consumer.Receive(instance.ctx)
if err != nil {
handleErr(instance.ctx, err, "Error receiving message")
return
}
stdin.ResetBuffer(msg.Payload())
sourceChan, err := instance.queueFactory.NewSourceChan(instance.ctx, &SourceQueueConfig{Topics: instance.definition.Inputs, SubName: fmt.Sprintf("function-stream-%s", instance.definition.Name)})
if err != nil {
instance.readyCh <- errors.Wrap(err, "Error creating source event queue")
return
}
sinkChan, err := instance.queueFactory.NewSinkChan(instance.ctx, &SinkQueueConfig{Topic: instance.definition.Output})
if err != nil {
instance.readyCh <- errors.Wrap(err, "Error creating sink event queue")
return
}

instance.readyCh <- nil
for e := range sourceChan {
stdin.ResetBuffer(e.GetPayload())
_, err = process.Call(instance.ctx)
if err != nil {
handleErr(instance.ctx, err, "Error calling process function")
return
}

output := stdout.GetAndReset()
producer.SendAsync(instance.ctx, &pulsar.ProducerMessage{
Payload: output,
}, func(id pulsar.MessageID, message *pulsar.ProducerMessage, err error) {
if err != nil {
handleErr(instance.ctx, err, "Error sending message", "error", err, "messageId", id)
return
}
err = consumer.Ack(msg)
if err != nil {
handleErr(instance.ctx, err, "Error acknowledging message", "error", err, "messageId", id)
return
}
})
sinkChan <- NewAckableEvent(output, e.Ack)
}
}

func (instance *FunctionInstance) WaitForReady() chan error {
func (instance *FunctionInstance) WaitForReady() <-chan error {
return instance.readyCh
}

Expand Down
Loading