From 6de82ec1aad3d0f7a4c007869a0d5a31fe4ad112 Mon Sep 17 00:00:00 2001 From: laminar Date: Tue, 1 Mar 2022 19:48:15 +0800 Subject: [PATCH] adjust innerEvent Signed-off-by: laminar --- context/context.go | 22 +++++++++++++++------- context/innerevent.go | 28 +++++++++++++++++----------- context/innerevent_test.go | 38 ++++++++++---------------------------- runtime/runtime.go | 14 +------------- 4 files changed, 43 insertions(+), 59 deletions(-) diff --git a/context/context.go b/context/context.go index 8fca7fe..3b2591c 100644 --- a/context/context.go +++ b/context/context.go @@ -339,8 +339,7 @@ func (ctx *FunctionContext) Send(outputName string, data []byte) ([]byte, error) var err error var output *Output var response *dapr.BindingEvent - var payload interface{} - var payloadBytes []byte + var payload []byte if v, ok := ctx.Outputs[outputName]; ok { output = v @@ -349,14 +348,12 @@ func (ctx *FunctionContext) Send(outputName string, data []byte) ([]byte, error) } payload = data - payloadBytes = data if traceable(output.ComponentType) { ie := NewInnerEvent(ctx) ie.MergeMetadata(ctx.GetInnerEvent()) ie.SetUserData(data) - payload = ie.GetCloudEvent() - payloadBytes = ie.GetCloudEventJSON() + payload = ie.GetCloudEventJSON() } switch output.GetType() { @@ -366,7 +363,7 @@ func (ctx *FunctionContext) Send(outputName string, data []byte) ([]byte, error) in := &dapr.InvokeBindingRequest{ Name: output.ComponentName, Operation: output.Operation, - Data: payloadBytes, + Data: payload, Metadata: output.Metadata, } response, err = ctx.daprClient.InvokeBinding(context.Background(), in) @@ -484,7 +481,7 @@ func (ctx *FunctionContext) SetEvent(inputName string, event interface{}) { ctx.setEvent(inputName, be, nil, nil, ie) case *common.TopicEvent: te := event.(*common.TopicEvent) - ie := convertEvent(ctx, inputName, te.Data) + ie := convertEvent(ctx, inputName, ConvertUserDataToBytes(te.Data)) ctx.setEvent(inputName, nil, te, nil, ie) case *cloudevents.Event: ce := event.(*cloudevents.Event) @@ -803,3 +800,14 @@ func getBuildingBlockType(componentType string) (ResourceType, error) { } return "", errors.New("invalid component type") } + +func ConvertUserDataToBytes(data interface{}) []byte { + if d, ok := data.([]byte); ok { + return d + } + if d, err := json.Marshal(data); err != nil { + return nil + } else { + return d + } +} diff --git a/context/innerevent.go b/context/innerevent.go index a535371..c21b76f 100644 --- a/context/innerevent.go +++ b/context/innerevent.go @@ -1,6 +1,7 @@ package context import ( + "encoding/base64" "encoding/json" "fmt" "sync" @@ -24,7 +25,7 @@ type InnerEvent interface { SetUserData(data interface{}) // GetUserData returns the userData in innerEventData. - GetUserData() interface{} + GetUserData() []byte // GetCloudEvent returns the cloudevent object in innerEvent. GetCloudEvent() cloudevents.Event @@ -50,7 +51,7 @@ type innerEvent struct { type innerEventData struct { Metadata map[string]string `json:"metadata,omitempty"` - UserData interface{} `json:"userData,omitempty"` + UserData []byte `json:"userData,omitempty"` } func NewInnerEvent(ctx RuntimeContext) InnerEvent { @@ -79,12 +80,13 @@ func (inner *innerEvent) GetMetadata() map[string]string { } func (inner *innerEvent) SetUserData(data interface{}) { + rawData := ConvertUserDataToBytes(data) inner.mu.Lock() defer func() { inner.save() inner.mu.Unlock() }() - inner.data.UserData = data + inner.data.UserData = rawData } func (inner *innerEvent) SetSubject(s string) { @@ -93,7 +95,7 @@ func (inner *innerEvent) SetSubject(s string) { inner.cloudevent.SetSubject(s) } -func (inner *innerEvent) GetUserData() interface{} { +func (inner *innerEvent) GetUserData() []byte { return inner.data.UserData } @@ -152,26 +154,33 @@ func (inner *innerEvent) Clone(event *cloudevents.Event) { inner.mu.Unlock() }() + var ud []byte inner.cloudevent = event d := &innerEventData{} if event.Data() != nil { if err := event.DataAs(d); err == nil { inner.data.Metadata = d.Metadata - inner.data.UserData = d.UserData + ud = d.UserData } else { - inner.data.UserData = event.Data() + ud = event.Data() } + if event.DataBase64 { + if rawUserData, err := base64.StdEncoding.DecodeString(string(ud)); err == nil { + inner.data.UserData = rawUserData + return + } + } + inner.data.UserData = ud } } func (inner *innerEvent) save() { if inner.cloudevent == nil || (inner.data != nil && len(inner.data.Metadata) > 0 && inner.data.UserData == nil) { - fmt.Println(inner.data.UserData) return } - if err := inner.cloudevent.SetData(cloudevents.ApplicationJSON, *inner.data); err != nil { + if err := inner.cloudevent.SetData(cloudevents.ApplicationJSON, ConvertUserDataToBytes(*inner.data)); err != nil { klog.Errorf("failed to set cloudevent data: %v\n", err) } } @@ -190,9 +199,6 @@ func convertEvent(ctx RuntimeContext, inputName string, data interface{}) InnerE inner.Clone(ce) return inner } - case cloudevents.Event: - inner.Clone(&data) - return inner default: inner.SetSubject(inputName) inner.SetUserData(data) diff --git a/context/innerevent_test.go b/context/innerevent_test.go index c8114a1..7fcb0b0 100644 --- a/context/innerevent_test.go +++ b/context/innerevent_test.go @@ -102,7 +102,7 @@ func TestInnerEvent(t *testing.T) { eventTest(t, ctx, te1, byteData) te2 := &common.TopicEvent{ - Data: ie.GetCloudEvent(), + Data: ie.GetCloudEventJSON(), } eventTest(t, ctx, te2, byteData) @@ -126,11 +126,10 @@ func TestInnerEvent(t *testing.T) { } func eventTest(t *testing.T, ctx RuntimeContext, event interface{}, target []byte) { - // receive test ctx.SetEvent("cron", event) ie := ctx.GetInnerEvent() - if !bytes.Equal(convertToByte(ie.GetUserData()), target) { + if !bytes.Equal(ie.GetUserData(), target) { t.Fatal("Error get user data in innerEvent") } ie.SetMetadata("k1", "v1") @@ -156,17 +155,13 @@ func eventTest(t *testing.T, ctx RuntimeContext, event interface{}, target []byt udInEvent := map[string]string{} if ie2.GetUserData() != nil { - ud := ie2.GetUserData() - switch ud := ud.(type) { - case []byte: - if err := json.Unmarshal(ud, &udInEvent); err != nil { - t.Fatal("Error unmarshal user data in inner event") - } - if v, exist := udInEvent["foo2"]; exist && v == "bar2" { - - } else { - t.Fatal("Error set inner event userdata") - } + if err := json.Unmarshal(ie2.GetUserData(), &udInEvent); err != nil { + t.Fatal("Error unmarshal user data in inner event") + } + if v, exist := udInEvent["foo2"]; exist && v == "bar2" { + + } else { + t.Fatal("Error set inner event userdata") } } else { t.Fatal("Error set inner event userdata") @@ -184,8 +179,7 @@ func eventTest(t *testing.T, ctx RuntimeContext, event interface{}, target []byt } ud := map[string]string{} - udByte, _ := json.Marshal(ieData.UserData) - if err := json.Unmarshal(udByte, &ud); err != nil { + if err := json.Unmarshal(ieData.UserData, &ud); err != nil { t.Fatal("Error unmarshal user data in inner event") } @@ -194,16 +188,4 @@ func eventTest(t *testing.T, ctx RuntimeContext, event interface{}, target []byt } else { t.Fatal("Error save inner event userdata") } - -} - -func convertToByte(data interface{}) []byte { - if d, ok := data.([]byte); ok { - return d - } - if d, err := json.Marshal(data); err != nil { - return nil - } else { - return d - } } diff --git a/runtime/runtime.go b/runtime/runtime.go index 9e3967f..852de58 100644 --- a/runtime/runtime.go +++ b/runtime/runtime.go @@ -2,7 +2,6 @@ package runtime import ( "context" - "encoding/json" "io/ioutil" "net/http" @@ -129,7 +128,7 @@ func (rm *RuntimeManager) FunctionRunWrapperWithHooks(fn interface{}) { userData := rm.FuncContext.GetInnerEvent().GetUserData() // pass user data to user function - out, err := function(functionContext, convertUserDataToBytes(userData)) + out, err := function(functionContext, userData) rm.FuncContext.WithOut(out.GetOut()) rm.FuncContext.WithError(err) @@ -152,14 +151,3 @@ func (rm *RuntimeManager) FunctionRunWrapperWithHooks(fn interface{}) { rm.ProcessPostHooks() } - -func convertUserDataToBytes(data interface{}) []byte { - if d, ok := data.([]byte); ok { - return d - } - if d, err := json.Marshal(data); err != nil { - return nil - } else { - return d - } -}