Skip to content

Commit 10c6e33

Browse files
author
laminar
committed
Supports multiple input sources
Signed-off-by: laminar <fangtian@kubesphere.io>
1 parent 4137e46 commit 10c6e33

File tree

5 files changed

+378
-192
lines changed

5 files changed

+378
-192
lines changed

functionframeworks/frameworks.go

Lines changed: 93 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -2,19 +2,19 @@ package functionframeworks
22

33
import (
44
"context"
5-
"encoding/json"
65
"errors"
76
"fmt"
8-
ofctx "github.com/OpenFunction/functions-framework-go/openfunction-context"
9-
dapr "github.com/dapr/go-sdk/service/common"
10-
daprd "github.com/dapr/go-sdk/service/grpc"
117
"log"
128
"net/http"
139
"os"
1410
"runtime/debug"
1511
"strings"
1612

1713
cloudevents "github.com/cloudevents/sdk-go/v2"
14+
dapr "github.com/dapr/go-sdk/service/common"
15+
daprd "github.com/dapr/go-sdk/service/grpc"
16+
17+
ofctx "github.com/OpenFunction/functions-framework-go/openfunction-context"
1818
)
1919

2020
const (
@@ -32,8 +32,95 @@ func RegisterHTTPFunction(ctx context.Context, fn func(http.ResponseWriter, *htt
3232
return registerHTTPFunction("/", fn, handler)
3333
}
3434

35-
func RegisterOpenFunction(ctx context.Context, fn func(*ofctx.OpenFunctionContext, []byte) int) error {
36-
return registerOpenFunction(fn, handler)
35+
func RegisterOpenFunction(ctx context.Context, fn func(*ofctx.OpenFunctionContext, []byte) ofctx.Return) error {
36+
return func(f func(*ofctx.OpenFunctionContext, []byte) ofctx.Return) error {
37+
fctx, err := ofctx.GetOpenFunctionContext()
38+
if err != nil {
39+
return err
40+
}
41+
42+
if fctx.Runtime == ofctx.OpenFuncAsync {
43+
openFuncAsyncServHandler, err = daprd.NewService(fmt.Sprintf(":%s", fctx.Port))
44+
if err != nil {
45+
return err
46+
}
47+
} else {
48+
return errors.New("cannot use non-OpenFuncAsync runtime for OpenFunction registration")
49+
}
50+
51+
var funcErr error
52+
53+
// Serving function with inputs
54+
if !fctx.InputsIsEmpty() {
55+
for name, input := range fctx.Inputs {
56+
switch input.Type {
57+
case ofctx.OpenFuncBinding:
58+
input.Uri = input.Component
59+
funcErr = openFuncAsyncServHandler.AddBindingInvocationHandler(input.Uri, func(c context.Context, in *dapr.BindingEvent) (out []byte, err error) {
60+
currentContext := fctx
61+
currentContext.Event.InputName = name
62+
currentContext.Event.BindingEvent = in
63+
ret := f(currentContext, in.Data)
64+
switch ret.Code {
65+
case ofctx.Success:
66+
return ret.Data, nil
67+
case ofctx.InternalError:
68+
return nil, errors.New(fmt.Sprint(ret.Error))
69+
default:
70+
return nil, nil
71+
}
72+
})
73+
case ofctx.OpenFuncTopic:
74+
sub := &dapr.Subscription{
75+
PubsubName: input.Component,
76+
Topic: input.Uri,
77+
}
78+
funcErr = openFuncAsyncServHandler.AddTopicEventHandler(sub, func(c context.Context, e *dapr.TopicEvent) (retry bool, err error) {
79+
currentContext := fctx
80+
currentContext.Event.InputName = name
81+
currentContext.Event.TopicEvent = e
82+
ret := f(currentContext, e.Data.([]byte))
83+
switch ret.Code {
84+
case ofctx.Success:
85+
return false, nil
86+
case ofctx.InternalError:
87+
err = errors.New(fmt.Sprint(ret.Error))
88+
if retry, ok := ret.Metadata["retry"]; ok {
89+
if strings.EqualFold(retry, "true") {
90+
return true, err
91+
} else if strings.EqualFold(retry, "false") {
92+
return false, err
93+
} else {
94+
return false, err
95+
}
96+
}
97+
return false, err
98+
default:
99+
return false, nil
100+
}
101+
})
102+
default:
103+
return fmt.Errorf("invalid input type: %s", input.Type)
104+
}
105+
if funcErr != nil {
106+
return err
107+
}
108+
}
109+
// Serving function without inputs
110+
} else {
111+
ret := fn(fctx, []byte{})
112+
switch ret.Code {
113+
case ofctx.Success:
114+
return nil
115+
case ofctx.InternalError:
116+
err = errors.New(fmt.Sprint(ret.Error))
117+
return err
118+
default:
119+
return nil
120+
}
121+
}
122+
return nil
123+
}(fn)
37124
}
38125

39126
func RegisterCloudEventFunction(ctx context.Context, fn func(context.Context, cloudevents.Event) error) error {
@@ -48,84 +135,6 @@ func registerHTTPFunction(path string, fn func(http.ResponseWriter, *http.Reques
48135
return nil
49136
}
50137

51-
func registerOpenFunction(fn func(*ofctx.OpenFunctionContext, []byte) int, h *http.ServeMux) error {
52-
ctx, err := ofctx.GetOpenFunctionContext()
53-
if err != nil {
54-
return err
55-
}
56-
57-
if ctx.Runtime == ofctx.OpenFuncAsync {
58-
openFuncAsyncServHandler, err = daprd.NewService(fmt.Sprintf(":%s", ctx.Port))
59-
if err != nil {
60-
return err
61-
}
62-
} else {
63-
return errors.New(fmt.Sprint("Cannot use non-OpenFuncAsync runtime for function registration."))
64-
}
65-
66-
// Serving function with inputs
67-
if !ctx.InputIsEmpty() {
68-
inType := ctx.Input.Params["type"]
69-
switch ofctx.ResourceType(inType) {
70-
case ofctx.OpenFuncBinding:
71-
if ctx.Input.Uri == "" {
72-
ctx.Input.Uri = ctx.Input.Name
73-
}
74-
err = openFuncAsyncServHandler.AddBindingInvocationHandler(ctx.Input.Uri, func(c context.Context, in *dapr.BindingEvent) (out []byte, err error) {
75-
code := fn(ctx, in.Data)
76-
if code == 200 {
77-
return nil, nil
78-
} else {
79-
return nil, errors.New("error")
80-
}
81-
})
82-
case ofctx.OpenFuncTopic:
83-
sub := &dapr.Subscription{
84-
PubsubName: ctx.Input.Name,
85-
Topic: ctx.Input.Uri,
86-
}
87-
err = openFuncAsyncServHandler.AddTopicEventHandler(sub, func(c context.Context, e *dapr.TopicEvent) (retry bool, err error) {
88-
in, err := json.Marshal(e.Data)
89-
if err != nil {
90-
return true, err
91-
}
92-
code := fn(ctx, in)
93-
if code == 200 {
94-
return false, nil
95-
} else {
96-
return true, errors.New("error")
97-
}
98-
})
99-
case ofctx.OpenFuncService:
100-
if ctx.Input.Uri == "" {
101-
ctx.Input.Uri = ctx.Input.Name
102-
}
103-
err = openFuncAsyncServHandler.AddServiceInvocationHandler(ctx.Input.Uri, func(c context.Context, in *dapr.InvocationEvent) (out *dapr.Content, err error) {
104-
code := fn(ctx, in.Data)
105-
if code == 200 {
106-
return nil, nil
107-
} else {
108-
return nil, errors.New("error")
109-
}
110-
})
111-
default:
112-
return fmt.Errorf("invalid input type: %s", inType)
113-
}
114-
if err != nil {
115-
return err
116-
}
117-
// Serving function without inputs
118-
} else {
119-
code := fn(ctx, []byte{})
120-
if code == 200 {
121-
return nil
122-
} else {
123-
return errors.New("error")
124-
}
125-
}
126-
return nil
127-
}
128-
129138
func registerCloudEventFunction(ctx context.Context, fn func(context.Context, cloudevents.Event) error, h *http.ServeMux) error {
130139
p, err := cloudevents.NewHTTP()
131140
if err != nil {

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,5 @@ go 1.15
44

55
require (
66
github.com/cloudevents/sdk-go/v2 v2.4.1
7-
github.com/dapr/go-sdk v1.1.0
7+
github.com/dapr/go-sdk v1.2.0
88
)

0 commit comments

Comments
 (0)