From 8e57e878baea216472068eb5d7e126ef82ccc963 Mon Sep 17 00:00:00 2001 From: Andrew Richardson Date: Mon, 31 Jan 2022 12:45:28 -0500 Subject: [PATCH 1/4] Adjust all definition handlers to return SystemBroadcastAction Signed-off-by: Andrew Richardson --- internal/definitions/definition_handler.go | 18 +++--------- .../definition_handler_datatype.go | 26 ++++++++--------- .../definition_handler_namespace.go | 22 +++++++-------- .../definition_handler_network_node.go | 28 +++++++++---------- .../definition_handler_network_org.go | 26 ++++++++--------- 5 files changed, 55 insertions(+), 65 deletions(-) diff --git a/internal/definitions/definition_handler.go b/internal/definitions/definition_handler.go index 1f0d0430aa..16d17b8f71 100644 --- a/internal/definitions/definition_handler.go +++ b/internal/definitions/definition_handler.go @@ -89,17 +89,15 @@ func (dh *definitionHandlers) EnsureLocalGroup(ctx context.Context, group *fftyp func (dh *definitionHandlers) HandleSystemBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (SystemBroadcastAction, error) { l := log.L(ctx) l.Infof("Confirming system broadcast '%s' [%s]", msg.Header.Tag, msg.Header.ID) - var valid bool - var err error switch fftypes.SystemTag(msg.Header.Tag) { case fftypes.SystemTagDefineDatatype: - valid, err = dh.handleDatatypeBroadcast(ctx, msg, data) + return dh.handleDatatypeBroadcast(ctx, msg, data) case fftypes.SystemTagDefineNamespace: - valid, err = dh.handleNamespaceBroadcast(ctx, msg, data) + return dh.handleNamespaceBroadcast(ctx, msg, data) case fftypes.SystemTagDefineOrganization: - valid, err = dh.handleOrganizationBroadcast(ctx, msg, data) + return dh.handleOrganizationBroadcast(ctx, msg, data) case fftypes.SystemTagDefineNode: - valid, err = dh.handleNodeBroadcast(ctx, msg, data) + return dh.handleNodeBroadcast(ctx, msg, data) case fftypes.SystemTagDefinePool: return dh.handleTokenPoolBroadcast(ctx, msg, data) case fftypes.SystemTagDefineFFI: @@ -110,14 +108,6 @@ func (dh *definitionHandlers) HandleSystemBroadcast(ctx context.Context, msg *ff l.Warnf("Unknown SystemTag '%s' for definition ID '%s'", msg.Header.Tag, msg.Header.ID) return ActionReject, nil } - switch { - case err != nil: - return ActionRetry, err - case !valid: - return ActionReject, nil - default: - return ActionConfirm, nil - } } func (dh *definitionHandlers) getSystemBroadcastPayload(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data, res fftypes.Definition) (valid bool) { diff --git a/internal/definitions/definition_handler_datatype.go b/internal/definitions/definition_handler_datatype.go index 10e355b16c..dc73efd90a 100644 --- a/internal/definitions/definition_handler_datatype.go +++ b/internal/definitions/definition_handler_datatype.go @@ -1,4 +1,4 @@ -// Copyright © 2021 Kaleido, Inc. +// Copyright © 2022 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -23,42 +23,42 @@ import ( "github.com/hyperledger/firefly/pkg/fftypes" ) -func (dh *definitionHandlers) handleDatatypeBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (valid bool, err error) { +func (dh *definitionHandlers) handleDatatypeBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (SystemBroadcastAction, error) { l := log.L(ctx) var dt fftypes.Datatype - valid = dh.getSystemBroadcastPayload(ctx, msg, data, &dt) + valid := dh.getSystemBroadcastPayload(ctx, msg, data, &dt) if !valid { - return false, nil + return ActionReject, nil } - if err = dt.Validate(ctx, true); err != nil { + if err := dt.Validate(ctx, true); err != nil { l.Warnf("Unable to process datatype broadcast %s - validate failed: %s", msg.Header.ID, err) - return false, nil + return ActionReject, nil } - if err = dh.data.CheckDatatype(ctx, dt.Namespace, &dt); err != nil { + if err := dh.data.CheckDatatype(ctx, dt.Namespace, &dt); err != nil { l.Warnf("Unable to process datatype broadcast %s - schema check: %s", msg.Header.ID, err) - return false, nil + return ActionReject, nil } existing, err := dh.database.GetDatatypeByName(ctx, dt.Namespace, dt.Name, dt.Version) if err != nil { - return false, err // We only return database errors + return ActionRetry, err // We only return database errors } if existing != nil { l.Warnf("Unable to process datatype broadcast %s (%s:%s) - duplicate of %v", msg.Header.ID, dt.Namespace, dt, existing.ID) - return false, nil + return ActionReject, nil } if err = dh.database.UpsertDatatype(ctx, &dt, false); err != nil { - return false, err + return ActionRetry, err } event := fftypes.NewEvent(fftypes.EventTypeDatatypeConfirmed, dt.Namespace, dt.ID) if err = dh.database.InsertEvent(ctx, event); err != nil { - return false, err + return ActionRetry, err } - return true, nil + return ActionConfirm, nil } diff --git a/internal/definitions/definition_handler_namespace.go b/internal/definitions/definition_handler_namespace.go index bb04c74c78..5d861b5125 100644 --- a/internal/definitions/definition_handler_namespace.go +++ b/internal/definitions/definition_handler_namespace.go @@ -1,4 +1,4 @@ -// Copyright © 2021 Kaleido, Inc. +// Copyright © 2022 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -23,42 +23,42 @@ import ( "github.com/hyperledger/firefly/pkg/fftypes" ) -func (dh *definitionHandlers) handleNamespaceBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (valid bool, err error) { +func (dh *definitionHandlers) handleNamespaceBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (SystemBroadcastAction, error) { l := log.L(ctx) var ns fftypes.Namespace - valid = dh.getSystemBroadcastPayload(ctx, msg, data, &ns) + valid := dh.getSystemBroadcastPayload(ctx, msg, data, &ns) if !valid { - return false, nil + return ActionReject, nil } if err := ns.Validate(ctx, true); err != nil { l.Warnf("Unable to process namespace broadcast %s - validate failed: %s", msg.Header.ID, err) - return false, nil + return ActionReject, nil } existing, err := dh.database.GetNamespace(ctx, ns.Name) if err != nil { - return false, err // We only return database errors + return ActionRetry, err // We only return database errors } if existing != nil { if existing.Type != fftypes.NamespaceTypeLocal { l.Warnf("Unable to process namespace broadcast %s (name=%s) - duplicate of %v", msg.Header.ID, existing.Name, existing.ID) - return false, nil + return ActionReject, nil } // Remove the local definition if err = dh.database.DeleteNamespace(ctx, existing.ID); err != nil { - return false, err + return ActionRetry, err } } if err = dh.database.UpsertNamespace(ctx, &ns, false); err != nil { - return false, err + return ActionRetry, err } event := fftypes.NewEvent(fftypes.EventTypeNamespaceConfirmed, ns.Name, ns.ID) if err = dh.database.InsertEvent(ctx, event); err != nil { - return false, err + return ActionRetry, err } - return true, nil + return ActionConfirm, nil } diff --git a/internal/definitions/definition_handler_network_node.go b/internal/definitions/definition_handler_network_node.go index d7f761a6a6..19aac1b62d 100644 --- a/internal/definitions/definition_handler_network_node.go +++ b/internal/definitions/definition_handler_network_node.go @@ -1,4 +1,4 @@ -// Copyright © 2021 Kaleido, Inc. +// Copyright © 2022 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -23,32 +23,32 @@ import ( "github.com/hyperledger/firefly/pkg/fftypes" ) -func (dh *definitionHandlers) handleNodeBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (valid bool, err error) { +func (dh *definitionHandlers) handleNodeBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (SystemBroadcastAction, error) { l := log.L(ctx) var node fftypes.Node - valid = dh.getSystemBroadcastPayload(ctx, msg, data, &node) + valid := dh.getSystemBroadcastPayload(ctx, msg, data, &node) if !valid { - return false, nil + return ActionReject, nil } - if err = node.Validate(ctx, true); err != nil { + if err := node.Validate(ctx, true); err != nil { l.Warnf("Unable to process node broadcast %s - validate failed: %s", msg.Header.ID, err) - return false, nil + return ActionReject, nil } owner, err := dh.database.GetOrganizationByIdentity(ctx, node.Owner) if err != nil { - return false, err // We only return database errors + return ActionRetry, err // We only return database errors } if owner == nil { l.Warnf("Unable to process node broadcast %s - parent identity not found: %s", msg.Header.ID, node.Owner) - return false, nil + return ActionReject, nil } if msg.Header.Key != node.Owner { l.Warnf("Unable to process node broadcast %s - incorrect signature. Expected=%s Received=%s", msg.Header.ID, node.Owner, msg.Header.Author) - return false, nil + return ActionReject, nil } existing, err := dh.database.GetNode(ctx, node.Owner, node.Name) @@ -56,24 +56,24 @@ func (dh *definitionHandlers) handleNodeBroadcast(ctx context.Context, msg *ffty existing, err = dh.database.GetNodeByID(ctx, node.ID) } if err != nil { - return false, err // We only return database errors + return ActionRetry, err // We only return database errors } if existing != nil { if existing.Owner != node.Owner { l.Warnf("Unable to process node broadcast %s - mismatch with existing %v", msg.Header.ID, existing.ID) - return false, nil + return ActionReject, nil } node.ID = nil // we keep the existing ID } if err = dh.database.UpsertNode(ctx, &node, true); err != nil { - return false, err + return ActionRetry, err } // Tell the data exchange about this node. Treat these errors like database errors - and return for retry processing if err = dh.exchange.AddPeer(ctx, node.DX.Peer, node.DX.Endpoint); err != nil { - return false, err + return ActionRetry, err } - return true, nil + return ActionConfirm, nil } diff --git a/internal/definitions/definition_handler_network_org.go b/internal/definitions/definition_handler_network_org.go index e8a6f68e71..c02a4b3cad 100644 --- a/internal/definitions/definition_handler_network_org.go +++ b/internal/definitions/definition_handler_network_org.go @@ -1,4 +1,4 @@ -// Copyright © 2021 Kaleido, Inc. +// Copyright © 2022 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -23,33 +23,33 @@ import ( "github.com/hyperledger/firefly/pkg/fftypes" ) -func (dh *definitionHandlers) handleOrganizationBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (valid bool, err error) { +func (dh *definitionHandlers) handleOrganizationBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (SystemBroadcastAction, error) { l := log.L(ctx) var org fftypes.Organization - valid = dh.getSystemBroadcastPayload(ctx, msg, data, &org) + valid := dh.getSystemBroadcastPayload(ctx, msg, data, &org) if !valid { - return false, nil + return ActionReject, nil } - if err = org.Validate(ctx, true); err != nil { + if err := org.Validate(ctx, true); err != nil { l.Warnf("Unable to process organization broadcast %s - validate failed: %s", msg.Header.ID, err) - return false, nil + return ActionReject, nil } if org.Parent != "" { parent, err := dh.database.GetOrganizationByIdentity(ctx, org.Parent) if err != nil { - return false, err // We only return database errors + return ActionRetry, err // We only return database errors } if parent == nil { l.Warnf("Unable to process organization broadcast %s - parent identity not found: %s", msg.Header.ID, org.Parent) - return false, nil + return ActionReject, nil } if msg.Header.Key != parent.Identity { l.Warnf("Unable to process organization broadcast %s - incorrect signature. Expected=%s Received=%s", msg.Header.ID, parent.Identity, msg.Header.Author) - return false, nil + return ActionReject, nil } } @@ -61,19 +61,19 @@ func (dh *definitionHandlers) handleOrganizationBroadcast(ctx context.Context, m } } if err != nil { - return false, err // We only return database errors + return ActionRetry, err // We only return database errors } if existing != nil { if existing.Parent != org.Parent { l.Warnf("Unable to process organization broadcast %s - mismatch with existing %v", msg.Header.ID, existing.ID) - return false, nil + return ActionReject, nil } org.ID = nil // we keep the existing ID } if err = dh.database.UpsertOrganization(ctx, &org, true); err != nil { - return false, err + return ActionRetry, err } - return true, nil + return ActionConfirm, nil } From e8d029a3d981f9be0f849339cb8adba8280a8667 Mon Sep 17 00:00:00 2001 From: Andrew Richardson Date: Tue, 1 Feb 2022 13:46:19 -0500 Subject: [PATCH 2/4] Add DefinitionBatchActions to execute after a batch of system definitions The bulk of the processing for a system definition broadcast occurs synchronously, within a database RunAsGroup call. For definitions that trigger calls out of FireFly (ie via plugins), there must be an option to trigger steps outside RunAsGroup (and possibly another final set of steps inside RunAsGroup). Signed-off-by: Andrew Richardson --- internal/definitions/definition_handler.go | 42 +++-- .../definition_handler_contracts.go | 4 +- .../definition_handler_contracts_test.go | 12 +- .../definition_handler_datatype.go | 2 +- .../definition_handler_datatype_test.go | 16 +- .../definition_handler_namespace.go | 2 +- .../definition_handler_namespace_test.go | 20 +-- .../definition_handler_network_node.go | 2 +- .../definition_handler_network_node_test.go | 22 +-- .../definition_handler_network_org.go | 2 +- .../definition_handler_network_org_test.go | 22 +-- .../definitions/definition_handler_test.go | 4 +- .../definition_handler_tokenpool.go | 32 ++-- .../definition_handler_tokenpool_test.go | 49 ++--- internal/events/aggregator.go | 169 +++++++++++++----- internal/events/aggregator_test.go | 83 ++++++--- internal/txcommon/txcommon.go | 18 +- mocks/definitionsmocks/definition_handlers.go | 27 ++- 18 files changed, 333 insertions(+), 195 deletions(-) diff --git a/internal/definitions/definition_handler.go b/internal/definitions/definition_handler.go index 16d17b8f71..f5c096af58 100644 --- a/internal/definitions/definition_handler.go +++ b/internal/definitions/definition_handler.go @@ -35,19 +35,36 @@ import ( type DefinitionHandlers interface { privatemessaging.GroupManager - HandleSystemBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (SystemBroadcastAction, error) + HandleDefinitionBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (DefinitionMessageAction, *DefinitionBatchActions, error) SendReply(ctx context.Context, event *fftypes.Event, reply *fftypes.MessageInOut) } -type SystemBroadcastAction int +// DefinitionMessageAction is the action to be taken on an individual definition message +type DefinitionMessageAction int const ( - ActionReject SystemBroadcastAction = iota + // ActionReject the message was successfully processed, but was malformed/invalid and should be marked as rejected + ActionReject DefinitionMessageAction = iota + + // ActionConfirm the message was valid and should be confirmed ActionConfirm + + // ActionRetry a recoverable error was encountered - batch should be halted and then re-processed from the start ActionRetry + + // ActionWait the message is still awaiting further pieces for aggregation and should be held in pending state ActionWait ) +// DefinitionBatchActions are actions to be taken at the end of a definition batch - should only run if batch is successfully processed +type DefinitionBatchActions struct { + // PreFinalize may perform a blocking action (possibly to an external connector) that should execute outside database RunAsGroup + PreFinalize func(ctx context.Context) error + + // Finalize may perform final, non-idempotent database operations (such as inserting Events) + Finalize func(ctx context.Context) error +} + type definitionHandlers struct { database database.Plugin exchange dataexchange.Plugin @@ -86,28 +103,29 @@ func (dh *definitionHandlers) EnsureLocalGroup(ctx context.Context, group *fftyp return dh.messaging.EnsureLocalGroup(ctx, group) } -func (dh *definitionHandlers) HandleSystemBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (SystemBroadcastAction, error) { +func (dh *definitionHandlers) HandleDefinitionBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (msgAction DefinitionMessageAction, batchActions *DefinitionBatchActions, err error) { l := log.L(ctx) - l.Infof("Confirming system broadcast '%s' [%s]", msg.Header.Tag, msg.Header.ID) + l.Infof("Confirming system definition broadcast '%s' [%s]", msg.Header.Tag, msg.Header.ID) switch fftypes.SystemTag(msg.Header.Tag) { case fftypes.SystemTagDefineDatatype: - return dh.handleDatatypeBroadcast(ctx, msg, data) + msgAction, err = dh.handleDatatypeBroadcast(ctx, msg, data) case fftypes.SystemTagDefineNamespace: - return dh.handleNamespaceBroadcast(ctx, msg, data) + msgAction, err = dh.handleNamespaceBroadcast(ctx, msg, data) case fftypes.SystemTagDefineOrganization: - return dh.handleOrganizationBroadcast(ctx, msg, data) + msgAction, err = dh.handleOrganizationBroadcast(ctx, msg, data) case fftypes.SystemTagDefineNode: - return dh.handleNodeBroadcast(ctx, msg, data) + msgAction, err = dh.handleNodeBroadcast(ctx, msg, data) case fftypes.SystemTagDefinePool: return dh.handleTokenPoolBroadcast(ctx, msg, data) case fftypes.SystemTagDefineFFI: - return dh.handleFFIBroadcast(ctx, msg, data) + msgAction, err = dh.handleFFIBroadcast(ctx, msg, data) case fftypes.SystemTagDefineContractAPI: - return dh.handleContractAPIBroadcast(ctx, msg, data) + msgAction, err = dh.handleContractAPIBroadcast(ctx, msg, data) default: l.Warnf("Unknown SystemTag '%s' for definition ID '%s'", msg.Header.Tag, msg.Header.ID) - return ActionReject, nil + return ActionReject, nil, nil } + return } func (dh *definitionHandlers) getSystemBroadcastPayload(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data, res fftypes.Definition) (valid bool) { diff --git a/internal/definitions/definition_handler_contracts.go b/internal/definitions/definition_handler_contracts.go index 6b8b3fd238..aca2433a7a 100644 --- a/internal/definitions/definition_handler_contracts.go +++ b/internal/definitions/definition_handler_contracts.go @@ -65,7 +65,7 @@ func (dh *definitionHandlers) persistContractAPI(ctx context.Context, api *fftyp return err == nil, err } -func (dh *definitionHandlers) handleFFIBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (actionResult SystemBroadcastAction, err error) { +func (dh *definitionHandlers) handleFFIBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (actionResult DefinitionMessageAction, err error) { l := log.L(ctx) var broadcast fftypes.FFI valid := dh.getSystemBroadcastPayload(ctx, msg, data, &broadcast) @@ -97,7 +97,7 @@ func (dh *definitionHandlers) handleFFIBroadcast(ctx context.Context, msg *fftyp return } -func (dh *definitionHandlers) handleContractAPIBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (actionResult SystemBroadcastAction, err error) { +func (dh *definitionHandlers) handleContractAPIBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (actionResult DefinitionMessageAction, err error) { l := log.L(ctx) var broadcast fftypes.ContractAPI valid := dh.getSystemBroadcastPayload(ctx, msg, data, &broadcast) diff --git a/internal/definitions/definition_handler_contracts_test.go b/internal/definitions/definition_handler_contracts_test.go index d2ecd11d48..c1db340ed8 100644 --- a/internal/definitions/definition_handler_contracts_test.go +++ b/internal/definitions/definition_handler_contracts_test.go @@ -102,7 +102,7 @@ func TestHandleFFIBroadcastOk(t *testing.T) { mbi.On("InsertEvent", mock.Anything, mock.Anything).Return(nil) mcm := dh.contracts.(*contractmocks.Manager) mcm.On("ValidateFFIAndSetPathnames", mock.Anything, mock.Anything).Return(nil) - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Tag: string(fftypes.SystemTagDefineFFI), }, @@ -187,7 +187,7 @@ func TestHandleFFIBroadcastValidateFail(t *testing.T) { } mbi := dh.database.(*databasemocks.Plugin) mbi.On("InsertEvent", mock.Anything, mock.Anything).Return(nil) - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Tag: string(fftypes.SystemTagDefineFFI), }, @@ -209,7 +209,7 @@ func TestHandleFFIBroadcastPersistFail(t *testing.T) { mdi.On("InsertEvent", mock.Anything, mock.Anything).Return(nil) mcm := dh.contracts.(*contractmocks.Manager) mcm.On("ValidateFFIAndSetPathnames", mock.Anything, mock.Anything).Return(nil) - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Tag: string(fftypes.SystemTagDefineFFI), }, @@ -231,7 +231,7 @@ func TestHandleContractAPIBroadcastOk(t *testing.T) { mbi.On("UpsertContractAPI", mock.Anything, mock.Anything, mock.Anything).Return(nil) mbi.On("GetContractAPIByName", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil) mbi.On("InsertEvent", mock.Anything, mock.Anything).Return(nil) - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Tag: string(fftypes.SystemTagDefineContractAPI), }, @@ -283,7 +283,7 @@ func TestHandleContractAPIBroadcastValidateFail(t *testing.T) { } mbi := dh.database.(*databasemocks.Plugin) mbi.On("InsertEvent", mock.Anything, mock.Anything).Return(nil) - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Tag: string(fftypes.SystemTagDefineContractAPI), }, @@ -304,7 +304,7 @@ func TestHandleContractAPIBroadcastPersistFail(t *testing.T) { mbi.On("GetContractAPIByName", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil) mbi.On("UpsertContractAPI", mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("pop")) mbi.On("InsertEvent", mock.Anything, mock.Anything).Return(nil) - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Tag: string(fftypes.SystemTagDefineContractAPI), }, diff --git a/internal/definitions/definition_handler_datatype.go b/internal/definitions/definition_handler_datatype.go index dc73efd90a..ea8de67cac 100644 --- a/internal/definitions/definition_handler_datatype.go +++ b/internal/definitions/definition_handler_datatype.go @@ -23,7 +23,7 @@ import ( "github.com/hyperledger/firefly/pkg/fftypes" ) -func (dh *definitionHandlers) handleDatatypeBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (SystemBroadcastAction, error) { +func (dh *definitionHandlers) handleDatatypeBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (DefinitionMessageAction, error) { l := log.L(ctx) var dt fftypes.Datatype diff --git a/internal/definitions/definition_handler_datatype_test.go b/internal/definitions/definition_handler_datatype_test.go index 5933e1dabc..d9988deef4 100644 --- a/internal/definitions/definition_handler_datatype_test.go +++ b/internal/definitions/definition_handler_datatype_test.go @@ -53,7 +53,7 @@ func TestHandleDefinitionBroadcastDatatypeOk(t *testing.T) { mbi.On("GetDatatypeByName", mock.Anything, "ns1", "name1", "ver1").Return(nil, nil) mbi.On("UpsertDatatype", mock.Anything, mock.Anything, false).Return(nil) mbi.On("InsertEvent", mock.Anything, mock.Anything).Return(nil) - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Tag: string(fftypes.SystemTagDefineDatatype), }, @@ -89,7 +89,7 @@ func TestHandleDefinitionBroadcastDatatypeEventFail(t *testing.T) { mbi.On("GetDatatypeByName", mock.Anything, "ns1", "name1", "ver1").Return(nil, nil) mbi.On("UpsertDatatype", mock.Anything, mock.Anything, false).Return(nil) mbi.On("InsertEvent", mock.Anything, mock.Anything).Return(fmt.Errorf("pop")) - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Tag: string(fftypes.SystemTagDefineDatatype), }, @@ -118,7 +118,7 @@ func TestHandleDefinitionBroadcastDatatypeMissingID(t *testing.T) { Value: fftypes.JSONAnyPtrBytes(b), } - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Tag: string(fftypes.SystemTagDefineDatatype), }, @@ -147,7 +147,7 @@ func TestHandleDefinitionBroadcastBadSchema(t *testing.T) { mdm := dh.data.(*datamocks.Manager) mdm.On("CheckDatatype", mock.Anything, "ns1", mock.Anything).Return(fmt.Errorf("pop")) - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Tag: string(fftypes.SystemTagDefineDatatype), }, @@ -171,7 +171,7 @@ func TestHandleDefinitionBroadcastMissingData(t *testing.T) { } dt.Hash = dt.Value.Hash() - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Tag: string(fftypes.SystemTagDefineDatatype), }, @@ -202,7 +202,7 @@ func TestHandleDefinitionBroadcastDatatypeLookupFail(t *testing.T) { mdm.On("CheckDatatype", mock.Anything, "ns1", mock.Anything).Return(nil) mbi := dh.database.(*databasemocks.Plugin) mbi.On("GetDatatypeByName", mock.Anything, "ns1", "name1", "ver1").Return(nil, fmt.Errorf("pop")) - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Namespace: fftypes.SystemNamespace, Tag: string(fftypes.SystemTagDefineDatatype), @@ -238,7 +238,7 @@ func TestHandleDefinitionBroadcastUpsertFail(t *testing.T) { mbi := dh.database.(*databasemocks.Plugin) mbi.On("GetDatatypeByName", mock.Anything, "ns1", "name1", "ver1").Return(nil, nil) mbi.On("UpsertDatatype", mock.Anything, mock.Anything, false).Return(fmt.Errorf("pop")) - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Tag: string(fftypes.SystemTagDefineDatatype), }, @@ -272,7 +272,7 @@ func TestHandleDefinitionBroadcastDatatypeDuplicate(t *testing.T) { mdm.On("CheckDatatype", mock.Anything, "ns1", mock.Anything).Return(nil) mbi := dh.database.(*databasemocks.Plugin) mbi.On("GetDatatypeByName", mock.Anything, "ns1", "name1", "ver1").Return(dt, nil) - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Tag: string(fftypes.SystemTagDefineDatatype), }, diff --git a/internal/definitions/definition_handler_namespace.go b/internal/definitions/definition_handler_namespace.go index 5d861b5125..1e65e742c0 100644 --- a/internal/definitions/definition_handler_namespace.go +++ b/internal/definitions/definition_handler_namespace.go @@ -23,7 +23,7 @@ import ( "github.com/hyperledger/firefly/pkg/fftypes" ) -func (dh *definitionHandlers) handleNamespaceBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (SystemBroadcastAction, error) { +func (dh *definitionHandlers) handleNamespaceBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (DefinitionMessageAction, error) { l := log.L(ctx) var ns fftypes.Namespace diff --git a/internal/definitions/definition_handler_namespace_test.go b/internal/definitions/definition_handler_namespace_test.go index a40bee861f..81b041dbe4 100644 --- a/internal/definitions/definition_handler_namespace_test.go +++ b/internal/definitions/definition_handler_namespace_test.go @@ -45,7 +45,7 @@ func TestHandleDefinitionBroadcastNSOk(t *testing.T) { mdi.On("GetNamespace", mock.Anything, "ns1").Return(nil, nil) mdi.On("UpsertNamespace", mock.Anything, mock.Anything, false).Return(nil) mdi.On("InsertEvent", mock.Anything, mock.Anything).Return(nil) - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Tag: string(fftypes.SystemTagDefineNamespace), }, @@ -73,7 +73,7 @@ func TestHandleDefinitionBroadcastNSEventFail(t *testing.T) { mdi.On("GetNamespace", mock.Anything, "ns1").Return(nil, nil) mdi.On("UpsertNamespace", mock.Anything, mock.Anything, false).Return(nil) mdi.On("InsertEvent", mock.Anything, mock.Anything).Return(fmt.Errorf("pop")) - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Tag: string(fftypes.SystemTagDefineNamespace), }, @@ -100,7 +100,7 @@ func TestHandleDefinitionBroadcastNSUpsertFail(t *testing.T) { mdi := dh.database.(*databasemocks.Plugin) mdi.On("GetNamespace", mock.Anything, "ns1").Return(nil, nil) mdi.On("UpsertNamespace", mock.Anything, mock.Anything, false).Return(fmt.Errorf("pop")) - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Tag: string(fftypes.SystemTagDefineNamespace), }, @@ -114,7 +114,7 @@ func TestHandleDefinitionBroadcastNSUpsertFail(t *testing.T) { func TestHandleDefinitionBroadcastNSMissingData(t *testing.T) { dh := newTestDefinitionHandlers(t) - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Tag: string(fftypes.SystemTagDefineNamespace), }, @@ -133,7 +133,7 @@ func TestHandleDefinitionBroadcastNSBadID(t *testing.T) { Value: fftypes.JSONAnyPtrBytes(b), } - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Tag: string(fftypes.SystemTagDefineNamespace), }, @@ -149,7 +149,7 @@ func TestHandleDefinitionBroadcastNSBadData(t *testing.T) { Value: fftypes.JSONAnyPtr(`!{json`), } - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Tag: string(fftypes.SystemTagDefineNamespace), }, @@ -173,7 +173,7 @@ func TestHandleDefinitionBroadcastDuplicate(t *testing.T) { mdi := dh.database.(*databasemocks.Plugin) mdi.On("GetNamespace", mock.Anything, "ns1").Return(ns, nil) - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Tag: string(fftypes.SystemTagDefineNamespace), }, @@ -203,7 +203,7 @@ func TestHandleDefinitionBroadcastDuplicateOverrideLocal(t *testing.T) { mdi.On("DeleteNamespace", mock.Anything, mock.Anything).Return(nil) mdi.On("UpsertNamespace", mock.Anything, mock.Anything, false).Return(nil) mdi.On("InsertEvent", mock.Anything, mock.Anything).Return(nil) - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Tag: string(fftypes.SystemTagDefineNamespace), }, @@ -231,7 +231,7 @@ func TestHandleDefinitionBroadcastDuplicateOverrideLocalFail(t *testing.T) { mdi := dh.database.(*databasemocks.Plugin) mdi.On("GetNamespace", mock.Anything, "ns1").Return(ns, nil) mdi.On("DeleteNamespace", mock.Anything, mock.Anything).Return(fmt.Errorf("pop")) - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Tag: string(fftypes.SystemTagDefineNamespace), }, @@ -257,7 +257,7 @@ func TestHandleDefinitionBroadcastDupCheckFail(t *testing.T) { mdi := dh.database.(*databasemocks.Plugin) mdi.On("GetNamespace", mock.Anything, "ns1").Return(nil, fmt.Errorf("pop")) - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Tag: string(fftypes.SystemTagDefineNamespace), }, diff --git a/internal/definitions/definition_handler_network_node.go b/internal/definitions/definition_handler_network_node.go index 19aac1b62d..c229e8ae4a 100644 --- a/internal/definitions/definition_handler_network_node.go +++ b/internal/definitions/definition_handler_network_node.go @@ -23,7 +23,7 @@ import ( "github.com/hyperledger/firefly/pkg/fftypes" ) -func (dh *definitionHandlers) handleNodeBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (SystemBroadcastAction, error) { +func (dh *definitionHandlers) handleNodeBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (DefinitionMessageAction, error) { l := log.L(ctx) var node fftypes.Node diff --git a/internal/definitions/definition_handler_network_node_test.go b/internal/definitions/definition_handler_network_node_test.go index 5c6b98efb2..1843221ae6 100644 --- a/internal/definitions/definition_handler_network_node_test.go +++ b/internal/definitions/definition_handler_network_node_test.go @@ -55,7 +55,7 @@ func TestHandleDefinitionBroadcastNodeOk(t *testing.T) { mdi.On("UpsertNode", mock.Anything, mock.Anything, true).Return(nil) mdx := dh.exchange.(*dataexchangemocks.Plugin) mdx.On("AddPeer", mock.Anything, "peer1", node.DX.Endpoint).Return(nil) - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Namespace: "ns1", Identity: fftypes.Identity{ @@ -95,7 +95,7 @@ func TestHandleDefinitionBroadcastNodeUpsertFail(t *testing.T) { mdi.On("GetNode", mock.Anything, "0x23456", "node1").Return(nil, nil) mdi.On("GetNodeByID", mock.Anything, node.ID).Return(nil, nil) mdi.On("UpsertNode", mock.Anything, mock.Anything, true).Return(fmt.Errorf("pop")) - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Namespace: "ns1", Identity: fftypes.Identity{ @@ -137,7 +137,7 @@ func TestHandleDefinitionBroadcastNodeAddPeerFail(t *testing.T) { mdi.On("UpsertNode", mock.Anything, mock.Anything, true).Return(nil) mdx := dh.exchange.(*dataexchangemocks.Plugin) mdx.On("AddPeer", mock.Anything, "peer1", mock.Anything).Return(fmt.Errorf("pop")) - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Namespace: "ns1", Identity: fftypes.Identity{ @@ -175,7 +175,7 @@ func TestHandleDefinitionBroadcastNodeDupMismatch(t *testing.T) { mdi := dh.database.(*databasemocks.Plugin) mdi.On("GetOrganizationByIdentity", mock.Anything, "0x23456").Return(&fftypes.Organization{ID: fftypes.NewUUID(), Identity: "0x23456"}, nil) mdi.On("GetNode", mock.Anything, "0x23456", "node1").Return(&fftypes.Node{Owner: "0x99999"}, nil) - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Namespace: "ns1", Identity: fftypes.Identity{ @@ -216,7 +216,7 @@ func TestHandleDefinitionBroadcastNodeDupOK(t *testing.T) { mdi.On("UpsertNode", mock.Anything, mock.Anything, true).Return(nil) mdx := dh.exchange.(*dataexchangemocks.Plugin) mdx.On("AddPeer", mock.Anything, "peer1", mock.Anything).Return(nil) - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Namespace: "ns1", Identity: fftypes.Identity{ @@ -254,7 +254,7 @@ func TestHandleDefinitionBroadcastNodeGetFail(t *testing.T) { mdi := dh.database.(*databasemocks.Plugin) mdi.On("GetOrganizationByIdentity", mock.Anything, "0x23456").Return(&fftypes.Organization{ID: fftypes.NewUUID(), Identity: "0x23456"}, nil) mdi.On("GetNode", mock.Anything, "0x23456", "node1").Return(nil, fmt.Errorf("pop")) - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Namespace: "ns1", Identity: fftypes.Identity{ @@ -291,7 +291,7 @@ func TestHandleDefinitionBroadcastNodeBadAuthor(t *testing.T) { mdi := dh.database.(*databasemocks.Plugin) mdi.On("GetOrganizationByIdentity", mock.Anything, "0x23456").Return(&fftypes.Organization{ID: fftypes.NewUUID(), Identity: "0x23456"}, nil) - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Namespace: "ns1", Identity: fftypes.Identity{ @@ -328,7 +328,7 @@ func TestHandleDefinitionBroadcastNodeGetOrgNotFound(t *testing.T) { mdi := dh.database.(*databasemocks.Plugin) mdi.On("GetOrganizationByIdentity", mock.Anything, "0x23456").Return(nil, nil) - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Namespace: "ns1", Identity: fftypes.Identity{ @@ -365,7 +365,7 @@ func TestHandleDefinitionBroadcastNodeGetOrgFail(t *testing.T) { mdi := dh.database.(*databasemocks.Plugin) mdi.On("GetOrganizationByIdentity", mock.Anything, "0x23456").Return(nil, fmt.Errorf("pop")) - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Namespace: "ns1", Identity: fftypes.Identity{ @@ -400,7 +400,7 @@ func TestHandleDefinitionBroadcastNodeValidateFail(t *testing.T) { Value: fftypes.JSONAnyPtrBytes(b), } - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Namespace: "ns1", Identity: fftypes.Identity{ @@ -421,7 +421,7 @@ func TestHandleDefinitionBroadcastNodeUnmarshalFail(t *testing.T) { Value: fftypes.JSONAnyPtr(`!json`), } - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Namespace: "ns1", Identity: fftypes.Identity{ diff --git a/internal/definitions/definition_handler_network_org.go b/internal/definitions/definition_handler_network_org.go index c02a4b3cad..7cbf0ff060 100644 --- a/internal/definitions/definition_handler_network_org.go +++ b/internal/definitions/definition_handler_network_org.go @@ -23,7 +23,7 @@ import ( "github.com/hyperledger/firefly/pkg/fftypes" ) -func (dh *definitionHandlers) handleOrganizationBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (SystemBroadcastAction, error) { +func (dh *definitionHandlers) handleOrganizationBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (DefinitionMessageAction, error) { l := log.L(ctx) var org fftypes.Organization diff --git a/internal/definitions/definition_handler_network_org_test.go b/internal/definitions/definition_handler_network_org_test.go index 12b665129d..f60f2efa8f 100644 --- a/internal/definitions/definition_handler_network_org_test.go +++ b/internal/definitions/definition_handler_network_org_test.go @@ -58,7 +58,7 @@ func TestHandleDefinitionBroadcastChildOrgOk(t *testing.T) { mdi.On("GetOrganizationByName", mock.Anything, "org1").Return(nil, nil) mdi.On("GetOrganizationByID", mock.Anything, org.ID).Return(nil, nil) mdi.On("UpsertOrganization", mock.Anything, mock.Anything, true).Return(nil) - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Namespace: "ns1", Identity: fftypes.Identity{ @@ -102,7 +102,7 @@ func TestHandleDefinitionBroadcastChildOrgDupOk(t *testing.T) { mdi.On("GetOrganizationByIdentity", mock.Anything, "0x23456").Return(parentOrg, nil) mdi.On("GetOrganizationByIdentity", mock.Anything, "0x12345").Return(org, nil) mdi.On("UpsertOrganization", mock.Anything, mock.Anything, true).Return(nil) - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Namespace: "ns1", Identity: fftypes.Identity{ @@ -144,7 +144,7 @@ func TestHandleDefinitionBroadcastChildOrgBadKey(t *testing.T) { mdi := dh.database.(*databasemocks.Plugin) mdi.On("GetOrganizationByIdentity", mock.Anything, "0x23456").Return(parentOrg, nil) - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Namespace: "ns1", Identity: fftypes.Identity{ @@ -179,7 +179,7 @@ func TestHandleDefinitionBroadcastOrgDupMismatch(t *testing.T) { mdi := dh.database.(*databasemocks.Plugin) mdi.On("GetOrganizationByIdentity", mock.Anything, "0x12345").Return(&fftypes.Organization{ID: fftypes.NewUUID(), Identity: "0x12345", Parent: "0x9999"}, nil) - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Namespace: "ns1", Identity: fftypes.Identity{ @@ -216,7 +216,7 @@ func TestHandleDefinitionBroadcastOrgUpsertFail(t *testing.T) { mdi.On("GetOrganizationByName", mock.Anything, "org1").Return(nil, nil) mdi.On("GetOrganizationByID", mock.Anything, org.ID).Return(nil, nil) mdi.On("UpsertOrganization", mock.Anything, mock.Anything, true).Return(fmt.Errorf("pop")) - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Namespace: "ns1", Identity: fftypes.Identity{ @@ -249,7 +249,7 @@ func TestHandleDefinitionBroadcastOrgGetOrgFail(t *testing.T) { mdi := dh.database.(*databasemocks.Plugin) mdi.On("GetOrganizationByIdentity", mock.Anything, "0x12345").Return(nil, fmt.Errorf("pop")) - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Namespace: "ns1", Identity: fftypes.Identity{ @@ -282,7 +282,7 @@ func TestHandleDefinitionBroadcastOrgAuthorMismatch(t *testing.T) { mdi := dh.database.(*databasemocks.Plugin) mdi.On("GetOrganizationByIdentity", mock.Anything, "0x12345").Return(&fftypes.Organization{ID: fftypes.NewUUID(), Identity: "0x12345", Parent: "0x9999"}, nil) - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Namespace: "ns1", Identity: fftypes.Identity{ @@ -317,7 +317,7 @@ func TestHandleDefinitionBroadcastGetParentFail(t *testing.T) { mdi := dh.database.(*databasemocks.Plugin) mdi.On("GetOrganizationByIdentity", mock.Anything, "0x23456").Return(nil, fmt.Errorf("pop")) - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Namespace: "ns1", Identity: fftypes.Identity{ @@ -352,7 +352,7 @@ func TestHandleDefinitionBroadcastGetParentNotFound(t *testing.T) { mdi := dh.database.(*databasemocks.Plugin) mdi.On("GetOrganizationByIdentity", mock.Anything, "0x23456").Return(nil, nil) - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Namespace: "ns1", Identity: fftypes.Identity{ @@ -383,7 +383,7 @@ func TestHandleDefinitionBroadcastValidateFail(t *testing.T) { Value: fftypes.JSONAnyPtrBytes(b), } - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Namespace: "ns1", Identity: fftypes.Identity{ @@ -404,7 +404,7 @@ func TestHandleDefinitionBroadcastUnmarshalFail(t *testing.T) { Value: fftypes.JSONAnyPtr(`!json`), } - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Namespace: "ns1", Identity: fftypes.Identity{ diff --git a/internal/definitions/definition_handler_test.go b/internal/definitions/definition_handler_test.go index e6898fcc2e..3b54f27369 100644 --- a/internal/definitions/definition_handler_test.go +++ b/internal/definitions/definition_handler_test.go @@ -43,9 +43,9 @@ func newTestDefinitionHandlers(t *testing.T) *definitionHandlers { return NewDefinitionHandlers(mdi, mdx, mdm, mbm, mpm, mam, mcm).(*definitionHandlers) } -func TestHandleSystemBroadcastUnknown(t *testing.T) { +func TestHandleDefinitionBroadcastUnknown(t *testing.T) { dh := newTestDefinitionHandlers(t) - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Tag: "unknown", }, diff --git a/internal/definitions/definition_handler_tokenpool.go b/internal/definitions/definition_handler_tokenpool.go index a544b21f10..167632affc 100644 --- a/internal/definitions/definition_handler_tokenpool.go +++ b/internal/definitions/definition_handler_tokenpool.go @@ -74,10 +74,10 @@ func (dh *definitionHandlers) rejectPool(ctx context.Context, pool *fftypes.Toke return err } -func (dh *definitionHandlers) handleTokenPoolBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (SystemBroadcastAction, error) { +func (dh *definitionHandlers) handleTokenPoolBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (DefinitionMessageAction, *DefinitionBatchActions, error) { var announce fftypes.TokenPoolAnnouncement if valid := dh.getSystemBroadcastPayload(ctx, msg, data, &announce); !valid { - return ActionReject, nil + return ActionReject, nil, nil } pool := announce.Pool @@ -85,27 +85,31 @@ func (dh *definitionHandlers) handleTokenPoolBroadcast(ctx context.Context, msg if err := pool.Validate(ctx); err != nil { log.L(ctx).Warnf("Token pool '%s' rejected - validate failed: %s", pool.ID, err) - return ActionReject, dh.rejectPool(ctx, pool) + return ActionReject, nil, dh.rejectPool(ctx, pool) } // Check if pool has already been confirmed on chain (and confirm the message if so) if existingPool, err := dh.database.GetTokenPoolByID(ctx, pool.ID); err != nil { - return ActionRetry, err + return ActionRetry, nil, err } else if existingPool != nil && existingPool.State == fftypes.TokenPoolStateConfirmed { - return ActionConfirm, nil + return ActionConfirm, nil, nil } if valid, err := dh.persistTokenPool(ctx, &announce); err != nil { - return ActionRetry, err + return ActionRetry, nil, err } else if !valid { - return ActionReject, dh.rejectPool(ctx, pool) + return ActionReject, nil, dh.rejectPool(ctx, pool) } - if err := dh.assets.ActivateTokenPool(ctx, pool, announce.Event); err != nil { - log.L(ctx).Errorf("Failed to activate token pool '%s': %s", pool.ID, err) - return ActionRetry, err - } - - // Message will remain unconfirmed until pool confirmation triggers a rewind - return ActionWait, nil + // Message will remain unconfirmed, but plugin will be notified to activate the pool + // This will ultimately trigger a pool creation event and a rewind + return ActionWait, &DefinitionBatchActions{ + PreFinalize: func(ctx context.Context) error { + if err := dh.assets.ActivateTokenPool(ctx, pool, announce.Event); err != nil { + log.L(ctx).Errorf("Failed to activate token pool '%s': %s", pool.ID, err) + return err + } + return nil + }, + }, nil } diff --git a/internal/definitions/definition_handler_tokenpool_test.go b/internal/definitions/definition_handler_tokenpool_test.go index c98c35d3a5..0947b865b4 100644 --- a/internal/definitions/definition_handler_tokenpool_test.go +++ b/internal/definitions/definition_handler_tokenpool_test.go @@ -66,7 +66,7 @@ func buildPoolDefinitionMessage(announce *fftypes.TokenPoolAnnouncement) (*fftyp return msg, data, nil } -func TestHandleSystemBroadcastTokenPoolActivateOK(t *testing.T) { +func TestHandleDefinitionBroadcastTokenPoolActivateOK(t *testing.T) { sh := newTestDefinitionHandlers(t) announce := newPoolAnnouncement() @@ -86,14 +86,14 @@ func TestHandleSystemBroadcastTokenPoolActivateOK(t *testing.T) { })).Return(nil) mam.On("ActivateTokenPool", context.Background(), mock.AnythingOfType("*fftypes.TokenPool"), mock.AnythingOfType("*fftypes.BlockchainEvent")).Return(nil) - action, err := sh.HandleSystemBroadcast(context.Background(), msg, data) + action, _, err := sh.HandleDefinitionBroadcast(context.Background(), msg, data) assert.Equal(t, ActionWait, action) assert.NoError(t, err) mdi.AssertExpectations(t) } -func TestHandleSystemBroadcastTokenPoolUpdateOpFail(t *testing.T) { +func TestHandleDefinitionBroadcastTokenPoolUpdateOpFail(t *testing.T) { sh := newTestDefinitionHandlers(t) announce := newPoolAnnouncement() @@ -108,14 +108,14 @@ func TestHandleSystemBroadcastTokenPoolUpdateOpFail(t *testing.T) { mdi.On("UpdateOperation", context.Background(), opID, mock.Anything).Return(fmt.Errorf("pop")) mdi.On("GetTokenPoolByID", context.Background(), pool.ID).Return(nil, nil) - action, err := sh.HandleSystemBroadcast(context.Background(), msg, data) + action, _, err := sh.HandleDefinitionBroadcast(context.Background(), msg, data) assert.Equal(t, ActionRetry, action) assert.EqualError(t, err, "pop") mdi.AssertExpectations(t) } -func TestHandleSystemBroadcastTokenPoolGetPoolFail(t *testing.T) { +func TestHandleDefinitionBroadcastTokenPoolGetPoolFail(t *testing.T) { sh := newTestDefinitionHandlers(t) announce := newPoolAnnouncement() @@ -126,14 +126,14 @@ func TestHandleSystemBroadcastTokenPoolGetPoolFail(t *testing.T) { mdi := sh.database.(*databasemocks.Plugin) mdi.On("GetTokenPoolByID", context.Background(), pool.ID).Return(nil, fmt.Errorf("pop")) - action, err := sh.HandleSystemBroadcast(context.Background(), msg, data) + action, _, err := sh.HandleDefinitionBroadcast(context.Background(), msg, data) assert.Equal(t, ActionRetry, action) assert.EqualError(t, err, "pop") mdi.AssertExpectations(t) } -func TestHandleSystemBroadcastTokenPoolExisting(t *testing.T) { +func TestHandleDefinitionBroadcastTokenPoolExisting(t *testing.T) { sh := newTestDefinitionHandlers(t) announce := newPoolAnnouncement() @@ -151,14 +151,14 @@ func TestHandleSystemBroadcastTokenPoolExisting(t *testing.T) { })).Return(nil) mam.On("ActivateTokenPool", context.Background(), mock.AnythingOfType("*fftypes.TokenPool"), mock.AnythingOfType("*fftypes.BlockchainEvent")).Return(nil) - action, err := sh.HandleSystemBroadcast(context.Background(), msg, data) + action, _, err := sh.HandleDefinitionBroadcast(context.Background(), msg, data) assert.Equal(t, ActionWait, action) assert.NoError(t, err) mdi.AssertExpectations(t) } -func TestHandleSystemBroadcastTokenPoolExistingConfirmed(t *testing.T) { +func TestHandleDefinitionBroadcastTokenPoolExistingConfirmed(t *testing.T) { sh := newTestDefinitionHandlers(t) announce := newPoolAnnouncement() @@ -172,14 +172,14 @@ func TestHandleSystemBroadcastTokenPoolExistingConfirmed(t *testing.T) { mdi := sh.database.(*databasemocks.Plugin) mdi.On("GetTokenPoolByID", context.Background(), pool.ID).Return(existing, nil) - action, err := sh.HandleSystemBroadcast(context.Background(), msg, data) + action, _, err := sh.HandleDefinitionBroadcast(context.Background(), msg, data) assert.Equal(t, ActionConfirm, action) assert.NoError(t, err) mdi.AssertExpectations(t) } -func TestHandleSystemBroadcastTokenPoolIDMismatch(t *testing.T) { +func TestHandleDefinitionBroadcastTokenPoolIDMismatch(t *testing.T) { sh := newTestDefinitionHandlers(t) announce := newPoolAnnouncement() @@ -200,14 +200,14 @@ func TestHandleSystemBroadcastTokenPoolIDMismatch(t *testing.T) { return *event.Reference == *pool.ID && event.Namespace == pool.Namespace && event.Type == fftypes.EventTypePoolRejected })).Return(nil) - action, err := sh.HandleSystemBroadcast(context.Background(), msg, data) + action, _, err := sh.HandleDefinitionBroadcast(context.Background(), msg, data) assert.Equal(t, ActionReject, action) assert.NoError(t, err) mdi.AssertExpectations(t) } -func TestHandleSystemBroadcastTokenPoolFailUpsert(t *testing.T) { +func TestHandleDefinitionBroadcastTokenPoolFailUpsert(t *testing.T) { sh := newTestDefinitionHandlers(t) announce := newPoolAnnouncement() @@ -225,14 +225,14 @@ func TestHandleSystemBroadcastTokenPoolFailUpsert(t *testing.T) { return *p.ID == *pool.ID && p.Message == msg.Header.ID })).Return(fmt.Errorf("pop")) - action, err := sh.HandleSystemBroadcast(context.Background(), msg, data) + action, _, err := sh.HandleDefinitionBroadcast(context.Background(), msg, data) assert.Equal(t, ActionRetry, action) assert.EqualError(t, err, "pop") mdi.AssertExpectations(t) } -func TestHandleSystemBroadcastTokenPoolOpsFail(t *testing.T) { +func TestHandleDefinitionBroadcastTokenPoolOpsFail(t *testing.T) { sh := newTestDefinitionHandlers(t) announce := newPoolAnnouncement() @@ -244,14 +244,14 @@ func TestHandleSystemBroadcastTokenPoolOpsFail(t *testing.T) { mdi.On("GetOperations", context.Background(), mock.Anything).Return(nil, nil, fmt.Errorf("pop")) mdi.On("GetTokenPoolByID", context.Background(), pool.ID).Return(nil, nil) - action, err := sh.HandleSystemBroadcast(context.Background(), msg, data) + action, _, err := sh.HandleDefinitionBroadcast(context.Background(), msg, data) assert.Equal(t, ActionRetry, action) assert.EqualError(t, err, "pop") mdi.AssertExpectations(t) } -func TestHandleSystemBroadcastTokenPoolActivateFail(t *testing.T) { +func TestHandleDefinitionBroadcastTokenPoolActivateFail(t *testing.T) { sh := newTestDefinitionHandlers(t) announce := newPoolAnnouncement() @@ -271,14 +271,17 @@ func TestHandleSystemBroadcastTokenPoolActivateFail(t *testing.T) { })).Return(nil) mam.On("ActivateTokenPool", context.Background(), mock.AnythingOfType("*fftypes.TokenPool"), mock.AnythingOfType("*fftypes.BlockchainEvent")).Return(fmt.Errorf("pop")) - action, err := sh.HandleSystemBroadcast(context.Background(), msg, data) - assert.Equal(t, ActionRetry, action) + action, batchAction, err := sh.HandleDefinitionBroadcast(context.Background(), msg, data) + assert.Equal(t, ActionWait, action) + assert.NoError(t, err) + + err = batchAction.PreFinalize(context.Background()) assert.EqualError(t, err, "pop") mdi.AssertExpectations(t) } -func TestHandleSystemBroadcastTokenPoolValidateFail(t *testing.T) { +func TestHandleDefinitionBroadcastTokenPoolValidateFail(t *testing.T) { sh := newTestDefinitionHandlers(t) announce := &fftypes.TokenPoolAnnouncement{ @@ -293,14 +296,14 @@ func TestHandleSystemBroadcastTokenPoolValidateFail(t *testing.T) { return event.Type == fftypes.EventTypePoolRejected })).Return(nil) - action, err := sh.HandleSystemBroadcast(context.Background(), msg, data) + action, _, err := sh.HandleDefinitionBroadcast(context.Background(), msg, data) assert.Equal(t, ActionReject, action) assert.NoError(t, err) mdi.AssertExpectations(t) } -func TestHandleSystemBroadcastTokenPoolBadMessage(t *testing.T) { +func TestHandleDefinitionBroadcastTokenPoolBadMessage(t *testing.T) { sh := newTestDefinitionHandlers(t) msg := &fftypes.Message{ @@ -310,7 +313,7 @@ func TestHandleSystemBroadcastTokenPoolBadMessage(t *testing.T) { }, } - action, err := sh.HandleSystemBroadcast(context.Background(), msg, nil) + action, _, err := sh.HandleDefinitionBroadcast(context.Background(), msg, nil) assert.Equal(t, ActionReject, action) assert.NoError(t, err) } diff --git a/internal/events/aggregator.go b/internal/events/aggregator.go index 9c6b5bcb2a..ef49b3655e 100644 --- a/internal/events/aggregator.go +++ b/internal/events/aggregator.go @@ -1,4 +1,4 @@ -// Copyright © 2021 Kaleido, Inc. +// Copyright © 2022 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -136,15 +136,83 @@ func (ag *aggregator) rewindOffchainBatches() (rewind bool, offset int64) { return rewind, offset } +// batchActions are synchronous actions to be performed while processing system messages, but which must happen after reading the whole batch +type batchActions struct { + // PreFinalize callbacks may perform blocking actions (possibly to an external connector) + // - Will execute after all batch messages have been processed + // - Will execute outside database RunAsGroup + // - If any PreFinalize callback errors out, batch will be aborted and retried + PreFinalize []func(ctx context.Context) error + + // Finalize callbacks may perform final, non-idempotent database operations (such as inserting Events) + // - Will execute after all batch messages have been processed + // - Will execute inside database RunAsGroup + // - If any Finalize callback errors out, batch will be aborted and retried (small chance of duplicate execution here) + Finalize []func(ctx context.Context) error +} + +func (ba *batchActions) AddPreFinalize(action func(ctx context.Context) error) { + if action != nil { + ba.PreFinalize = append(ba.PreFinalize, action) + } +} + +func (ba *batchActions) AddFinalize(action func(ctx context.Context) error) { + if action != nil { + ba.Finalize = append(ba.Finalize, action) + } +} + +func (ba *batchActions) RunPreFinalize(ctx context.Context) error { + for _, action := range ba.PreFinalize { + if err := action(ctx); err != nil { + return err + } + } + return nil +} + +func (ba *batchActions) RunFinalize(ctx context.Context) error { + for _, action := range ba.Finalize { + if err := action(ctx); err != nil { + return err + } + } + return nil +} + func (ag *aggregator) processPinsDBGroup(items []fftypes.LocallySequenced) (repoll bool, err error) { pins := make([]*fftypes.Pin, len(items)) for i, item := range items { pins[i] = item.(*fftypes.Pin) } + + actions := &batchActions{ + PreFinalize: make([]func(ctx context.Context) error, 0), + Finalize: make([]func(ctx context.Context) error, 0), + } + err = ag.database.RunAsGroup(ag.ctx, func(ctx context.Context) (err error) { - err = ag.processPins(ctx, pins) - return err + if err := ag.processPins(ctx, pins, actions); err != nil { + return err + } + if len(actions.PreFinalize) == 0 { + return actions.RunFinalize(ctx) + } + return nil }) + if err != nil { + return false, err + } + + if len(actions.PreFinalize) > 0 { + if err := actions.RunPreFinalize(ag.ctx); err != nil { + return false, err + } + err = ag.database.RunAsGroup(ag.ctx, func(ctx context.Context) (err error) { + return actions.RunFinalize(ctx) + }) + } return false, err } @@ -157,7 +225,7 @@ func (ag *aggregator) getPins(ctx context.Context, filter database.Filter) ([]ff return ls, err } -func (ag *aggregator) processPins(ctx context.Context, pins []*fftypes.Pin) (err error) { +func (ag *aggregator) processPins(ctx context.Context, pins []*fftypes.Pin, actions *batchActions) (err error) { l := log.L(ctx) // Keep a batch cache for this list of pins @@ -204,7 +272,7 @@ func (ag *aggregator) processPins(ctx context.Context, pins []*fftypes.Pin) (err dupMsgCheck[*msg.Header.ID] = true // Attempt to process the message (only returns errors for database persistence issues) - if err = ag.processMessage(ctx, batch, pin.Masked, pin.Sequence, msg); err != nil { + if err = ag.processMessage(ctx, batch, pin.Masked, pin.Sequence, msg, actions); err != nil { return err } } @@ -224,7 +292,7 @@ func (ag *aggregator) calcHash(topic string, groupID *fftypes.Bytes32, identity return fftypes.HashResult(h) } -func (ag *aggregator) processMessage(ctx context.Context, batch *fftypes.Batch, masked bool, pinnedSequence int64, msg *fftypes.Message) (err error) { +func (ag *aggregator) processMessage(ctx context.Context, batch *fftypes.Batch, masked bool, pinnedSequence int64, msg *fftypes.Message, actions *batchActions) (err error) { l := log.L(ctx) // Check if it's ready to be processed @@ -273,28 +341,32 @@ func (ag *aggregator) processMessage(ctx context.Context, batch *fftypes.Batch, } } - dispatched, err := ag.attemptMessageDispatch(ctx, msg) + dispatched, err := ag.attemptMessageDispatch(ctx, msg, actions) if err != nil || !dispatched { return err } - // Move the nextPin forwards to the next sequence for this sender, on all - // topics associated with the message - if masked { - for i, nextPin := range nextPins { - nextPin.Nonce++ - nextPin.Hash = ag.calcHash(msg.Header.Topics[i], msg.Header.Group, nextPin.Identity, nextPin.Nonce) - if err = ag.database.UpdateNextPin(ctx, nextPin.Sequence, database.NextPinQueryFactory.NewUpdate(ctx). - Set("nonce", nextPin.Nonce). - Set("hash", nextPin.Hash), - ); err != nil { - return err + actions.AddFinalize(func(ctx context.Context) error { + // Move the nextPin forwards to the next sequence for this sender, on all + // topics associated with the message + if masked { + for i, nextPin := range nextPins { + nextPin.Nonce++ + nextPin.Hash = ag.calcHash(msg.Header.Topics[i], msg.Header.Group, nextPin.Identity, nextPin.Nonce) + if err = ag.database.UpdateNextPin(ctx, nextPin.Sequence, database.NextPinQueryFactory.NewUpdate(ctx). + Set("nonce", nextPin.Nonce). + Set("hash", nextPin.Hash), + ); err != nil { + return err + } } } - } - // Mark the pin dispatched - return ag.database.SetPinDispatched(ctx, pinnedSequence) + // Mark the pin dispatched + return ag.database.SetPinDispatched(ctx, pinnedSequence) + }) + + return nil } func (ag *aggregator) checkMaskedContextReady(ctx context.Context, msg *fftypes.Message, topic string, pinnedSequence int64, pin *fftypes.Bytes32) (*fftypes.NextPin, error) { @@ -399,7 +471,7 @@ func (ag *aggregator) attemptContextInit(ctx context.Context, msg *fftypes.Messa return nextPin, err } -func (ag *aggregator) attemptMessageDispatch(ctx context.Context, msg *fftypes.Message) (bool, error) { +func (ag *aggregator) attemptMessageDispatch(ctx context.Context, msg *fftypes.Message, actions *batchActions) (bool, error) { // If we don't find all the data, then we don't dispatch data, foundAll, err := ag.data.GetMessageData(ctx, msg, true) @@ -427,19 +499,24 @@ func (ag *aggregator) attemptMessageDispatch(ctx context.Context, msg *fftypes.M } } - // We're going to dispatch it at this point, but we need to validate the data first + // Validate the message data valid := true - eventType := fftypes.EventTypeMessageConfirmed switch { case msg.Header.Type == fftypes.MessageTypeDefinition: // We handle definition events in-line on the aggregator, as it would be confusing for apps to be // dispatched subsequent events before we have processed the definition events they depend on. - var action definitions.SystemBroadcastAction - action, err = ag.definitions.HandleSystemBroadcast(ctx, msg, data) - if action == definitions.ActionRetry || action == definitions.ActionWait { + msgAction, batchAction, err := ag.definitions.HandleDefinitionBroadcast(ctx, msg, data) + if msgAction == definitions.ActionRetry { return false, err } - valid = action == definitions.ActionConfirm + if batchAction != nil { + actions.AddPreFinalize(batchAction.PreFinalize) + actions.AddFinalize(batchAction.Finalize) + } + if msgAction == definitions.ActionWait { + return false, nil + } + valid = msgAction == definitions.ActionConfirm case msg.Header.Type == fftypes.MessageTypeGroupInit: // Already handled as part of resolving the context - do nothing. @@ -451,30 +528,30 @@ func (ag *aggregator) attemptMessageDispatch(ctx context.Context, msg *fftypes.M } } - // This message is now confirmed state := fftypes.MessageStateConfirmed + eventType := fftypes.EventTypeMessageConfirmed if !valid { state = fftypes.MessageStateRejected - } - setConfirmed := database.MessageQueryFactory.NewUpdate(ctx). - Set("confirmed", fftypes.Now()). // the timestamp of the aggregator provides ordering - Set("state", state) // mark if the message was confirmed or rejected - err = ag.database.UpdateMessage(ctx, msg.Header.ID, setConfirmed) - if err != nil { - return false, err - } - if !valid { - // An message with invalid (but complete) data is still considered dispatched. - // However, we drive a different event to the applications. eventType = fftypes.EventTypeMessageRejected } - // Generate the appropriate event - event := fftypes.NewEvent(eventType, msg.Header.Namespace, msg.Header.ID) - if err = ag.database.InsertEvent(ctx, event); err != nil { - return false, err - } - log.L(ctx).Infof("Emitting %s for message %s:%s", eventType, msg.Header.Namespace, msg.Header.ID) + actions.AddFinalize(func(ctx context.Context) error { + // This message is now confirmed + setConfirmed := database.MessageQueryFactory.NewUpdate(ctx). + Set("confirmed", fftypes.Now()). // the timestamp of the aggregator provides ordering + Set("state", state) // mark if the message was confirmed or rejected + if err = ag.database.UpdateMessage(ctx, msg.Header.ID, setConfirmed); err != nil { + return err + } + + // Generate the appropriate event + event := fftypes.NewEvent(eventType, msg.Header.Namespace, msg.Header.ID) + if err = ag.database.InsertEvent(ctx, event); err != nil { + return err + } + log.L(ctx).Infof("Emitting %s for message %s:%s", eventType, msg.Header.Namespace, msg.Header.ID) + return nil + }) return true, nil } diff --git a/internal/events/aggregator_test.go b/internal/events/aggregator_test.go index 623b75a1a5..d23a266abb 100644 --- a/internal/events/aggregator_test.go +++ b/internal/events/aggregator_test.go @@ -46,6 +46,7 @@ func TestAggregationMaskedZeroNonceMatch(t *testing.T) { ag, cancel := newTestAggregator() defer cancel() + ba := &batchActions{} // Generate some pin data member1org := "org1" @@ -166,7 +167,10 @@ func TestAggregationMaskedZeroNonceMatch(t *testing.T) { Index: 0, Dispatched: false, }, - }) + }, ba) + assert.NoError(t, err) + + err = ba.RunFinalize(ag.ctx) assert.NoError(t, err) mdi.AssertExpectations(t) @@ -177,6 +181,7 @@ func TestAggregationMaskedNextSequenceMatch(t *testing.T) { ag, cancel := newTestAggregator() defer cancel() + ba := &batchActions{} // Generate some pin data member1org := "org1" @@ -261,7 +266,10 @@ func TestAggregationMaskedNextSequenceMatch(t *testing.T) { Index: 0, Dispatched: false, }, - }) + }, ba) + assert.NoError(t, err) + + err = ba.RunFinalize(ag.ctx) assert.NoError(t, err) mdi.AssertExpectations(t) @@ -272,6 +280,7 @@ func TestAggregationBroadcast(t *testing.T) { ag, cancel := newTestAggregator() defer cancel() + ba := &batchActions{} // Generate some pin data topic := "some-topic" @@ -331,7 +340,10 @@ func TestAggregationBroadcast(t *testing.T) { Index: 0, Dispatched: false, }, - }) + }, ba) + assert.NoError(t, err) + + err = ba.RunFinalize(ag.ctx) assert.NoError(t, err) mdi.AssertExpectations(t) @@ -393,6 +405,7 @@ func TestGetPins(t *testing.T) { func TestProcessPinsMissingBatch(t *testing.T) { ag, cancel := newTestAggregator() defer cancel() + ba := &batchActions{} mdi := ag.database.(*databasemocks.Plugin) mdi.On("GetBatchByID", ag.ctx, mock.Anything).Return(nil, nil) @@ -400,7 +413,7 @@ func TestProcessPinsMissingBatch(t *testing.T) { err := ag.processPins(ag.ctx, []*fftypes.Pin{ {Sequence: 12345, Batch: fftypes.NewUUID()}, - }) + }, ba) assert.NoError(t, err) } @@ -408,6 +421,7 @@ func TestProcessPinsMissingBatch(t *testing.T) { func TestProcessPinsMissingNoMsg(t *testing.T) { ag, cancel := newTestAggregator() defer cancel() + ba := &batchActions{} mdi := ag.database.(*databasemocks.Plugin) mdi.On("GetBatchByID", ag.ctx, mock.Anything).Return(&fftypes.Batch{ @@ -422,7 +436,7 @@ func TestProcessPinsMissingNoMsg(t *testing.T) { err := ag.processPins(ag.ctx, []*fftypes.Pin{ {Sequence: 12345, Batch: fftypes.NewUUID(), Index: 25}, - }) + }, ba) assert.NoError(t, err) mdi.AssertExpectations(t) @@ -431,6 +445,7 @@ func TestProcessPinsMissingNoMsg(t *testing.T) { func TestProcessPinsBadMsgHeader(t *testing.T) { ag, cancel := newTestAggregator() defer cancel() + ba := &batchActions{} mdi := ag.database.(*databasemocks.Plugin) mdi.On("GetBatchByID", ag.ctx, mock.Anything).Return(&fftypes.Batch{ @@ -448,7 +463,7 @@ func TestProcessPinsBadMsgHeader(t *testing.T) { err := ag.processPins(ag.ctx, []*fftypes.Pin{ {Sequence: 12345, Batch: fftypes.NewUUID(), Index: 0}, - }) + }, ba) assert.NoError(t, err) mdi.AssertExpectations(t) @@ -457,6 +472,7 @@ func TestProcessPinsBadMsgHeader(t *testing.T) { func TestProcessSkipDupMsg(t *testing.T) { ag, cancel := newTestAggregator() defer cancel() + ba := &batchActions{} batchID := fftypes.NewUUID() mdi := ag.database.(*databasemocks.Plugin) @@ -479,7 +495,7 @@ func TestProcessSkipDupMsg(t *testing.T) { err := ag.processPins(ag.ctx, []*fftypes.Pin{ {Sequence: 12345, Batch: batchID, Index: 0, Hash: fftypes.NewRandB32()}, {Sequence: 12345, Batch: batchID, Index: 1, Hash: fftypes.NewRandB32()}, - }) + }, ba) assert.NoError(t, err) mdi.AssertExpectations(t) @@ -488,6 +504,7 @@ func TestProcessSkipDupMsg(t *testing.T) { func TestProcessMsgFailGetPins(t *testing.T) { ag, cancel := newTestAggregator() defer cancel() + ba := &batchActions{} batchID := fftypes.NewUUID() mdi := ag.database.(*databasemocks.Plugin) @@ -506,7 +523,7 @@ func TestProcessMsgFailGetPins(t *testing.T) { err := ag.processPins(ag.ctx, []*fftypes.Pin{ {Sequence: 12345, Batch: batchID, Index: 0, Hash: fftypes.NewRandB32()}, - }) + }, ba) assert.EqualError(t, err, "pop") mdi.AssertExpectations(t) } @@ -515,7 +532,7 @@ func TestProcessMsgFailMissingGroup(t *testing.T) { ag, cancel := newTestAggregator() defer cancel() - err := ag.processMessage(ag.ctx, &fftypes.Batch{}, true, 12345, &fftypes.Message{}) + err := ag.processMessage(ag.ctx, &fftypes.Batch{}, true, 12345, &fftypes.Message{}, nil) assert.NoError(t, err) } @@ -531,7 +548,7 @@ func TestProcessMsgFailBadPin(t *testing.T) { Topics: fftypes.FFStringArray{"topic1"}, }, Pins: fftypes.FFStringArray{"!Wrong"}, - }) + }, nil) assert.NoError(t, err) } @@ -550,7 +567,7 @@ func TestProcessMsgFailGetNextPins(t *testing.T) { Topics: fftypes.FFStringArray{"topic1"}, }, Pins: fftypes.FFStringArray{fftypes.NewRandB32().String()}, - }) + }, nil) assert.EqualError(t, err, "pop") } @@ -570,7 +587,7 @@ func TestProcessMsgFailDispatch(t *testing.T) { Topics: fftypes.FFStringArray{"topic1"}, }, Pins: fftypes.FFStringArray{fftypes.NewRandB32().String()}, - }) + }, nil) assert.EqualError(t, err, "pop") } @@ -578,6 +595,7 @@ func TestProcessMsgFailDispatch(t *testing.T) { func TestProcessMsgFailPinUpdate(t *testing.T) { ag, cancel := newTestAggregator() defer cancel() + ba := &batchActions{} pin := fftypes.NewRandB32() mdi := ag.database.(*databasemocks.Plugin) @@ -598,7 +616,10 @@ func TestProcessMsgFailPinUpdate(t *testing.T) { Topics: fftypes.FFStringArray{"topic1"}, }, Pins: fftypes.FFStringArray{pin.String()}, - }) + }, ba) + assert.NoError(t, err) + + err = ba.RunFinalize(ag.ctx) assert.EqualError(t, err, "pop") } @@ -833,7 +854,7 @@ func TestAttemptMessageDispatchFailGetData(t *testing.T) { _, err := ag.attemptMessageDispatch(ag.ctx, &fftypes.Message{ Header: fftypes.MessageHeader{ID: fftypes.NewUUID()}, - }) + }, nil) assert.EqualError(t, err, "pop") } @@ -851,7 +872,7 @@ func TestAttemptMessageDispatchFailValidateData(t *testing.T) { Data: fftypes.DataRefs{ {ID: fftypes.NewUUID()}, }, - }) + }, nil) assert.EqualError(t, err, "pop") } @@ -877,7 +898,7 @@ func TestAttemptMessageDispatchMissingBlobs(t *testing.T) { dispatched, err := ag.attemptMessageDispatch(ag.ctx, &fftypes.Message{ Header: fftypes.MessageHeader{ID: fftypes.NewUUID()}, - }) + }, nil) assert.NoError(t, err) assert.False(t, dispatched) @@ -900,7 +921,7 @@ func TestAttemptMessageDispatchMissingTransfers(t *testing.T) { }, } msg.Hash = msg.Header.Hash() - dispatched, err := ag.attemptMessageDispatch(ag.ctx, msg) + dispatched, err := ag.attemptMessageDispatch(ag.ctx, msg, nil) assert.NoError(t, err) assert.False(t, dispatched) @@ -925,7 +946,7 @@ func TestAttemptMessageDispatchGetTransfersFail(t *testing.T) { }, } msg.Hash = msg.Header.Hash() - dispatched, err := ag.attemptMessageDispatch(ag.ctx, msg) + dispatched, err := ag.attemptMessageDispatch(ag.ctx, msg, nil) assert.EqualError(t, err, "pop") assert.False(t, dispatched) @@ -956,7 +977,7 @@ func TestAttemptMessageDispatchTransferMismatch(t *testing.T) { mdi := ag.database.(*databasemocks.Plugin) mdi.On("GetTokenTransfers", ag.ctx, mock.Anything).Return(transfers, nil, nil) - dispatched, err := ag.attemptMessageDispatch(ag.ctx, msg) + dispatched, err := ag.attemptMessageDispatch(ag.ctx, msg, nil) assert.NoError(t, err) assert.False(t, dispatched) @@ -967,9 +988,10 @@ func TestAttemptMessageDispatchTransferMismatch(t *testing.T) { func TestAttemptMessageDispatchFailValidateBadSystem(t *testing.T) { ag, cancel := newTestAggregator() defer cancel() + ba := &batchActions{} msh := ag.definitions.(*definitionsmocks.DefinitionHandlers) - msh.On("HandleSystemBroadcast", mock.Anything, mock.Anything, mock.Anything).Return(definitions.ActionReject, nil) + msh.On("HandleDefinitionBroadcast", mock.Anything, mock.Anything, mock.Anything).Return(definitions.ActionReject, &definitions.DefinitionBatchActions{}, nil) mdm := ag.data.(*datamocks.Manager) mdm.On("GetMessageData", ag.ctx, mock.Anything, true).Return([]*fftypes.Data{}, true, nil) @@ -1003,7 +1025,7 @@ func TestAttemptMessageDispatchFailValidateBadSystem(t *testing.T) { Data: fftypes.DataRefs{ {ID: fftypes.NewUUID()}, }, - }) + }, ba) assert.NoError(t, err) } @@ -1013,7 +1035,7 @@ func TestAttemptMessageDispatchFailValidateSystemFail(t *testing.T) { defer cancel() msh := ag.definitions.(*definitionsmocks.DefinitionHandlers) - msh.On("HandleSystemBroadcast", mock.Anything, mock.Anything, mock.Anything).Return(definitions.ActionRetry, fmt.Errorf("pop")) + msh.On("HandleDefinitionBroadcast", mock.Anything, mock.Anything, mock.Anything).Return(definitions.ActionRetry, &definitions.DefinitionBatchActions{}, fmt.Errorf("pop")) mdm := ag.data.(*datamocks.Manager) mdm.On("GetMessageData", ag.ctx, mock.Anything, true).Return([]*fftypes.Data{}, true, nil) @@ -1027,7 +1049,7 @@ func TestAttemptMessageDispatchFailValidateSystemFail(t *testing.T) { Data: fftypes.DataRefs{ {ID: fftypes.NewUUID()}, }, - }) + }, nil) assert.EqualError(t, err, "pop") } @@ -1035,6 +1057,7 @@ func TestAttemptMessageDispatchFailValidateSystemFail(t *testing.T) { func TestAttemptMessageDispatchEventFail(t *testing.T) { ag, cancel := newTestAggregator() defer cancel() + ba := &batchActions{} mdi := ag.database.(*databasemocks.Plugin) mdm := ag.data.(*datamocks.Manager) @@ -1045,7 +1068,10 @@ func TestAttemptMessageDispatchEventFail(t *testing.T) { _, err := ag.attemptMessageDispatch(ag.ctx, &fftypes.Message{ Header: fftypes.MessageHeader{ID: fftypes.NewUUID()}, - }) + }, ba) + assert.NoError(t, err) + + err = ba.RunFinalize(ag.ctx) assert.EqualError(t, err, "pop") } @@ -1053,6 +1079,7 @@ func TestAttemptMessageDispatchEventFail(t *testing.T) { func TestAttemptMessageDispatchGroupInit(t *testing.T) { ag, cancel := newTestAggregator() defer cancel() + ba := &batchActions{} mdi := ag.database.(*databasemocks.Plugin) mdm := ag.data.(*datamocks.Manager) @@ -1066,7 +1093,7 @@ func TestAttemptMessageDispatchGroupInit(t *testing.T) { ID: fftypes.NewUUID(), Type: fftypes.MessageTypeGroupInit, }, - }) + }, ba) assert.NoError(t, err) } @@ -1074,6 +1101,7 @@ func TestAttemptMessageDispatchGroupInit(t *testing.T) { func TestAttemptMessageUpdateMessageFail(t *testing.T) { ag, cancel := newTestAggregator() defer cancel() + ba := &batchActions{} mdi := ag.database.(*databasemocks.Plugin) mdm := ag.data.(*datamocks.Manager) @@ -1083,7 +1111,10 @@ func TestAttemptMessageUpdateMessageFail(t *testing.T) { _, err := ag.attemptMessageDispatch(ag.ctx, &fftypes.Message{ Header: fftypes.MessageHeader{ID: fftypes.NewUUID()}, - }) + }, ba) + assert.NoError(t, err) + + err = ba.RunFinalize(ag.ctx) assert.EqualError(t, err, "pop") } diff --git a/internal/txcommon/txcommon.go b/internal/txcommon/txcommon.go index 77760a8825..6f5e64d941 100644 --- a/internal/txcommon/txcommon.go +++ b/internal/txcommon/txcommon.go @@ -90,17 +90,13 @@ func (t *transactionHelper) PersistTransaction(ctx context.Context, ns string, i return false, err } - } else { - - if err = t.database.InsertTransaction(ctx, &fftypes.Transaction{ - ID: id, - Namespace: ns, - Type: txType, - BlockchainIDs: fftypes.NewFFStringArray(strings.ToLower(blockchainTXID)), - }); err != nil { - return false, err - } - + } else if err = t.database.InsertTransaction(ctx, &fftypes.Transaction{ + ID: id, + Namespace: ns, + Type: txType, + BlockchainIDs: fftypes.NewFFStringArray(strings.ToLower(blockchainTXID)), + }); err != nil { + return false, err } return true, nil diff --git a/mocks/definitionsmocks/definition_handlers.go b/mocks/definitionsmocks/definition_handlers.go index f292a1bf1c..1bb1a35a0b 100644 --- a/mocks/definitionsmocks/definition_handlers.go +++ b/mocks/definitionsmocks/definition_handlers.go @@ -94,25 +94,34 @@ func (_m *DefinitionHandlers) GetGroupsNS(ctx context.Context, ns string, filter return r0, r1, r2 } -// HandleSystemBroadcast provides a mock function with given fields: ctx, msg, data -func (_m *DefinitionHandlers) HandleSystemBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (definitions.SystemBroadcastAction, error) { +// HandleDefinitionBroadcast provides a mock function with given fields: ctx, msg, data +func (_m *DefinitionHandlers) HandleDefinitionBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (definitions.DefinitionMessageAction, *definitions.DefinitionBatchActions, error) { ret := _m.Called(ctx, msg, data) - var r0 definitions.SystemBroadcastAction - if rf, ok := ret.Get(0).(func(context.Context, *fftypes.Message, []*fftypes.Data) definitions.SystemBroadcastAction); ok { + var r0 definitions.DefinitionMessageAction + if rf, ok := ret.Get(0).(func(context.Context, *fftypes.Message, []*fftypes.Data) definitions.DefinitionMessageAction); ok { r0 = rf(ctx, msg, data) } else { - r0 = ret.Get(0).(definitions.SystemBroadcastAction) + r0 = ret.Get(0).(definitions.DefinitionMessageAction) } - var r1 error - if rf, ok := ret.Get(1).(func(context.Context, *fftypes.Message, []*fftypes.Data) error); ok { + var r1 *definitions.DefinitionBatchActions + if rf, ok := ret.Get(1).(func(context.Context, *fftypes.Message, []*fftypes.Data) *definitions.DefinitionBatchActions); ok { r1 = rf(ctx, msg, data) } else { - r1 = ret.Error(1) + if ret.Get(1) != nil { + r1 = ret.Get(1).(*definitions.DefinitionBatchActions) + } } - return r0, r1 + var r2 error + if rf, ok := ret.Get(2).(func(context.Context, *fftypes.Message, []*fftypes.Data) error); ok { + r2 = rf(ctx, msg, data) + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 } // ResolveInitGroup provides a mock function with given fields: ctx, msg From 6cd23318c690714b6352845bb2fbd990699db7a4 Mon Sep 17 00:00:00 2001 From: Andrew Richardson Date: Tue, 1 Feb 2022 16:43:24 -0500 Subject: [PATCH 3/4] Add test coverage for BatchActions Signed-off-by: Andrew Richardson --- .../definition_handler_tokenpool_test.go | 5 +- internal/events/aggregator.go | 41 +++--- internal/events/aggregator_test.go | 125 ++++++++++++++++-- 3 files changed, 144 insertions(+), 27 deletions(-) diff --git a/internal/definitions/definition_handler_tokenpool_test.go b/internal/definitions/definition_handler_tokenpool_test.go index 0947b865b4..53e8ff1500 100644 --- a/internal/definitions/definition_handler_tokenpool_test.go +++ b/internal/definitions/definition_handler_tokenpool_test.go @@ -86,10 +86,13 @@ func TestHandleDefinitionBroadcastTokenPoolActivateOK(t *testing.T) { })).Return(nil) mam.On("ActivateTokenPool", context.Background(), mock.AnythingOfType("*fftypes.TokenPool"), mock.AnythingOfType("*fftypes.BlockchainEvent")).Return(nil) - action, _, err := sh.HandleDefinitionBroadcast(context.Background(), msg, data) + action, ba, err := sh.HandleDefinitionBroadcast(context.Background(), msg, data) assert.Equal(t, ActionWait, action) assert.NoError(t, err) + err = ba.PreFinalize(context.Background()) + assert.NoError(t, err) + mdi.AssertExpectations(t) } diff --git a/internal/events/aggregator.go b/internal/events/aggregator.go index ef49b3655e..d7d2790542 100644 --- a/internal/events/aggregator.go +++ b/internal/events/aggregator.go @@ -181,19 +181,14 @@ func (ba *batchActions) RunFinalize(ctx context.Context) error { return nil } -func (ag *aggregator) processPinsDBGroup(items []fftypes.LocallySequenced) (repoll bool, err error) { - pins := make([]*fftypes.Pin, len(items)) - for i, item := range items { - pins[i] = item.(*fftypes.Pin) - } - +func (ag *aggregator) processWithBatchActions(callback func(ctx context.Context, actions *batchActions) error) error { actions := &batchActions{ PreFinalize: make([]func(ctx context.Context) error, 0), Finalize: make([]func(ctx context.Context) error, 0), } - err = ag.database.RunAsGroup(ag.ctx, func(ctx context.Context) (err error) { - if err := ag.processPins(ctx, pins, actions); err != nil { + err := ag.database.RunAsGroup(ag.ctx, func(ctx context.Context) (err error) { + if err := callback(ctx, actions); err != nil { return err } if len(actions.PreFinalize) == 0 { @@ -202,18 +197,30 @@ func (ag *aggregator) processPinsDBGroup(items []fftypes.LocallySequenced) (repo return nil }) if err != nil { - return false, err + return err } - if len(actions.PreFinalize) > 0 { - if err := actions.RunPreFinalize(ag.ctx); err != nil { - return false, err - } - err = ag.database.RunAsGroup(ag.ctx, func(ctx context.Context) (err error) { - return actions.RunFinalize(ctx) - }) + if len(actions.PreFinalize) == 0 { + return err } - return false, err + + if err := actions.RunPreFinalize(ag.ctx); err != nil { + return err + } + return ag.database.RunAsGroup(ag.ctx, func(ctx context.Context) error { + return actions.RunFinalize(ctx) + }) +} + +func (ag *aggregator) processPinsDBGroup(items []fftypes.LocallySequenced) (repoll bool, err error) { + pins := make([]*fftypes.Pin, len(items)) + for i, item := range items { + pins[i] = item.(*fftypes.Pin) + } + + return false, ag.processWithBatchActions(func(ctx context.Context, actions *batchActions) error { + return ag.processPins(ctx, pins, actions) + }) } func (ag *aggregator) getPins(ctx context.Context, filter database.Filter) ([]fftypes.LocallySequenced, error) { diff --git a/internal/events/aggregator_test.go b/internal/events/aggregator_test.go index d23a266abb..461ae1e24d 100644 --- a/internal/events/aggregator_test.go +++ b/internal/events/aggregator_test.go @@ -181,7 +181,6 @@ func TestAggregationMaskedNextSequenceMatch(t *testing.T) { ag, cancel := newTestAggregator() defer cancel() - ba := &batchActions{} // Generate some pin data member1org := "org1" @@ -202,6 +201,11 @@ func TestAggregationMaskedNextSequenceMatch(t *testing.T) { mdi := ag.database.(*databasemocks.Plugin) mdm := ag.data.(*datamocks.Manager) + rag := mdi.On("RunAsGroup", mock.Anything, mock.Anything).Maybe() + rag.RunFn = func(a mock.Arguments) { + rag.ReturnArguments = mock.Arguments{a[1].(func(context.Context) error)(a[0].(context.Context))} + } + // Get the batch mdi.On("GetBatchByID", ag.ctx, batchID).Return(&fftypes.Batch{ ID: batchID, @@ -257,8 +261,8 @@ func TestAggregationMaskedNextSequenceMatch(t *testing.T) { // Confirm the offset mdi.On("UpdateOffset", ag.ctx, mock.Anything, mock.Anything, mock.Anything).Return(nil) - err := ag.processPins(ag.ctx, []*fftypes.Pin{ - { + _, err := ag.processPinsDBGroup([]fftypes.LocallySequenced{ + &fftypes.Pin{ Sequence: 10001, Masked: true, Hash: member2Nonce500, @@ -266,10 +270,7 @@ func TestAggregationMaskedNextSequenceMatch(t *testing.T) { Index: 0, Dispatched: false, }, - }, ba) - assert.NoError(t, err) - - err = ba.RunFinalize(ag.ctx) + }) assert.NoError(t, err) mdi.AssertExpectations(t) @@ -985,7 +986,7 @@ func TestAttemptMessageDispatchTransferMismatch(t *testing.T) { mdi.AssertExpectations(t) } -func TestAttemptMessageDispatchFailValidateBadSystem(t *testing.T) { +func TestDefinitionBroadcastActionReject(t *testing.T) { ag, cancel := newTestAggregator() defer cancel() ba := &batchActions{} @@ -1030,7 +1031,7 @@ func TestAttemptMessageDispatchFailValidateBadSystem(t *testing.T) { } -func TestAttemptMessageDispatchFailValidateSystemFail(t *testing.T) { +func TestDefinitionBroadcastActionRetry(t *testing.T) { ag, cancel := newTestAggregator() defer cancel() @@ -1054,6 +1055,30 @@ func TestAttemptMessageDispatchFailValidateSystemFail(t *testing.T) { } +func TestDefinitionBroadcastActionWait(t *testing.T) { + ag, cancel := newTestAggregator() + defer cancel() + + msh := ag.definitions.(*definitionsmocks.DefinitionHandlers) + msh.On("HandleDefinitionBroadcast", mock.Anything, mock.Anything, mock.Anything).Return(definitions.ActionWait, &definitions.DefinitionBatchActions{}, nil) + + mdm := ag.data.(*datamocks.Manager) + mdm.On("GetMessageData", ag.ctx, mock.Anything, true).Return([]*fftypes.Data{}, true, nil) + + _, err := ag.attemptMessageDispatch(ag.ctx, &fftypes.Message{ + Header: fftypes.MessageHeader{ + Type: fftypes.MessageTypeDefinition, + ID: fftypes.NewUUID(), + Namespace: "any", + }, + Data: fftypes.DataRefs{ + {ID: fftypes.NewUUID()}, + }, + }, nil) + assert.NoError(t, err) + +} + func TestAttemptMessageDispatchEventFail(t *testing.T) { ag, cancel := newTestAggregator() defer cancel() @@ -1313,3 +1338,85 @@ func TestResolveBlobsCopyOk(t *testing.T) { assert.NoError(t, err) assert.True(t, resolved) } + +func TestBatchActions(t *testing.T) { + prefinalizeCalled := false + finalizeCalled := false + + ba := &batchActions{ + PreFinalize: make([]func(ctx context.Context) error, 0), + Finalize: make([]func(ctx context.Context) error, 0), + } + + ba.AddPreFinalize(func(ctx context.Context) error { + prefinalizeCalled = true + return nil + }) + ba.AddFinalize(func(ctx context.Context) error { + finalizeCalled = true + return nil + }) + + err := ba.RunPreFinalize(context.Background()) + assert.NoError(t, err) + assert.True(t, prefinalizeCalled) + + err = ba.RunFinalize(context.Background()) + assert.NoError(t, err) + assert.True(t, finalizeCalled) +} + +func TestBatchActionsError(t *testing.T) { + ba := &batchActions{ + PreFinalize: make([]func(ctx context.Context) error, 0), + Finalize: make([]func(ctx context.Context) error, 0), + } + + ba.AddPreFinalize(func(ctx context.Context) error { + return fmt.Errorf("pop") + }) + ba.AddFinalize(func(ctx context.Context) error { + return fmt.Errorf("pop") + }) + + err := ba.RunPreFinalize(context.Background()) + assert.EqualError(t, err, "pop") + + err = ba.RunFinalize(context.Background()) + assert.EqualError(t, err, "pop") +} + +func TestProcessWithBatchActionsPreFinalizeError(t *testing.T) { + ag, cancel := newTestAggregator() + defer cancel() + + mdi := ag.database.(*databasemocks.Plugin) + rag := mdi.On("RunAsGroup", mock.Anything, mock.Anything).Maybe() + rag.RunFn = func(a mock.Arguments) { + rag.ReturnArguments = mock.Arguments{a[1].(func(context.Context) error)(a[0].(context.Context))} + } + + err := ag.processWithBatchActions(func(ctx context.Context, actions *batchActions) error { + actions.AddPreFinalize(func(ctx context.Context) error { return fmt.Errorf("pop") }) + return nil + }) + assert.EqualError(t, err, "pop") +} + +func TestProcessWithBatchActionsSuccess(t *testing.T) { + ag, cancel := newTestAggregator() + defer cancel() + + mdi := ag.database.(*databasemocks.Plugin) + rag := mdi.On("RunAsGroup", mock.Anything, mock.Anything).Maybe() + rag.RunFn = func(a mock.Arguments) { + rag.ReturnArguments = mock.Arguments{a[1].(func(context.Context) error)(a[0].(context.Context))} + } + + err := ag.processWithBatchActions(func(ctx context.Context, actions *batchActions) error { + actions.AddPreFinalize(func(ctx context.Context) error { return nil }) + actions.AddFinalize(func(ctx context.Context) error { return nil }) + return nil + }) + assert.NoError(t, err) +} From 17b2ffcdfacea4af8d46d273c2c10889d119c4af Mon Sep 17 00:00:00 2001 From: Andrew Richardson Date: Wed, 2 Feb 2022 08:07:32 -0500 Subject: [PATCH 4/4] Add a few additional comments on batch actions Signed-off-by: Andrew Richardson --- internal/definitions/definition_handler.go | 3 ++- internal/events/aggregator.go | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/internal/definitions/definition_handler.go b/internal/definitions/definition_handler.go index f5c096af58..6620176dba 100644 --- a/internal/definitions/definition_handler.go +++ b/internal/definitions/definition_handler.go @@ -56,7 +56,8 @@ const ( ActionWait ) -// DefinitionBatchActions are actions to be taken at the end of a definition batch - should only run if batch is successfully processed +// DefinitionBatchActions are actions to be taken at the end of a definition batch +// See further notes on "batchActions" in the event aggregator type DefinitionBatchActions struct { // PreFinalize may perform a blocking action (possibly to an external connector) that should execute outside database RunAsGroup PreFinalize func(ctx context.Context) error diff --git a/internal/events/aggregator.go b/internal/events/aggregator.go index d7d2790542..a12617b82f 100644 --- a/internal/events/aggregator.go +++ b/internal/events/aggregator.go @@ -145,7 +145,7 @@ type batchActions struct { PreFinalize []func(ctx context.Context) error // Finalize callbacks may perform final, non-idempotent database operations (such as inserting Events) - // - Will execute after all batch messages have been processed + // - Will execute after all batch messages have been processed and any PreFinalize callbacks have succeeded // - Will execute inside database RunAsGroup // - If any Finalize callback errors out, batch will be aborted and retried (small chance of duplicate execution here) Finalize []func(ctx context.Context) error