Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 15 additions & 7 deletions context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,8 +339,7 @@ func (ctx *FunctionContext) Send(outputName string, data []byte) ([]byte, error)
var err error
var output *Output
var response *dapr.BindingEvent
var payload interface{}
var payloadBytes []byte
var payload []byte

if v, ok := ctx.Outputs[outputName]; ok {
output = v
Expand All @@ -349,14 +348,12 @@ func (ctx *FunctionContext) Send(outputName string, data []byte) ([]byte, error)
}

payload = data
payloadBytes = data

if traceable(output.ComponentType) {
ie := NewInnerEvent(ctx)
ie.MergeMetadata(ctx.GetInnerEvent())
ie.SetUserData(data)
payload = ie.GetCloudEvent()
payloadBytes = ie.GetCloudEventJSON()
payload = ie.GetCloudEventJSON()
}

switch output.GetType() {
Expand All @@ -366,7 +363,7 @@ func (ctx *FunctionContext) Send(outputName string, data []byte) ([]byte, error)
in := &dapr.InvokeBindingRequest{
Name: output.ComponentName,
Operation: output.Operation,
Data: payloadBytes,
Data: payload,
Metadata: output.Metadata,
}
response, err = ctx.daprClient.InvokeBinding(context.Background(), in)
Expand Down Expand Up @@ -484,7 +481,7 @@ func (ctx *FunctionContext) SetEvent(inputName string, event interface{}) {
ctx.setEvent(inputName, be, nil, nil, ie)
case *common.TopicEvent:
te := event.(*common.TopicEvent)
ie := convertEvent(ctx, inputName, te.Data)
ie := convertEvent(ctx, inputName, ConvertUserDataToBytes(te.Data))
ctx.setEvent(inputName, nil, te, nil, ie)
case *cloudevents.Event:
ce := event.(*cloudevents.Event)
Expand Down Expand Up @@ -803,3 +800,14 @@ func getBuildingBlockType(componentType string) (ResourceType, error) {
}
return "", errors.New("invalid component type")
}

func ConvertUserDataToBytes(data interface{}) []byte {
if d, ok := data.([]byte); ok {
return d
}
if d, err := json.Marshal(data); err != nil {
return nil
} else {
return d
}
}
28 changes: 17 additions & 11 deletions context/innerevent.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package context

import (
"encoding/base64"
"encoding/json"
"fmt"
"sync"
Expand All @@ -24,7 +25,7 @@ type InnerEvent interface {
SetUserData(data interface{})

// GetUserData returns the userData in innerEventData.
GetUserData() interface{}
GetUserData() []byte

// GetCloudEvent returns the cloudevent object in innerEvent.
GetCloudEvent() cloudevents.Event
Expand All @@ -50,7 +51,7 @@ type innerEvent struct {

type innerEventData struct {
Metadata map[string]string `json:"metadata,omitempty"`
UserData interface{} `json:"userData,omitempty"`
UserData []byte `json:"userData,omitempty"`
}

func NewInnerEvent(ctx RuntimeContext) InnerEvent {
Expand Down Expand Up @@ -79,12 +80,13 @@ func (inner *innerEvent) GetMetadata() map[string]string {
}

func (inner *innerEvent) SetUserData(data interface{}) {
rawData := ConvertUserDataToBytes(data)
inner.mu.Lock()
defer func() {
inner.save()
inner.mu.Unlock()
}()
inner.data.UserData = data
inner.data.UserData = rawData
}

func (inner *innerEvent) SetSubject(s string) {
Expand All @@ -93,7 +95,7 @@ func (inner *innerEvent) SetSubject(s string) {
inner.cloudevent.SetSubject(s)
}

func (inner *innerEvent) GetUserData() interface{} {
func (inner *innerEvent) GetUserData() []byte {
return inner.data.UserData
}

Expand Down Expand Up @@ -152,26 +154,33 @@ func (inner *innerEvent) Clone(event *cloudevents.Event) {
inner.mu.Unlock()
}()

var ud []byte
inner.cloudevent = event

d := &innerEventData{}
if event.Data() != nil {
if err := event.DataAs(d); err == nil {
inner.data.Metadata = d.Metadata
inner.data.UserData = d.UserData
ud = d.UserData
} else {
inner.data.UserData = event.Data()
ud = event.Data()
}
if event.DataBase64 {
if rawUserData, err := base64.StdEncoding.DecodeString(string(ud)); err == nil {
inner.data.UserData = rawUserData
return
}
}
inner.data.UserData = ud
}
}

func (inner *innerEvent) save() {
if inner.cloudevent == nil || (inner.data != nil && len(inner.data.Metadata) > 0 && inner.data.UserData == nil) {
fmt.Println(inner.data.UserData)
return
}

if err := inner.cloudevent.SetData(cloudevents.ApplicationJSON, *inner.data); err != nil {
if err := inner.cloudevent.SetData(cloudevents.ApplicationJSON, ConvertUserDataToBytes(*inner.data)); err != nil {
klog.Errorf("failed to set cloudevent data: %v\n", err)
}
}
Expand All @@ -190,9 +199,6 @@ func convertEvent(ctx RuntimeContext, inputName string, data interface{}) InnerE
inner.Clone(ce)
return inner
}
case cloudevents.Event:
inner.Clone(&data)
return inner
default:
inner.SetSubject(inputName)
inner.SetUserData(data)
Expand Down
38 changes: 10 additions & 28 deletions context/innerevent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func TestInnerEvent(t *testing.T) {
eventTest(t, ctx, te1, byteData)

te2 := &common.TopicEvent{
Data: ie.GetCloudEvent(),
Data: ie.GetCloudEventJSON(),
}
eventTest(t, ctx, te2, byteData)

Expand All @@ -126,11 +126,10 @@ func TestInnerEvent(t *testing.T) {
}

func eventTest(t *testing.T, ctx RuntimeContext, event interface{}, target []byte) {

// receive test
ctx.SetEvent("cron", event)
ie := ctx.GetInnerEvent()
if !bytes.Equal(convertToByte(ie.GetUserData()), target) {
if !bytes.Equal(ie.GetUserData(), target) {
t.Fatal("Error get user data in innerEvent")
}
ie.SetMetadata("k1", "v1")
Expand All @@ -156,17 +155,13 @@ func eventTest(t *testing.T, ctx RuntimeContext, event interface{}, target []byt

udInEvent := map[string]string{}
if ie2.GetUserData() != nil {
ud := ie2.GetUserData()
switch ud := ud.(type) {
case []byte:
if err := json.Unmarshal(ud, &udInEvent); err != nil {
t.Fatal("Error unmarshal user data in inner event")
}
if v, exist := udInEvent["foo2"]; exist && v == "bar2" {

} else {
t.Fatal("Error set inner event userdata")
}
if err := json.Unmarshal(ie2.GetUserData(), &udInEvent); err != nil {
t.Fatal("Error unmarshal user data in inner event")
}
if v, exist := udInEvent["foo2"]; exist && v == "bar2" {

} else {
t.Fatal("Error set inner event userdata")
}
} else {
t.Fatal("Error set inner event userdata")
Expand All @@ -184,8 +179,7 @@ func eventTest(t *testing.T, ctx RuntimeContext, event interface{}, target []byt
}

ud := map[string]string{}
udByte, _ := json.Marshal(ieData.UserData)
if err := json.Unmarshal(udByte, &ud); err != nil {
if err := json.Unmarshal(ieData.UserData, &ud); err != nil {
t.Fatal("Error unmarshal user data in inner event")
}

Expand All @@ -194,16 +188,4 @@ func eventTest(t *testing.T, ctx RuntimeContext, event interface{}, target []byt
} else {
t.Fatal("Error save inner event userdata")
}

}

func convertToByte(data interface{}) []byte {
if d, ok := data.([]byte); ok {
return d
}
if d, err := json.Marshal(data); err != nil {
return nil
} else {
return d
}
}
14 changes: 1 addition & 13 deletions runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package runtime

import (
"context"
"encoding/json"
"io/ioutil"
"net/http"

Expand Down Expand Up @@ -129,7 +128,7 @@ func (rm *RuntimeManager) FunctionRunWrapperWithHooks(fn interface{}) {
userData := rm.FuncContext.GetInnerEvent().GetUserData()

// pass user data to user function
out, err := function(functionContext, convertUserDataToBytes(userData))
out, err := function(functionContext, userData)

rm.FuncContext.WithOut(out.GetOut())
rm.FuncContext.WithError(err)
Expand All @@ -152,14 +151,3 @@ func (rm *RuntimeManager) FunctionRunWrapperWithHooks(fn interface{}) {

rm.ProcessPostHooks()
}

func convertUserDataToBytes(data interface{}) []byte {
if d, ok := data.([]byte); ok {
return d
}
if d, err := json.Marshal(data); err != nil {
return nil
} else {
return d
}
}