Skip to content

Commit 99b250e

Browse files
committed
fix: solved the goroutine safety problem of funcContext and plugin during function processing
Signed-off-by: laminar <fangtian@kubesphere.io>
1 parent 0fb66d1 commit 99b250e

File tree

7 files changed

+167
-128
lines changed

7 files changed

+167
-128
lines changed

framework/framework.go

Lines changed: 3 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -55,17 +55,17 @@ func NewFramework() (*functionsFrameworkImpl, error) {
5555

5656
func (fwk *functionsFrameworkImpl) Register(ctx context.Context, fn interface{}) error {
5757
if fnHTTP, ok := fn.(func(http.ResponseWriter, *http.Request) error); ok {
58-
if err := fwk.runtime.RegisterHTTPFunction(fwk.funcContext, fwk.processPreHooks, fwk.processPostHooks, fnHTTP); err != nil {
58+
if err := fwk.runtime.RegisterHTTPFunction(fwk.funcContext, fwk.prePlugins, fwk.postPlugins, fnHTTP); err != nil {
5959
klog.Errorf("failed to register function: %v", err)
6060
return err
6161
}
6262
} else if fnOpenFunction, ok := fn.(func(ofctx.Context, []byte) (ofctx.Out, error)); ok {
63-
if err := fwk.runtime.RegisterOpenFunction(fwk.funcContext, fwk.processPreHooks, fwk.processPostHooks, fnOpenFunction); err != nil {
63+
if err := fwk.runtime.RegisterOpenFunction(fwk.funcContext, fwk.prePlugins, fwk.postPlugins, fnOpenFunction); err != nil {
6464
klog.Errorf("failed to register function: %v", err)
6565
return err
6666
}
6767
} else if fnCloudEvent, ok := fn.(func(context.Context, cloudevents.Event) error); ok {
68-
if err := fwk.runtime.RegisterCloudEventFunction(ctx, fwk.funcContext, fwk.processPreHooks, fwk.processPostHooks, fnCloudEvent); err != nil {
68+
if err := fwk.runtime.RegisterCloudEventFunction(ctx, fwk.funcContext, fwk.prePlugins, fwk.postPlugins, fnCloudEvent); err != nil {
6969
klog.Errorf("failed to register function: %v", err)
7070
return err
7171
}
@@ -77,28 +77,6 @@ func (fwk *functionsFrameworkImpl) Register(ctx context.Context, fn interface{})
7777
return nil
7878
}
7979

80-
func (fwk *functionsFrameworkImpl) processPreHooks() error {
81-
plugins := fwk.pluginMap
82-
for _, plg := range fwk.prePlugins {
83-
klog.Infof("exec pre hooks: %s of version %s", plg.Name(), plg.Version())
84-
if err := plg.ExecPreHook(fwk.funcContext, plugins); err != nil {
85-
klog.Warningf("failed to exec pre hooks %s: %s", plg.Name(), err.Error())
86-
}
87-
}
88-
return nil
89-
}
90-
91-
func (fwk *functionsFrameworkImpl) processPostHooks() error {
92-
plugins := fwk.pluginMap
93-
for _, plg := range fwk.postPlugins {
94-
klog.Infof("exec post hooks: %s of version %s", plg.Name(), plg.Version())
95-
if err := plg.ExecPostHook(fwk.funcContext, plugins); err != nil {
96-
klog.Warningf("failed to exec post hooks %s: %s", plg.Name(), err.Error())
97-
}
98-
}
99-
return nil
100-
}
101-
10280
func (fwk *functionsFrameworkImpl) Start(ctx context.Context) error {
10381
err := fwk.runtime.Start(ctx)
10482
if err != nil {

plugin/interface.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ type Metadata interface {
1111

1212
type Plugin interface {
1313
Metadata
14+
Init() Plugin
1415
ExecPreHook(ctx ofctx.Context, plugins map[string]Plugin) error
1516
ExecPostHook(ctx ofctx.Context, plugins map[string]Plugin) error
1617
Get(fieldName string) (interface{}, bool)

plugin/plugin-example/plugin-example.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,10 @@ func (p *PluginExample) Version() string {
3737
return Version
3838
}
3939

40+
func (p *PluginExample) Init() plugin.Plugin {
41+
return New()
42+
}
43+
4044
func (p *PluginExample) ExecPreHook(ctx ofctx.Context, plugins map[string]plugin.Plugin) error {
4145
r := preHookLogic(ctx.Ctx)
4246
p.stateA = 1

runtime/async/async.go

Lines changed: 33 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ import (
1414
"k8s.io/klog/v2"
1515

1616
ofctx "github.com/OpenFunction/functions-framework-go/context"
17+
"github.com/OpenFunction/functions-framework-go/plugin"
18+
"github.com/OpenFunction/functions-framework-go/runtime"
1719
)
1820

1921
type Runtime struct {
@@ -41,8 +43,8 @@ func (r *Runtime) Start(ctx context.Context) error {
4143

4244
func (r *Runtime) RegisterHTTPFunction(
4345
ctx ofctx.Context,
44-
processPreHooksFunc func() error,
45-
processPostHooksFunc func() error,
46+
prePlugins []plugin.Plugin,
47+
postPlugins []plugin.Plugin,
4648
fn func(http.ResponseWriter, *http.Request) error,
4749
) error {
4850
return errors.New("async runtime cannot register http function")
@@ -51,17 +53,17 @@ func (r *Runtime) RegisterHTTPFunction(
5153
func (r *Runtime) RegisterCloudEventFunction(
5254
ctx context.Context,
5355
funcContext ofctx.Context,
54-
processPreHooksFunc func() error,
55-
processPostHooksFunc func() error,
56+
prePlugins []plugin.Plugin,
57+
postPlugins []plugin.Plugin,
5658
fn func(context.Context, cloudevents.Event) error,
5759
) error {
5860
return errors.New("async runtime cannot register cloudevent function")
5961
}
6062

6163
func (r *Runtime) RegisterOpenFunction(
6264
ctx ofctx.Context,
63-
processPreHooksFunc func() error,
64-
processPostHooksFunc func() error,
65+
prePlugins []plugin.Plugin,
66+
postPlugins []plugin.Plugin,
6567
fn func(ofctx.Context, []byte) (ofctx.Out, error),
6668
) error {
6769
// Register the asynchronous functions (based on the Dapr runtime)
@@ -78,24 +80,21 @@ func (r *Runtime) RegisterOpenFunction(
7880
case ofctx.OpenFuncBinding:
7981
input.Uri = input.Component
8082
funcErr = r.handler.AddBindingInvocationHandler(input.Uri, func(c context.Context, in *dapr.BindingEvent) (out []byte, err error) {
81-
ctx.EventMeta.InputName = name
82-
ctx.EventMeta.BindingEvent = in
83+
rm := runtime.NewRuntimeManager(ctx, prePlugins, postPlugins)
84+
rm.FuncContext.EventMeta.InputName = name
85+
rm.FuncContext.EventMeta.BindingEvent = in
8386

84-
if err := processPreHooksFunc(); err != nil {
85-
// Just logging errors
86-
}
87+
rm.ProcessPreHooks()
8788

88-
ctx.Out, ctx.Error = f(ctx, in.Data)
89+
rm.FuncContext.Out, rm.FuncContext.Error = f(rm.FuncContext, in.Data)
8990

90-
if err := processPostHooksFunc(); err != nil {
91-
// Just logging errors
92-
}
91+
rm.ProcessPostHooks()
9392

94-
switch ctx.Out.Code {
93+
switch rm.FuncContext.Out.Code {
9594
case ofctx.Success:
96-
return ctx.Out.Data, nil
95+
return rm.FuncContext.Out.Data, nil
9796
case ofctx.InternalError:
98-
return nil, ctx.Out.Error
97+
return nil, rm.FuncContext.Out.Error
9998
default:
10099
return nil, nil
101100
}
@@ -106,25 +105,22 @@ func (r *Runtime) RegisterOpenFunction(
106105
Topic: input.Uri,
107106
}
108107
funcErr = r.handler.AddTopicEventHandler(sub, func(c context.Context, e *dapr.TopicEvent) (retry bool, err error) {
109-
ctx.EventMeta.InputName = name
110-
ctx.EventMeta.TopicEvent = e
108+
rm := runtime.NewRuntimeManager(ctx, prePlugins, postPlugins)
109+
rm.FuncContext.EventMeta.InputName = name
110+
rm.FuncContext.EventMeta.TopicEvent = e
111111

112-
if err := processPreHooksFunc(); err != nil {
113-
// Just logging errors
114-
}
112+
rm.ProcessPreHooks()
115113

116-
ctx.Out, ctx.Error = f(ctx, convertTopicEventToByte(e.Data))
114+
rm.FuncContext.Out, rm.FuncContext.Error = f(rm.FuncContext, convertTopicEventToByte(e.Data))
117115

118-
if err := processPostHooksFunc(); err != nil {
119-
// Just logging errors
120-
}
116+
rm.ProcessPostHooks()
121117

122-
switch ctx.Out.Code {
118+
switch rm.FuncContext.Out.Code {
123119
case ofctx.Success:
124120
return false, nil
125121
case ofctx.InternalError:
126-
err = ctx.Out.Error
127-
if retry, ok := ctx.Out.Metadata["retry"]; ok {
122+
err = rm.FuncContext.Out.Error
123+
if retry, ok := rm.FuncContext.Out.Metadata["retry"]; ok {
128124
if strings.EqualFold(retry, "true") {
129125
return true, err
130126
} else if strings.EqualFold(retry, "false") {
@@ -152,20 +148,18 @@ func (r *Runtime) RegisterOpenFunction(
152148
}
153149
// Serving function without inputs
154150
} else {
155-
if err := processPreHooksFunc(); err != nil {
156-
// Just logging errors
157-
}
151+
rm := runtime.NewRuntimeManager(ctx, prePlugins, postPlugins)
152+
rm.ProcessPreHooks()
158153

159-
ctx.Out, ctx.Error = f(ctx, nil)
154+
rm.FuncContext.Out, rm.FuncContext.Error = f(rm.FuncContext, nil)
160155

161-
if err := processPostHooksFunc(); err != nil {
162-
// Just logging errors
163-
}
164-
switch ctx.Out.Code {
156+
rm.ProcessPostHooks()
157+
158+
switch rm.FuncContext.Out.Code {
165159
case ofctx.Success:
166160
return nil
167161
case ofctx.InternalError:
168-
return ctx.Out.Error
162+
return rm.FuncContext.Out.Error
169163
default:
170164
return nil
171165
}

runtime/interface.go

Lines changed: 0 additions & 33 deletions
This file was deleted.

runtime/knative/knative.go

Lines changed: 29 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ import (
1313
"k8s.io/klog/v2"
1414

1515
ofctx "github.com/OpenFunction/functions-framework-go/context"
16+
"github.com/OpenFunction/functions-framework-go/plugin"
17+
"github.com/OpenFunction/functions-framework-go/runtime"
1618
)
1719

1820
const (
@@ -48,34 +50,31 @@ func (r *Runtime) Start(ctx context.Context) error {
4850

4951
func (r *Runtime) RegisterOpenFunction(
5052
ctx ofctx.Context,
51-
processPreHooksFunc func() error,
52-
processPostHooksFunc func() error,
53+
prePlugins []plugin.Plugin,
54+
postPlugins []plugin.Plugin,
5355
fn func(ofctx.Context, []byte) (ofctx.Out, error),
5456
) error {
5557
// Register the synchronous function (based on Knaitve runtime)
5658
return func(f func(ofctx.Context, []byte) (ofctx.Out, error)) error {
5759
r.handler.HandleFunc(r.pattern, func(w http.ResponseWriter, r *http.Request) {
58-
ctx.SyncRequestMeta.ResponseWriter = w
59-
ctx.SyncRequestMeta.Request = r
60+
rm := runtime.NewRuntimeManager(ctx, prePlugins, postPlugins)
61+
rm.FuncContext.SyncRequestMeta.ResponseWriter = w
62+
rm.FuncContext.SyncRequestMeta.Request = r
6063
defer RecoverPanicHTTP(w, "Function panic")
6164

62-
if err := processPreHooksFunc(); err != nil {
63-
// Just logging errors
64-
}
65+
rm.ProcessPreHooks()
6566

66-
ctx.Out, ctx.Error = f(ctx, convertRequestBodyToByte(r))
67+
rm.FuncContext.Out, rm.FuncContext.Error = f(rm.FuncContext, convertRequestBodyToByte(r))
6768

68-
if err := processPostHooksFunc; err != nil {
69-
// Just logging errors
70-
}
69+
rm.ProcessPostHooks()
7170

72-
switch ctx.Out.Code {
71+
switch rm.FuncContext.Out.Code {
7372
case ofctx.Success:
7473
w.Header().Set(functionStatusHeader, successStatus)
7574
return
7675
case ofctx.InternalError:
7776
w.Header().Set(functionStatusHeader, errorStatus)
78-
w.WriteHeader(int(ctx.Out.Code))
77+
w.WriteHeader(int(rm.FuncContext.Out.Code))
7978
return
8079
default:
8180
return
@@ -87,31 +86,31 @@ func (r *Runtime) RegisterOpenFunction(
8786

8887
func (r *Runtime) RegisterHTTPFunction(
8988
ctx ofctx.Context,
90-
processPreHooksFunc func() error,
91-
processPostHooksFunc func() error,
89+
prePlugins []plugin.Plugin,
90+
postPlugins []plugin.Plugin,
9291
fn func(http.ResponseWriter, *http.Request) error,
9392
) error {
9493
r.handler.HandleFunc(r.pattern, func(w http.ResponseWriter, r *http.Request) {
94+
rm := runtime.NewRuntimeManager(ctx, prePlugins, postPlugins)
95+
rm.FuncContext.SyncRequestMeta.ResponseWriter = w
96+
rm.FuncContext.SyncRequestMeta.Request = r
9597
defer RecoverPanicHTTP(w, "Function panic")
9698

97-
if err := processPreHooksFunc(); err != nil {
98-
// Just logging errors
99-
}
99+
rm.ProcessPreHooks()
100100

101-
ctx.Error = fn(w, r)
101+
rm.FuncContext.Error = fn(w, r)
102+
103+
rm.ProcessPostHooks()
102104

103-
if err := processPostHooksFunc(); err != nil {
104-
// Just logging errors
105-
}
106105
})
107106
return nil
108107
}
109108

110109
func (r *Runtime) RegisterCloudEventFunction(
111110
ctx context.Context,
112111
funcContext ofctx.Context,
113-
processPreHooksFunc func() error,
114-
processPostHooksFunc func() error,
112+
prePlugins []plugin.Plugin,
113+
postPlugins []plugin.Plugin,
115114
fn func(context.Context, cloudevents.Event) error,
116115
) error {
117116
p, err := cloudevents.NewHTTP()
@@ -121,15 +120,14 @@ func (r *Runtime) RegisterCloudEventFunction(
121120
}
122121

123122
handleFn, err := cloudevents.NewHTTPReceiveHandler(ctx, p, func(ctx context.Context, ce cloudevents.Event) error {
124-
if err := processPreHooksFunc(); err != nil {
125-
// Just logging errors
126-
}
123+
rm := runtime.NewRuntimeManager(funcContext, prePlugins, postPlugins)
124+
rm.FuncContext.EventMeta.CloudEvent = &ce
125+
126+
rm.ProcessPreHooks()
127127

128-
funcContext.Error = fn(ctx, ce)
128+
rm.FuncContext.Error = fn(ctx, ce)
129129

130-
if err := processPostHooksFunc(); err != nil {
131-
// Just logging errors
132-
}
130+
rm.ProcessPostHooks()
133131

134132
return funcContext.Error
135133
})

0 commit comments

Comments
 (0)