diff --git a/Makefile b/Makefile index 6c867b9798..621fb69bea 100644 --- a/Makefile +++ b/Makefile @@ -24,46 +24,47 @@ ${MOCKERY}: ${LINT}: $(VGO) install github.com/golangci/golangci-lint/cmd/golangci-lint@latest + define makemock mocks: mocks-$(strip $(1))-$(strip $(2)) mocks-$(strip $(1))-$(strip $(2)): ${MOCKERY} ${MOCKERY} --case underscore --dir $(1) --name $(2) --outpkg $(3) --output mocks/$(strip $(3)) endef -$(eval $(call makemock, pkg/blockchain, Plugin, blockchainmocks)) -$(eval $(call makemock, pkg/blockchain, Callbacks, blockchainmocks)) -$(eval $(call makemock, pkg/database, Plugin, databasemocks)) -$(eval $(call makemock, pkg/database, Callbacks, databasemocks)) -$(eval $(call makemock, pkg/publicstorage, Plugin, publicstoragemocks)) -$(eval $(call makemock, pkg/publicstorage, Callbacks, publicstoragemocks)) -$(eval $(call makemock, pkg/events, Plugin, eventsmocks)) -$(eval $(call makemock, pkg/events, PluginAll, eventsmocks)) -$(eval $(call makemock, pkg/events, Callbacks, eventsmocks)) -$(eval $(call makemock, pkg/identity, Plugin, identitymocks)) -$(eval $(call makemock, pkg/identity, Callbacks, identitymocks)) -$(eval $(call makemock, pkg/dataexchange, Plugin, dataexchangemocks)) -$(eval $(call makemock, pkg/dataexchange, Callbacks, dataexchangemocks)) -$(eval $(call makemock, pkg/tokens, Plugin, tokenmocks)) -$(eval $(call makemock, pkg/tokens, Callbacks, tokenmocks)) -$(eval $(call makemock, pkg/wsclient, WSClient, wsmocks)) -$(eval $(call makemock, internal/identity, Manager, identitymanagermocks)) -$(eval $(call makemock, internal/batchpin, Submitter, batchpinmocks)) -$(eval $(call makemock, internal/sysmessaging, SystemEvents, sysmessagingmocks)) -$(eval $(call makemock, internal/sysmessaging, MessageSender, sysmessagingmocks)) -$(eval $(call makemock, internal/sysmessaging, LocalNodeInfo, sysmessagingmocks)) -$(eval $(call makemock, internal/syncasync, Bridge, syncasyncmocks)) -$(eval $(call makemock, internal/data, Manager, datamocks)) -$(eval $(call makemock, internal/batch, Manager, batchmocks)) -$(eval $(call makemock, internal/broadcast, Manager, broadcastmocks)) -$(eval $(call makemock, internal/privatemessaging, Manager, privatemessagingmocks)) -$(eval $(call makemock, internal/syshandlers, SystemHandlers, syshandlersmocks)) -$(eval $(call makemock, internal/events, EventManager, eventmocks)) -$(eval $(call makemock, internal/networkmap, Manager, networkmapmocks)) -$(eval $(call makemock, internal/assets, Manager, assetmocks)) -$(eval $(call makemock, internal/orchestrator, Orchestrator, orchestratormocks)) -$(eval $(call makemock, internal/apiserver, Server, apiservermocks)) -$(eval $(call makemock, internal/apiserver, IServer, apiservermocks)) -$(eval $(call makemock, internal/txcommon, Helper, txcommonmocks)) +$(eval $(call makemock, pkg/blockchain, Plugin, blockchainmocks)) +$(eval $(call makemock, pkg/blockchain, Callbacks, blockchainmocks)) +$(eval $(call makemock, pkg/database, Plugin, databasemocks)) +$(eval $(call makemock, pkg/database, Callbacks, databasemocks)) +$(eval $(call makemock, pkg/publicstorage, Plugin, publicstoragemocks)) +$(eval $(call makemock, pkg/publicstorage, Callbacks, publicstoragemocks)) +$(eval $(call makemock, pkg/events, Plugin, eventsmocks)) +$(eval $(call makemock, pkg/events, PluginAll, eventsmocks)) +$(eval $(call makemock, pkg/events, Callbacks, eventsmocks)) +$(eval $(call makemock, pkg/identity, Plugin, identitymocks)) +$(eval $(call makemock, pkg/identity, Callbacks, identitymocks)) +$(eval $(call makemock, pkg/dataexchange, Plugin, dataexchangemocks)) +$(eval $(call makemock, pkg/dataexchange, Callbacks, dataexchangemocks)) +$(eval $(call makemock, pkg/tokens, Plugin, tokenmocks)) +$(eval $(call makemock, pkg/tokens, Callbacks, tokenmocks)) +$(eval $(call makemock, pkg/wsclient, WSClient, wsmocks)) +$(eval $(call makemock, internal/identity, Manager, identitymanagermocks)) +$(eval $(call makemock, internal/batchpin, Submitter, batchpinmocks)) +$(eval $(call makemock, internal/sysmessaging, SystemEvents, sysmessagingmocks)) +$(eval $(call makemock, internal/sysmessaging, MessageSender, sysmessagingmocks)) +$(eval $(call makemock, internal/sysmessaging, LocalNodeInfo, sysmessagingmocks)) +$(eval $(call makemock, internal/syncasync, Bridge, syncasyncmocks)) +$(eval $(call makemock, internal/data, Manager, datamocks)) +$(eval $(call makemock, internal/batch, Manager, batchmocks)) +$(eval $(call makemock, internal/broadcast, Manager, broadcastmocks)) +$(eval $(call makemock, internal/privatemessaging, Manager, privatemessagingmocks)) +$(eval $(call makemock, internal/definitions, DefinitionHandlers, definitionsmocks)) +$(eval $(call makemock, internal/events, EventManager, eventmocks)) +$(eval $(call makemock, internal/networkmap, Manager, networkmapmocks)) +$(eval $(call makemock, internal/assets, Manager, assetmocks)) +$(eval $(call makemock, internal/orchestrator, Orchestrator, orchestratormocks)) +$(eval $(call makemock, internal/apiserver, Server, apiservermocks)) +$(eval $(call makemock, internal/apiserver, IServer, apiservermocks)) +$(eval $(call makemock, internal/txcommon, Helper, txcommonmocks)) firefly-nocgo: ${GOFILES} CGO_ENABLED=0 $(VGO) build -o ${BINARY_NAME}-nocgo -ldflags "-X main.buildDate=`date -u +\"%Y-%m-%dT%H:%M:%SZ\"` -X main.buildVersion=$(BUILD_VERSION)" -tags=prod -tags=prod -v diff --git a/internal/broadcast/datatype.go b/internal/broadcast/datatype.go index 52ace611ae..7246423b0f 100644 --- a/internal/broadcast/datatype.go +++ b/internal/broadcast/datatype.go @@ -43,7 +43,7 @@ func (bm *broadcastManager) BroadcastDatatype(ctx context.Context, ns string, da if err := bm.data.CheckDatatype(ctx, ns, datatype); err != nil { return nil, err } - msg, err := bm.BroadcastDefinitionAsNode(ctx, datatype, fftypes.SystemTagDefineDatatype, waitConfirm) + msg, err := bm.BroadcastDefinitionAsNode(ctx, ns, datatype, fftypes.SystemTagDefineDatatype, waitConfirm) if msg != nil { datatype.Message = msg.Header.ID } diff --git a/internal/broadcast/definition.go b/internal/broadcast/definition.go index 23aa669631..e478efe5eb 100644 --- a/internal/broadcast/definition.go +++ b/internal/broadcast/definition.go @@ -25,34 +25,34 @@ import ( "github.com/hyperledger/firefly/pkg/fftypes" ) -func (bm *broadcastManager) BroadcastDefinitionAsNode(ctx context.Context, def fftypes.Definition, tag fftypes.SystemTag, waitConfirm bool) (msg *fftypes.Message, err error) { - return bm.BroadcastDefinition(ctx, def, &fftypes.Identity{ /* resolve to node default */ }, tag, waitConfirm) +func (bm *broadcastManager) BroadcastDefinitionAsNode(ctx context.Context, ns string, def fftypes.Definition, tag fftypes.SystemTag, waitConfirm bool) (msg *fftypes.Message, err error) { + return bm.BroadcastDefinition(ctx, ns, def, &fftypes.Identity{ /* resolve to node default */ }, tag, waitConfirm) } -func (bm *broadcastManager) BroadcastDefinition(ctx context.Context, def fftypes.Definition, signingIdentity *fftypes.Identity, tag fftypes.SystemTag, waitConfirm bool) (msg *fftypes.Message, err error) { +func (bm *broadcastManager) BroadcastDefinition(ctx context.Context, ns string, def fftypes.Definition, signingIdentity *fftypes.Identity, tag fftypes.SystemTag, waitConfirm bool) (msg *fftypes.Message, err error) { err = bm.identity.ResolveInputIdentity(ctx, signingIdentity) if err != nil { return nil, err } - return bm.broadcastDefinitionCommon(ctx, def, signingIdentity, tag, waitConfirm) + return bm.broadcastDefinitionCommon(ctx, ns, def, signingIdentity, tag, waitConfirm) } func (bm *broadcastManager) BroadcastRootOrgDefinition(ctx context.Context, def *fftypes.Organization, signingIdentity *fftypes.Identity, tag fftypes.SystemTag, waitConfirm bool) (msg *fftypes.Message, err error) { signingIdentity.Author = bm.identity.OrgDID(def) - return bm.broadcastDefinitionCommon(ctx, def, signingIdentity, tag, waitConfirm) + return bm.broadcastDefinitionCommon(ctx, fftypes.SystemNamespace, def, signingIdentity, tag, waitConfirm) } -func (bm *broadcastManager) broadcastDefinitionCommon(ctx context.Context, def fftypes.Definition, signingIdentity *fftypes.Identity, tag fftypes.SystemTag, waitConfirm bool) (msg *fftypes.Message, err error) { +func (bm *broadcastManager) broadcastDefinitionCommon(ctx context.Context, ns string, def fftypes.Definition, signingIdentity *fftypes.Identity, tag fftypes.SystemTag, waitConfirm bool) (msg *fftypes.Message, err error) { // Serialize it into a data object, as a piece of data we can write to a message data := &fftypes.Data{ Validator: fftypes.ValidatorTypeSystemDefinition, ID: fftypes.NewUUID(), - Namespace: fftypes.SystemNamespace, + Namespace: ns, Created: fftypes.Now(), } data.Value, err = json.Marshal(&def) @@ -72,7 +72,7 @@ func (bm *broadcastManager) broadcastDefinitionCommon(ctx context.Context, def f in := &fftypes.MessageInOut{ Message: fftypes.Message{ Header: fftypes.MessageHeader{ - Namespace: fftypes.SystemNamespace, + Namespace: ns, Type: fftypes.MessageTypeDefinition, Identity: *signingIdentity, Topics: fftypes.FFNameArray{def.Topic()}, @@ -88,7 +88,7 @@ func (bm *broadcastManager) broadcastDefinitionCommon(ctx context.Context, def f // Broadcast the message sender := broadcastSender{ mgr: bm, - namespace: fftypes.SystemNamespace, + namespace: ns, msg: in, resolved: true, } diff --git a/internal/broadcast/definition_test.go b/internal/broadcast/definition_test.go index c88e752175..057ef9e211 100644 --- a/internal/broadcast/definition_test.go +++ b/internal/broadcast/definition_test.go @@ -41,7 +41,28 @@ func TestBroadcastDefinitionAsNodeConfirm(t *testing.T) { mim.On("ResolveInputIdentity", mock.Anything, mock.Anything).Return(nil) msa.On("WaitForMessage", bm.ctx, "ff_system", mock.Anything, mock.Anything).Return(nil, fmt.Errorf("pop")) - _, err := bm.BroadcastDefinitionAsNode(bm.ctx, &fftypes.Namespace{}, fftypes.SystemTagDefineNamespace, true) + _, err := bm.BroadcastDefinitionAsNode(bm.ctx, fftypes.SystemNamespace, &fftypes.Namespace{}, fftypes.SystemTagDefineNamespace, true) + assert.EqualError(t, err, "pop") + + mdi.AssertExpectations(t) + msa.AssertExpectations(t) + mim.AssertExpectations(t) +} + +func TestBroadcastDatatypeDefinitionAsNodeConfirm(t *testing.T) { + bm, cancel := newTestBroadcast(t) + defer cancel() + + mdi := bm.database.(*databasemocks.Plugin) + msa := bm.syncasync.(*syncasyncmocks.Bridge) + mim := bm.identity.(*identitymanagermocks.Manager) + ns := "customNamespace" + + mdi.On("UpsertData", mock.Anything, mock.Anything, database.UpsertOptimizationNew).Return(nil) + mim.On("ResolveInputIdentity", mock.Anything, mock.Anything).Return(nil) + msa.On("WaitForMessage", bm.ctx, ns, mock.Anything, mock.Anything).Return(nil, fmt.Errorf("pop")) + + _, err := bm.BroadcastDefinitionAsNode(bm.ctx, ns, &fftypes.Datatype{}, fftypes.SystemTagDefineNamespace, true) assert.EqualError(t, err, "pop") mdi.AssertExpectations(t) @@ -57,7 +78,7 @@ func TestBroadcastDefinitionAsNodeUpsertFail(t *testing.T) { mdi.On("UpsertData", mock.Anything, mock.Anything, database.UpsertOptimizationNew).Return(fmt.Errorf("pop")) mim := bm.identity.(*identitymanagermocks.Manager) mim.On("ResolveInputIdentity", mock.Anything, mock.Anything).Return(nil) - _, err := bm.BroadcastDefinitionAsNode(bm.ctx, &fftypes.Namespace{}, fftypes.SystemTagDefineNamespace, false) + _, err := bm.BroadcastDefinitionAsNode(bm.ctx, fftypes.SystemNamespace, &fftypes.Namespace{}, fftypes.SystemTagDefineNamespace, false) assert.Regexp(t, "pop", err) } @@ -67,7 +88,7 @@ func TestBroadcastDefinitionBadIdentity(t *testing.T) { mim := bm.identity.(*identitymanagermocks.Manager) mim.On("ResolveInputIdentity", mock.Anything, mock.Anything).Return(fmt.Errorf("pop")) - _, err := bm.BroadcastDefinition(bm.ctx, &fftypes.Namespace{}, &fftypes.Identity{ + _, err := bm.BroadcastDefinition(bm.ctx, fftypes.SystemNamespace, &fftypes.Namespace{}, &fftypes.Identity{ Author: "wrong", Key: "wrong", }, fftypes.SystemTagDefineNamespace, false) diff --git a/internal/broadcast/manager.go b/internal/broadcast/manager.go index 9876e488ec..4430f21c82 100644 --- a/internal/broadcast/manager.go +++ b/internal/broadcast/manager.go @@ -42,8 +42,8 @@ type Manager interface { BroadcastDatatype(ctx context.Context, ns string, datatype *fftypes.Datatype, waitConfirm bool) (msg *fftypes.Message, err error) BroadcastNamespace(ctx context.Context, ns *fftypes.Namespace, waitConfirm bool) (msg *fftypes.Message, err error) BroadcastMessage(ctx context.Context, ns string, in *fftypes.MessageInOut, waitConfirm bool) (out *fftypes.Message, err error) - BroadcastDefinitionAsNode(ctx context.Context, def fftypes.Definition, tag fftypes.SystemTag, waitConfirm bool) (msg *fftypes.Message, err error) - BroadcastDefinition(ctx context.Context, def fftypes.Definition, signingIdentity *fftypes.Identity, tag fftypes.SystemTag, waitConfirm bool) (msg *fftypes.Message, err error) + BroadcastDefinitionAsNode(ctx context.Context, ns string, def fftypes.Definition, tag fftypes.SystemTag, waitConfirm bool) (msg *fftypes.Message, err error) + BroadcastDefinition(ctx context.Context, ns string, def fftypes.Definition, signingIdentity *fftypes.Identity, tag fftypes.SystemTag, waitConfirm bool) (msg *fftypes.Message, err error) BroadcastRootOrgDefinition(ctx context.Context, def *fftypes.Organization, signingIdentity *fftypes.Identity, tag fftypes.SystemTag, waitConfirm bool) (msg *fftypes.Message, err error) BroadcastTokenPool(ctx context.Context, ns string, pool *fftypes.TokenPoolAnnouncement, waitConfirm bool) (msg *fftypes.Message, err error) Start() error diff --git a/internal/broadcast/namespace.go b/internal/broadcast/namespace.go index a0bcd4ac3d..6f1a8a3e65 100644 --- a/internal/broadcast/namespace.go +++ b/internal/broadcast/namespace.go @@ -31,7 +31,7 @@ func (bm *broadcastManager) BroadcastNamespace(ctx context.Context, ns *fftypes. if err := ns.Validate(ctx, false); err != nil { return nil, err } - msg, err := bm.BroadcastDefinitionAsNode(ctx, ns, fftypes.SystemTagDefineNamespace, waitConfirm) + msg, err := bm.BroadcastDefinitionAsNode(ctx, fftypes.SystemNamespace, ns, fftypes.SystemTagDefineNamespace, waitConfirm) if msg != nil { ns.Message = msg.Header.ID } diff --git a/internal/broadcast/tokenpool.go b/internal/broadcast/tokenpool.go index 355bd4bb54..51cc8dc75c 100644 --- a/internal/broadcast/tokenpool.go +++ b/internal/broadcast/tokenpool.go @@ -30,7 +30,7 @@ func (bm *broadcastManager) BroadcastTokenPool(ctx context.Context, ns string, p return nil, err } - msg, err = bm.BroadcastDefinitionAsNode(ctx, pool, fftypes.SystemTagDefinePool, waitConfirm) + msg, err = bm.BroadcastDefinitionAsNode(ctx, ns, pool, fftypes.SystemTagDefinePool, waitConfirm) if msg != nil { pool.Pool.Message = msg.Header.ID } diff --git a/internal/syshandlers/syshandler.go b/internal/definitions/definition_handler.go similarity index 61% rename from internal/syshandlers/syshandler.go rename to internal/definitions/definition_handler.go index 0061e56487..eaaac89505 100644 --- a/internal/syshandlers/syshandler.go +++ b/internal/definitions/definition_handler.go @@ -14,7 +14,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package syshandlers +package definitions import ( "context" @@ -31,8 +31,8 @@ import ( "github.com/hyperledger/firefly/pkg/fftypes" ) -// SystemHandlers interface allows components to call broadcast/private messaging functions internally (without import cycles) -type SystemHandlers interface { +// DefinitionHandlers interface allows components to call broadcast/private messaging functions internally (without import cycles) +type DefinitionHandlers interface { privatemessaging.GroupManager HandleSystemBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (SystemBroadcastAction, error) @@ -48,7 +48,7 @@ const ( ActionWait ) -type systemHandlers struct { +type definitionHandlers struct { database database.Plugin exchange dataexchange.Plugin data data.Manager @@ -58,8 +58,8 @@ type systemHandlers struct { txhelper txcommon.Helper } -func NewSystemHandlers(di database.Plugin, dx dataexchange.Plugin, dm data.Manager, bm broadcast.Manager, pm privatemessaging.Manager, am assets.Manager) SystemHandlers { - return &systemHandlers{ +func NewDefinitionHandlers(di database.Plugin, dx dataexchange.Plugin, dm data.Manager, bm broadcast.Manager, pm privatemessaging.Manager, am assets.Manager) DefinitionHandlers { + return &definitionHandlers{ database: di, exchange: dx, data: dm, @@ -70,40 +70,40 @@ func NewSystemHandlers(di database.Plugin, dx dataexchange.Plugin, dm data.Manag } } -func (sh *systemHandlers) GetGroupByID(ctx context.Context, id string) (*fftypes.Group, error) { - return sh.messaging.GetGroupByID(ctx, id) +func (dh *definitionHandlers) GetGroupByID(ctx context.Context, id string) (*fftypes.Group, error) { + return dh.messaging.GetGroupByID(ctx, id) } -func (sh *systemHandlers) GetGroups(ctx context.Context, filter database.AndFilter) ([]*fftypes.Group, *database.FilterResult, error) { - return sh.messaging.GetGroups(ctx, filter) +func (dh *definitionHandlers) GetGroups(ctx context.Context, filter database.AndFilter) ([]*fftypes.Group, *database.FilterResult, error) { + return dh.messaging.GetGroups(ctx, filter) } -func (sh *systemHandlers) ResolveInitGroup(ctx context.Context, msg *fftypes.Message) (*fftypes.Group, error) { - return sh.messaging.ResolveInitGroup(ctx, msg) +func (dh *definitionHandlers) ResolveInitGroup(ctx context.Context, msg *fftypes.Message) (*fftypes.Group, error) { + return dh.messaging.ResolveInitGroup(ctx, msg) } -func (sh *systemHandlers) EnsureLocalGroup(ctx context.Context, group *fftypes.Group) (ok bool, err error) { - return sh.messaging.EnsureLocalGroup(ctx, group) +func (dh *definitionHandlers) EnsureLocalGroup(ctx context.Context, group *fftypes.Group) (ok bool, err error) { + return dh.messaging.EnsureLocalGroup(ctx, group) } -func (sh *systemHandlers) HandleSystemBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (SystemBroadcastAction, error) { +func (dh *definitionHandlers) HandleSystemBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (SystemBroadcastAction, error) { l := log.L(ctx) l.Infof("Confirming system broadcast '%s' [%s]", msg.Header.Tag, msg.Header.ID) var valid bool var err error switch fftypes.SystemTag(msg.Header.Tag) { case fftypes.SystemTagDefineDatatype: - valid, err = sh.handleDatatypeBroadcast(ctx, msg, data) + valid, err = dh.handleDatatypeBroadcast(ctx, msg, data) case fftypes.SystemTagDefineNamespace: - valid, err = sh.handleNamespaceBroadcast(ctx, msg, data) + valid, err = dh.handleNamespaceBroadcast(ctx, msg, data) case fftypes.SystemTagDefineOrganization: - valid, err = sh.handleOrganizationBroadcast(ctx, msg, data) + valid, err = dh.handleOrganizationBroadcast(ctx, msg, data) case fftypes.SystemTagDefineNode: - valid, err = sh.handleNodeBroadcast(ctx, msg, data) + valid, err = dh.handleNodeBroadcast(ctx, msg, data) case fftypes.SystemTagDefinePool: - return sh.handleTokenPoolBroadcast(ctx, msg, data) + return dh.handleTokenPoolBroadcast(ctx, msg, data) default: - l.Warnf("Unknown topic '%s' for system broadcast ID '%s'", msg.Header.Tag, msg.Header.ID) + l.Warnf("Unknown SystemTag '%s' for definition ID '%s'", msg.Header.Tag, msg.Header.ID) return ActionReject, nil } switch { @@ -116,7 +116,7 @@ func (sh *systemHandlers) HandleSystemBroadcast(ctx context.Context, msg *fftype } } -func (sh *systemHandlers) getSystemBroadcastPayload(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data, res fftypes.Definition) (valid bool) { +func (dh *definitionHandlers) getSystemBroadcastPayload(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data, res fftypes.Definition) (valid bool) { l := log.L(ctx) if len(data) != 1 { l.Warnf("Unable to process system broadcast %s - expecting 1 attachement, found %d", msg.Header.ID, len(data)) diff --git a/internal/syshandlers/syshandler_datatype.go b/internal/definitions/definition_handler_datatype.go similarity index 76% rename from internal/syshandlers/syshandler_datatype.go rename to internal/definitions/definition_handler_datatype.go index 89ca1c73d6..10e355b16c 100644 --- a/internal/syshandlers/syshandler_datatype.go +++ b/internal/definitions/definition_handler_datatype.go @@ -14,7 +14,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package syshandlers +package definitions import ( "context" @@ -23,11 +23,11 @@ import ( "github.com/hyperledger/firefly/pkg/fftypes" ) -func (sh *systemHandlers) handleDatatypeBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (valid bool, err error) { +func (dh *definitionHandlers) handleDatatypeBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (valid bool, err error) { l := log.L(ctx) var dt fftypes.Datatype - valid = sh.getSystemBroadcastPayload(ctx, msg, data, &dt) + valid = dh.getSystemBroadcastPayload(ctx, msg, data, &dt) if !valid { return false, nil } @@ -37,12 +37,12 @@ func (sh *systemHandlers) handleDatatypeBroadcast(ctx context.Context, msg *ffty return false, nil } - if err = sh.data.CheckDatatype(ctx, dt.Namespace, &dt); err != nil { + if err = dh.data.CheckDatatype(ctx, dt.Namespace, &dt); err != nil { l.Warnf("Unable to process datatype broadcast %s - schema check: %s", msg.Header.ID, err) return false, nil } - existing, err := sh.database.GetDatatypeByName(ctx, dt.Namespace, dt.Name, dt.Version) + existing, err := dh.database.GetDatatypeByName(ctx, dt.Namespace, dt.Name, dt.Version) if err != nil { return false, err // We only return database errors } @@ -51,12 +51,12 @@ func (sh *systemHandlers) handleDatatypeBroadcast(ctx context.Context, msg *ffty return false, nil } - if err = sh.database.UpsertDatatype(ctx, &dt, false); err != nil { + if err = dh.database.UpsertDatatype(ctx, &dt, false); err != nil { return false, err } event := fftypes.NewEvent(fftypes.EventTypeDatatypeConfirmed, dt.Namespace, dt.ID) - if err = sh.database.InsertEvent(ctx, event); err != nil { + if err = dh.database.InsertEvent(ctx, event); err != nil { return false, err } diff --git a/internal/syshandlers/syshandler_datatype_test.go b/internal/definitions/definition_handler_datatype_test.go similarity index 79% rename from internal/syshandlers/syshandler_datatype_test.go rename to internal/definitions/definition_handler_datatype_test.go index 00e2e97270..2a53bf737f 100644 --- a/internal/syshandlers/syshandler_datatype_test.go +++ b/internal/definitions/definition_handler_datatype_test.go @@ -14,7 +14,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package syshandlers +package definitions import ( "context" @@ -29,8 +29,8 @@ import ( "github.com/stretchr/testify/mock" ) -func TestHandleSystemBroadcastDatatypeOk(t *testing.T) { - sh := newTestSystemHandlers(t) +func TestHandleDefinitionBroadcastDatatypeOk(t *testing.T) { + dh := newTestDefinitionHandlers(t) dt := &fftypes.Datatype{ ID: fftypes.NewUUID(), @@ -47,13 +47,13 @@ func TestHandleSystemBroadcastDatatypeOk(t *testing.T) { Value: fftypes.Byteable(b), } - mdm := sh.data.(*datamocks.Manager) + mdm := dh.data.(*datamocks.Manager) mdm.On("CheckDatatype", mock.Anything, "ns1", mock.Anything).Return(nil) - mbi := sh.database.(*databasemocks.Plugin) + mbi := dh.database.(*databasemocks.Plugin) mbi.On("GetDatatypeByName", mock.Anything, "ns1", "name1", "ver1").Return(nil, nil) mbi.On("UpsertDatatype", mock.Anything, mock.Anything, false).Return(nil) mbi.On("InsertEvent", mock.Anything, mock.Anything).Return(nil) - action, err := sh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Tag: string(fftypes.SystemTagDefineDatatype), }, @@ -65,8 +65,8 @@ func TestHandleSystemBroadcastDatatypeOk(t *testing.T) { mbi.AssertExpectations(t) } -func TestHandleSystemBroadcastDatatypeEventFail(t *testing.T) { - sh := newTestSystemHandlers(t) +func TestHandleDefinitionBroadcastDatatypeEventFail(t *testing.T) { + dh := newTestDefinitionHandlers(t) dt := &fftypes.Datatype{ ID: fftypes.NewUUID(), @@ -83,13 +83,13 @@ func TestHandleSystemBroadcastDatatypeEventFail(t *testing.T) { Value: fftypes.Byteable(b), } - mdm := sh.data.(*datamocks.Manager) + mdm := dh.data.(*datamocks.Manager) mdm.On("CheckDatatype", mock.Anything, "ns1", mock.Anything).Return(nil) - mbi := sh.database.(*databasemocks.Plugin) + mbi := dh.database.(*databasemocks.Plugin) mbi.On("GetDatatypeByName", mock.Anything, "ns1", "name1", "ver1").Return(nil, nil) mbi.On("UpsertDatatype", mock.Anything, mock.Anything, false).Return(nil) mbi.On("InsertEvent", mock.Anything, mock.Anything).Return(fmt.Errorf("pop")) - action, err := sh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Tag: string(fftypes.SystemTagDefineDatatype), }, @@ -101,8 +101,8 @@ func TestHandleSystemBroadcastDatatypeEventFail(t *testing.T) { mbi.AssertExpectations(t) } -func TestHandleSystemBroadcastDatatypeMissingID(t *testing.T) { - sh := newTestSystemHandlers(t) +func TestHandleDefinitionBroadcastDatatypeMissingID(t *testing.T) { + dh := newTestDefinitionHandlers(t) dt := &fftypes.Datatype{ Validator: fftypes.ValidatorTypeJSON, @@ -118,7 +118,7 @@ func TestHandleSystemBroadcastDatatypeMissingID(t *testing.T) { Value: fftypes.Byteable(b), } - action, err := sh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Tag: string(fftypes.SystemTagDefineDatatype), }, @@ -127,8 +127,8 @@ func TestHandleSystemBroadcastDatatypeMissingID(t *testing.T) { assert.NoError(t, err) } -func TestHandleSystemBroadcastBadSchema(t *testing.T) { - sh := newTestSystemHandlers(t) +func TestHandleDefinitionBroadcastBadSchema(t *testing.T) { + dh := newTestDefinitionHandlers(t) dt := &fftypes.Datatype{ ID: fftypes.NewUUID(), @@ -145,9 +145,9 @@ func TestHandleSystemBroadcastBadSchema(t *testing.T) { Value: fftypes.Byteable(b), } - mdm := sh.data.(*datamocks.Manager) + mdm := dh.data.(*datamocks.Manager) mdm.On("CheckDatatype", mock.Anything, "ns1", mock.Anything).Return(fmt.Errorf("pop")) - action, err := sh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Tag: string(fftypes.SystemTagDefineDatatype), }, @@ -158,8 +158,8 @@ func TestHandleSystemBroadcastBadSchema(t *testing.T) { mdm.AssertExpectations(t) } -func TestHandleSystemBroadcastMissingData(t *testing.T) { - sh := newTestSystemHandlers(t) +func TestHandleDefinitionBroadcastMissingData(t *testing.T) { + dh := newTestDefinitionHandlers(t) dt := &fftypes.Datatype{ ID: fftypes.NewUUID(), @@ -171,7 +171,7 @@ func TestHandleSystemBroadcastMissingData(t *testing.T) { } dt.Hash = dt.Value.Hash() - action, err := sh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Tag: string(fftypes.SystemTagDefineDatatype), }, @@ -180,8 +180,8 @@ func TestHandleSystemBroadcastMissingData(t *testing.T) { assert.NoError(t, err) } -func TestHandleSystemBroadcastDatatypeLookupFail(t *testing.T) { - sh := newTestSystemHandlers(t) +func TestHandleDefinitionBroadcastDatatypeLookupFail(t *testing.T) { + dh := newTestDefinitionHandlers(t) dt := &fftypes.Datatype{ ID: fftypes.NewUUID(), @@ -198,11 +198,11 @@ func TestHandleSystemBroadcastDatatypeLookupFail(t *testing.T) { Value: fftypes.Byteable(b), } - mdm := sh.data.(*datamocks.Manager) + mdm := dh.data.(*datamocks.Manager) mdm.On("CheckDatatype", mock.Anything, "ns1", mock.Anything).Return(nil) - mbi := sh.database.(*databasemocks.Plugin) + mbi := dh.database.(*databasemocks.Plugin) mbi.On("GetDatatypeByName", mock.Anything, "ns1", "name1", "ver1").Return(nil, fmt.Errorf("pop")) - action, err := sh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Namespace: fftypes.SystemNamespace, Tag: string(fftypes.SystemTagDefineDatatype), @@ -215,8 +215,8 @@ func TestHandleSystemBroadcastDatatypeLookupFail(t *testing.T) { mbi.AssertExpectations(t) } -func TestHandleSystemBroadcastUpsertFail(t *testing.T) { - sh := newTestSystemHandlers(t) +func TestHandleDefinitionBroadcastUpsertFail(t *testing.T) { + dh := newTestDefinitionHandlers(t) dt := &fftypes.Datatype{ ID: fftypes.NewUUID(), @@ -233,12 +233,12 @@ func TestHandleSystemBroadcastUpsertFail(t *testing.T) { Value: fftypes.Byteable(b), } - mdm := sh.data.(*datamocks.Manager) + mdm := dh.data.(*datamocks.Manager) mdm.On("CheckDatatype", mock.Anything, "ns1", mock.Anything).Return(nil) - mbi := sh.database.(*databasemocks.Plugin) + mbi := dh.database.(*databasemocks.Plugin) mbi.On("GetDatatypeByName", mock.Anything, "ns1", "name1", "ver1").Return(nil, nil) mbi.On("UpsertDatatype", mock.Anything, mock.Anything, false).Return(fmt.Errorf("pop")) - action, err := sh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Tag: string(fftypes.SystemTagDefineDatatype), }, @@ -250,8 +250,8 @@ func TestHandleSystemBroadcastUpsertFail(t *testing.T) { mbi.AssertExpectations(t) } -func TestHandleSystemBroadcastDatatypeDuplicate(t *testing.T) { - sh := newTestSystemHandlers(t) +func TestHandleDefinitionBroadcastDatatypeDuplicate(t *testing.T) { + dh := newTestDefinitionHandlers(t) dt := &fftypes.Datatype{ ID: fftypes.NewUUID(), @@ -268,11 +268,11 @@ func TestHandleSystemBroadcastDatatypeDuplicate(t *testing.T) { Value: fftypes.Byteable(b), } - mdm := sh.data.(*datamocks.Manager) + mdm := dh.data.(*datamocks.Manager) mdm.On("CheckDatatype", mock.Anything, "ns1", mock.Anything).Return(nil) - mbi := sh.database.(*databasemocks.Plugin) + mbi := dh.database.(*databasemocks.Plugin) mbi.On("GetDatatypeByName", mock.Anything, "ns1", "name1", "ver1").Return(dt, nil) - action, err := sh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Tag: string(fftypes.SystemTagDefineDatatype), }, diff --git a/internal/syshandlers/syshandler_namespace.go b/internal/definitions/definition_handler_namespace.go similarity index 76% rename from internal/syshandlers/syshandler_namespace.go rename to internal/definitions/definition_handler_namespace.go index 186ea15d53..bb04c74c78 100644 --- a/internal/syshandlers/syshandler_namespace.go +++ b/internal/definitions/definition_handler_namespace.go @@ -14,7 +14,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package syshandlers +package definitions import ( "context" @@ -23,11 +23,11 @@ import ( "github.com/hyperledger/firefly/pkg/fftypes" ) -func (sh *systemHandlers) handleNamespaceBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (valid bool, err error) { +func (dh *definitionHandlers) handleNamespaceBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (valid bool, err error) { l := log.L(ctx) var ns fftypes.Namespace - valid = sh.getSystemBroadcastPayload(ctx, msg, data, &ns) + valid = dh.getSystemBroadcastPayload(ctx, msg, data, &ns) if !valid { return false, nil } @@ -36,7 +36,7 @@ func (sh *systemHandlers) handleNamespaceBroadcast(ctx context.Context, msg *fft return false, nil } - existing, err := sh.database.GetNamespace(ctx, ns.Name) + existing, err := dh.database.GetNamespace(ctx, ns.Name) if err != nil { return false, err // We only return database errors } @@ -46,17 +46,17 @@ func (sh *systemHandlers) handleNamespaceBroadcast(ctx context.Context, msg *fft return false, nil } // Remove the local definition - if err = sh.database.DeleteNamespace(ctx, existing.ID); err != nil { + if err = dh.database.DeleteNamespace(ctx, existing.ID); err != nil { return false, err } } - if err = sh.database.UpsertNamespace(ctx, &ns, false); err != nil { + if err = dh.database.UpsertNamespace(ctx, &ns, false); err != nil { return false, err } event := fftypes.NewEvent(fftypes.EventTypeNamespaceConfirmed, ns.Name, ns.ID) - if err = sh.database.InsertEvent(ctx, event); err != nil { + if err = dh.database.InsertEvent(ctx, event); err != nil { return false, err } diff --git a/internal/syshandlers/syshandler_namespace_test.go b/internal/definitions/definition_handler_namespace_test.go similarity index 74% rename from internal/syshandlers/syshandler_namespace_test.go rename to internal/definitions/definition_handler_namespace_test.go index 0f0f757548..87c306b206 100644 --- a/internal/syshandlers/syshandler_namespace_test.go +++ b/internal/definitions/definition_handler_namespace_test.go @@ -14,7 +14,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package syshandlers +package definitions import ( "context" @@ -28,8 +28,8 @@ import ( "github.com/stretchr/testify/mock" ) -func TestHandleSystemBroadcastNSOk(t *testing.T) { - sh := newTestSystemHandlers(t) +func TestHandleDefinitionBroadcastNSOk(t *testing.T) { + dh := newTestDefinitionHandlers(t) ns := &fftypes.Namespace{ ID: fftypes.NewUUID(), @@ -41,11 +41,11 @@ func TestHandleSystemBroadcastNSOk(t *testing.T) { Value: fftypes.Byteable(b), } - mdi := sh.database.(*databasemocks.Plugin) + mdi := dh.database.(*databasemocks.Plugin) mdi.On("GetNamespace", mock.Anything, "ns1").Return(nil, nil) mdi.On("UpsertNamespace", mock.Anything, mock.Anything, false).Return(nil) mdi.On("InsertEvent", mock.Anything, mock.Anything).Return(nil) - action, err := sh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Tag: string(fftypes.SystemTagDefineNamespace), }, @@ -56,8 +56,8 @@ func TestHandleSystemBroadcastNSOk(t *testing.T) { mdi.AssertExpectations(t) } -func TestHandleSystemBroadcastNSEventFail(t *testing.T) { - sh := newTestSystemHandlers(t) +func TestHandleDefinitionBroadcastNSEventFail(t *testing.T) { + dh := newTestDefinitionHandlers(t) ns := &fftypes.Namespace{ ID: fftypes.NewUUID(), @@ -69,11 +69,11 @@ func TestHandleSystemBroadcastNSEventFail(t *testing.T) { Value: fftypes.Byteable(b), } - mdi := sh.database.(*databasemocks.Plugin) + mdi := dh.database.(*databasemocks.Plugin) mdi.On("GetNamespace", mock.Anything, "ns1").Return(nil, nil) mdi.On("UpsertNamespace", mock.Anything, mock.Anything, false).Return(nil) mdi.On("InsertEvent", mock.Anything, mock.Anything).Return(fmt.Errorf("pop")) - action, err := sh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Tag: string(fftypes.SystemTagDefineNamespace), }, @@ -84,8 +84,8 @@ func TestHandleSystemBroadcastNSEventFail(t *testing.T) { mdi.AssertExpectations(t) } -func TestHandleSystemBroadcastNSUpsertFail(t *testing.T) { - sh := newTestSystemHandlers(t) +func TestHandleDefinitionBroadcastNSUpsertFail(t *testing.T) { + dh := newTestDefinitionHandlers(t) ns := &fftypes.Namespace{ ID: fftypes.NewUUID(), @@ -97,10 +97,10 @@ func TestHandleSystemBroadcastNSUpsertFail(t *testing.T) { Value: fftypes.Byteable(b), } - mdi := sh.database.(*databasemocks.Plugin) + mdi := dh.database.(*databasemocks.Plugin) mdi.On("GetNamespace", mock.Anything, "ns1").Return(nil, nil) mdi.On("UpsertNamespace", mock.Anything, mock.Anything, false).Return(fmt.Errorf("pop")) - action, err := sh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Tag: string(fftypes.SystemTagDefineNamespace), }, @@ -111,10 +111,10 @@ func TestHandleSystemBroadcastNSUpsertFail(t *testing.T) { mdi.AssertExpectations(t) } -func TestHandleSystemBroadcastNSMissingData(t *testing.T) { - sh := newTestSystemHandlers(t) +func TestHandleDefinitionBroadcastNSMissingData(t *testing.T) { + dh := newTestDefinitionHandlers(t) - action, err := sh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Tag: string(fftypes.SystemTagDefineNamespace), }, @@ -123,8 +123,8 @@ func TestHandleSystemBroadcastNSMissingData(t *testing.T) { assert.NoError(t, err) } -func TestHandleSystemBroadcastNSBadID(t *testing.T) { - sh := newTestSystemHandlers(t) +func TestHandleDefinitionBroadcastNSBadID(t *testing.T) { + dh := newTestDefinitionHandlers(t) ns := &fftypes.Namespace{} b, err := json.Marshal(&ns) @@ -133,7 +133,7 @@ func TestHandleSystemBroadcastNSBadID(t *testing.T) { Value: fftypes.Byteable(b), } - action, err := sh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Tag: string(fftypes.SystemTagDefineNamespace), }, @@ -142,14 +142,14 @@ func TestHandleSystemBroadcastNSBadID(t *testing.T) { assert.NoError(t, err) } -func TestHandleSystemBroadcastNSBadData(t *testing.T) { - sh := newTestSystemHandlers(t) +func TestHandleDefinitionBroadcastNSBadData(t *testing.T) { + dh := newTestDefinitionHandlers(t) data := &fftypes.Data{ Value: fftypes.Byteable(`!{json`), } - action, err := sh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Tag: string(fftypes.SystemTagDefineNamespace), }, @@ -158,8 +158,8 @@ func TestHandleSystemBroadcastNSBadData(t *testing.T) { assert.NoError(t, err) } -func TestHandleSystemBroadcastDuplicate(t *testing.T) { - sh := newTestSystemHandlers(t) +func TestHandleDefinitionBroadcastDuplicate(t *testing.T) { + dh := newTestDefinitionHandlers(t) ns := &fftypes.Namespace{ ID: fftypes.NewUUID(), @@ -171,9 +171,9 @@ func TestHandleSystemBroadcastDuplicate(t *testing.T) { Value: fftypes.Byteable(b), } - mdi := sh.database.(*databasemocks.Plugin) + mdi := dh.database.(*databasemocks.Plugin) mdi.On("GetNamespace", mock.Anything, "ns1").Return(ns, nil) - action, err := sh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Tag: string(fftypes.SystemTagDefineNamespace), }, @@ -184,8 +184,8 @@ func TestHandleSystemBroadcastDuplicate(t *testing.T) { mdi.AssertExpectations(t) } -func TestHandleSystemBroadcastDuplicateOverrideLocal(t *testing.T) { - sh := newTestSystemHandlers(t) +func TestHandleDefinitionBroadcastDuplicateOverrideLocal(t *testing.T) { + dh := newTestDefinitionHandlers(t) ns := &fftypes.Namespace{ ID: fftypes.NewUUID(), @@ -198,12 +198,12 @@ func TestHandleSystemBroadcastDuplicateOverrideLocal(t *testing.T) { Value: fftypes.Byteable(b), } - mdi := sh.database.(*databasemocks.Plugin) + mdi := dh.database.(*databasemocks.Plugin) mdi.On("GetNamespace", mock.Anything, "ns1").Return(ns, nil) mdi.On("DeleteNamespace", mock.Anything, mock.Anything).Return(nil) mdi.On("UpsertNamespace", mock.Anything, mock.Anything, false).Return(nil) mdi.On("InsertEvent", mock.Anything, mock.Anything).Return(nil) - action, err := sh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Tag: string(fftypes.SystemTagDefineNamespace), }, @@ -214,8 +214,8 @@ func TestHandleSystemBroadcastDuplicateOverrideLocal(t *testing.T) { mdi.AssertExpectations(t) } -func TestHandleSystemBroadcastDuplicateOverrideLocalFail(t *testing.T) { - sh := newTestSystemHandlers(t) +func TestHandleDefinitionBroadcastDuplicateOverrideLocalFail(t *testing.T) { + dh := newTestDefinitionHandlers(t) ns := &fftypes.Namespace{ ID: fftypes.NewUUID(), @@ -228,10 +228,10 @@ func TestHandleSystemBroadcastDuplicateOverrideLocalFail(t *testing.T) { Value: fftypes.Byteable(b), } - mdi := sh.database.(*databasemocks.Plugin) + mdi := dh.database.(*databasemocks.Plugin) mdi.On("GetNamespace", mock.Anything, "ns1").Return(ns, nil) mdi.On("DeleteNamespace", mock.Anything, mock.Anything).Return(fmt.Errorf("pop")) - action, err := sh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Tag: string(fftypes.SystemTagDefineNamespace), }, @@ -242,8 +242,8 @@ func TestHandleSystemBroadcastDuplicateOverrideLocalFail(t *testing.T) { mdi.AssertExpectations(t) } -func TestHandleSystemBroadcastDupCheckFail(t *testing.T) { - sh := newTestSystemHandlers(t) +func TestHandleDefinitionBroadcastDupCheckFail(t *testing.T) { + dh := newTestDefinitionHandlers(t) ns := &fftypes.Namespace{ ID: fftypes.NewUUID(), @@ -255,9 +255,9 @@ func TestHandleSystemBroadcastDupCheckFail(t *testing.T) { Value: fftypes.Byteable(b), } - mdi := sh.database.(*databasemocks.Plugin) + mdi := dh.database.(*databasemocks.Plugin) mdi.On("GetNamespace", mock.Anything, "ns1").Return(nil, fmt.Errorf("pop")) - action, err := sh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Tag: string(fftypes.SystemTagDefineNamespace), }, diff --git a/internal/syshandlers/syshandler_network_node.go b/internal/definitions/definition_handler_network_node.go similarity index 78% rename from internal/syshandlers/syshandler_network_node.go rename to internal/definitions/definition_handler_network_node.go index 83ec1a102d..d7f761a6a6 100644 --- a/internal/syshandlers/syshandler_network_node.go +++ b/internal/definitions/definition_handler_network_node.go @@ -14,7 +14,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package syshandlers +package definitions import ( "context" @@ -23,11 +23,11 @@ import ( "github.com/hyperledger/firefly/pkg/fftypes" ) -func (sh *systemHandlers) handleNodeBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (valid bool, err error) { +func (dh *definitionHandlers) handleNodeBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (valid bool, err error) { l := log.L(ctx) var node fftypes.Node - valid = sh.getSystemBroadcastPayload(ctx, msg, data, &node) + valid = dh.getSystemBroadcastPayload(ctx, msg, data, &node) if !valid { return false, nil } @@ -37,7 +37,7 @@ func (sh *systemHandlers) handleNodeBroadcast(ctx context.Context, msg *fftypes. return false, nil } - owner, err := sh.database.GetOrganizationByIdentity(ctx, node.Owner) + owner, err := dh.database.GetOrganizationByIdentity(ctx, node.Owner) if err != nil { return false, err // We only return database errors } @@ -51,9 +51,9 @@ func (sh *systemHandlers) handleNodeBroadcast(ctx context.Context, msg *fftypes. return false, nil } - existing, err := sh.database.GetNode(ctx, node.Owner, node.Name) + existing, err := dh.database.GetNode(ctx, node.Owner, node.Name) if err == nil && existing == nil { - existing, err = sh.database.GetNodeByID(ctx, node.ID) + existing, err = dh.database.GetNodeByID(ctx, node.ID) } if err != nil { return false, err // We only return database errors @@ -66,12 +66,12 @@ func (sh *systemHandlers) handleNodeBroadcast(ctx context.Context, msg *fftypes. node.ID = nil // we keep the existing ID } - if err = sh.database.UpsertNode(ctx, &node, true); err != nil { + if err = dh.database.UpsertNode(ctx, &node, true); err != nil { return false, err } // Tell the data exchange about this node. Treat these errors like database errors - and return for retry processing - if err = sh.exchange.AddPeer(ctx, node.DX.Peer, node.DX.Endpoint); err != nil { + if err = dh.exchange.AddPeer(ctx, node.DX.Peer, node.DX.Endpoint); err != nil { return false, err } diff --git a/internal/syshandlers/syshandler_network_node_test.go b/internal/definitions/definition_handler_network_node_test.go similarity index 81% rename from internal/syshandlers/syshandler_network_node_test.go rename to internal/definitions/definition_handler_network_node_test.go index 868a88a1a2..d670c235ab 100644 --- a/internal/syshandlers/syshandler_network_node_test.go +++ b/internal/definitions/definition_handler_network_node_test.go @@ -14,7 +14,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package syshandlers +package definitions import ( "context" @@ -29,8 +29,8 @@ import ( "github.com/stretchr/testify/mock" ) -func TestHandleSystemBroadcastNodeOk(t *testing.T) { - sh := newTestSystemHandlers(t) +func TestHandleDefinitionBroadcastNodeOk(t *testing.T) { + dh := newTestDefinitionHandlers(t) node := &fftypes.Node{ ID: fftypes.NewUUID(), @@ -48,14 +48,14 @@ func TestHandleSystemBroadcastNodeOk(t *testing.T) { Value: fftypes.Byteable(b), } - mdi := sh.database.(*databasemocks.Plugin) + mdi := dh.database.(*databasemocks.Plugin) mdi.On("GetOrganizationByIdentity", mock.Anything, "0x23456").Return(&fftypes.Organization{ID: fftypes.NewUUID(), Identity: "0x23456"}, nil) mdi.On("GetNode", mock.Anything, "0x23456", "node1").Return(nil, nil) mdi.On("GetNodeByID", mock.Anything, node.ID).Return(nil, nil) mdi.On("UpsertNode", mock.Anything, mock.Anything, true).Return(nil) - mdx := sh.exchange.(*dataexchangemocks.Plugin) + mdx := dh.exchange.(*dataexchangemocks.Plugin) mdx.On("AddPeer", mock.Anything, "peer1", node.DX.Endpoint).Return(nil) - action, err := sh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Namespace: "ns1", Identity: fftypes.Identity{ @@ -71,8 +71,8 @@ func TestHandleSystemBroadcastNodeOk(t *testing.T) { mdi.AssertExpectations(t) } -func TestHandleSystemBroadcastNodeUpsertFail(t *testing.T) { - sh := newTestSystemHandlers(t) +func TestHandleDefinitionBroadcastNodeUpsertFail(t *testing.T) { + dh := newTestDefinitionHandlers(t) node := &fftypes.Node{ ID: fftypes.NewUUID(), @@ -90,12 +90,12 @@ func TestHandleSystemBroadcastNodeUpsertFail(t *testing.T) { Value: fftypes.Byteable(b), } - mdi := sh.database.(*databasemocks.Plugin) + mdi := dh.database.(*databasemocks.Plugin) mdi.On("GetOrganizationByIdentity", mock.Anything, "0x23456").Return(&fftypes.Organization{ID: fftypes.NewUUID(), Identity: "0x23456"}, nil) mdi.On("GetNode", mock.Anything, "0x23456", "node1").Return(nil, nil) mdi.On("GetNodeByID", mock.Anything, node.ID).Return(nil, nil) mdi.On("UpsertNode", mock.Anything, mock.Anything, true).Return(fmt.Errorf("pop")) - action, err := sh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Namespace: "ns1", Identity: fftypes.Identity{ @@ -111,8 +111,8 @@ func TestHandleSystemBroadcastNodeUpsertFail(t *testing.T) { mdi.AssertExpectations(t) } -func TestHandleSystemBroadcastNodeAddPeerFail(t *testing.T) { - sh := newTestSystemHandlers(t) +func TestHandleDefinitionBroadcastNodeAddPeerFail(t *testing.T) { + dh := newTestDefinitionHandlers(t) node := &fftypes.Node{ ID: fftypes.NewUUID(), @@ -130,14 +130,14 @@ func TestHandleSystemBroadcastNodeAddPeerFail(t *testing.T) { Value: fftypes.Byteable(b), } - mdi := sh.database.(*databasemocks.Plugin) + mdi := dh.database.(*databasemocks.Plugin) mdi.On("GetOrganizationByIdentity", mock.Anything, "0x23456").Return(&fftypes.Organization{ID: fftypes.NewUUID(), Identity: "0x23456"}, nil) mdi.On("GetNode", mock.Anything, "0x23456", "node1").Return(nil, nil) mdi.On("GetNodeByID", mock.Anything, node.ID).Return(nil, nil) mdi.On("UpsertNode", mock.Anything, mock.Anything, true).Return(nil) - mdx := sh.exchange.(*dataexchangemocks.Plugin) + mdx := dh.exchange.(*dataexchangemocks.Plugin) mdx.On("AddPeer", mock.Anything, "peer1", mock.Anything).Return(fmt.Errorf("pop")) - action, err := sh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Namespace: "ns1", Identity: fftypes.Identity{ @@ -153,8 +153,8 @@ func TestHandleSystemBroadcastNodeAddPeerFail(t *testing.T) { mdi.AssertExpectations(t) } -func TestHandleSystemBroadcastNodeDupMismatch(t *testing.T) { - sh := newTestSystemHandlers(t) +func TestHandleDefinitionBroadcastNodeDupMismatch(t *testing.T) { + dh := newTestDefinitionHandlers(t) node := &fftypes.Node{ ID: fftypes.NewUUID(), @@ -172,10 +172,10 @@ func TestHandleSystemBroadcastNodeDupMismatch(t *testing.T) { Value: fftypes.Byteable(b), } - mdi := sh.database.(*databasemocks.Plugin) + mdi := dh.database.(*databasemocks.Plugin) mdi.On("GetOrganizationByIdentity", mock.Anything, "0x23456").Return(&fftypes.Organization{ID: fftypes.NewUUID(), Identity: "0x23456"}, nil) mdi.On("GetNode", mock.Anything, "0x23456", "node1").Return(&fftypes.Node{Owner: "0x99999"}, nil) - action, err := sh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Namespace: "ns1", Identity: fftypes.Identity{ @@ -191,8 +191,8 @@ func TestHandleSystemBroadcastNodeDupMismatch(t *testing.T) { mdi.AssertExpectations(t) } -func TestHandleSystemBroadcastNodeDupOK(t *testing.T) { - sh := newTestSystemHandlers(t) +func TestHandleDefinitionBroadcastNodeDupOK(t *testing.T) { + dh := newTestDefinitionHandlers(t) node := &fftypes.Node{ ID: fftypes.NewUUID(), @@ -210,13 +210,13 @@ func TestHandleSystemBroadcastNodeDupOK(t *testing.T) { Value: fftypes.Byteable(b), } - mdi := sh.database.(*databasemocks.Plugin) + mdi := dh.database.(*databasemocks.Plugin) mdi.On("GetOrganizationByIdentity", mock.Anything, "0x23456").Return(&fftypes.Organization{ID: fftypes.NewUUID(), Identity: "0x23456"}, nil) mdi.On("GetNode", mock.Anything, "0x23456", "node1").Return(&fftypes.Node{Owner: "0x23456"}, nil) mdi.On("UpsertNode", mock.Anything, mock.Anything, true).Return(nil) - mdx := sh.exchange.(*dataexchangemocks.Plugin) + mdx := dh.exchange.(*dataexchangemocks.Plugin) mdx.On("AddPeer", mock.Anything, "peer1", mock.Anything).Return(nil) - action, err := sh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Namespace: "ns1", Identity: fftypes.Identity{ @@ -232,8 +232,8 @@ func TestHandleSystemBroadcastNodeDupOK(t *testing.T) { mdi.AssertExpectations(t) } -func TestHandleSystemBroadcastNodeGetFail(t *testing.T) { - sh := newTestSystemHandlers(t) +func TestHandleDefinitionBroadcastNodeGetFail(t *testing.T) { + dh := newTestDefinitionHandlers(t) node := &fftypes.Node{ ID: fftypes.NewUUID(), @@ -251,10 +251,10 @@ func TestHandleSystemBroadcastNodeGetFail(t *testing.T) { Value: fftypes.Byteable(b), } - mdi := sh.database.(*databasemocks.Plugin) + mdi := dh.database.(*databasemocks.Plugin) mdi.On("GetOrganizationByIdentity", mock.Anything, "0x23456").Return(&fftypes.Organization{ID: fftypes.NewUUID(), Identity: "0x23456"}, nil) mdi.On("GetNode", mock.Anything, "0x23456", "node1").Return(nil, fmt.Errorf("pop")) - action, err := sh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Namespace: "ns1", Identity: fftypes.Identity{ @@ -270,8 +270,8 @@ func TestHandleSystemBroadcastNodeGetFail(t *testing.T) { mdi.AssertExpectations(t) } -func TestHandleSystemBroadcastNodeBadAuthor(t *testing.T) { - sh := newTestSystemHandlers(t) +func TestHandleDefinitionBroadcastNodeBadAuthor(t *testing.T) { + dh := newTestDefinitionHandlers(t) node := &fftypes.Node{ ID: fftypes.NewUUID(), @@ -289,9 +289,9 @@ func TestHandleSystemBroadcastNodeBadAuthor(t *testing.T) { Value: fftypes.Byteable(b), } - mdi := sh.database.(*databasemocks.Plugin) + mdi := dh.database.(*databasemocks.Plugin) mdi.On("GetOrganizationByIdentity", mock.Anything, "0x23456").Return(&fftypes.Organization{ID: fftypes.NewUUID(), Identity: "0x23456"}, nil) - action, err := sh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Namespace: "ns1", Identity: fftypes.Identity{ @@ -307,8 +307,8 @@ func TestHandleSystemBroadcastNodeBadAuthor(t *testing.T) { mdi.AssertExpectations(t) } -func TestHandleSystemBroadcastNodeGetOrgNotFound(t *testing.T) { - sh := newTestSystemHandlers(t) +func TestHandleDefinitionBroadcastNodeGetOrgNotFound(t *testing.T) { + dh := newTestDefinitionHandlers(t) node := &fftypes.Node{ ID: fftypes.NewUUID(), @@ -326,9 +326,9 @@ func TestHandleSystemBroadcastNodeGetOrgNotFound(t *testing.T) { Value: fftypes.Byteable(b), } - mdi := sh.database.(*databasemocks.Plugin) + mdi := dh.database.(*databasemocks.Plugin) mdi.On("GetOrganizationByIdentity", mock.Anything, "0x23456").Return(nil, nil) - action, err := sh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Namespace: "ns1", Identity: fftypes.Identity{ @@ -344,8 +344,8 @@ func TestHandleSystemBroadcastNodeGetOrgNotFound(t *testing.T) { mdi.AssertExpectations(t) } -func TestHandleSystemBroadcastNodeGetOrgFail(t *testing.T) { - sh := newTestSystemHandlers(t) +func TestHandleDefinitionBroadcastNodeGetOrgFail(t *testing.T) { + dh := newTestDefinitionHandlers(t) node := &fftypes.Node{ ID: fftypes.NewUUID(), @@ -363,9 +363,9 @@ func TestHandleSystemBroadcastNodeGetOrgFail(t *testing.T) { Value: fftypes.Byteable(b), } - mdi := sh.database.(*databasemocks.Plugin) + mdi := dh.database.(*databasemocks.Plugin) mdi.On("GetOrganizationByIdentity", mock.Anything, "0x23456").Return(nil, fmt.Errorf("pop")) - action, err := sh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Namespace: "ns1", Identity: fftypes.Identity{ @@ -381,8 +381,8 @@ func TestHandleSystemBroadcastNodeGetOrgFail(t *testing.T) { mdi.AssertExpectations(t) } -func TestHandleSystemBroadcastNodeValidateFail(t *testing.T) { - sh := newTestSystemHandlers(t) +func TestHandleDefinitionBroadcastNodeValidateFail(t *testing.T) { + dh := newTestDefinitionHandlers(t) node := &fftypes.Node{ ID: fftypes.NewUUID(), @@ -400,7 +400,7 @@ func TestHandleSystemBroadcastNodeValidateFail(t *testing.T) { Value: fftypes.Byteable(b), } - action, err := sh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Namespace: "ns1", Identity: fftypes.Identity{ @@ -414,14 +414,14 @@ func TestHandleSystemBroadcastNodeValidateFail(t *testing.T) { assert.NoError(t, err) } -func TestHandleSystemBroadcastNodeUnmarshalFail(t *testing.T) { - sh := newTestSystemHandlers(t) +func TestHandleDefinitionBroadcastNodeUnmarshalFail(t *testing.T) { + dh := newTestDefinitionHandlers(t) data := &fftypes.Data{ Value: fftypes.Byteable(`!json`), } - action, err := sh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Namespace: "ns1", Identity: fftypes.Identity{ diff --git a/internal/syshandlers/syshandler_network_org.go b/internal/definitions/definition_handler_network_org.go similarity index 78% rename from internal/syshandlers/syshandler_network_org.go rename to internal/definitions/definition_handler_network_org.go index df819c103f..e8a6f68e71 100644 --- a/internal/syshandlers/syshandler_network_org.go +++ b/internal/definitions/definition_handler_network_org.go @@ -14,7 +14,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package syshandlers +package definitions import ( "context" @@ -23,11 +23,11 @@ import ( "github.com/hyperledger/firefly/pkg/fftypes" ) -func (sh *systemHandlers) handleOrganizationBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (valid bool, err error) { +func (dh *definitionHandlers) handleOrganizationBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (valid bool, err error) { l := log.L(ctx) var org fftypes.Organization - valid = sh.getSystemBroadcastPayload(ctx, msg, data, &org) + valid = dh.getSystemBroadcastPayload(ctx, msg, data, &org) if !valid { return false, nil } @@ -38,7 +38,7 @@ func (sh *systemHandlers) handleOrganizationBroadcast(ctx context.Context, msg * } if org.Parent != "" { - parent, err := sh.database.GetOrganizationByIdentity(ctx, org.Parent) + parent, err := dh.database.GetOrganizationByIdentity(ctx, org.Parent) if err != nil { return false, err // We only return database errors } @@ -53,11 +53,11 @@ func (sh *systemHandlers) handleOrganizationBroadcast(ctx context.Context, msg * } } - existing, err := sh.database.GetOrganizationByIdentity(ctx, org.Identity) + existing, err := dh.database.GetOrganizationByIdentity(ctx, org.Identity) if err == nil && existing == nil { - existing, err = sh.database.GetOrganizationByName(ctx, org.Name) + existing, err = dh.database.GetOrganizationByName(ctx, org.Name) if err == nil && existing == nil { - existing, err = sh.database.GetOrganizationByID(ctx, org.ID) + existing, err = dh.database.GetOrganizationByID(ctx, org.ID) } } if err != nil { @@ -71,7 +71,7 @@ func (sh *systemHandlers) handleOrganizationBroadcast(ctx context.Context, msg * org.ID = nil // we keep the existing ID } - if err = sh.database.UpsertOrganization(ctx, &org, true); err != nil { + if err = dh.database.UpsertOrganization(ctx, &org, true); err != nil { return false, err } diff --git a/internal/syshandlers/syshandler_network_org_test.go b/internal/definitions/definition_handler_network_org_test.go similarity index 81% rename from internal/syshandlers/syshandler_network_org_test.go rename to internal/definitions/definition_handler_network_org_test.go index 2636ff5a2e..18b7379baa 100644 --- a/internal/syshandlers/syshandler_network_org_test.go +++ b/internal/definitions/definition_handler_network_org_test.go @@ -14,7 +14,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package syshandlers +package definitions import ( "context" @@ -28,8 +28,8 @@ import ( "github.com/stretchr/testify/mock" ) -func TestHandleSystemBroadcastChildOrgOk(t *testing.T) { - sh := newTestSystemHandlers(t) +func TestHandleDefinitionBroadcastChildOrgOk(t *testing.T) { + dh := newTestDefinitionHandlers(t) parentOrg := &fftypes.Organization{ ID: fftypes.NewUUID(), @@ -52,13 +52,13 @@ func TestHandleSystemBroadcastChildOrgOk(t *testing.T) { Value: fftypes.Byteable(b), } - mdi := sh.database.(*databasemocks.Plugin) + mdi := dh.database.(*databasemocks.Plugin) mdi.On("GetOrganizationByIdentity", mock.Anything, "0x23456").Return(parentOrg, nil) mdi.On("GetOrganizationByIdentity", mock.Anything, "0x12345").Return(nil, nil) mdi.On("GetOrganizationByName", mock.Anything, "org1").Return(nil, nil) mdi.On("GetOrganizationByID", mock.Anything, org.ID).Return(nil, nil) mdi.On("UpsertOrganization", mock.Anything, mock.Anything, true).Return(nil) - action, err := sh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Namespace: "ns1", Identity: fftypes.Identity{ @@ -74,8 +74,8 @@ func TestHandleSystemBroadcastChildOrgOk(t *testing.T) { mdi.AssertExpectations(t) } -func TestHandleSystemBroadcastChildOrgDupOk(t *testing.T) { - sh := newTestSystemHandlers(t) +func TestHandleDefinitionBroadcastChildOrgDupOk(t *testing.T) { + dh := newTestDefinitionHandlers(t) parentOrg := &fftypes.Organization{ ID: fftypes.NewUUID(), @@ -98,11 +98,11 @@ func TestHandleSystemBroadcastChildOrgDupOk(t *testing.T) { Value: fftypes.Byteable(b), } - mdi := sh.database.(*databasemocks.Plugin) + mdi := dh.database.(*databasemocks.Plugin) mdi.On("GetOrganizationByIdentity", mock.Anything, "0x23456").Return(parentOrg, nil) mdi.On("GetOrganizationByIdentity", mock.Anything, "0x12345").Return(org, nil) mdi.On("UpsertOrganization", mock.Anything, mock.Anything, true).Return(nil) - action, err := sh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Namespace: "ns1", Identity: fftypes.Identity{ @@ -118,8 +118,8 @@ func TestHandleSystemBroadcastChildOrgDupOk(t *testing.T) { mdi.AssertExpectations(t) } -func TestHandleSystemBroadcastChildOrgBadKey(t *testing.T) { - sh := newTestSystemHandlers(t) +func TestHandleDefinitionBroadcastChildOrgBadKey(t *testing.T) { + dh := newTestDefinitionHandlers(t) parentOrg := &fftypes.Organization{ ID: fftypes.NewUUID(), @@ -142,9 +142,9 @@ func TestHandleSystemBroadcastChildOrgBadKey(t *testing.T) { Value: fftypes.Byteable(b), } - mdi := sh.database.(*databasemocks.Plugin) + mdi := dh.database.(*databasemocks.Plugin) mdi.On("GetOrganizationByIdentity", mock.Anything, "0x23456").Return(parentOrg, nil) - action, err := sh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Namespace: "ns1", Identity: fftypes.Identity{ @@ -160,8 +160,8 @@ func TestHandleSystemBroadcastChildOrgBadKey(t *testing.T) { mdi.AssertExpectations(t) } -func TestHandleSystemBroadcastOrgDupMismatch(t *testing.T) { - sh := newTestSystemHandlers(t) +func TestHandleDefinitionBroadcastOrgDupMismatch(t *testing.T) { + dh := newTestDefinitionHandlers(t) org := &fftypes.Organization{ ID: fftypes.NewUUID(), @@ -177,9 +177,9 @@ func TestHandleSystemBroadcastOrgDupMismatch(t *testing.T) { Value: fftypes.Byteable(b), } - mdi := sh.database.(*databasemocks.Plugin) + mdi := dh.database.(*databasemocks.Plugin) mdi.On("GetOrganizationByIdentity", mock.Anything, "0x12345").Return(&fftypes.Organization{ID: fftypes.NewUUID(), Identity: "0x12345", Parent: "0x9999"}, nil) - action, err := sh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Namespace: "ns1", Identity: fftypes.Identity{ @@ -195,8 +195,8 @@ func TestHandleSystemBroadcastOrgDupMismatch(t *testing.T) { mdi.AssertExpectations(t) } -func TestHandleSystemBroadcastOrgUpsertFail(t *testing.T) { - sh := newTestSystemHandlers(t) +func TestHandleDefinitionBroadcastOrgUpsertFail(t *testing.T) { + dh := newTestDefinitionHandlers(t) org := &fftypes.Organization{ ID: fftypes.NewUUID(), @@ -211,12 +211,12 @@ func TestHandleSystemBroadcastOrgUpsertFail(t *testing.T) { Value: fftypes.Byteable(b), } - mdi := sh.database.(*databasemocks.Plugin) + mdi := dh.database.(*databasemocks.Plugin) mdi.On("GetOrganizationByIdentity", mock.Anything, "0x12345").Return(nil, nil) mdi.On("GetOrganizationByName", mock.Anything, "org1").Return(nil, nil) mdi.On("GetOrganizationByID", mock.Anything, org.ID).Return(nil, nil) mdi.On("UpsertOrganization", mock.Anything, mock.Anything, true).Return(fmt.Errorf("pop")) - action, err := sh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Namespace: "ns1", Identity: fftypes.Identity{ @@ -231,8 +231,8 @@ func TestHandleSystemBroadcastOrgUpsertFail(t *testing.T) { mdi.AssertExpectations(t) } -func TestHandleSystemBroadcastOrgGetOrgFail(t *testing.T) { - sh := newTestSystemHandlers(t) +func TestHandleDefinitionBroadcastOrgGetOrgFail(t *testing.T) { + dh := newTestDefinitionHandlers(t) org := &fftypes.Organization{ ID: fftypes.NewUUID(), @@ -247,9 +247,9 @@ func TestHandleSystemBroadcastOrgGetOrgFail(t *testing.T) { Value: fftypes.Byteable(b), } - mdi := sh.database.(*databasemocks.Plugin) + mdi := dh.database.(*databasemocks.Plugin) mdi.On("GetOrganizationByIdentity", mock.Anything, "0x12345").Return(nil, fmt.Errorf("pop")) - action, err := sh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Namespace: "ns1", Identity: fftypes.Identity{ @@ -264,8 +264,8 @@ func TestHandleSystemBroadcastOrgGetOrgFail(t *testing.T) { mdi.AssertExpectations(t) } -func TestHandleSystemBroadcastOrgAuthorMismatch(t *testing.T) { - sh := newTestSystemHandlers(t) +func TestHandleDefinitionBroadcastOrgAuthorMismatch(t *testing.T) { + dh := newTestDefinitionHandlers(t) org := &fftypes.Organization{ ID: fftypes.NewUUID(), @@ -280,9 +280,9 @@ func TestHandleSystemBroadcastOrgAuthorMismatch(t *testing.T) { Value: fftypes.Byteable(b), } - mdi := sh.database.(*databasemocks.Plugin) + mdi := dh.database.(*databasemocks.Plugin) mdi.On("GetOrganizationByIdentity", mock.Anything, "0x12345").Return(&fftypes.Organization{ID: fftypes.NewUUID(), Identity: "0x12345", Parent: "0x9999"}, nil) - action, err := sh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Namespace: "ns1", Identity: fftypes.Identity{ @@ -298,8 +298,8 @@ func TestHandleSystemBroadcastOrgAuthorMismatch(t *testing.T) { mdi.AssertExpectations(t) } -func TestHandleSystemBroadcastGetParentFail(t *testing.T) { - sh := newTestSystemHandlers(t) +func TestHandleDefinitionBroadcastGetParentFail(t *testing.T) { + dh := newTestDefinitionHandlers(t) org := &fftypes.Organization{ ID: fftypes.NewUUID(), @@ -315,9 +315,9 @@ func TestHandleSystemBroadcastGetParentFail(t *testing.T) { Value: fftypes.Byteable(b), } - mdi := sh.database.(*databasemocks.Plugin) + mdi := dh.database.(*databasemocks.Plugin) mdi.On("GetOrganizationByIdentity", mock.Anything, "0x23456").Return(nil, fmt.Errorf("pop")) - action, err := sh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Namespace: "ns1", Identity: fftypes.Identity{ @@ -333,8 +333,8 @@ func TestHandleSystemBroadcastGetParentFail(t *testing.T) { mdi.AssertExpectations(t) } -func TestHandleSystemBroadcastGetParentNotFound(t *testing.T) { - sh := newTestSystemHandlers(t) +func TestHandleDefinitionBroadcastGetParentNotFound(t *testing.T) { + dh := newTestDefinitionHandlers(t) org := &fftypes.Organization{ ID: fftypes.NewUUID(), @@ -350,9 +350,9 @@ func TestHandleSystemBroadcastGetParentNotFound(t *testing.T) { Value: fftypes.Byteable(b), } - mdi := sh.database.(*databasemocks.Plugin) + mdi := dh.database.(*databasemocks.Plugin) mdi.On("GetOrganizationByIdentity", mock.Anything, "0x23456").Return(nil, nil) - action, err := sh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Namespace: "ns1", Identity: fftypes.Identity{ @@ -368,8 +368,8 @@ func TestHandleSystemBroadcastGetParentNotFound(t *testing.T) { mdi.AssertExpectations(t) } -func TestHandleSystemBroadcastValidateFail(t *testing.T) { - sh := newTestSystemHandlers(t) +func TestHandleDefinitionBroadcastValidateFail(t *testing.T) { + dh := newTestDefinitionHandlers(t) org := &fftypes.Organization{ ID: fftypes.NewUUID(), @@ -383,7 +383,7 @@ func TestHandleSystemBroadcastValidateFail(t *testing.T) { Value: fftypes.Byteable(b), } - action, err := sh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Namespace: "ns1", Identity: fftypes.Identity{ @@ -397,14 +397,14 @@ func TestHandleSystemBroadcastValidateFail(t *testing.T) { assert.NoError(t, err) } -func TestHandleSystemBroadcastUnmarshalFail(t *testing.T) { - sh := newTestSystemHandlers(t) +func TestHandleDefinitionBroadcastUnmarshalFail(t *testing.T) { + dh := newTestDefinitionHandlers(t) data := &fftypes.Data{ Value: fftypes.Byteable(`!json`), } - action, err := sh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Namespace: "ns1", Identity: fftypes.Identity{ diff --git a/internal/syshandlers/syshandler_test.go b/internal/definitions/definition_handler_test.go similarity index 75% rename from internal/syshandlers/syshandler_test.go rename to internal/definitions/definition_handler_test.go index 47f82e2d9a..ae0c49ab1b 100644 --- a/internal/syshandlers/syshandler_test.go +++ b/internal/definitions/definition_handler_test.go @@ -14,7 +14,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package syshandlers +package definitions import ( "context" @@ -31,19 +31,19 @@ import ( "github.com/stretchr/testify/mock" ) -func newTestSystemHandlers(t *testing.T) *systemHandlers { +func newTestDefinitionHandlers(t *testing.T) *definitionHandlers { mdi := &databasemocks.Plugin{} mdx := &dataexchangemocks.Plugin{} mdm := &datamocks.Manager{} mbm := &broadcastmocks.Manager{} mpm := &privatemessagingmocks.Manager{} mam := &assetmocks.Manager{} - return NewSystemHandlers(mdi, mdx, mdm, mbm, mpm, mam).(*systemHandlers) + return NewDefinitionHandlers(mdi, mdx, mdm, mbm, mpm, mam).(*definitionHandlers) } func TestHandleSystemBroadcastUnknown(t *testing.T) { - sh := newTestSystemHandlers(t) - action, err := sh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + dh := newTestDefinitionHandlers(t) + action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Tag: "unknown", }, @@ -53,8 +53,8 @@ func TestHandleSystemBroadcastUnknown(t *testing.T) { } func TestGetSystemBroadcastPayloadMissingData(t *testing.T) { - sh := newTestSystemHandlers(t) - valid := sh.getSystemBroadcastPayload(context.Background(), &fftypes.Message{ + dh := newTestDefinitionHandlers(t) + valid := dh.getSystemBroadcastPayload(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Tag: "unknown", }, @@ -63,8 +63,8 @@ func TestGetSystemBroadcastPayloadMissingData(t *testing.T) { } func TestGetSystemBroadcastPayloadBadJSON(t *testing.T) { - sh := newTestSystemHandlers(t) - valid := sh.getSystemBroadcastPayload(context.Background(), &fftypes.Message{ + dh := newTestDefinitionHandlers(t) + valid := dh.getSystemBroadcastPayload(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Tag: "unknown", }, @@ -75,17 +75,17 @@ func TestGetSystemBroadcastPayloadBadJSON(t *testing.T) { func TestPrivateMessagingPassthroughs(t *testing.T) { ctx := context.Background() - sh := newTestSystemHandlers(t) - mpm := sh.messaging.(*privatemessagingmocks.Manager) + dh := newTestDefinitionHandlers(t) + mpm := dh.messaging.(*privatemessagingmocks.Manager) mpm.On("GetGroupByID", ctx, mock.Anything).Return(nil, nil) mpm.On("GetGroups", ctx, mock.Anything).Return(nil, nil, nil) mpm.On("ResolveInitGroup", ctx, mock.Anything).Return(nil, nil) mpm.On("EnsureLocalGroup", ctx, mock.Anything).Return(false, nil) - _, _ = sh.GetGroupByID(ctx, fftypes.NewUUID().String()) - _, _, _ = sh.GetGroups(ctx, nil) - _, _ = sh.ResolveInitGroup(ctx, nil) - _, _ = sh.EnsureLocalGroup(ctx, nil) + _, _ = dh.GetGroupByID(ctx, fftypes.NewUUID().String()) + _, _, _ = dh.GetGroups(ctx, nil) + _, _ = dh.ResolveInitGroup(ctx, nil) + _, _ = dh.EnsureLocalGroup(ctx, nil) mpm.AssertExpectations(t) diff --git a/internal/syshandlers/syshandler_tokenpool.go b/internal/definitions/definition_handler_tokenpool.go similarity index 70% rename from internal/syshandlers/syshandler_tokenpool.go rename to internal/definitions/definition_handler_tokenpool.go index 054c694ac5..42ae5d6992 100644 --- a/internal/syshandlers/syshandler_tokenpool.go +++ b/internal/definitions/definition_handler_tokenpool.go @@ -14,7 +14,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package syshandlers +package definitions import ( "context" @@ -24,38 +24,38 @@ import ( "github.com/hyperledger/firefly/pkg/fftypes" ) -func (sh *systemHandlers) confirmPoolAnnounceOp(ctx context.Context, pool *fftypes.TokenPool) error { +func (dh *definitionHandlers) confirmPoolAnnounceOp(ctx context.Context, pool *fftypes.TokenPool) error { // Find a matching operation within this transaction fb := database.OperationQueryFactory.NewFilter(ctx) filter := fb.And( fb.Eq("tx", pool.TX.ID), fb.Eq("type", fftypes.OpTypeTokenAnnouncePool), ) - if operations, _, err := sh.database.GetOperations(ctx, filter); err != nil { + if operations, _, err := dh.database.GetOperations(ctx, filter); err != nil { return err } else if len(operations) > 0 { op := operations[0] update := database.OperationQueryFactory.NewUpdate(ctx). Set("status", fftypes.OpStatusSucceeded). Set("output", fftypes.JSONObject{"message": pool.Message}) - if err := sh.database.UpdateOperation(ctx, op.ID, update); err != nil { + if err := dh.database.UpdateOperation(ctx, op.ID, update); err != nil { return err } } return nil } -func (sh *systemHandlers) persistTokenPool(ctx context.Context, announce *fftypes.TokenPoolAnnouncement) (valid bool, err error) { +func (dh *definitionHandlers) persistTokenPool(ctx context.Context, announce *fftypes.TokenPoolAnnouncement) (valid bool, err error) { pool := announce.Pool // Mark announce operation (if any) completed - if err := sh.confirmPoolAnnounceOp(ctx, pool); err != nil { + if err := dh.confirmPoolAnnounceOp(ctx, pool); err != nil { return false, err // retryable } // Create the pool in pending state pool.State = fftypes.TokenPoolStatePending - err = sh.database.UpsertTokenPool(ctx, pool) + err = dh.database.UpsertTokenPool(ctx, pool) if err != nil { if err == database.IDMismatch { log.L(ctx).Errorf("Invalid token pool '%s'. ID mismatch with existing record", pool.ID) @@ -68,15 +68,15 @@ func (sh *systemHandlers) persistTokenPool(ctx context.Context, announce *fftype return true, nil } -func (sh *systemHandlers) rejectPool(ctx context.Context, pool *fftypes.TokenPool) error { +func (dh *definitionHandlers) rejectPool(ctx context.Context, pool *fftypes.TokenPool) error { event := fftypes.NewEvent(fftypes.EventTypePoolRejected, pool.Namespace, pool.ID) - err := sh.database.InsertEvent(ctx, event) + err := dh.database.InsertEvent(ctx, event) return err } -func (sh *systemHandlers) handleTokenPoolBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (SystemBroadcastAction, error) { +func (dh *definitionHandlers) handleTokenPoolBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (SystemBroadcastAction, error) { var announce fftypes.TokenPoolAnnouncement - if valid := sh.getSystemBroadcastPayload(ctx, msg, data, &announce); !valid { + if valid := dh.getSystemBroadcastPayload(ctx, msg, data, &announce); !valid { return ActionReject, nil } @@ -85,23 +85,23 @@ func (sh *systemHandlers) handleTokenPoolBroadcast(ctx context.Context, msg *fft if err := pool.Validate(ctx); err != nil { log.L(ctx).Warnf("Token pool '%s' rejected - validate failed: %s", pool.ID, err) - return ActionReject, sh.rejectPool(ctx, pool) + return ActionReject, dh.rejectPool(ctx, pool) } // Check if pool has already been confirmed on chain (and confirm the message if so) - if existingPool, err := sh.database.GetTokenPoolByID(ctx, pool.ID); err != nil { + if existingPool, err := dh.database.GetTokenPoolByID(ctx, pool.ID); err != nil { return ActionRetry, err } else if existingPool != nil && existingPool.State == fftypes.TokenPoolStateConfirmed { return ActionConfirm, nil } - if valid, err := sh.persistTokenPool(ctx, &announce); err != nil { + if valid, err := dh.persistTokenPool(ctx, &announce); err != nil { return ActionRetry, err } else if !valid { - return ActionReject, sh.rejectPool(ctx, pool) + return ActionReject, dh.rejectPool(ctx, pool) } - if err := sh.assets.ActivateTokenPool(ctx, pool, announce.TX); err != nil { + if err := dh.assets.ActivateTokenPool(ctx, pool, announce.TX); err != nil { log.L(ctx).Errorf("Failed to activate token pool '%s': %s", pool.ID, err) return ActionRetry, err } diff --git a/internal/syshandlers/syshandler_tokenpool_test.go b/internal/definitions/definition_handler_tokenpool_test.go similarity index 96% rename from internal/syshandlers/syshandler_tokenpool_test.go rename to internal/definitions/definition_handler_tokenpool_test.go index 01852c0b07..449a851846 100644 --- a/internal/syshandlers/syshandler_tokenpool_test.go +++ b/internal/definitions/definition_handler_tokenpool_test.go @@ -14,7 +14,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package syshandlers +package definitions import ( "context" @@ -67,7 +67,7 @@ func buildPoolDefinitionMessage(announce *fftypes.TokenPoolAnnouncement) (*fftyp } func TestHandleSystemBroadcastTokenPoolActivateOK(t *testing.T) { - sh := newTestSystemHandlers(t) + sh := newTestDefinitionHandlers(t) announce := newPoolAnnouncement() pool := announce.Pool @@ -98,7 +98,7 @@ func TestHandleSystemBroadcastTokenPoolActivateOK(t *testing.T) { } func TestHandleSystemBroadcastTokenPoolUpdateOpFail(t *testing.T) { - sh := newTestSystemHandlers(t) + sh := newTestDefinitionHandlers(t) announce := newPoolAnnouncement() pool := announce.Pool @@ -120,7 +120,7 @@ func TestHandleSystemBroadcastTokenPoolUpdateOpFail(t *testing.T) { } func TestHandleSystemBroadcastTokenPoolGetPoolFail(t *testing.T) { - sh := newTestSystemHandlers(t) + sh := newTestDefinitionHandlers(t) announce := newPoolAnnouncement() pool := announce.Pool @@ -138,7 +138,7 @@ func TestHandleSystemBroadcastTokenPoolGetPoolFail(t *testing.T) { } func TestHandleSystemBroadcastTokenPoolExisting(t *testing.T) { - sh := newTestSystemHandlers(t) + sh := newTestDefinitionHandlers(t) announce := newPoolAnnouncement() pool := announce.Pool @@ -167,7 +167,7 @@ func TestHandleSystemBroadcastTokenPoolExisting(t *testing.T) { } func TestHandleSystemBroadcastTokenPoolExistingConfirmed(t *testing.T) { - sh := newTestSystemHandlers(t) + sh := newTestDefinitionHandlers(t) announce := newPoolAnnouncement() pool := announce.Pool @@ -188,7 +188,7 @@ func TestHandleSystemBroadcastTokenPoolExistingConfirmed(t *testing.T) { } func TestHandleSystemBroadcastTokenPoolIDMismatch(t *testing.T) { - sh := newTestSystemHandlers(t) + sh := newTestDefinitionHandlers(t) announce := newPoolAnnouncement() pool := announce.Pool @@ -216,7 +216,7 @@ func TestHandleSystemBroadcastTokenPoolIDMismatch(t *testing.T) { } func TestHandleSystemBroadcastTokenPoolFailUpsert(t *testing.T) { - sh := newTestSystemHandlers(t) + sh := newTestDefinitionHandlers(t) announce := newPoolAnnouncement() pool := announce.Pool @@ -241,7 +241,7 @@ func TestHandleSystemBroadcastTokenPoolFailUpsert(t *testing.T) { } func TestHandleSystemBroadcastTokenPoolOpsFail(t *testing.T) { - sh := newTestSystemHandlers(t) + sh := newTestDefinitionHandlers(t) announce := newPoolAnnouncement() pool := announce.Pool @@ -260,7 +260,7 @@ func TestHandleSystemBroadcastTokenPoolOpsFail(t *testing.T) { } func TestHandleSystemBroadcastTokenPoolActivateFail(t *testing.T) { - sh := newTestSystemHandlers(t) + sh := newTestDefinitionHandlers(t) announce := newPoolAnnouncement() pool := announce.Pool @@ -291,7 +291,7 @@ func TestHandleSystemBroadcastTokenPoolActivateFail(t *testing.T) { } func TestHandleSystemBroadcastTokenPoolValidateFail(t *testing.T) { - sh := newTestSystemHandlers(t) + sh := newTestDefinitionHandlers(t) announce := &fftypes.TokenPoolAnnouncement{ Pool: &fftypes.TokenPool{}, @@ -313,7 +313,7 @@ func TestHandleSystemBroadcastTokenPoolValidateFail(t *testing.T) { } func TestHandleSystemBroadcastTokenPoolBadMessage(t *testing.T) { - sh := newTestSystemHandlers(t) + sh := newTestDefinitionHandlers(t) msg := &fftypes.Message{ Header: fftypes.MessageHeader{ diff --git a/internal/syshandlers/reply_sender.go b/internal/definitions/reply_sender.go similarity index 80% rename from internal/syshandlers/reply_sender.go rename to internal/definitions/reply_sender.go index 040796c662..495b2d9c40 100644 --- a/internal/syshandlers/reply_sender.go +++ b/internal/definitions/reply_sender.go @@ -14,7 +14,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package syshandlers +package definitions import ( "context" @@ -23,12 +23,12 @@ import ( "github.com/hyperledger/firefly/pkg/fftypes" ) -func (sh *systemHandlers) SendReply(ctx context.Context, event *fftypes.Event, reply *fftypes.MessageInOut) { +func (dh *definitionHandlers) SendReply(ctx context.Context, event *fftypes.Event, reply *fftypes.MessageInOut) { var err error if reply.Header.Group != nil { - err = sh.messaging.NewMessage(event.Namespace, reply).Send(ctx) + err = dh.messaging.NewMessage(event.Namespace, reply).Send(ctx) } else { - err = sh.broadcast.NewBroadcast(event.Namespace, reply).Send(ctx) + err = dh.broadcast.NewBroadcast(event.Namespace, reply).Send(ctx) } if err != nil { log.L(ctx).Errorf("Failed to send reply: %s", err) diff --git a/internal/syshandlers/reply_sender_test.go b/internal/definitions/reply_sender_test.go similarity index 83% rename from internal/syshandlers/reply_sender_test.go rename to internal/definitions/reply_sender_test.go index 775a829d1a..074d79a91d 100644 --- a/internal/syshandlers/reply_sender_test.go +++ b/internal/definitions/reply_sender_test.go @@ -14,7 +14,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package syshandlers +package definitions import ( "context" @@ -29,13 +29,13 @@ import ( ) func TestSendReplyBroadcastFail(t *testing.T) { - sh := newTestSystemHandlers(t) + dh := newTestDefinitionHandlers(t) mms := &sysmessagingmocks.MessageSender{} - mbm := sh.broadcast.(*broadcastmocks.Manager) + mbm := dh.broadcast.(*broadcastmocks.Manager) mbm.On("NewBroadcast", "ns1", mock.Anything).Return(mms) mms.On("Send", context.Background()).Return(fmt.Errorf("pop")) - sh.SendReply(context.Background(), &fftypes.Event{ + dh.SendReply(context.Background(), &fftypes.Event{ ID: fftypes.NewUUID(), Namespace: "ns1", }, &fftypes.MessageInOut{}) @@ -45,13 +45,13 @@ func TestSendReplyBroadcastFail(t *testing.T) { } func TestSendReplyPrivateFail(t *testing.T) { - sh := newTestSystemHandlers(t) + dh := newTestDefinitionHandlers(t) mms := &sysmessagingmocks.MessageSender{} - mpm := sh.messaging.(*privatemessagingmocks.Manager) + mpm := dh.messaging.(*privatemessagingmocks.Manager) mpm.On("NewMessage", "ns1", mock.Anything).Return(mms) mms.On("Send", context.Background()).Return(fmt.Errorf("pop")) - sh.SendReply(context.Background(), &fftypes.Event{ + dh.SendReply(context.Background(), &fftypes.Event{ ID: fftypes.NewUUID(), Namespace: "ns1", }, &fftypes.MessageInOut{ @@ -67,7 +67,7 @@ func TestSendReplyPrivateFail(t *testing.T) { } func TestSendReplyPrivateOk(t *testing.T) { - sh := newTestSystemHandlers(t) + dh := newTestDefinitionHandlers(t) msg := &fftypes.Message{ Header: fftypes.MessageHeader{ @@ -76,11 +76,11 @@ func TestSendReplyPrivateOk(t *testing.T) { } mms := &sysmessagingmocks.MessageSender{} - mpm := sh.messaging.(*privatemessagingmocks.Manager) + mpm := dh.messaging.(*privatemessagingmocks.Manager) mpm.On("NewMessage", "ns1", mock.Anything).Return(mms) mms.On("Send", context.Background()).Return(nil) - sh.SendReply(context.Background(), &fftypes.Event{ + dh.SendReply(context.Background(), &fftypes.Event{ ID: fftypes.NewUUID(), Namespace: "ns1", }, &fftypes.MessageInOut{ diff --git a/internal/events/aggregator.go b/internal/events/aggregator.go index d7ab6bc2a6..9c6b5bcb2a 100644 --- a/internal/events/aggregator.go +++ b/internal/events/aggregator.go @@ -24,9 +24,9 @@ import ( "github.com/hyperledger/firefly/internal/config" "github.com/hyperledger/firefly/internal/data" + "github.com/hyperledger/firefly/internal/definitions" "github.com/hyperledger/firefly/internal/log" "github.com/hyperledger/firefly/internal/retry" - "github.com/hyperledger/firefly/internal/syshandlers" "github.com/hyperledger/firefly/pkg/database" "github.com/hyperledger/firefly/pkg/fftypes" ) @@ -38,7 +38,7 @@ const ( type aggregator struct { ctx context.Context database database.Plugin - syshandlers syshandlers.SystemHandlers + definitions definitions.DefinitionHandlers data data.Manager eventPoller *eventPoller newPins chan int64 @@ -47,12 +47,12 @@ type aggregator struct { retry *retry.Retry } -func newAggregator(ctx context.Context, di database.Plugin, sh syshandlers.SystemHandlers, dm data.Manager, en *eventNotifier) *aggregator { +func newAggregator(ctx context.Context, di database.Plugin, sh definitions.DefinitionHandlers, dm data.Manager, en *eventNotifier) *aggregator { batchSize := config.GetInt(config.EventAggregatorBatchSize) ag := &aggregator{ ctx: log.WithLogField(ctx, "role", "aggregator"), database: di, - syshandlers: sh, + definitions: sh, data: dm, newPins: make(chan int64), offchainBatches: make(chan *fftypes.UUID, 1), // hops to queuedRewinds with a shouldertab on the event poller @@ -340,7 +340,7 @@ func (ag *aggregator) attemptContextInit(ctx context.Context, msg *fftypes.Messa l := log.L(ctx) // It might be the system topic/context initializing the group - group, err := ag.syshandlers.ResolveInitGroup(ctx, msg) + group, err := ag.definitions.ResolveInitGroup(ctx, msg) if err != nil || group == nil { return nil, err } @@ -431,15 +431,15 @@ func (ag *aggregator) attemptMessageDispatch(ctx context.Context, msg *fftypes.M valid := true eventType := fftypes.EventTypeMessageConfirmed switch { - case msg.Header.Namespace == fftypes.SystemNamespace: - // We handle system events in-line on the aggregator, as it would be confusing for apps to be - // dispatched subsequent events before we have processed the system events they depend on. - var action syshandlers.SystemBroadcastAction - action, err = ag.syshandlers.HandleSystemBroadcast(ctx, msg, data) - if action == syshandlers.ActionRetry || action == syshandlers.ActionWait { + case msg.Header.Type == fftypes.MessageTypeDefinition: + // We handle definition events in-line on the aggregator, as it would be confusing for apps to be + // dispatched subsequent events before we have processed the definition events they depend on. + var action definitions.SystemBroadcastAction + action, err = ag.definitions.HandleSystemBroadcast(ctx, msg, data) + if action == definitions.ActionRetry || action == definitions.ActionWait { return false, err } - valid = action == syshandlers.ActionConfirm + valid = action == definitions.ActionConfirm case msg.Header.Type == fftypes.MessageTypeGroupInit: // Already handled as part of resolving the context - do nothing. diff --git a/internal/events/aggregator_test.go b/internal/events/aggregator_test.go index dbccb518b1..a4cfbd66f8 100644 --- a/internal/events/aggregator_test.go +++ b/internal/events/aggregator_test.go @@ -23,10 +23,10 @@ import ( "testing" "github.com/hyperledger/firefly/internal/config" - "github.com/hyperledger/firefly/internal/syshandlers" + "github.com/hyperledger/firefly/internal/definitions" "github.com/hyperledger/firefly/mocks/databasemocks" "github.com/hyperledger/firefly/mocks/datamocks" - "github.com/hyperledger/firefly/mocks/syshandlersmocks" + "github.com/hyperledger/firefly/mocks/definitionsmocks" "github.com/hyperledger/firefly/pkg/database" "github.com/hyperledger/firefly/pkg/fftypes" "github.com/stretchr/testify/assert" @@ -36,7 +36,7 @@ import ( func newTestAggregator() (*aggregator, func()) { mdi := &databasemocks.Plugin{} mdm := &datamocks.Manager{} - msh := &syshandlersmocks.SystemHandlers{} + msh := &definitionsmocks.DefinitionHandlers{} ctx, cancel := context.WithCancel(context.Background()) ag := newAggregator(ctx, mdi, msh, mdm, newEventNotifier(ctx, "ut")) return ag, cancel @@ -65,7 +65,7 @@ func TestAggregationMaskedZeroNonceMatch(t *testing.T) { mdi := ag.database.(*databasemocks.Plugin) mdm := ag.data.(*datamocks.Manager) - msh := ag.syshandlers.(*syshandlersmocks.SystemHandlers) + msh := ag.definitions.(*definitionsmocks.DefinitionHandlers) // Get the batch mdi.On("GetBatchByID", ag.ctx, batchID).Return(&fftypes.Batch{ @@ -631,7 +631,7 @@ func TestAttemptContextInitGetGroupByIDFail(t *testing.T) { ag, cancel := newTestAggregator() defer cancel() - msh := ag.syshandlers.(*syshandlersmocks.SystemHandlers) + msh := ag.definitions.(*definitionsmocks.DefinitionHandlers) msh.On("ResolveInitGroup", ag.ctx, mock.Anything).Return(nil, fmt.Errorf("pop")) _, err := ag.attemptContextInit(ag.ctx, &fftypes.Message{ @@ -652,7 +652,7 @@ func TestAttemptContextInitGroupNotFound(t *testing.T) { ag, cancel := newTestAggregator() defer cancel() - msh := ag.syshandlers.(*syshandlersmocks.SystemHandlers) + msh := ag.definitions.(*definitionsmocks.DefinitionHandlers) msh.On("ResolveInitGroup", ag.ctx, mock.Anything).Return(nil, nil) _, err := ag.attemptContextInit(ag.ctx, &fftypes.Message{ @@ -675,7 +675,7 @@ func TestAttemptContextInitAuthorMismatch(t *testing.T) { groupID := fftypes.NewRandB32() zeroHash := ag.calcHash("topic1", groupID, "author2", 0) - msh := ag.syshandlers.(*syshandlersmocks.SystemHandlers) + msh := ag.definitions.(*definitionsmocks.DefinitionHandlers) msh.On("ResolveInitGroup", ag.ctx, mock.Anything).Return(&fftypes.Group{ GroupIdentity: fftypes.GroupIdentity{ Members: fftypes.Members{ @@ -703,7 +703,7 @@ func TestAttemptContextInitNoMatch(t *testing.T) { defer cancel() groupID := fftypes.NewRandB32() - msh := ag.syshandlers.(*syshandlersmocks.SystemHandlers) + msh := ag.definitions.(*definitionsmocks.DefinitionHandlers) msh.On("ResolveInitGroup", ag.ctx, mock.Anything).Return(&fftypes.Group{ GroupIdentity: fftypes.GroupIdentity{ Members: fftypes.Members{ @@ -732,7 +732,7 @@ func TestAttemptContextInitGetPinsFail(t *testing.T) { groupID := fftypes.NewRandB32() zeroHash := ag.calcHash("topic1", groupID, "author1", 0) - msh := ag.syshandlers.(*syshandlersmocks.SystemHandlers) + msh := ag.definitions.(*definitionsmocks.DefinitionHandlers) mdi := ag.database.(*databasemocks.Plugin) msh.On("ResolveInitGroup", ag.ctx, mock.Anything).Return(&fftypes.Group{ GroupIdentity: fftypes.GroupIdentity{ @@ -764,7 +764,7 @@ func TestAttemptContextInitGetPinsBlocked(t *testing.T) { groupID := fftypes.NewRandB32() zeroHash := ag.calcHash("topic1", groupID, "author1", 0) mdi := ag.database.(*databasemocks.Plugin) - msh := ag.syshandlers.(*syshandlersmocks.SystemHandlers) + msh := ag.definitions.(*definitionsmocks.DefinitionHandlers) msh.On("ResolveInitGroup", ag.ctx, mock.Anything).Return(&fftypes.Group{ GroupIdentity: fftypes.GroupIdentity{ Members: fftypes.Members{ @@ -798,7 +798,7 @@ func TestAttemptContextInitInsertPinsFail(t *testing.T) { groupID := fftypes.NewRandB32() zeroHash := ag.calcHash("topic1", groupID, "author1", 0) mdi := ag.database.(*databasemocks.Plugin) - msh := ag.syshandlers.(*syshandlersmocks.SystemHandlers) + msh := ag.definitions.(*definitionsmocks.DefinitionHandlers) msh.On("ResolveInitGroup", ag.ctx, mock.Anything).Return(&fftypes.Group{ GroupIdentity: fftypes.GroupIdentity{ Members: fftypes.Members{ @@ -968,8 +968,8 @@ func TestAttemptMessageDispatchFailValidateBadSystem(t *testing.T) { ag, cancel := newTestAggregator() defer cancel() - msh := ag.syshandlers.(*syshandlersmocks.SystemHandlers) - msh.On("HandleSystemBroadcast", mock.Anything, mock.Anything, mock.Anything).Return(syshandlers.ActionReject, nil) + msh := ag.definitions.(*definitionsmocks.DefinitionHandlers) + msh.On("HandleSystemBroadcast", mock.Anything, mock.Anything, mock.Anything).Return(definitions.ActionReject, nil) mdm := ag.data.(*datamocks.Manager) mdm.On("GetMessageData", ag.ctx, mock.Anything, true).Return([]*fftypes.Data{}, true, nil) @@ -996,8 +996,9 @@ func TestAttemptMessageDispatchFailValidateBadSystem(t *testing.T) { _, err := ag.attemptMessageDispatch(ag.ctx, &fftypes.Message{ Header: fftypes.MessageHeader{ + Type: fftypes.MessageTypeDefinition, ID: fftypes.NewUUID(), - Namespace: fftypes.SystemNamespace, + Namespace: "any", }, Data: fftypes.DataRefs{ {ID: fftypes.NewUUID()}, @@ -1011,16 +1012,17 @@ func TestAttemptMessageDispatchFailValidateSystemFail(t *testing.T) { ag, cancel := newTestAggregator() defer cancel() - msh := ag.syshandlers.(*syshandlersmocks.SystemHandlers) - msh.On("HandleSystemBroadcast", mock.Anything, mock.Anything, mock.Anything).Return(syshandlers.ActionRetry, fmt.Errorf("pop")) + msh := ag.definitions.(*definitionsmocks.DefinitionHandlers) + msh.On("HandleSystemBroadcast", mock.Anything, mock.Anything, mock.Anything).Return(definitions.ActionRetry, fmt.Errorf("pop")) mdm := ag.data.(*datamocks.Manager) mdm.On("GetMessageData", ag.ctx, mock.Anything, true).Return([]*fftypes.Data{}, true, nil) _, err := ag.attemptMessageDispatch(ag.ctx, &fftypes.Message{ Header: fftypes.MessageHeader{ + Type: fftypes.MessageTypeDefinition, ID: fftypes.NewUUID(), - Namespace: fftypes.SystemNamespace, + Namespace: "any", }, Data: fftypes.DataRefs{ {ID: fftypes.NewUUID()}, diff --git a/internal/events/dx_callbacks.go b/internal/events/dx_callbacks.go index 0878a8d628..9f11f3d9cc 100644 --- a/internal/events/dx_callbacks.go +++ b/internal/events/dx_callbacks.go @@ -275,7 +275,7 @@ func (em *eventManager) unpinnedMessageReceived(peerID string, message *fftypes. return em.retry.Do(em.ctx, "unpinned message received", func(attempt int) (bool, error) { err := em.database.RunAsGroup(em.ctx, func(ctx context.Context) error { - if valid, err := em.syshandlers.EnsureLocalGroup(ctx, group); err != nil || !valid { + if valid, err := em.definitions.EnsureLocalGroup(ctx, group); err != nil || !valid { return err } diff --git a/internal/events/dx_callbacks_test.go b/internal/events/dx_callbacks_test.go index 01182db389..facf7f5a0f 100644 --- a/internal/events/dx_callbacks_test.go +++ b/internal/events/dx_callbacks_test.go @@ -23,7 +23,7 @@ import ( "github.com/hyperledger/firefly/mocks/databasemocks" "github.com/hyperledger/firefly/mocks/dataexchangemocks" - "github.com/hyperledger/firefly/mocks/syshandlersmocks" + "github.com/hyperledger/firefly/mocks/definitionsmocks" "github.com/hyperledger/firefly/pkg/database" "github.com/hyperledger/firefly/pkg/fftypes" "github.com/stretchr/testify/assert" @@ -599,7 +599,7 @@ func TestMessageReceiveMessageIdentityFail(t *testing.T) { mdi := em.database.(*databasemocks.Plugin) mdx := &dataexchangemocks.Plugin{} - msh := em.syshandlers.(*syshandlersmocks.SystemHandlers) + msh := em.definitions.(*definitionsmocks.DefinitionHandlers) msh.On("EnsureLocalGroup", em.ctx, mock.Anything).Return(true, nil) mdi.On("GetNodes", em.ctx, mock.Anything).Return(nil, nil, fmt.Errorf("pop")) @@ -636,7 +636,7 @@ func TestMessageReceiveMessageIdentityIncorrect(t *testing.T) { mdi := em.database.(*databasemocks.Plugin) mdx := &dataexchangemocks.Plugin{} - msh := em.syshandlers.(*syshandlersmocks.SystemHandlers) + msh := em.definitions.(*definitionsmocks.DefinitionHandlers) msh.On("EnsureLocalGroup", em.ctx, mock.Anything).Return(true, nil) mdi.On("GetNodes", em.ctx, mock.Anything).Return([]*fftypes.Node{}, nil, nil) @@ -673,7 +673,7 @@ func TestMessageReceiveMessagePersistMessageFail(t *testing.T) { mdi := em.database.(*databasemocks.Plugin) mdx := &dataexchangemocks.Plugin{} - msh := em.syshandlers.(*syshandlersmocks.SystemHandlers) + msh := em.definitions.(*definitionsmocks.DefinitionHandlers) msh.On("EnsureLocalGroup", em.ctx, mock.Anything).Return(true, nil) mdi.On("GetNodes", em.ctx, mock.Anything).Return([]*fftypes.Node{ @@ -723,7 +723,7 @@ func TestMessageReceiveMessagePersistDataFail(t *testing.T) { mdi := em.database.(*databasemocks.Plugin) mdx := &dataexchangemocks.Plugin{} - msh := em.syshandlers.(*syshandlersmocks.SystemHandlers) + msh := em.definitions.(*definitionsmocks.DefinitionHandlers) msh.On("EnsureLocalGroup", em.ctx, mock.Anything).Return(true, nil) mdi.On("GetNodes", em.ctx, mock.Anything).Return([]*fftypes.Node{ @@ -773,7 +773,7 @@ func TestMessageReceiveMessagePersistEventFail(t *testing.T) { mdi := em.database.(*databasemocks.Plugin) mdx := &dataexchangemocks.Plugin{} - msh := em.syshandlers.(*syshandlersmocks.SystemHandlers) + msh := em.definitions.(*definitionsmocks.DefinitionHandlers) msh.On("EnsureLocalGroup", em.ctx, mock.Anything).Return(true, nil) mdi.On("GetNodes", em.ctx, mock.Anything).Return([]*fftypes.Node{ @@ -825,7 +825,7 @@ func TestMessageReceiveMessageEnsureLocalGroupFail(t *testing.T) { mdi := em.database.(*databasemocks.Plugin) mdx := &dataexchangemocks.Plugin{} - msh := em.syshandlers.(*syshandlersmocks.SystemHandlers) + msh := em.definitions.(*definitionsmocks.DefinitionHandlers) msh.On("EnsureLocalGroup", em.ctx, mock.Anything).Return(false, fmt.Errorf("pop")) err = em.MessageReceived(mdx, "peer1", b) @@ -867,7 +867,7 @@ func TestMessageReceiveMessageEnsureLocalGroupReject(t *testing.T) { mdi := em.database.(*databasemocks.Plugin) mdx := &dataexchangemocks.Plugin{} - msh := em.syshandlers.(*syshandlersmocks.SystemHandlers) + msh := em.definitions.(*definitionsmocks.DefinitionHandlers) msh.On("EnsureLocalGroup", em.ctx, mock.Anything).Return(false, nil) err = em.MessageReceived(mdx, "peer1", b) diff --git a/internal/events/event_dispatcher.go b/internal/events/event_dispatcher.go index 99312890f7..27d91326b2 100644 --- a/internal/events/event_dispatcher.go +++ b/internal/events/event_dispatcher.go @@ -24,10 +24,10 @@ import ( "github.com/hyperledger/firefly/internal/config" "github.com/hyperledger/firefly/internal/data" + "github.com/hyperledger/firefly/internal/definitions" "github.com/hyperledger/firefly/internal/i18n" "github.com/hyperledger/firefly/internal/log" "github.com/hyperledger/firefly/internal/retry" - "github.com/hyperledger/firefly/internal/syshandlers" "github.com/hyperledger/firefly/pkg/database" "github.com/hyperledger/firefly/pkg/events" "github.com/hyperledger/firefly/pkg/fftypes" @@ -52,7 +52,7 @@ type eventDispatcher struct { data data.Manager database database.Plugin transport events.Plugin - syshandlers syshandlers.SystemHandlers + definitions definitions.DefinitionHandlers elected bool eventPoller *eventPoller inflight map[fftypes.UUID]*fftypes.Event @@ -65,7 +65,7 @@ type eventDispatcher struct { changeEvents chan *fftypes.ChangeEvent } -func newEventDispatcher(ctx context.Context, ei events.Plugin, di database.Plugin, dm data.Manager, sh syshandlers.SystemHandlers, connID string, sub *subscription, en *eventNotifier, cel *changeEventListener) *eventDispatcher { +func newEventDispatcher(ctx context.Context, ei events.Plugin, di database.Plugin, dm data.Manager, sh definitions.DefinitionHandlers, connID string, sub *subscription, en *eventNotifier, cel *changeEventListener) *eventDispatcher { ctx, cancelCtx := context.WithCancel(ctx) readAhead := config.GetUint(config.SubscriptionDefaultsReadAhead) if sub.definition.Options.ReadAhead != nil { @@ -80,7 +80,7 @@ func newEventDispatcher(ctx context.Context, ei events.Plugin, di database.Plugi "sub", fmt.Sprintf("%s/%s:%s", sub.definition.ID, sub.definition.Namespace, sub.definition.Name)), database: di, transport: ei, - syshandlers: sh, + definitions: sh, data: dm, connID: connID, cancelCtx: cancelCtx, @@ -424,7 +424,7 @@ func (ed *eventDispatcher) deliveryResponse(response *fftypes.EventDeliveryRespo // We might have a message to send, do that before we dispatch the ack // Note a failure to send the reply does not invalidate the ack if response.Reply != nil { - ed.syshandlers.SendReply(ed.ctx, event, response.Reply) + ed.definitions.SendReply(ed.ctx, event, response.Reply) } l.Debugf("Response for %s event: %.10d/%s [%s]: ref=%s/%s rejected=%t info='%s'", ed.transport.Name(), event.Sequence, event.ID, event.Type, event.Namespace, event.Reference, response.Rejected, response.Info) diff --git a/internal/events/event_dispatcher_test.go b/internal/events/event_dispatcher_test.go index b67468f7d5..079a1a9dd5 100644 --- a/internal/events/event_dispatcher_test.go +++ b/internal/events/event_dispatcher_test.go @@ -26,8 +26,8 @@ import ( "github.com/hyperledger/firefly/internal/log" "github.com/hyperledger/firefly/mocks/databasemocks" "github.com/hyperledger/firefly/mocks/datamocks" + "github.com/hyperledger/firefly/mocks/definitionsmocks" "github.com/hyperledger/firefly/mocks/eventsmocks" - "github.com/hyperledger/firefly/mocks/syshandlersmocks" "github.com/hyperledger/firefly/pkg/database" "github.com/hyperledger/firefly/pkg/events" "github.com/hyperledger/firefly/pkg/fftypes" @@ -41,7 +41,7 @@ func newTestEventDispatcher(sub *subscription) (*eventDispatcher, func()) { mei.On("Capabilities").Return(&events.Capabilities{ChangeEvents: true}).Maybe() mei.On("Name").Return("ut").Maybe() mdm := &datamocks.Manager{} - msh := &syshandlersmocks.SystemHandlers{} + msh := &definitionsmocks.DefinitionHandlers{} ctx, cancel := context.WithCancel(context.Background()) return newEventDispatcher(ctx, mei, mdi, mdm, msh, fftypes.NewUUID().String(), sub, newEventNotifier(ctx, "ut"), newChangeEventListener(ctx)), func() { cancel() @@ -859,7 +859,7 @@ func TestEventDispatcherWithReply(t *testing.T) { ed, cancel := newTestEventDispatcher(sub) cancel() ed.acksNacks = make(chan ackNack, 2) - msh := ed.syshandlers.(*syshandlersmocks.SystemHandlers) + msh := ed.definitions.(*definitionsmocks.DefinitionHandlers) msh.On("SendReply", ed.ctx, mock.Anything, mock.Anything).Return(&fftypes.Message{}, nil) event1 := fftypes.NewUUID() diff --git a/internal/events/event_manager.go b/internal/events/event_manager.go index 70d6e0d56d..9f0d7789a6 100644 --- a/internal/events/event_manager.go +++ b/internal/events/event_manager.go @@ -26,6 +26,7 @@ import ( "github.com/hyperledger/firefly/internal/broadcast" "github.com/hyperledger/firefly/internal/config" "github.com/hyperledger/firefly/internal/data" + "github.com/hyperledger/firefly/internal/definitions" "github.com/hyperledger/firefly/internal/events/eifactory" "github.com/hyperledger/firefly/internal/events/system" "github.com/hyperledger/firefly/internal/i18n" @@ -33,7 +34,6 @@ import ( "github.com/hyperledger/firefly/internal/log" "github.com/hyperledger/firefly/internal/privatemessaging" "github.com/hyperledger/firefly/internal/retry" - "github.com/hyperledger/firefly/internal/syshandlers" "github.com/hyperledger/firefly/internal/sysmessaging" "github.com/hyperledger/firefly/internal/txcommon" "github.com/hyperledger/firefly/pkg/blockchain" @@ -79,7 +79,7 @@ type eventManager struct { publicstorage publicstorage.Plugin database database.Plugin identity identity.Manager - syshandlers syshandlers.SystemHandlers + definitions definitions.DefinitionHandlers data data.Manager subManager *subscriptionManager retry retry.Retry @@ -95,7 +95,7 @@ type eventManager struct { internalEvents *system.Events } -func NewEventManager(ctx context.Context, ni sysmessaging.LocalNodeInfo, pi publicstorage.Plugin, di database.Plugin, im identity.Manager, sh syshandlers.SystemHandlers, dm data.Manager, bm broadcast.Manager, pm privatemessaging.Manager, am assets.Manager) (EventManager, error) { +func NewEventManager(ctx context.Context, ni sysmessaging.LocalNodeInfo, pi publicstorage.Plugin, di database.Plugin, im identity.Manager, dh definitions.DefinitionHandlers, dm data.Manager, bm broadcast.Manager, pm privatemessaging.Manager, am assets.Manager) (EventManager, error) { if ni == nil || pi == nil || di == nil || im == nil || dm == nil || bm == nil || pm == nil || am == nil { return nil, i18n.NewError(ctx, i18n.MsgInitializationNilDepError) } @@ -107,7 +107,7 @@ func NewEventManager(ctx context.Context, ni sysmessaging.LocalNodeInfo, pi publ publicstorage: pi, database: di, identity: im, - syshandlers: sh, + definitions: dh, data: dm, broadcast: bm, messaging: pm, @@ -122,13 +122,13 @@ func NewEventManager(ctx context.Context, ni sysmessaging.LocalNodeInfo, pi publ opCorrelationRetries: config.GetInt(config.EventAggregatorOpCorrelationRetries), newEventNotifier: newEventNotifier, newPinNotifier: newPinNotifier, - aggregator: newAggregator(ctx, di, sh, dm, newPinNotifier), + aggregator: newAggregator(ctx, di, dh, dm, newPinNotifier), } ie, _ := eifactory.GetPlugin(ctx, system.SystemEventsTransport) em.internalEvents = ie.(*system.Events) var err error - if em.subManager, err = newSubscriptionManager(ctx, di, dm, newEventNotifier, sh); err != nil { + if em.subManager, err = newSubscriptionManager(ctx, di, dm, newEventNotifier, dh); err != nil { return nil, err } diff --git a/internal/events/event_manager_test.go b/internal/events/event_manager_test.go index 389c6471dd..a9b47a8204 100644 --- a/internal/events/event_manager_test.go +++ b/internal/events/event_manager_test.go @@ -27,11 +27,11 @@ import ( "github.com/hyperledger/firefly/mocks/broadcastmocks" "github.com/hyperledger/firefly/mocks/databasemocks" "github.com/hyperledger/firefly/mocks/datamocks" + "github.com/hyperledger/firefly/mocks/definitionsmocks" "github.com/hyperledger/firefly/mocks/eventsmocks" "github.com/hyperledger/firefly/mocks/identitymanagermocks" "github.com/hyperledger/firefly/mocks/privatemessagingmocks" "github.com/hyperledger/firefly/mocks/publicstoragemocks" - "github.com/hyperledger/firefly/mocks/syshandlersmocks" "github.com/hyperledger/firefly/mocks/sysmessagingmocks" "github.com/hyperledger/firefly/pkg/fftypes" "github.com/stretchr/testify/assert" @@ -48,7 +48,7 @@ func newTestEventManager(t *testing.T) (*eventManager, func()) { mpi := &publicstoragemocks.Plugin{} met := &eventsmocks.Plugin{} mdm := &datamocks.Manager{} - msh := &syshandlersmocks.SystemHandlers{} + msh := &definitionsmocks.DefinitionHandlers{} mbm := &broadcastmocks.Manager{} mpm := &privatemessagingmocks.Manager{} mam := &assetmocks.Manager{} @@ -97,7 +97,7 @@ func TestStartStopBadTransports(t *testing.T) { mim := &identitymanagermocks.Manager{} mpi := &publicstoragemocks.Plugin{} mdm := &datamocks.Manager{} - msh := &syshandlersmocks.SystemHandlers{} + msh := &definitionsmocks.DefinitionHandlers{} mbm := &broadcastmocks.Manager{} mpm := &privatemessagingmocks.Manager{} mni := &sysmessagingmocks.LocalNodeInfo{} diff --git a/internal/events/subscription_manager.go b/internal/events/subscription_manager.go index 3cd8d9a2d8..c9f6382267 100644 --- a/internal/events/subscription_manager.go +++ b/internal/events/subscription_manager.go @@ -23,12 +23,12 @@ import ( "github.com/hyperledger/firefly/internal/config" "github.com/hyperledger/firefly/internal/data" + "github.com/hyperledger/firefly/internal/definitions" "github.com/hyperledger/firefly/internal/events/eifactory" "github.com/hyperledger/firefly/internal/events/system" "github.com/hyperledger/firefly/internal/i18n" "github.com/hyperledger/firefly/internal/log" "github.com/hyperledger/firefly/internal/retry" - "github.com/hyperledger/firefly/internal/syshandlers" "github.com/hyperledger/firefly/pkg/database" "github.com/hyperledger/firefly/pkg/events" "github.com/hyperledger/firefly/pkg/fftypes" @@ -58,7 +58,7 @@ type subscriptionManager struct { database database.Plugin data data.Manager eventNotifier *eventNotifier - syshandlers syshandlers.SystemHandlers + definitions definitions.DefinitionHandlers transports map[string]events.Plugin connections map[string]*connection mux sync.Mutex @@ -71,7 +71,7 @@ type subscriptionManager struct { retry retry.Retry } -func newSubscriptionManager(ctx context.Context, di database.Plugin, dm data.Manager, en *eventNotifier, sh syshandlers.SystemHandlers) (*subscriptionManager, error) { +func newSubscriptionManager(ctx context.Context, di database.Plugin, dm data.Manager, en *eventNotifier, sh definitions.DefinitionHandlers) (*subscriptionManager, error) { ctx, cancelCtx := context.WithCancel(ctx) sm := &subscriptionManager{ ctx: ctx, @@ -85,7 +85,7 @@ func newSubscriptionManager(ctx context.Context, di database.Plugin, dm data.Man maxSubs: uint64(config.GetUint(config.SubscriptionMax)), cancelCtx: cancelCtx, eventNotifier: en, - syshandlers: sh, + definitions: sh, retry: retry.Retry{ InitialDelay: config.GetDuration(config.SubscriptionsRetryInitialDelay), MaximumDelay: config.GetDuration(config.SubscriptionsRetryMaxDelay), @@ -381,7 +381,7 @@ func (sm *subscriptionManager) matchSubToConnLocked(conn *connection, sub *subsc } if conn.transport == sub.definition.Transport && conn.matcher(sub.definition.SubscriptionRef) { if _, ok := conn.dispatchers[*sub.definition.ID]; !ok { - dispatcher := newEventDispatcher(sm.ctx, conn.ei, sm.database, sm.data, sm.syshandlers, conn.id, sub, sm.eventNotifier, sm.cel) + dispatcher := newEventDispatcher(sm.ctx, conn.ei, sm.database, sm.data, sm.definitions, conn.id, sub, sm.eventNotifier, sm.cel) conn.dispatchers[*sub.definition.ID] = dispatcher dispatcher.start() } @@ -418,7 +418,7 @@ func (sm *subscriptionManager) ephemeralSubscription(ei events.Plugin, connID, n } // Create the dispatcher, and start immediately - dispatcher := newEventDispatcher(sm.ctx, ei, sm.database, sm.data, sm.syshandlers, connID, newSub, sm.eventNotifier, sm.cel) + dispatcher := newEventDispatcher(sm.ctx, ei, sm.database, sm.data, sm.definitions, connID, newSub, sm.eventNotifier, sm.cel) dispatcher.start() conn.dispatchers[*subID] = dispatcher diff --git a/internal/events/subscription_manager_test.go b/internal/events/subscription_manager_test.go index f442e93f4b..51e5179080 100644 --- a/internal/events/subscription_manager_test.go +++ b/internal/events/subscription_manager_test.go @@ -24,8 +24,8 @@ import ( "github.com/hyperledger/firefly/internal/config" "github.com/hyperledger/firefly/mocks/databasemocks" "github.com/hyperledger/firefly/mocks/datamocks" + "github.com/hyperledger/firefly/mocks/definitionsmocks" "github.com/hyperledger/firefly/mocks/eventsmocks" - "github.com/hyperledger/firefly/mocks/syshandlersmocks" "github.com/hyperledger/firefly/pkg/events" "github.com/hyperledger/firefly/pkg/fftypes" "github.com/stretchr/testify/assert" @@ -38,7 +38,7 @@ func newTestSubManager(t *testing.T, mei *eventsmocks.PluginAll) (*subscriptionM mdi := &databasemocks.Plugin{} mdm := &datamocks.Manager{} - msh := &syshandlersmocks.SystemHandlers{} + msh := &definitionsmocks.DefinitionHandlers{} ctx, cancel := context.WithCancel(context.Background()) mei.On("Name").Return("ut") diff --git a/internal/networkmap/register_node.go b/internal/networkmap/register_node.go index f8b78b40e2..01dc631d81 100644 --- a/internal/networkmap/register_node.go +++ b/internal/networkmap/register_node.go @@ -63,7 +63,7 @@ func (nm *networkMap) RegisterNode(ctx context.Context, waitConfirm bool) (node return nil, nil, err } - msg, err = nm.broadcast.BroadcastDefinitionAsNode(ctx, node, fftypes.SystemTagDefineNode, waitConfirm) + msg, err = nm.broadcast.BroadcastDefinitionAsNode(ctx, fftypes.SystemNamespace, node, fftypes.SystemTagDefineNode, waitConfirm) if msg != nil { node.Message = msg.Header.ID } diff --git a/internal/networkmap/register_node_test.go b/internal/networkmap/register_node_test.go index a093819b25..9fb9f21893 100644 --- a/internal/networkmap/register_node_test.go +++ b/internal/networkmap/register_node_test.go @@ -53,7 +53,7 @@ func TestRegisterNodeOk(t *testing.T) { mockMsg := &fftypes.Message{Header: fftypes.MessageHeader{ID: fftypes.NewUUID()}} mbm := nm.broadcast.(*broadcastmocks.Manager) - mbm.On("BroadcastDefinitionAsNode", nm.ctx, mock.Anything, fftypes.SystemTagDefineNode, true).Return(mockMsg, nil) + mbm.On("BroadcastDefinitionAsNode", nm.ctx, fftypes.SystemNamespace, mock.Anything, fftypes.SystemTagDefineNode, true).Return(mockMsg, nil) node, msg, err := nm.RegisterNode(nm.ctx, true) assert.NoError(t, err) diff --git a/internal/orchestrator/orchestrator.go b/internal/orchestrator/orchestrator.go index 3966837284..55c912df2b 100644 --- a/internal/orchestrator/orchestrator.go +++ b/internal/orchestrator/orchestrator.go @@ -29,6 +29,7 @@ import ( "github.com/hyperledger/firefly/internal/data" "github.com/hyperledger/firefly/internal/database/difactory" "github.com/hyperledger/firefly/internal/dataexchange/dxfactory" + "github.com/hyperledger/firefly/internal/definitions" "github.com/hyperledger/firefly/internal/events" "github.com/hyperledger/firefly/internal/i18n" "github.com/hyperledger/firefly/internal/identity" @@ -38,7 +39,6 @@ import ( "github.com/hyperledger/firefly/internal/privatemessaging" "github.com/hyperledger/firefly/internal/publicstorage/psfactory" "github.com/hyperledger/firefly/internal/syncasync" - "github.com/hyperledger/firefly/internal/syshandlers" "github.com/hyperledger/firefly/internal/tokens/tifactory" "github.com/hyperledger/firefly/pkg/blockchain" "github.com/hyperledger/firefly/pkg/database" @@ -135,7 +135,7 @@ type orchestrator struct { batch batch.Manager broadcast broadcast.Manager messaging privatemessaging.Manager - syshandlers syshandlers.SystemHandlers + definitions definitions.DefinitionHandlers data data.Manager syncasync syncasync.Bridge batchpin batchpin.Submitter @@ -413,10 +413,10 @@ func (or *orchestrator) initComponents(ctx context.Context) (err error) { } } - or.syshandlers = syshandlers.NewSystemHandlers(or.database, or.dataexchange, or.data, or.broadcast, or.messaging, or.assets) + or.definitions = definitions.NewDefinitionHandlers(or.database, or.dataexchange, or.data, or.broadcast, or.messaging, or.assets) if or.events == nil { - or.events, err = events.NewEventManager(ctx, or, or.publicstorage, or.database, or.identity, or.syshandlers, or.data, or.broadcast, or.messaging, or.assets) + or.events, err = events.NewEventManager(ctx, or, or.publicstorage, or.database, or.identity, or.definitions, or.data, or.broadcast, or.messaging, or.assets) if err != nil { return err } diff --git a/mocks/broadcastmocks/manager.go b/mocks/broadcastmocks/manager.go index 816b4d61c9..f419341a4f 100644 --- a/mocks/broadcastmocks/manager.go +++ b/mocks/broadcastmocks/manager.go @@ -39,13 +39,13 @@ func (_m *Manager) BroadcastDatatype(ctx context.Context, ns string, datatype *f return r0, r1 } -// BroadcastDefinition provides a mock function with given fields: ctx, def, signingIdentity, tag, waitConfirm -func (_m *Manager) BroadcastDefinition(ctx context.Context, def fftypes.Definition, signingIdentity *fftypes.Identity, tag fftypes.SystemTag, waitConfirm bool) (*fftypes.Message, error) { - ret := _m.Called(ctx, def, signingIdentity, tag, waitConfirm) +// BroadcastDefinition provides a mock function with given fields: ctx, ns, def, signingIdentity, tag, waitConfirm +func (_m *Manager) BroadcastDefinition(ctx context.Context, ns string, def fftypes.Definition, signingIdentity *fftypes.Identity, tag fftypes.SystemTag, waitConfirm bool) (*fftypes.Message, error) { + ret := _m.Called(ctx, ns, def, signingIdentity, tag, waitConfirm) var r0 *fftypes.Message - if rf, ok := ret.Get(0).(func(context.Context, fftypes.Definition, *fftypes.Identity, fftypes.SystemTag, bool) *fftypes.Message); ok { - r0 = rf(ctx, def, signingIdentity, tag, waitConfirm) + if rf, ok := ret.Get(0).(func(context.Context, string, fftypes.Definition, *fftypes.Identity, fftypes.SystemTag, bool) *fftypes.Message); ok { + r0 = rf(ctx, ns, def, signingIdentity, tag, waitConfirm) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(*fftypes.Message) @@ -53,8 +53,8 @@ func (_m *Manager) BroadcastDefinition(ctx context.Context, def fftypes.Definiti } var r1 error - if rf, ok := ret.Get(1).(func(context.Context, fftypes.Definition, *fftypes.Identity, fftypes.SystemTag, bool) error); ok { - r1 = rf(ctx, def, signingIdentity, tag, waitConfirm) + if rf, ok := ret.Get(1).(func(context.Context, string, fftypes.Definition, *fftypes.Identity, fftypes.SystemTag, bool) error); ok { + r1 = rf(ctx, ns, def, signingIdentity, tag, waitConfirm) } else { r1 = ret.Error(1) } @@ -62,13 +62,13 @@ func (_m *Manager) BroadcastDefinition(ctx context.Context, def fftypes.Definiti return r0, r1 } -// BroadcastDefinitionAsNode provides a mock function with given fields: ctx, def, tag, waitConfirm -func (_m *Manager) BroadcastDefinitionAsNode(ctx context.Context, def fftypes.Definition, tag fftypes.SystemTag, waitConfirm bool) (*fftypes.Message, error) { - ret := _m.Called(ctx, def, tag, waitConfirm) +// BroadcastDefinitionAsNode provides a mock function with given fields: ctx, ns, def, tag, waitConfirm +func (_m *Manager) BroadcastDefinitionAsNode(ctx context.Context, ns string, def fftypes.Definition, tag fftypes.SystemTag, waitConfirm bool) (*fftypes.Message, error) { + ret := _m.Called(ctx, ns, def, tag, waitConfirm) var r0 *fftypes.Message - if rf, ok := ret.Get(0).(func(context.Context, fftypes.Definition, fftypes.SystemTag, bool) *fftypes.Message); ok { - r0 = rf(ctx, def, tag, waitConfirm) + if rf, ok := ret.Get(0).(func(context.Context, string, fftypes.Definition, fftypes.SystemTag, bool) *fftypes.Message); ok { + r0 = rf(ctx, ns, def, tag, waitConfirm) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(*fftypes.Message) @@ -76,8 +76,8 @@ func (_m *Manager) BroadcastDefinitionAsNode(ctx context.Context, def fftypes.De } var r1 error - if rf, ok := ret.Get(1).(func(context.Context, fftypes.Definition, fftypes.SystemTag, bool) error); ok { - r1 = rf(ctx, def, tag, waitConfirm) + if rf, ok := ret.Get(1).(func(context.Context, string, fftypes.Definition, fftypes.SystemTag, bool) error); ok { + r1 = rf(ctx, ns, def, tag, waitConfirm) } else { r1 = ret.Error(1) } diff --git a/mocks/syshandlersmocks/system_handlers.go b/mocks/definitionsmocks/definition_handlers.go similarity index 72% rename from mocks/syshandlersmocks/system_handlers.go rename to mocks/definitionsmocks/definition_handlers.go index 30f2986e5d..2e81092fae 100644 --- a/mocks/syshandlersmocks/system_handlers.go +++ b/mocks/definitionsmocks/definition_handlers.go @@ -1,25 +1,25 @@ // Code generated by mockery v1.0.0. DO NOT EDIT. -package syshandlersmocks +package definitionsmocks import ( context "context" + definitions "github.com/hyperledger/firefly/internal/definitions" database "github.com/hyperledger/firefly/pkg/database" + fftypes "github.com/hyperledger/firefly/pkg/fftypes" mock "github.com/stretchr/testify/mock" - - syshandlers "github.com/hyperledger/firefly/internal/syshandlers" ) -// SystemHandlers is an autogenerated mock type for the SystemHandlers type -type SystemHandlers struct { +// DefinitionHandlers is an autogenerated mock type for the DefinitionHandlers type +type DefinitionHandlers struct { mock.Mock } // EnsureLocalGroup provides a mock function with given fields: ctx, group -func (_m *SystemHandlers) EnsureLocalGroup(ctx context.Context, group *fftypes.Group) (bool, error) { +func (_m *DefinitionHandlers) EnsureLocalGroup(ctx context.Context, group *fftypes.Group) (bool, error) { ret := _m.Called(ctx, group) var r0 bool @@ -40,7 +40,7 @@ func (_m *SystemHandlers) EnsureLocalGroup(ctx context.Context, group *fftypes.G } // GetGroupByID provides a mock function with given fields: ctx, id -func (_m *SystemHandlers) GetGroupByID(ctx context.Context, id string) (*fftypes.Group, error) { +func (_m *DefinitionHandlers) GetGroupByID(ctx context.Context, id string) (*fftypes.Group, error) { ret := _m.Called(ctx, id) var r0 *fftypes.Group @@ -63,7 +63,7 @@ func (_m *SystemHandlers) GetGroupByID(ctx context.Context, id string) (*fftypes } // GetGroups provides a mock function with given fields: ctx, filter -func (_m *SystemHandlers) GetGroups(ctx context.Context, filter database.AndFilter) ([]*fftypes.Group, *database.FilterResult, error) { +func (_m *DefinitionHandlers) GetGroups(ctx context.Context, filter database.AndFilter) ([]*fftypes.Group, *database.FilterResult, error) { ret := _m.Called(ctx, filter) var r0 []*fftypes.Group @@ -95,14 +95,14 @@ func (_m *SystemHandlers) GetGroups(ctx context.Context, filter database.AndFilt } // HandleSystemBroadcast provides a mock function with given fields: ctx, msg, data -func (_m *SystemHandlers) HandleSystemBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (syshandlers.SystemBroadcastAction, error) { +func (_m *DefinitionHandlers) HandleSystemBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (definitions.SystemBroadcastAction, error) { ret := _m.Called(ctx, msg, data) - var r0 syshandlers.SystemBroadcastAction - if rf, ok := ret.Get(0).(func(context.Context, *fftypes.Message, []*fftypes.Data) syshandlers.SystemBroadcastAction); ok { + var r0 definitions.SystemBroadcastAction + if rf, ok := ret.Get(0).(func(context.Context, *fftypes.Message, []*fftypes.Data) definitions.SystemBroadcastAction); ok { r0 = rf(ctx, msg, data) } else { - r0 = ret.Get(0).(syshandlers.SystemBroadcastAction) + r0 = ret.Get(0).(definitions.SystemBroadcastAction) } var r1 error @@ -116,7 +116,7 @@ func (_m *SystemHandlers) HandleSystemBroadcast(ctx context.Context, msg *fftype } // ResolveInitGroup provides a mock function with given fields: ctx, msg -func (_m *SystemHandlers) ResolveInitGroup(ctx context.Context, msg *fftypes.Message) (*fftypes.Group, error) { +func (_m *DefinitionHandlers) ResolveInitGroup(ctx context.Context, msg *fftypes.Message) (*fftypes.Group, error) { ret := _m.Called(ctx, msg) var r0 *fftypes.Group @@ -139,6 +139,6 @@ func (_m *SystemHandlers) ResolveInitGroup(ctx context.Context, msg *fftypes.Mes } // SendReply provides a mock function with given fields: ctx, event, reply -func (_m *SystemHandlers) SendReply(ctx context.Context, event *fftypes.Event, reply *fftypes.MessageInOut) { +func (_m *DefinitionHandlers) SendReply(ctx context.Context, event *fftypes.Event, reply *fftypes.MessageInOut) { _m.Called(ctx, event, reply) } diff --git a/test/e2e/tokens_test.go b/test/e2e/tokens_test.go index 4e5f10cf25..f50566ff95 100644 --- a/test/e2e/tokens_test.go +++ b/test/e2e/tokens_test.go @@ -50,7 +50,8 @@ func (suite *TokensTestSuite) TestE2EFungibleTokensAsync() { } CreateTokenPool(suite.T(), suite.testState.client1, pool, false) - <-received1 + <-received1 // event for token pool creation + <-received1 // event for token pool announcement pools = GetTokenPools(suite.T(), suite.testState.client1, suite.testState.startTime) assert.Equal(suite.T(), 1, len(pools)) assert.Equal(suite.T(), "default", pools[0].Namespace) @@ -61,7 +62,8 @@ func (suite *TokensTestSuite) TestE2EFungibleTokensAsync() { poolID := pools[0].ID - <-received2 + <-received2 // event for token pool creation + <-received2 // event for token pool announcement pools = GetTokenPools(suite.T(), suite.testState.client1, suite.testState.startTime) assert.Equal(suite.T(), 1, len(pools)) assert.Equal(suite.T(), "default", pools[0].Namespace) @@ -192,8 +194,10 @@ func (suite *TokensTestSuite) TestE2ENonFungibleTokensSync() { poolID := poolOut.ID - <-received1 - <-received2 + <-received1 // event for token pool creation + <-received2 // event for token pool announcement + <-received1 // event for token pool creation + <-received2 // event for token pool announcement pools = GetTokenPools(suite.T(), suite.testState.client1, suite.testState.startTime) assert.Equal(suite.T(), 1, len(pools)) assert.Equal(suite.T(), "default", pools[0].Namespace)