Skip to content

Commit 0fb66d1

Browse files
authored
functions-framework refactoring (#22)
Signed-off-by: laminar <fangtian@kubesphere.io>
1 parent e154d75 commit 0fb66d1

File tree

14 files changed

+930
-628
lines changed

14 files changed

+930
-628
lines changed

context/context.go

Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
package context
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"errors"
7+
"fmt"
8+
"os"
9+
"reflect"
10+
"strconv"
11+
"sync"
12+
13+
dapr "github.com/dapr/go-sdk/client"
14+
)
15+
16+
var (
17+
mu sync.RWMutex
18+
clientGRPCPort string
19+
)
20+
21+
func GetOpenFunctionContext() (*Context, error) {
22+
ctx := &Context{
23+
Inputs: make(map[string]*Input),
24+
Outputs: make(map[string]*Output),
25+
}
26+
27+
data := os.Getenv("FUNC_CONTEXT")
28+
if data == "" {
29+
return nil, errors.New("env FUNC_CONTEXT not found")
30+
}
31+
32+
err := json.Unmarshal([]byte(data), ctx)
33+
if err != nil {
34+
return nil, err
35+
}
36+
37+
switch ctx.Runtime {
38+
case Async, Knative:
39+
break
40+
default:
41+
return nil, fmt.Errorf("invalid runtime: %s", ctx.Runtime)
42+
}
43+
44+
ctx.EventMeta = &EventMetadata{}
45+
ctx.SyncRequestMeta = &SyncRequestMetadata{}
46+
47+
if !ctx.InputsIsEmpty() {
48+
for name, in := range ctx.Inputs {
49+
switch in.Type {
50+
case OpenFuncBinding, OpenFuncTopic:
51+
break
52+
default:
53+
return nil, fmt.Errorf("invalid input type %s: %s", name, in.Type)
54+
}
55+
}
56+
}
57+
58+
if !ctx.OutputIsEmpty() {
59+
for name, out := range ctx.Outputs {
60+
switch out.Type {
61+
case OpenFuncBinding, OpenFuncTopic:
62+
break
63+
default:
64+
return nil, fmt.Errorf("invalid output type %s: %s", name, out.Type)
65+
}
66+
}
67+
}
68+
69+
if ctx.Port == "" {
70+
ctx.Port = defaultPort
71+
} else {
72+
if _, err := strconv.Atoi(ctx.Port); err != nil {
73+
return nil, fmt.Errorf("error parsing port: %s", err.Error())
74+
}
75+
}
76+
77+
// When using self-hosted mode, configure the client port via env,
78+
// refer to https://docs.dapr.io/reference/environment/
79+
port := os.Getenv("DAPR_GRPC_PORT")
80+
if port == "" {
81+
clientGRPCPort = daprSidecarGRPCPort
82+
} else {
83+
clientGRPCPort = port
84+
}
85+
86+
return ctx, nil
87+
}
88+
89+
func (ctx *Context) Send(outputName string, data []byte) ([]byte, error) {
90+
if ctx.OutputIsEmpty() {
91+
return nil, errors.New("no output")
92+
}
93+
94+
var err error
95+
var output *Output
96+
var response *dapr.BindingEvent
97+
98+
client := ctx.GetDaprClient()
99+
100+
if v, ok := ctx.Outputs[outputName]; ok {
101+
output = v
102+
} else {
103+
return nil, fmt.Errorf("output %s not found", outputName)
104+
}
105+
106+
switch output.Type {
107+
case OpenFuncTopic:
108+
err = client.PublishEvent(context.Background(), output.Component, output.Uri, data)
109+
case OpenFuncBinding:
110+
in := &dapr.InvokeBindingRequest{
111+
Name: output.Component,
112+
Operation: output.Operation,
113+
Data: data,
114+
Metadata: output.Metadata,
115+
}
116+
response, err = client.InvokeBinding(context.Background(), in)
117+
}
118+
119+
if err != nil {
120+
return nil, err
121+
}
122+
123+
if response != nil {
124+
return response.Data, nil
125+
}
126+
return nil, nil
127+
}
128+
129+
func (ctx *Context) InputsIsEmpty() bool {
130+
nilInputs := map[string]*Input{}
131+
if reflect.DeepEqual(ctx.Inputs, nilInputs) {
132+
return true
133+
}
134+
return false
135+
}
136+
137+
func (ctx *Context) OutputIsEmpty() bool {
138+
nilOutputs := map[string]*Output{}
139+
if reflect.DeepEqual(ctx.Outputs, nilOutputs) {
140+
return true
141+
}
142+
return false
143+
}
144+
145+
func (ctx *Context) ReturnOnSuccess() Out {
146+
return Out{
147+
Code: Success,
148+
}
149+
}
150+
151+
func (ctx *Context) ReturnOnInternalError() Out {
152+
return Out{
153+
Code: InternalError,
154+
}
155+
}
156+
157+
func (ctx *Context) GetDaprClient() dapr.Client {
158+
return ctx.daprClient
159+
}
160+
161+
func InitDaprClientIfNil(ctx *Context) {
162+
if ctx.daprClient == nil {
163+
mu.Lock()
164+
defer mu.Unlock()
165+
c, e := dapr.NewClientWithPort(clientGRPCPort)
166+
if e != nil {
167+
panic(e)
168+
}
169+
ctx.daprClient = c
170+
}
171+
}
172+
173+
func DestroyDaprClient(ctx *Context) {
174+
if ctx.daprClient != nil {
175+
mu.Lock()
176+
defer mu.Unlock()
177+
ctx.daprClient.Close()
178+
ctx.daprClient = nil
179+
}
180+
}

context/types.go

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
package context
2+
3+
import (
4+
"context"
5+
"net/http"
6+
7+
cloudevents "github.com/cloudevents/sdk-go/v2"
8+
dapr "github.com/dapr/go-sdk/client"
9+
"github.com/dapr/go-sdk/service/common"
10+
)
11+
12+
const (
13+
Async Runtime = "Async"
14+
Knative Runtime = "Knative"
15+
OpenFuncBinding ResourceType = "bindings"
16+
OpenFuncTopic ResourceType = "pubsub"
17+
Success ReturnCode = 200
18+
InternalError ReturnCode = 500
19+
defaultPort = "8080"
20+
daprSidecarGRPCPort = "50001"
21+
)
22+
23+
type Runtime string
24+
type ReturnCode int
25+
type ResourceType string
26+
27+
type Context struct {
28+
Name string `json:"name"`
29+
Version string `json:"version"`
30+
RequestID string `json:"requestID,omitempty"`
31+
Ctx context.Context `json:"ctx,omitempty"`
32+
Inputs map[string]*Input `json:"inputs,omitempty"`
33+
Outputs map[string]*Output `json:"outputs,omitempty"`
34+
Runtime Runtime `json:"runtime"`
35+
Port string `json:"port,omitempty"`
36+
State interface{} `json:"state,omitempty"`
37+
EventMeta *EventMetadata `json:"event,omitempty"`
38+
SyncRequestMeta *SyncRequestMetadata `json:"syncRequest,omitempty"`
39+
PrePlugins []string `json:"prePlugins,omitempty"`
40+
PostPlugins []string `json:"postPlugins,omitempty"`
41+
Out Out `json:"out,omitempty"`
42+
Error error `json:"error,omitempty"`
43+
HttpPattern string `json:"httpPattern,omitempty"`
44+
daprClient dapr.Client
45+
}
46+
47+
type EventMetadata struct {
48+
InputName string `json:"inputName,omitempty"`
49+
BindingEvent *common.BindingEvent `json:"bindingEvent,omitempty"`
50+
TopicEvent *common.TopicEvent `json:"topicEvent,omitempty"`
51+
CloudEvent *cloudevents.Event `json:"cloudEventnt,omitempty"`
52+
}
53+
54+
type SyncRequestMetadata struct {
55+
ResponseWriter http.ResponseWriter `json:"responseWriter,omitempty"`
56+
Request *http.Request `json:"request,omitempty"`
57+
}
58+
59+
type Input struct {
60+
Uri string `json:"uri,omitempty"`
61+
Component string `json:"component,omitempty"`
62+
Type ResourceType `json:"type"`
63+
Metadata map[string]string `json:"metadata,omitempty"`
64+
}
65+
66+
type Output struct {
67+
Uri string `json:"uri,omitempty"`
68+
Component string `json:"component,omitempty"`
69+
Type ResourceType `json:"type"`
70+
Metadata map[string]string `json:"metadata,omitempty"`
71+
Operation string `json:"operation,omitempty"`
72+
}
73+
74+
type Out struct {
75+
Code ReturnCode `json:"code"`
76+
Data []byte `json:"data,omitempty"`
77+
Metadata map[string]string `json:"metadata,omitempty"`
78+
Error error `json:"error,omitempty"`
79+
}

0 commit comments

Comments
 (0)