From 10c6e339cc939ba572916d42c997a6bb95e653ca Mon Sep 17 00:00:00 2001 From: laminar Date: Mon, 13 Sep 2021 10:13:57 +0800 Subject: [PATCH] Supports multiple input sources MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: laminarĀ  --- functionframeworks/frameworks.go | 177 +++++++++++++++-------------- go.mod | 2 +- go.sum | 165 +++++++++++++++++++++++++++ openfunction-context/context.go | 187 +++++++++++++++---------------- openfunction-context/types.go | 39 +++++-- 5 files changed, 378 insertions(+), 192 deletions(-) create mode 100644 go.sum diff --git a/functionframeworks/frameworks.go b/functionframeworks/frameworks.go index a9d0816..35930d5 100644 --- a/functionframeworks/frameworks.go +++ b/functionframeworks/frameworks.go @@ -2,12 +2,8 @@ package functionframeworks import ( "context" - "encoding/json" "errors" "fmt" - ofctx "github.com/OpenFunction/functions-framework-go/openfunction-context" - dapr "github.com/dapr/go-sdk/service/common" - daprd "github.com/dapr/go-sdk/service/grpc" "log" "net/http" "os" @@ -15,6 +11,10 @@ import ( "strings" cloudevents "github.com/cloudevents/sdk-go/v2" + dapr "github.com/dapr/go-sdk/service/common" + daprd "github.com/dapr/go-sdk/service/grpc" + + ofctx "github.com/OpenFunction/functions-framework-go/openfunction-context" ) const ( @@ -32,8 +32,95 @@ func RegisterHTTPFunction(ctx context.Context, fn func(http.ResponseWriter, *htt return registerHTTPFunction("/", fn, handler) } -func RegisterOpenFunction(ctx context.Context, fn func(*ofctx.OpenFunctionContext, []byte) int) error { - return registerOpenFunction(fn, handler) +func RegisterOpenFunction(ctx context.Context, fn func(*ofctx.OpenFunctionContext, []byte) ofctx.Return) error { + return func(f func(*ofctx.OpenFunctionContext, []byte) ofctx.Return) error { + fctx, err := ofctx.GetOpenFunctionContext() + if err != nil { + return err + } + + if fctx.Runtime == ofctx.OpenFuncAsync { + openFuncAsyncServHandler, err = daprd.NewService(fmt.Sprintf(":%s", fctx.Port)) + if err != nil { + return err + } + } else { + return errors.New("cannot use non-OpenFuncAsync runtime for OpenFunction registration") + } + + var funcErr error + + // Serving function with inputs + if !fctx.InputsIsEmpty() { + for name, input := range fctx.Inputs { + switch input.Type { + case ofctx.OpenFuncBinding: + input.Uri = input.Component + funcErr = openFuncAsyncServHandler.AddBindingInvocationHandler(input.Uri, func(c context.Context, in *dapr.BindingEvent) (out []byte, err error) { + currentContext := fctx + currentContext.Event.InputName = name + currentContext.Event.BindingEvent = in + ret := f(currentContext, in.Data) + switch ret.Code { + case ofctx.Success: + return ret.Data, nil + case ofctx.InternalError: + return nil, errors.New(fmt.Sprint(ret.Error)) + default: + return nil, nil + } + }) + case ofctx.OpenFuncTopic: + sub := &dapr.Subscription{ + PubsubName: input.Component, + Topic: input.Uri, + } + funcErr = openFuncAsyncServHandler.AddTopicEventHandler(sub, func(c context.Context, e *dapr.TopicEvent) (retry bool, err error) { + currentContext := fctx + currentContext.Event.InputName = name + currentContext.Event.TopicEvent = e + ret := f(currentContext, e.Data.([]byte)) + switch ret.Code { + case ofctx.Success: + return false, nil + case ofctx.InternalError: + err = errors.New(fmt.Sprint(ret.Error)) + if retry, ok := ret.Metadata["retry"]; ok { + if strings.EqualFold(retry, "true") { + return true, err + } else if strings.EqualFold(retry, "false") { + return false, err + } else { + return false, err + } + } + return false, err + default: + return false, nil + } + }) + default: + return fmt.Errorf("invalid input type: %s", input.Type) + } + if funcErr != nil { + return err + } + } + // Serving function without inputs + } else { + ret := fn(fctx, []byte{}) + switch ret.Code { + case ofctx.Success: + return nil + case ofctx.InternalError: + err = errors.New(fmt.Sprint(ret.Error)) + return err + default: + return nil + } + } + return nil + }(fn) } func RegisterCloudEventFunction(ctx context.Context, fn func(context.Context, cloudevents.Event) error) error { @@ -48,84 +135,6 @@ func registerHTTPFunction(path string, fn func(http.ResponseWriter, *http.Reques return nil } -func registerOpenFunction(fn func(*ofctx.OpenFunctionContext, []byte) int, h *http.ServeMux) error { - ctx, err := ofctx.GetOpenFunctionContext() - if err != nil { - return err - } - - if ctx.Runtime == ofctx.OpenFuncAsync { - openFuncAsyncServHandler, err = daprd.NewService(fmt.Sprintf(":%s", ctx.Port)) - if err != nil { - return err - } - } else { - return errors.New(fmt.Sprint("Cannot use non-OpenFuncAsync runtime for function registration.")) - } - - // Serving function with inputs - if !ctx.InputIsEmpty() { - inType := ctx.Input.Params["type"] - switch ofctx.ResourceType(inType) { - case ofctx.OpenFuncBinding: - if ctx.Input.Uri == "" { - ctx.Input.Uri = ctx.Input.Name - } - err = openFuncAsyncServHandler.AddBindingInvocationHandler(ctx.Input.Uri, func(c context.Context, in *dapr.BindingEvent) (out []byte, err error) { - code := fn(ctx, in.Data) - if code == 200 { - return nil, nil - } else { - return nil, errors.New("error") - } - }) - case ofctx.OpenFuncTopic: - sub := &dapr.Subscription{ - PubsubName: ctx.Input.Name, - Topic: ctx.Input.Uri, - } - err = openFuncAsyncServHandler.AddTopicEventHandler(sub, func(c context.Context, e *dapr.TopicEvent) (retry bool, err error) { - in, err := json.Marshal(e.Data) - if err != nil { - return true, err - } - code := fn(ctx, in) - if code == 200 { - return false, nil - } else { - return true, errors.New("error") - } - }) - case ofctx.OpenFuncService: - if ctx.Input.Uri == "" { - ctx.Input.Uri = ctx.Input.Name - } - err = openFuncAsyncServHandler.AddServiceInvocationHandler(ctx.Input.Uri, func(c context.Context, in *dapr.InvocationEvent) (out *dapr.Content, err error) { - code := fn(ctx, in.Data) - if code == 200 { - return nil, nil - } else { - return nil, errors.New("error") - } - }) - default: - return fmt.Errorf("invalid input type: %s", inType) - } - if err != nil { - return err - } - // Serving function without inputs - } else { - code := fn(ctx, []byte{}) - if code == 200 { - return nil - } else { - return errors.New("error") - } - } - return nil -} - func registerCloudEventFunction(ctx context.Context, fn func(context.Context, cloudevents.Event) error, h *http.ServeMux) error { p, err := cloudevents.NewHTTP() if err != nil { diff --git a/go.mod b/go.mod index e833036..32cfad3 100644 --- a/go.mod +++ b/go.mod @@ -4,5 +4,5 @@ go 1.15 require ( github.com/cloudevents/sdk-go/v2 v2.4.1 - github.com/dapr/go-sdk v1.1.0 + github.com/dapr/go-sdk v1.2.0 ) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..7e919c8 --- /dev/null +++ b/go.sum @@ -0,0 +1,165 @@ +cloud.google.com/go v0.26.0 h1:e0WKqKTd5BnrG8aKH3J3h+QvEIQtSUcf2n5UZ5ZgLtQ= +cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/census-instrumentation/opencensus-proto v0.2.1 h1:glEXhBS5PSLLv4IXzLA5yPRVX4bilULVyxxbrfOtDAk= +github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= +github.com/client9/misspell v0.3.4 h1:ta993UF76GwbvJcIo3Y68y/M3WxlpEHPWIGDkJYwzJI= +github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/cloudevents/sdk-go/v2 v2.4.1 h1:rZJoz9QVLbWQmnvLPDFEmv17Czu+CfSPwMO6lhJ72xQ= +github.com/cloudevents/sdk-go/v2 v2.4.1/go.mod h1:MZiMwmAh5tGj+fPFvtHv9hKurKqXtdB9haJYMJ/7GJY= +github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354 h1:9kRtNpqLHbZVO/NNxhHp2ymxFxsHOe3x2efJGn//Tas= +github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= +github.com/creack/pty v1.1.9 h1:uDmaGzcdjhF4i/plgjmEsriH11Y0o7RKapEf/LDaM3w= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/dapr/go-sdk v1.1.0 h1:OfhGdSDMRaTlDmaTDlo1POW9FMFxdox/PFlapEdBug0= +github.com/dapr/go-sdk v1.1.0/go.mod h1:zyhsocIKv4pqQ2VtvWvf2CK1UhP7Z2OAOXgEpVxMgIs= +github.com/dapr/go-sdk v1.2.0 h1:9Uvw3AJlgfEPBDpaFg5zThe9dqv+ag+9AzFAiDZNYHo= +github.com/dapr/go-sdk v1.2.0/go.mod h1:zyhsocIKv4pqQ2VtvWvf2CK1UhP7Z2OAOXgEpVxMgIs= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/go-control-plane v0.9.7 h1:EARl0OvqMoxq/UMgMSCLnXzkaXbxzskluEBlMQCJPms= +github.com/envoyproxy/go-control-plane v0.9.7/go.mod h1:cwu0lG7PUMfa9snN8LXBig5ynNVH9qI8YYLbd1fK2po= +github.com/envoyproxy/protoc-gen-validate v0.1.0 h1:EQciDnbrYxy13PgWoY8AqoxGiPrpgBZ1R8UNe3ddc+A= +github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/mock v1.1.1 h1:G5FRp8JnTd7RQH5kemVNlMeyXQAztQ3mOWV95KxsXH8= +github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= +github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= +github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= +github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= +github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= +github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= +github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/golang/protobuf v1.4.3 h1:JjCZWpVbqXDqFVmTfYWEVTMIYrL/NPdPSCHPJ0T/raM= +github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= +github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.0 h1:/QaMHBdZ26BB3SSst0Iwl10Epc+xhTquomWX0oZEB6w= +github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/gofuzz v1.0.0 h1:A8PeW59pxE9IoFRqBp37U+mSNaQoZ46F1f0f863XSXw= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y= +github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/json-iterator/go v1.1.10 h1:Kz6Cvnvv2wGdaG/V8yMvfkmNiXq9Ya2KUv4rouJJr68= +github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= +github.com/kr/pty v1.1.1 h1:VkoXIwSboBpnk99O/KFauAEILuNHv5DVFKZMBN/gUgw= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742 h1:Esafd1046DLDQ0W1YjYsBW+p8U2u7vzgW2SQVmlNazg= +github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs= +github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4 h1:gQz4mCbXsO+nc9n1hCxHcGA3Zx3Eo+UHZoInFGUIXNM= +github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= +github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= +go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU= +go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= +go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI= +go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= +go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM= +go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 h1:psW17arqaxU48Z5kZ0CQnkZWQJsqcURM6tKiBApRjXI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/exp v0.0.0-20190121172915-509febef88a4 h1:c2HOrn5iMezYjSlGPncknSEr/8x5LELb/ilJbXi9DEA= +golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= +golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= +golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3 h1:XQyxROzUlZH+WIQwySDgnISgOivlhjIEwaQaJEJrrN0= +golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb h1:eBmm0M9fYhWpKZLjQUUKka/LtIxf46G4fxeEz5KJr9U= +golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be h1:vEDujvNQGv4jgYKudGeI/+DAX4Jffq6hpD55MmoEvKs= +golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e h1:vcxGaoTs7kV8m5Np9uUNQin4BrLOthgV7252N8V+FwY= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201202213521-69691e467435 h1:25AvDqqB9PrNqj1FLf2/70I4W0L19qqoaFq3gjNwbKk= +golang.org/x/sys v0.0.0-20201202213521-69691e467435/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.4 h1:0YWbFKbhXG/wIiuHDSKpS0Iy7FSA+u45VtBMfQcFTTc= +golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135 h1:5Beo0mZN8dRzgrMMkDp0jc8YXQKx9DiJ2k1dkvGsn5A= +golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= +google.golang.org/appengine v1.4.0 h1:/wp5JvzpHIxhs/dumFmF7BXTf3Z+dd4uXta4kVyO508= +google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= +google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= +google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= +google.golang.org/genproto v0.0.0-20201204160425-06b3db808446 h1:65ppmIPdaZE+BO34gntwqexoTYr30IRNGmS0OGOHu3A= +google.golang.org/genproto v0.0.0-20201204160425-06b3db808446/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= +google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= +google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= +google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= +google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= +google.golang.org/grpc v1.34.0 h1:raiipEjMOIC/TO2AvyTxP25XFdLxNIBwzDh3FM3XztI= +google.golang.org/grpc v1.34.0/go.mod h1:WotjhfgOW/POjDeRt8vscBtXq+2VjORFy659qA51WJ8= +google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= +google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= +google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= +google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= +google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= +google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.24.0/go.mod h1:r/3tXBNzIEhYS9I1OUVjXDlt8tc493IdKGjtUeSXeh4= +google.golang.org/protobuf v1.25.0 h1:Ejskq+SyPohKW+1uil0JJMtmHCgJPJ/qWTxr8qp+R4c= +google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU= +gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU= +gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776 h1:tQIYjPdBoyREyB9XMu+nnTclpTYkz2zFM+lzLJFO4gQ= +gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc h1:/hemPrYIhOhy8zYrNj+069zDB68us2sMGsfkFJO0iZs= +honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/openfunction-context/context.go b/openfunction-context/context.go index 209266c..c7b4d10 100644 --- a/openfunction-context/context.go +++ b/openfunction-context/context.go @@ -1,35 +1,14 @@ package openfunctioncontext import ( - "bytes" "context" "encoding/json" "errors" "fmt" - dapr "github.com/dapr/go-sdk/client" - "io" - "net/http" "os" "reflect" - "time" -) - -const ( - ApplicationJson = "application/json" - HTTPTimeoutSecond = 60 -) -var ( - httpNormalResponse = map[int]string{ - 200: "Request successful", - 204: "Empty Response", - } - httpErrorResponse = map[int]string{ - 403: "Invocation forbidden by access control", - 404: "Not Found", - 400: "Method name not given", - 500: "Request failed", - } + dapr "github.com/dapr/go-sdk/client" ) // ContextInterface represents Dapr callback service @@ -40,13 +19,13 @@ type ContextInterface interface { func GetOpenFunctionContext() (*OpenFunctionContext, error) { ctx := &OpenFunctionContext{ - Input: Input{}, + Inputs: make(map[string]*Input), Outputs: make(map[string]*Output), } data := os.Getenv("FUNC_CONTEXT") if data == "" { - return nil, errors.New("FUNC_CONTEXT not found") + return nil, errors.New("env FUNC_CONTEXT not found") } err := json.Unmarshal([]byte(data), ctx) @@ -61,90 +40,118 @@ func GetOpenFunctionContext() (*OpenFunctionContext, error) { return nil, fmt.Errorf("invalid runtime: %s", ctx.Runtime) } - if !ctx.InputIsEmpty() { - if ctx.Runtime == OpenFuncAsync { - if _, ok := ctx.Input.Params["type"]; !ok { - return nil, errors.New("invalid input: missing type") + if ctx.Runtime == OpenFuncAsync { + if !ctx.InputsIsEmpty() { + for name, in := range ctx.Inputs { + switch in.Type { + case OpenFuncBinding, OpenFuncTopic: + break + default: + return nil, fmt.Errorf("invalid input type %s: %s", name, in.Type) + } } } - } - if !ctx.OutputIsEmpty() { - if ctx.Runtime == OpenFuncAsync { + if !ctx.OutputIsEmpty() { for name, out := range ctx.Outputs { - if _, ok := out.Params["type"]; !ok { - return nil, fmt.Errorf("invalid output %s: missing type", name) + switch out.Type { + case OpenFuncBinding, OpenFuncTopic: + break + default: + return nil, fmt.Errorf("invalid output type %s: %s", name, out.Type) } } } } + ctx.Event = &EventMetadata{} return ctx, nil } +func (ctx *OpenFunctionContext) Send(outputName string, data []byte) ([]byte, error) { + if ctx.OutputIsEmpty() { + return nil, errors.New("no output") + } + + var err error + var output *Output + var client dapr.Client + var response *dapr.BindingEvent + if v, ok := ctx.Outputs[outputName]; ok { + output = v + } else { + return nil, fmt.Errorf("output %s not found", outputName) + } + + if ctx.Runtime == OpenFuncAsync { + c, e := dapr.NewClient() + if e != nil { + panic(e) + } + client = c + switch output.Type { + case OpenFuncTopic: + err = client.PublishEvent(context.Background(), output.Component, output.Uri, data) + case OpenFuncBinding: + in := &dapr.InvokeBindingRequest{ + Name: output.Component, + Operation: output.Operation, + Data: data, + Metadata: output.Metadata, + } + response, err = client.InvokeBinding(context.Background(), in) + } + + } else { + err = errors.New("the SendTo need OpenFuncAsync runtime") + } + + if err != nil && client != nil { + client.Close() + return nil, err + } + + if response != nil { + return response.Data, nil + } + return nil, nil +} + func (ctx *OpenFunctionContext) SendTo(data []byte, outputName string) error { if ctx.OutputIsEmpty() { return errors.New("no output") } var err error - var op *Output + var output *Output var client dapr.Client - var method = "" if v, ok := ctx.Outputs[outputName]; ok { - op = v + output = v } else { return fmt.Errorf("output %s not found", outputName) } - if m, ok := op.Params["method"]; ok { - method = m - } - if ctx.Runtime == OpenFuncAsync { - c, err := dapr.NewClient() - if err != nil { - panic(err) + c, e := dapr.NewClient() + if e != nil { + panic(e) } client = c - outType := op.Params["type"] - switch ResourceType(outType) { + switch output.Type { case OpenFuncTopic: - err = client.PublishEvent(context.Background(), outputName, op.Uri, data) - case OpenFuncService: - if method != "" { - content := &dapr.DataContent{ - ContentType: "application/json", - Data: data, - } - _, err = client.InvokeMethodWithContent(context.Background(), outputName, op.Uri, method, content) - } else { - err = errors.New("output method is empty or invalid") - } + err = client.PublishEvent(context.Background(), output.Component, output.Uri, data) case OpenFuncBinding: - var metadata map[string]string - if md, ok := op.Params["metadata"]; ok { - err = json.Unmarshal([]byte(md), &metadata) - if err != nil { - break - } - } - in := &dapr.InvokeBindingRequest{ - Name: outputName, - Operation: op.Params["operation"], + Name: output.Component, + Operation: output.Operation, Data: data, - Metadata: metadata, + Metadata: output.Metadata, } - err = client.InvokeOutputBinding(context.Background(), in) + _, err = client.InvokeBinding(context.Background(), in) } } else { - if method != "" { - _, err = doHttpRequest(method, op.Uri, ApplicationJson, bytes.NewReader(data)) - } else { - err = errors.New("output method is empty or invalid") - } + err = errors.New("the SendTo need OpenFuncAsync runtime") } if err != nil && client != nil { @@ -155,9 +162,9 @@ func (ctx *OpenFunctionContext) SendTo(data []byte, outputName string) error { return nil } -func (ctx *OpenFunctionContext) InputIsEmpty() bool { - nilInput := Input{} - if reflect.DeepEqual(ctx.Input, nilInput) { +func (ctx *OpenFunctionContext) InputsIsEmpty() bool { + nilInputs := map[string]*Input{} + if reflect.DeepEqual(ctx.Inputs, nilInputs) { return true } return false @@ -171,28 +178,14 @@ func (ctx *OpenFunctionContext) OutputIsEmpty() bool { return false } -func doHttpRequest(method string, url string, contentType string, body io.Reader) (resp *http.Response, err error) { - ctx, cancel := context.WithTimeout(context.Background(), HTTPTimeoutSecond*time.Second) - defer cancel() - - req, err := http.NewRequest(method, url, body) - if err != nil { - return nil, err - } - - req.Header.Set("Content-Type", contentType) - rsp, err := http.DefaultClient.Do(req.WithContext(ctx)) - if err != nil { - return nil, err - } - - if _, ok := httpNormalResponse[rsp.StatusCode]; ok { - return rsp, nil +func (ctx *OpenFunctionContext) ReturnWithSuccess() Return { + return Return{ + Code: Success, } +} - if rspText, ok := httpErrorResponse[rsp.StatusCode]; ok { - return rsp, errors.New(rspText) +func (ctx *OpenFunctionContext) ReturnWithInternalError() Return { + return Return{ + Code: InternalError, } - - return rsp, errors.New("unrecognized response code") } diff --git a/openfunction-context/types.go b/openfunction-context/types.go index 56611b3..0a2cb16 100644 --- a/openfunction-context/types.go +++ b/openfunction-context/types.go @@ -1,37 +1,56 @@ package openfunctioncontext +import "github.com/dapr/go-sdk/service/common" + const ( OpenFuncAsync Runtime = "OpenFuncAsync" Knative Runtime = "Knative" OpenFuncBinding ResourceType = "bindings" - OpenFuncService ResourceType = "invoke" OpenFuncTopic ResourceType = "pubsub" + Success ReturnCode = 200 + InternalError ReturnCode = 500 ) type OpenFunctionContext struct { Name string `json:"name"` Version string `json:"version"` RequestID string `json:"requestID,omitempty"` - Input Input `json:"input,omitempty"` + Inputs map[string]*Input `json:"inputs,omitempty"` Outputs map[string]*Output `json:"outputs,omitempty"` Runtime Runtime `json:"runtime"` Port string `json:"port,omitempty"` State interface{} `json:"state,omitempty"` + Event *EventMetadata `json:"event,omitempty"` +} + +type EventMetadata struct { + InputName string `json:"inputName,omitempty"` + BindingEvent *common.BindingEvent `json:"bindingEvent,omitempty"` + TopicEvent *common.TopicEvent `json:"topicEvent,omitempty"` } type Input struct { - Name string `json:"name"` - Uri string `json:"uri"` - Params map[string]string `json:"params,omitempty"` + Uri string `json:"uri,omitempty"` + Component string `json:"component,omitempty"` + Type ResourceType `json:"type"` + Metadata map[string]string `json:"metadata,omitempty"` } type Output struct { - Uri string `json:"uri"` - Params map[string]string `json:"params,omitempty"` + Uri string `json:"uri,omitempty"` + Component string `json:"component,omitempty"` + Type ResourceType `json:"type"` + Metadata map[string]string `json:"metadata,omitempty"` + Operation string `json:"operation,omitempty"` } type Runtime string - -type Protocol string - +type ReturnCode int type ResourceType string + +type Return struct { + Code ReturnCode `json:"code"` + Data []byte `json:"data,omitempty"` + Metadata map[string]string `json:"metadata,omitempty"` + Error string `json:"error,omitempty"` +}