Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions internal/apiserver/route_get_namespace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package apiserver

import (
"context"
"fmt"
"net/http/httptest"
"testing"

Expand Down Expand Up @@ -48,8 +49,8 @@ func TestGetNamespaceInvalid(t *testing.T) {
req.Header.Set("Content-Type", "application/json; charset=utf-8")
res := httptest.NewRecorder()

mgr.On("Orchestrator", "BAD").Return(nil, nil)
mgr.On("Orchestrator", mock.Anything, "BAD").Return(nil, fmt.Errorf("pop"))
r.ServeHTTP(res, req)

assert.Equal(t, 404, res.Result().StatusCode)
assert.Equal(t, 500, res.Result().StatusCode)
}
19 changes: 8 additions & 11 deletions internal/apiserver/server.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2022 Kaleido, Inc.
// Copyright © 2023 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -210,9 +210,9 @@ func (as *apiServer) swaggerGenerator(routes []*ffapi.Route, apiBaseURL string)
func (as *apiServer) contractSwaggerGenerator(mgr namespace.Manager, apiBaseURL string) func(req *http.Request) (*openapi3.T, error) {
return func(req *http.Request) (*openapi3.T, error) {
vars := mux.Vars(req)
or := mgr.Orchestrator(vars["ns"])
if or == nil {
return nil, i18n.NewError(req.Context(), coremsgs.MsgNamespaceDoesNotExist)
or, err := mgr.Orchestrator(req.Context(), vars["ns"])
if err != nil {
return nil, err
}
cm := or.Contracts()
api, err := cm.GetContractAPI(req.Context(), apiBaseURL, vars["apiName"])
Expand All @@ -235,19 +235,16 @@ func (as *apiServer) contractSwaggerGenerator(mgr namespace.Manager, apiBaseURL
func getOrchestrator(ctx context.Context, mgr namespace.Manager, tag string, r *ffapi.APIRequest) (or orchestrator.Orchestrator, err error) {
switch tag {
case routeTagDefaultNamespace:
or = mgr.Orchestrator(config.GetString(coreconfig.NamespacesDefault))
return mgr.Orchestrator(ctx, config.GetString(coreconfig.NamespacesDefault))
case routeTagNonDefaultNamespace:
vars := mux.Vars(r.Req)
if ns, ok := vars["ns"]; ok {
or = mgr.Orchestrator(ns)
return mgr.Orchestrator(ctx, ns)
}
default:
case routeTagGlobal:
return nil, nil
}
if or == nil {
return nil, i18n.NewError(ctx, coremsgs.MsgNamespaceDoesNotExist)
}
return or, nil
return nil, i18n.NewError(ctx, coremsgs.MsgMissingNamespace)
}

func (as *apiServer) routeHandler(hf *ffapi.HandlerFactory, mgr namespace.Manager, apiBaseURL string, route *ffapi.Route) http.HandlerFunc {
Expand Down
15 changes: 12 additions & 3 deletions internal/apiserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/hyperledger/firefly-common/pkg/httpserver"
"github.com/hyperledger/firefly-common/pkg/i18n"
"github.com/hyperledger/firefly/internal/coreconfig"
"github.com/hyperledger/firefly/internal/coremsgs"
"github.com/hyperledger/firefly/internal/metrics"
"github.com/hyperledger/firefly/mocks/apiservermocks"
"github.com/hyperledger/firefly/mocks/contractmocks"
Expand All @@ -56,6 +57,9 @@ func newTestServer() (*namespacemocks.Manager, *orchestratormocks.Orchestrator,
mgr.On("Orchestrator", "default").Return(o).Maybe()
mgr.On("Orchestrator", "mynamespace").Return(o).Maybe()
mgr.On("Orchestrator", "ns1").Return(o).Maybe()
mgr.On("Orchestrator", mock.Anything, "default").Return(o, nil).Maybe()
mgr.On("Orchestrator", mock.Anything, "mynamespace").Return(o, nil).Maybe()
mgr.On("Orchestrator", mock.Anything, "ns1").Return(o, nil).Maybe()
as := &apiServer{
apiTimeout: 5 * time.Second,
maxFilterLimit: 100,
Expand Down Expand Up @@ -369,7 +373,7 @@ func TestContractAPISwaggerJSONBadNamespace(t *testing.T) {
s := httptest.NewServer(r)
defer s.Close()

mgr.On("Orchestrator", "BAD").Return(nil)
mgr.On("Orchestrator", mock.Anything, "BAD").Return(nil, i18n.NewError(context.Background(), coremsgs.MsgUnknownNamespace))

res, err := http.Get(fmt.Sprintf("http://%s/api/v1/namespaces/BAD/apis/my-api/api/swagger.json", s.Listener.Addr()))
assert.NoError(t, err)
Expand All @@ -395,7 +399,7 @@ func TestJSONBadNamespace(t *testing.T) {
s := httptest.NewServer(r)
defer s.Close()

mgr.On("Orchestrator", "BAD").Return(nil)
mgr.On("Orchestrator", mock.Anything, "BAD").Return(nil, i18n.NewError(context.Background(), coremsgs.MsgUnknownNamespace))

var b bytes.Buffer
req := httptest.NewRequest("GET", "/api/v1/namespaces/BAD/apis", &b)
Expand All @@ -413,7 +417,7 @@ func TestFormDataBadNamespace(t *testing.T) {
s := httptest.NewServer(r)
defer s.Close()

mgr.On("Orchestrator", "BAD").Return(nil)
mgr.On("Orchestrator", mock.Anything, "BAD").Return(nil, i18n.NewError(context.Background(), coremsgs.MsgUnknownNamespace))

var b bytes.Buffer
w := multipart.NewWriter(&b)
Expand Down Expand Up @@ -471,3 +475,8 @@ func TestFormDataDisabledRoute(t *testing.T) {

assert.Equal(t, 400, res.Result().StatusCode)
}

func TestGetOrchestratorMissingTag(t *testing.T) {
_, err := getOrchestrator(context.Background(), &namespacemocks.Manager{}, "", nil)
assert.Regexp(t, "FF10436", err)
}
11 changes: 7 additions & 4 deletions internal/apiserver/spi_routes.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2022 Kaleido, Inc.
// Copyright © 2023 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand All @@ -20,11 +20,14 @@ import "github.com/hyperledger/firefly-common/pkg/ffapi"

// The Service Provider Interface (SPI) allows external microservices (such as the FireFly Transaction Manager)
// to act as augmented components to the core.
var spiRoutes = []*ffapi.Route{
var spiRoutes = append(globalRoutes([]*ffapi.Route{
spiGetNamespaceByName,
spiGetNamespaces,
spiGetOpByID,
spiGetOps,
spiPatchOpByID,
spiPostReset,
}
}),
namespacedRoutes([]*ffapi.Route{
spiGetOps,
})...,
)
6 changes: 5 additions & 1 deletion internal/coremsgs/en_error_messages.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2022 Kaleido, Inc.
// Copyright © 2023 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -271,4 +271,8 @@ var (
MsgIdempotencyKeyDuplicateMessage = ffe("FF10430", "Idempotency key '%s' already used for message '%s'", 409)
MsgIdempotencyKeyDuplicateTransaction = ffe("FF10431", "Idempotency key '%s' already used for transaction '%s'", 409)
MsgNonIdempotencyKeyConflictTxInsert = ffe("FF10432", "Conflict on insert of transaction '%s'. No existing transaction matching idempotency key '%s' found", 409)
MsgErrorNameMustBeSet = ffe("FF10433", "The name of the error must be set", 400)
MsgContractErrorsResolveError = ffe("FF10434", "Unable to resolve contract errors: %s", 400)
MsgUnknownNamespace = ffe("FF10435", "Unknown namespace '%s'", 404)
MsgMissingNamespace = ffe("FF10436", "Missing namespace in request", 400)
)
20 changes: 8 additions & 12 deletions internal/events/webhooks/webhooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,18 +211,19 @@ func (wh *WebHooks) attemptRequest(sub *core.Subscription, event *core.EventDeli

if req.method == http.MethodPost || req.method == http.MethodPatch || req.method == http.MethodPut {
switch {
case !withData:
// We are just sending the event itself
req.r.SetBody(event)
case req.body != nil:
// We might have been told to extract a body from the first data record
req.r.SetBody(req.body)
case len(allData) > 1:
// We've got an array of data to POST
req.r.SetBody(allData)
default:
// Otherwise just send the first object directly
case len(allData) == 1:
// Just send the first object directly
req.r.SetBody(firstData)
default:
// Just send the event itself
req.r.SetBody(event)

}
}

Expand Down Expand Up @@ -292,7 +293,7 @@ func (wh *WebHooks) doDelivery(connID string, reply bool, sub *core.Subscription
log.L(wh.ctx).Tracef("Webhook response: %s", string(b))

// Emit the response
if reply {
if reply && event.Message != nil {
txType := fftypes.FFEnum(strings.ToLower(sub.Options.TransportOptions().GetString("replytx")))
if req != nil && req.replyTx != "" {
txType = fftypes.FFEnum(strings.ToLower(req.replyTx))
Expand Down Expand Up @@ -333,13 +334,8 @@ func (wh *WebHooks) doDelivery(connID string, reply bool, sub *core.Subscription
}

func (wh *WebHooks) DeliveryRequest(connID string, sub *core.Subscription, event *core.EventDelivery, data core.DataArray) error {
if event.Message == nil && sub.Options.WithData != nil && *sub.Options.WithData {
log.L(wh.ctx).Debugf("Webhook withData=true subscription called with non-message event '%s'", event.ID)
return nil
}

reply := sub.Options.TransportOptions().GetBool("reply")
if reply && event.Message.Header.CID != nil {
if reply && event.Message != nil && event.Message.Header.CID != nil {
// We cowardly refuse to dispatch a message that is itself a reply, as it's hard for users to
// avoid loops - and there's no way for us to detect here if a user has configured correctly
// to avoid a loop.
Expand Down
3 changes: 3 additions & 0 deletions internal/events/webhooks/webhooks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,8 @@ func TestWebhookFailFastAsk(t *testing.T) {
func TestDeliveryRequestNilMessage(t *testing.T) {
wh, cancel := newTestWebHooks(t)
defer cancel()
mcb := wh.callbacks["ns1"].(*eventsmocks.Callbacks)
mcb.On("DeliveryResponse", mock.Anything, mock.Anything).Return("", &core.EventDelivery{})

yes := true
sub := &core.Subscription{
Expand All @@ -729,6 +731,7 @@ func TestDeliveryRequestNilMessage(t *testing.T) {

err := wh.DeliveryRequest(mock.Anything, sub, event, nil)
assert.NoError(t, err)
mcb.AssertExpectations(t)
}

func TestDeliveryRequestReplyToReply(t *testing.T) {
Expand Down
41 changes: 28 additions & 13 deletions internal/namespace/manager.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2022 Kaleido, Inc.
// Copyright © 2023 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -77,7 +77,8 @@ type Manager interface {
WaitStop()
Reset(ctx context.Context)

Orchestrator(ns string) orchestrator.Orchestrator
Orchestrator(ctx context.Context, ns string) (orchestrator.Orchestrator, error)
MustOrchestrator(ns string) orchestrator.Orchestrator
SPIEvents() spievents.Manager
GetNamespaces(ctx context.Context) ([]*core.Namespace, error)
GetOperationByNamespacedID(ctx context.Context, nsOpID string) (*core.Operation, error)
Expand Down Expand Up @@ -1039,13 +1040,22 @@ func (nm *namespaceManager) SPIEvents() spievents.Manager {
return nm.adminEvents
}

func (nm *namespaceManager) Orchestrator(ns string) orchestrator.Orchestrator {
func (nm *namespaceManager) Orchestrator(ctx context.Context, ns string) (orchestrator.Orchestrator, error) {
nm.nsMux.Lock()
defer nm.nsMux.Unlock()
if namespace, ok := nm.namespaces[ns]; ok {
return namespace.orchestrator
if namespace, ok := nm.namespaces[ns]; ok && namespace != nil {
return namespace.orchestrator, nil
}
return nil
return nil, i18n.NewError(ctx, coremsgs.MsgUnknownNamespace, ns)
}

// MustOrchestrator must only be called by code that is absolutely sure the orchestrator exists
func (nm *namespaceManager) MustOrchestrator(ns string) orchestrator.Orchestrator {
or, err := nm.Orchestrator(context.Background(), ns)
if err != nil {
panic(err)
}
return or
}

func (nm *namespaceManager) GetNamespaces(ctx context.Context) ([]*core.Namespace, error) {
Expand All @@ -1063,9 +1073,9 @@ func (nm *namespaceManager) GetOperationByNamespacedID(ctx context.Context, nsOp
if err != nil {
return nil, err
}
or := nm.Orchestrator(ns)
if or == nil {
return nil, i18n.NewError(ctx, coremsgs.Msg404NotFound)
or, err := nm.Orchestrator(ctx, ns)
if err != nil {
return nil, err
}
return or.GetOperationByID(ctx, u.String())
}
Expand All @@ -1075,10 +1085,11 @@ func (nm *namespaceManager) ResolveOperationByNamespacedID(ctx context.Context,
if err != nil {
return err
}
or := nm.Orchestrator(ns)
if or == nil {
return i18n.NewError(ctx, coremsgs.Msg404NotFound)
or, err := nm.Orchestrator(ctx, ns)
if err != nil {
return err
}

return or.Operations().ResolveOperationByID(ctx, u, op)
}

Expand Down Expand Up @@ -1133,5 +1144,9 @@ func (nm *namespaceManager) getAuthPlugin(ctx context.Context) (plugins map[stri
}

func (nm *namespaceManager) Authorize(ctx context.Context, authReq *fftypes.AuthReq) error {
return nm.Orchestrator(authReq.Namespace).Authorize(ctx, authReq)
or, err := nm.Orchestrator(ctx, authReq.Namespace)
if err != nil {
return err
}
return or.Authorize(ctx, authReq)
}
Loading