Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 93 additions & 84 deletions functionframeworks/frameworks.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,19 @@ package functionframeworks

import (
"context"
"encoding/json"
"errors"
"fmt"
ofctx "github.com/OpenFunction/functions-framework-go/openfunction-context"
dapr "github.com/dapr/go-sdk/service/common"
daprd "github.com/dapr/go-sdk/service/grpc"
"log"
"net/http"
"os"
"runtime/debug"
"strings"

cloudevents "github.com/cloudevents/sdk-go/v2"
dapr "github.com/dapr/go-sdk/service/common"
daprd "github.com/dapr/go-sdk/service/grpc"

ofctx "github.com/OpenFunction/functions-framework-go/openfunction-context"
)

const (
Expand All @@ -32,8 +32,95 @@ func RegisterHTTPFunction(ctx context.Context, fn func(http.ResponseWriter, *htt
return registerHTTPFunction("/", fn, handler)
}

func RegisterOpenFunction(ctx context.Context, fn func(*ofctx.OpenFunctionContext, []byte) int) error {
return registerOpenFunction(fn, handler)
func RegisterOpenFunction(ctx context.Context, fn func(*ofctx.OpenFunctionContext, []byte) ofctx.Return) error {
return func(f func(*ofctx.OpenFunctionContext, []byte) ofctx.Return) error {
fctx, err := ofctx.GetOpenFunctionContext()
if err != nil {
return err
}

if fctx.Runtime == ofctx.OpenFuncAsync {
openFuncAsyncServHandler, err = daprd.NewService(fmt.Sprintf(":%s", fctx.Port))
if err != nil {
return err
}
} else {
return errors.New("cannot use non-OpenFuncAsync runtime for OpenFunction registration")
}

var funcErr error

// Serving function with inputs
if !fctx.InputsIsEmpty() {
for name, input := range fctx.Inputs {
switch input.Type {
case ofctx.OpenFuncBinding:
input.Uri = input.Component
funcErr = openFuncAsyncServHandler.AddBindingInvocationHandler(input.Uri, func(c context.Context, in *dapr.BindingEvent) (out []byte, err error) {
currentContext := fctx
currentContext.Event.InputName = name
currentContext.Event.BindingEvent = in
ret := f(currentContext, in.Data)
switch ret.Code {
case ofctx.Success:
return ret.Data, nil
case ofctx.InternalError:
return nil, errors.New(fmt.Sprint(ret.Error))
default:
return nil, nil
}
})
case ofctx.OpenFuncTopic:
sub := &dapr.Subscription{
PubsubName: input.Component,
Topic: input.Uri,
}
funcErr = openFuncAsyncServHandler.AddTopicEventHandler(sub, func(c context.Context, e *dapr.TopicEvent) (retry bool, err error) {
currentContext := fctx
currentContext.Event.InputName = name
currentContext.Event.TopicEvent = e
ret := f(currentContext, e.Data.([]byte))
switch ret.Code {
case ofctx.Success:
return false, nil
case ofctx.InternalError:
err = errors.New(fmt.Sprint(ret.Error))
if retry, ok := ret.Metadata["retry"]; ok {
if strings.EqualFold(retry, "true") {
return true, err
} else if strings.EqualFold(retry, "false") {
return false, err
} else {
return false, err
}
}
return false, err
default:
return false, nil
}
})
default:
return fmt.Errorf("invalid input type: %s", input.Type)
}
if funcErr != nil {
return err
}
}
// Serving function without inputs
} else {
ret := fn(fctx, []byte{})
switch ret.Code {
case ofctx.Success:
return nil
case ofctx.InternalError:
err = errors.New(fmt.Sprint(ret.Error))
return err
default:
return nil
}
}
return nil
}(fn)
}

func RegisterCloudEventFunction(ctx context.Context, fn func(context.Context, cloudevents.Event) error) error {
Expand All @@ -48,84 +135,6 @@ func registerHTTPFunction(path string, fn func(http.ResponseWriter, *http.Reques
return nil
}

func registerOpenFunction(fn func(*ofctx.OpenFunctionContext, []byte) int, h *http.ServeMux) error {
ctx, err := ofctx.GetOpenFunctionContext()
if err != nil {
return err
}

if ctx.Runtime == ofctx.OpenFuncAsync {
openFuncAsyncServHandler, err = daprd.NewService(fmt.Sprintf(":%s", ctx.Port))
if err != nil {
return err
}
} else {
return errors.New(fmt.Sprint("Cannot use non-OpenFuncAsync runtime for function registration."))
}

// Serving function with inputs
if !ctx.InputIsEmpty() {
inType := ctx.Input.Params["type"]
switch ofctx.ResourceType(inType) {
case ofctx.OpenFuncBinding:
if ctx.Input.Uri == "" {
ctx.Input.Uri = ctx.Input.Name
}
err = openFuncAsyncServHandler.AddBindingInvocationHandler(ctx.Input.Uri, func(c context.Context, in *dapr.BindingEvent) (out []byte, err error) {
code := fn(ctx, in.Data)
if code == 200 {
return nil, nil
} else {
return nil, errors.New("error")
}
})
case ofctx.OpenFuncTopic:
sub := &dapr.Subscription{
PubsubName: ctx.Input.Name,
Topic: ctx.Input.Uri,
}
err = openFuncAsyncServHandler.AddTopicEventHandler(sub, func(c context.Context, e *dapr.TopicEvent) (retry bool, err error) {
in, err := json.Marshal(e.Data)
if err != nil {
return true, err
}
code := fn(ctx, in)
if code == 200 {
return false, nil
} else {
return true, errors.New("error")
}
})
case ofctx.OpenFuncService:
if ctx.Input.Uri == "" {
ctx.Input.Uri = ctx.Input.Name
}
err = openFuncAsyncServHandler.AddServiceInvocationHandler(ctx.Input.Uri, func(c context.Context, in *dapr.InvocationEvent) (out *dapr.Content, err error) {
code := fn(ctx, in.Data)
if code == 200 {
return nil, nil
} else {
return nil, errors.New("error")
}
})
default:
return fmt.Errorf("invalid input type: %s", inType)
}
if err != nil {
return err
}
// Serving function without inputs
} else {
code := fn(ctx, []byte{})
if code == 200 {
return nil
} else {
return errors.New("error")
}
}
return nil
}

func registerCloudEventFunction(ctx context.Context, fn func(context.Context, cloudevents.Event) error, h *http.ServeMux) error {
p, err := cloudevents.NewHTTP()
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ go 1.15

require (
github.com/cloudevents/sdk-go/v2 v2.4.1
github.com/dapr/go-sdk v1.1.0
github.com/dapr/go-sdk v1.2.0
)
Loading