diff --git a/internal/definitions/definition_handler.go b/internal/definitions/definition_handler.go index 1f0d0430aa..6620176dba 100644 --- a/internal/definitions/definition_handler.go +++ b/internal/definitions/definition_handler.go @@ -35,19 +35,37 @@ import ( type DefinitionHandlers interface { privatemessaging.GroupManager - HandleSystemBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (SystemBroadcastAction, error) + HandleDefinitionBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (DefinitionMessageAction, *DefinitionBatchActions, error) SendReply(ctx context.Context, event *fftypes.Event, reply *fftypes.MessageInOut) } -type SystemBroadcastAction int +// DefinitionMessageAction is the action to be taken on an individual definition message +type DefinitionMessageAction int const ( - ActionReject SystemBroadcastAction = iota + // ActionReject the message was successfully processed, but was malformed/invalid and should be marked as rejected + ActionReject DefinitionMessageAction = iota + + // ActionConfirm the message was valid and should be confirmed ActionConfirm + + // ActionRetry a recoverable error was encountered - batch should be halted and then re-processed from the start ActionRetry + + // ActionWait the message is still awaiting further pieces for aggregation and should be held in pending state ActionWait ) +// DefinitionBatchActions are actions to be taken at the end of a definition batch +// See further notes on "batchActions" in the event aggregator +type DefinitionBatchActions struct { + // PreFinalize may perform a blocking action (possibly to an external connector) that should execute outside database RunAsGroup + PreFinalize func(ctx context.Context) error + + // Finalize may perform final, non-idempotent database operations (such as inserting Events) + Finalize func(ctx context.Context) error +} + type definitionHandlers struct { database database.Plugin exchange dataexchange.Plugin @@ -86,38 +104,29 @@ func (dh *definitionHandlers) EnsureLocalGroup(ctx context.Context, group *fftyp return dh.messaging.EnsureLocalGroup(ctx, group) } -func (dh *definitionHandlers) HandleSystemBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (SystemBroadcastAction, error) { +func (dh *definitionHandlers) HandleDefinitionBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (msgAction DefinitionMessageAction, batchActions *DefinitionBatchActions, err error) { l := log.L(ctx) - l.Infof("Confirming system broadcast '%s' [%s]", msg.Header.Tag, msg.Header.ID) - var valid bool - var err error + l.Infof("Confirming system definition broadcast '%s' [%s]", msg.Header.Tag, msg.Header.ID) switch fftypes.SystemTag(msg.Header.Tag) { case fftypes.SystemTagDefineDatatype: - valid, err = dh.handleDatatypeBroadcast(ctx, msg, data) + msgAction, err = dh.handleDatatypeBroadcast(ctx, msg, data) case fftypes.SystemTagDefineNamespace: - valid, err = dh.handleNamespaceBroadcast(ctx, msg, data) + msgAction, err = dh.handleNamespaceBroadcast(ctx, msg, data) case fftypes.SystemTagDefineOrganization: - valid, err = dh.handleOrganizationBroadcast(ctx, msg, data) + msgAction, err = dh.handleOrganizationBroadcast(ctx, msg, data) case fftypes.SystemTagDefineNode: - valid, err = dh.handleNodeBroadcast(ctx, msg, data) + msgAction, err = dh.handleNodeBroadcast(ctx, msg, data) case fftypes.SystemTagDefinePool: return dh.handleTokenPoolBroadcast(ctx, msg, data) case fftypes.SystemTagDefineFFI: - return dh.handleFFIBroadcast(ctx, msg, data) + msgAction, err = dh.handleFFIBroadcast(ctx, msg, data) case fftypes.SystemTagDefineContractAPI: - return dh.handleContractAPIBroadcast(ctx, msg, data) + msgAction, err = dh.handleContractAPIBroadcast(ctx, msg, data) default: l.Warnf("Unknown SystemTag '%s' for definition ID '%s'", msg.Header.Tag, msg.Header.ID) - return ActionReject, nil - } - switch { - case err != nil: - return ActionRetry, err - case !valid: - return ActionReject, nil - default: - return ActionConfirm, nil + return ActionReject, nil, nil } + return } func (dh *definitionHandlers) getSystemBroadcastPayload(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data, res fftypes.Definition) (valid bool) { diff --git a/internal/definitions/definition_handler_contracts.go b/internal/definitions/definition_handler_contracts.go index 6b8b3fd238..aca2433a7a 100644 --- a/internal/definitions/definition_handler_contracts.go +++ b/internal/definitions/definition_handler_contracts.go @@ -65,7 +65,7 @@ func (dh *definitionHandlers) persistContractAPI(ctx context.Context, api *fftyp return err == nil, err } -func (dh *definitionHandlers) handleFFIBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (actionResult SystemBroadcastAction, err error) { +func (dh *definitionHandlers) handleFFIBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (actionResult DefinitionMessageAction, err error) { l := log.L(ctx) var broadcast fftypes.FFI valid := dh.getSystemBroadcastPayload(ctx, msg, data, &broadcast) @@ -97,7 +97,7 @@ func (dh *definitionHandlers) handleFFIBroadcast(ctx context.Context, msg *fftyp return } -func (dh *definitionHandlers) handleContractAPIBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (actionResult SystemBroadcastAction, err error) { +func (dh *definitionHandlers) handleContractAPIBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (actionResult DefinitionMessageAction, err error) { l := log.L(ctx) var broadcast fftypes.ContractAPI valid := dh.getSystemBroadcastPayload(ctx, msg, data, &broadcast) diff --git a/internal/definitions/definition_handler_contracts_test.go b/internal/definitions/definition_handler_contracts_test.go index d2ecd11d48..c1db340ed8 100644 --- a/internal/definitions/definition_handler_contracts_test.go +++ b/internal/definitions/definition_handler_contracts_test.go @@ -102,7 +102,7 @@ func TestHandleFFIBroadcastOk(t *testing.T) { mbi.On("InsertEvent", mock.Anything, mock.Anything).Return(nil) mcm := dh.contracts.(*contractmocks.Manager) mcm.On("ValidateFFIAndSetPathnames", mock.Anything, mock.Anything).Return(nil) - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Tag: string(fftypes.SystemTagDefineFFI), }, @@ -187,7 +187,7 @@ func TestHandleFFIBroadcastValidateFail(t *testing.T) { } mbi := dh.database.(*databasemocks.Plugin) mbi.On("InsertEvent", mock.Anything, mock.Anything).Return(nil) - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Tag: string(fftypes.SystemTagDefineFFI), }, @@ -209,7 +209,7 @@ func TestHandleFFIBroadcastPersistFail(t *testing.T) { mdi.On("InsertEvent", mock.Anything, mock.Anything).Return(nil) mcm := dh.contracts.(*contractmocks.Manager) mcm.On("ValidateFFIAndSetPathnames", mock.Anything, mock.Anything).Return(nil) - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Tag: string(fftypes.SystemTagDefineFFI), }, @@ -231,7 +231,7 @@ func TestHandleContractAPIBroadcastOk(t *testing.T) { mbi.On("UpsertContractAPI", mock.Anything, mock.Anything, mock.Anything).Return(nil) mbi.On("GetContractAPIByName", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil) mbi.On("InsertEvent", mock.Anything, mock.Anything).Return(nil) - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Tag: string(fftypes.SystemTagDefineContractAPI), }, @@ -283,7 +283,7 @@ func TestHandleContractAPIBroadcastValidateFail(t *testing.T) { } mbi := dh.database.(*databasemocks.Plugin) mbi.On("InsertEvent", mock.Anything, mock.Anything).Return(nil) - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Tag: string(fftypes.SystemTagDefineContractAPI), }, @@ -304,7 +304,7 @@ func TestHandleContractAPIBroadcastPersistFail(t *testing.T) { mbi.On("GetContractAPIByName", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil) mbi.On("UpsertContractAPI", mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("pop")) mbi.On("InsertEvent", mock.Anything, mock.Anything).Return(nil) - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Tag: string(fftypes.SystemTagDefineContractAPI), }, diff --git a/internal/definitions/definition_handler_datatype.go b/internal/definitions/definition_handler_datatype.go index 10e355b16c..ea8de67cac 100644 --- a/internal/definitions/definition_handler_datatype.go +++ b/internal/definitions/definition_handler_datatype.go @@ -1,4 +1,4 @@ -// Copyright © 2021 Kaleido, Inc. +// Copyright © 2022 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -23,42 +23,42 @@ import ( "github.com/hyperledger/firefly/pkg/fftypes" ) -func (dh *definitionHandlers) handleDatatypeBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (valid bool, err error) { +func (dh *definitionHandlers) handleDatatypeBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (DefinitionMessageAction, error) { l := log.L(ctx) var dt fftypes.Datatype - valid = dh.getSystemBroadcastPayload(ctx, msg, data, &dt) + valid := dh.getSystemBroadcastPayload(ctx, msg, data, &dt) if !valid { - return false, nil + return ActionReject, nil } - if err = dt.Validate(ctx, true); err != nil { + if err := dt.Validate(ctx, true); err != nil { l.Warnf("Unable to process datatype broadcast %s - validate failed: %s", msg.Header.ID, err) - return false, nil + return ActionReject, nil } - if err = dh.data.CheckDatatype(ctx, dt.Namespace, &dt); err != nil { + if err := dh.data.CheckDatatype(ctx, dt.Namespace, &dt); err != nil { l.Warnf("Unable to process datatype broadcast %s - schema check: %s", msg.Header.ID, err) - return false, nil + return ActionReject, nil } existing, err := dh.database.GetDatatypeByName(ctx, dt.Namespace, dt.Name, dt.Version) if err != nil { - return false, err // We only return database errors + return ActionRetry, err // We only return database errors } if existing != nil { l.Warnf("Unable to process datatype broadcast %s (%s:%s) - duplicate of %v", msg.Header.ID, dt.Namespace, dt, existing.ID) - return false, nil + return ActionReject, nil } if err = dh.database.UpsertDatatype(ctx, &dt, false); err != nil { - return false, err + return ActionRetry, err } event := fftypes.NewEvent(fftypes.EventTypeDatatypeConfirmed, dt.Namespace, dt.ID) if err = dh.database.InsertEvent(ctx, event); err != nil { - return false, err + return ActionRetry, err } - return true, nil + return ActionConfirm, nil } diff --git a/internal/definitions/definition_handler_datatype_test.go b/internal/definitions/definition_handler_datatype_test.go index 5933e1dabc..d9988deef4 100644 --- a/internal/definitions/definition_handler_datatype_test.go +++ b/internal/definitions/definition_handler_datatype_test.go @@ -53,7 +53,7 @@ func TestHandleDefinitionBroadcastDatatypeOk(t *testing.T) { mbi.On("GetDatatypeByName", mock.Anything, "ns1", "name1", "ver1").Return(nil, nil) mbi.On("UpsertDatatype", mock.Anything, mock.Anything, false).Return(nil) mbi.On("InsertEvent", mock.Anything, mock.Anything).Return(nil) - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Tag: string(fftypes.SystemTagDefineDatatype), }, @@ -89,7 +89,7 @@ func TestHandleDefinitionBroadcastDatatypeEventFail(t *testing.T) { mbi.On("GetDatatypeByName", mock.Anything, "ns1", "name1", "ver1").Return(nil, nil) mbi.On("UpsertDatatype", mock.Anything, mock.Anything, false).Return(nil) mbi.On("InsertEvent", mock.Anything, mock.Anything).Return(fmt.Errorf("pop")) - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Tag: string(fftypes.SystemTagDefineDatatype), }, @@ -118,7 +118,7 @@ func TestHandleDefinitionBroadcastDatatypeMissingID(t *testing.T) { Value: fftypes.JSONAnyPtrBytes(b), } - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Tag: string(fftypes.SystemTagDefineDatatype), }, @@ -147,7 +147,7 @@ func TestHandleDefinitionBroadcastBadSchema(t *testing.T) { mdm := dh.data.(*datamocks.Manager) mdm.On("CheckDatatype", mock.Anything, "ns1", mock.Anything).Return(fmt.Errorf("pop")) - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Tag: string(fftypes.SystemTagDefineDatatype), }, @@ -171,7 +171,7 @@ func TestHandleDefinitionBroadcastMissingData(t *testing.T) { } dt.Hash = dt.Value.Hash() - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Tag: string(fftypes.SystemTagDefineDatatype), }, @@ -202,7 +202,7 @@ func TestHandleDefinitionBroadcastDatatypeLookupFail(t *testing.T) { mdm.On("CheckDatatype", mock.Anything, "ns1", mock.Anything).Return(nil) mbi := dh.database.(*databasemocks.Plugin) mbi.On("GetDatatypeByName", mock.Anything, "ns1", "name1", "ver1").Return(nil, fmt.Errorf("pop")) - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Namespace: fftypes.SystemNamespace, Tag: string(fftypes.SystemTagDefineDatatype), @@ -238,7 +238,7 @@ func TestHandleDefinitionBroadcastUpsertFail(t *testing.T) { mbi := dh.database.(*databasemocks.Plugin) mbi.On("GetDatatypeByName", mock.Anything, "ns1", "name1", "ver1").Return(nil, nil) mbi.On("UpsertDatatype", mock.Anything, mock.Anything, false).Return(fmt.Errorf("pop")) - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Tag: string(fftypes.SystemTagDefineDatatype), }, @@ -272,7 +272,7 @@ func TestHandleDefinitionBroadcastDatatypeDuplicate(t *testing.T) { mdm.On("CheckDatatype", mock.Anything, "ns1", mock.Anything).Return(nil) mbi := dh.database.(*databasemocks.Plugin) mbi.On("GetDatatypeByName", mock.Anything, "ns1", "name1", "ver1").Return(dt, nil) - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Tag: string(fftypes.SystemTagDefineDatatype), }, diff --git a/internal/definitions/definition_handler_namespace.go b/internal/definitions/definition_handler_namespace.go index bb04c74c78..1e65e742c0 100644 --- a/internal/definitions/definition_handler_namespace.go +++ b/internal/definitions/definition_handler_namespace.go @@ -1,4 +1,4 @@ -// Copyright © 2021 Kaleido, Inc. +// Copyright © 2022 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -23,42 +23,42 @@ import ( "github.com/hyperledger/firefly/pkg/fftypes" ) -func (dh *definitionHandlers) handleNamespaceBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (valid bool, err error) { +func (dh *definitionHandlers) handleNamespaceBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (DefinitionMessageAction, error) { l := log.L(ctx) var ns fftypes.Namespace - valid = dh.getSystemBroadcastPayload(ctx, msg, data, &ns) + valid := dh.getSystemBroadcastPayload(ctx, msg, data, &ns) if !valid { - return false, nil + return ActionReject, nil } if err := ns.Validate(ctx, true); err != nil { l.Warnf("Unable to process namespace broadcast %s - validate failed: %s", msg.Header.ID, err) - return false, nil + return ActionReject, nil } existing, err := dh.database.GetNamespace(ctx, ns.Name) if err != nil { - return false, err // We only return database errors + return ActionRetry, err // We only return database errors } if existing != nil { if existing.Type != fftypes.NamespaceTypeLocal { l.Warnf("Unable to process namespace broadcast %s (name=%s) - duplicate of %v", msg.Header.ID, existing.Name, existing.ID) - return false, nil + return ActionReject, nil } // Remove the local definition if err = dh.database.DeleteNamespace(ctx, existing.ID); err != nil { - return false, err + return ActionRetry, err } } if err = dh.database.UpsertNamespace(ctx, &ns, false); err != nil { - return false, err + return ActionRetry, err } event := fftypes.NewEvent(fftypes.EventTypeNamespaceConfirmed, ns.Name, ns.ID) if err = dh.database.InsertEvent(ctx, event); err != nil { - return false, err + return ActionRetry, err } - return true, nil + return ActionConfirm, nil } diff --git a/internal/definitions/definition_handler_namespace_test.go b/internal/definitions/definition_handler_namespace_test.go index a40bee861f..81b041dbe4 100644 --- a/internal/definitions/definition_handler_namespace_test.go +++ b/internal/definitions/definition_handler_namespace_test.go @@ -45,7 +45,7 @@ func TestHandleDefinitionBroadcastNSOk(t *testing.T) { mdi.On("GetNamespace", mock.Anything, "ns1").Return(nil, nil) mdi.On("UpsertNamespace", mock.Anything, mock.Anything, false).Return(nil) mdi.On("InsertEvent", mock.Anything, mock.Anything).Return(nil) - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Tag: string(fftypes.SystemTagDefineNamespace), }, @@ -73,7 +73,7 @@ func TestHandleDefinitionBroadcastNSEventFail(t *testing.T) { mdi.On("GetNamespace", mock.Anything, "ns1").Return(nil, nil) mdi.On("UpsertNamespace", mock.Anything, mock.Anything, false).Return(nil) mdi.On("InsertEvent", mock.Anything, mock.Anything).Return(fmt.Errorf("pop")) - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Tag: string(fftypes.SystemTagDefineNamespace), }, @@ -100,7 +100,7 @@ func TestHandleDefinitionBroadcastNSUpsertFail(t *testing.T) { mdi := dh.database.(*databasemocks.Plugin) mdi.On("GetNamespace", mock.Anything, "ns1").Return(nil, nil) mdi.On("UpsertNamespace", mock.Anything, mock.Anything, false).Return(fmt.Errorf("pop")) - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Tag: string(fftypes.SystemTagDefineNamespace), }, @@ -114,7 +114,7 @@ func TestHandleDefinitionBroadcastNSUpsertFail(t *testing.T) { func TestHandleDefinitionBroadcastNSMissingData(t *testing.T) { dh := newTestDefinitionHandlers(t) - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Tag: string(fftypes.SystemTagDefineNamespace), }, @@ -133,7 +133,7 @@ func TestHandleDefinitionBroadcastNSBadID(t *testing.T) { Value: fftypes.JSONAnyPtrBytes(b), } - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Tag: string(fftypes.SystemTagDefineNamespace), }, @@ -149,7 +149,7 @@ func TestHandleDefinitionBroadcastNSBadData(t *testing.T) { Value: fftypes.JSONAnyPtr(`!{json`), } - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Tag: string(fftypes.SystemTagDefineNamespace), }, @@ -173,7 +173,7 @@ func TestHandleDefinitionBroadcastDuplicate(t *testing.T) { mdi := dh.database.(*databasemocks.Plugin) mdi.On("GetNamespace", mock.Anything, "ns1").Return(ns, nil) - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Tag: string(fftypes.SystemTagDefineNamespace), }, @@ -203,7 +203,7 @@ func TestHandleDefinitionBroadcastDuplicateOverrideLocal(t *testing.T) { mdi.On("DeleteNamespace", mock.Anything, mock.Anything).Return(nil) mdi.On("UpsertNamespace", mock.Anything, mock.Anything, false).Return(nil) mdi.On("InsertEvent", mock.Anything, mock.Anything).Return(nil) - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Tag: string(fftypes.SystemTagDefineNamespace), }, @@ -231,7 +231,7 @@ func TestHandleDefinitionBroadcastDuplicateOverrideLocalFail(t *testing.T) { mdi := dh.database.(*databasemocks.Plugin) mdi.On("GetNamespace", mock.Anything, "ns1").Return(ns, nil) mdi.On("DeleteNamespace", mock.Anything, mock.Anything).Return(fmt.Errorf("pop")) - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Tag: string(fftypes.SystemTagDefineNamespace), }, @@ -257,7 +257,7 @@ func TestHandleDefinitionBroadcastDupCheckFail(t *testing.T) { mdi := dh.database.(*databasemocks.Plugin) mdi.On("GetNamespace", mock.Anything, "ns1").Return(nil, fmt.Errorf("pop")) - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Tag: string(fftypes.SystemTagDefineNamespace), }, diff --git a/internal/definitions/definition_handler_network_node.go b/internal/definitions/definition_handler_network_node.go index d7f761a6a6..c229e8ae4a 100644 --- a/internal/definitions/definition_handler_network_node.go +++ b/internal/definitions/definition_handler_network_node.go @@ -1,4 +1,4 @@ -// Copyright © 2021 Kaleido, Inc. +// Copyright © 2022 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -23,32 +23,32 @@ import ( "github.com/hyperledger/firefly/pkg/fftypes" ) -func (dh *definitionHandlers) handleNodeBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (valid bool, err error) { +func (dh *definitionHandlers) handleNodeBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (DefinitionMessageAction, error) { l := log.L(ctx) var node fftypes.Node - valid = dh.getSystemBroadcastPayload(ctx, msg, data, &node) + valid := dh.getSystemBroadcastPayload(ctx, msg, data, &node) if !valid { - return false, nil + return ActionReject, nil } - if err = node.Validate(ctx, true); err != nil { + if err := node.Validate(ctx, true); err != nil { l.Warnf("Unable to process node broadcast %s - validate failed: %s", msg.Header.ID, err) - return false, nil + return ActionReject, nil } owner, err := dh.database.GetOrganizationByIdentity(ctx, node.Owner) if err != nil { - return false, err // We only return database errors + return ActionRetry, err // We only return database errors } if owner == nil { l.Warnf("Unable to process node broadcast %s - parent identity not found: %s", msg.Header.ID, node.Owner) - return false, nil + return ActionReject, nil } if msg.Header.Key != node.Owner { l.Warnf("Unable to process node broadcast %s - incorrect signature. Expected=%s Received=%s", msg.Header.ID, node.Owner, msg.Header.Author) - return false, nil + return ActionReject, nil } existing, err := dh.database.GetNode(ctx, node.Owner, node.Name) @@ -56,24 +56,24 @@ func (dh *definitionHandlers) handleNodeBroadcast(ctx context.Context, msg *ffty existing, err = dh.database.GetNodeByID(ctx, node.ID) } if err != nil { - return false, err // We only return database errors + return ActionRetry, err // We only return database errors } if existing != nil { if existing.Owner != node.Owner { l.Warnf("Unable to process node broadcast %s - mismatch with existing %v", msg.Header.ID, existing.ID) - return false, nil + return ActionReject, nil } node.ID = nil // we keep the existing ID } if err = dh.database.UpsertNode(ctx, &node, true); err != nil { - return false, err + return ActionRetry, err } // Tell the data exchange about this node. Treat these errors like database errors - and return for retry processing if err = dh.exchange.AddPeer(ctx, node.DX.Peer, node.DX.Endpoint); err != nil { - return false, err + return ActionRetry, err } - return true, nil + return ActionConfirm, nil } diff --git a/internal/definitions/definition_handler_network_node_test.go b/internal/definitions/definition_handler_network_node_test.go index 5c6b98efb2..1843221ae6 100644 --- a/internal/definitions/definition_handler_network_node_test.go +++ b/internal/definitions/definition_handler_network_node_test.go @@ -55,7 +55,7 @@ func TestHandleDefinitionBroadcastNodeOk(t *testing.T) { mdi.On("UpsertNode", mock.Anything, mock.Anything, true).Return(nil) mdx := dh.exchange.(*dataexchangemocks.Plugin) mdx.On("AddPeer", mock.Anything, "peer1", node.DX.Endpoint).Return(nil) - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Namespace: "ns1", Identity: fftypes.Identity{ @@ -95,7 +95,7 @@ func TestHandleDefinitionBroadcastNodeUpsertFail(t *testing.T) { mdi.On("GetNode", mock.Anything, "0x23456", "node1").Return(nil, nil) mdi.On("GetNodeByID", mock.Anything, node.ID).Return(nil, nil) mdi.On("UpsertNode", mock.Anything, mock.Anything, true).Return(fmt.Errorf("pop")) - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Namespace: "ns1", Identity: fftypes.Identity{ @@ -137,7 +137,7 @@ func TestHandleDefinitionBroadcastNodeAddPeerFail(t *testing.T) { mdi.On("UpsertNode", mock.Anything, mock.Anything, true).Return(nil) mdx := dh.exchange.(*dataexchangemocks.Plugin) mdx.On("AddPeer", mock.Anything, "peer1", mock.Anything).Return(fmt.Errorf("pop")) - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Namespace: "ns1", Identity: fftypes.Identity{ @@ -175,7 +175,7 @@ func TestHandleDefinitionBroadcastNodeDupMismatch(t *testing.T) { mdi := dh.database.(*databasemocks.Plugin) mdi.On("GetOrganizationByIdentity", mock.Anything, "0x23456").Return(&fftypes.Organization{ID: fftypes.NewUUID(), Identity: "0x23456"}, nil) mdi.On("GetNode", mock.Anything, "0x23456", "node1").Return(&fftypes.Node{Owner: "0x99999"}, nil) - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Namespace: "ns1", Identity: fftypes.Identity{ @@ -216,7 +216,7 @@ func TestHandleDefinitionBroadcastNodeDupOK(t *testing.T) { mdi.On("UpsertNode", mock.Anything, mock.Anything, true).Return(nil) mdx := dh.exchange.(*dataexchangemocks.Plugin) mdx.On("AddPeer", mock.Anything, "peer1", mock.Anything).Return(nil) - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Namespace: "ns1", Identity: fftypes.Identity{ @@ -254,7 +254,7 @@ func TestHandleDefinitionBroadcastNodeGetFail(t *testing.T) { mdi := dh.database.(*databasemocks.Plugin) mdi.On("GetOrganizationByIdentity", mock.Anything, "0x23456").Return(&fftypes.Organization{ID: fftypes.NewUUID(), Identity: "0x23456"}, nil) mdi.On("GetNode", mock.Anything, "0x23456", "node1").Return(nil, fmt.Errorf("pop")) - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Namespace: "ns1", Identity: fftypes.Identity{ @@ -291,7 +291,7 @@ func TestHandleDefinitionBroadcastNodeBadAuthor(t *testing.T) { mdi := dh.database.(*databasemocks.Plugin) mdi.On("GetOrganizationByIdentity", mock.Anything, "0x23456").Return(&fftypes.Organization{ID: fftypes.NewUUID(), Identity: "0x23456"}, nil) - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Namespace: "ns1", Identity: fftypes.Identity{ @@ -328,7 +328,7 @@ func TestHandleDefinitionBroadcastNodeGetOrgNotFound(t *testing.T) { mdi := dh.database.(*databasemocks.Plugin) mdi.On("GetOrganizationByIdentity", mock.Anything, "0x23456").Return(nil, nil) - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Namespace: "ns1", Identity: fftypes.Identity{ @@ -365,7 +365,7 @@ func TestHandleDefinitionBroadcastNodeGetOrgFail(t *testing.T) { mdi := dh.database.(*databasemocks.Plugin) mdi.On("GetOrganizationByIdentity", mock.Anything, "0x23456").Return(nil, fmt.Errorf("pop")) - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Namespace: "ns1", Identity: fftypes.Identity{ @@ -400,7 +400,7 @@ func TestHandleDefinitionBroadcastNodeValidateFail(t *testing.T) { Value: fftypes.JSONAnyPtrBytes(b), } - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Namespace: "ns1", Identity: fftypes.Identity{ @@ -421,7 +421,7 @@ func TestHandleDefinitionBroadcastNodeUnmarshalFail(t *testing.T) { Value: fftypes.JSONAnyPtr(`!json`), } - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Namespace: "ns1", Identity: fftypes.Identity{ diff --git a/internal/definitions/definition_handler_network_org.go b/internal/definitions/definition_handler_network_org.go index e8a6f68e71..7cbf0ff060 100644 --- a/internal/definitions/definition_handler_network_org.go +++ b/internal/definitions/definition_handler_network_org.go @@ -1,4 +1,4 @@ -// Copyright © 2021 Kaleido, Inc. +// Copyright © 2022 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -23,33 +23,33 @@ import ( "github.com/hyperledger/firefly/pkg/fftypes" ) -func (dh *definitionHandlers) handleOrganizationBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (valid bool, err error) { +func (dh *definitionHandlers) handleOrganizationBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (DefinitionMessageAction, error) { l := log.L(ctx) var org fftypes.Organization - valid = dh.getSystemBroadcastPayload(ctx, msg, data, &org) + valid := dh.getSystemBroadcastPayload(ctx, msg, data, &org) if !valid { - return false, nil + return ActionReject, nil } - if err = org.Validate(ctx, true); err != nil { + if err := org.Validate(ctx, true); err != nil { l.Warnf("Unable to process organization broadcast %s - validate failed: %s", msg.Header.ID, err) - return false, nil + return ActionReject, nil } if org.Parent != "" { parent, err := dh.database.GetOrganizationByIdentity(ctx, org.Parent) if err != nil { - return false, err // We only return database errors + return ActionRetry, err // We only return database errors } if parent == nil { l.Warnf("Unable to process organization broadcast %s - parent identity not found: %s", msg.Header.ID, org.Parent) - return false, nil + return ActionReject, nil } if msg.Header.Key != parent.Identity { l.Warnf("Unable to process organization broadcast %s - incorrect signature. Expected=%s Received=%s", msg.Header.ID, parent.Identity, msg.Header.Author) - return false, nil + return ActionReject, nil } } @@ -61,19 +61,19 @@ func (dh *definitionHandlers) handleOrganizationBroadcast(ctx context.Context, m } } if err != nil { - return false, err // We only return database errors + return ActionRetry, err // We only return database errors } if existing != nil { if existing.Parent != org.Parent { l.Warnf("Unable to process organization broadcast %s - mismatch with existing %v", msg.Header.ID, existing.ID) - return false, nil + return ActionReject, nil } org.ID = nil // we keep the existing ID } if err = dh.database.UpsertOrganization(ctx, &org, true); err != nil { - return false, err + return ActionRetry, err } - return true, nil + return ActionConfirm, nil } diff --git a/internal/definitions/definition_handler_network_org_test.go b/internal/definitions/definition_handler_network_org_test.go index 12b665129d..f60f2efa8f 100644 --- a/internal/definitions/definition_handler_network_org_test.go +++ b/internal/definitions/definition_handler_network_org_test.go @@ -58,7 +58,7 @@ func TestHandleDefinitionBroadcastChildOrgOk(t *testing.T) { mdi.On("GetOrganizationByName", mock.Anything, "org1").Return(nil, nil) mdi.On("GetOrganizationByID", mock.Anything, org.ID).Return(nil, nil) mdi.On("UpsertOrganization", mock.Anything, mock.Anything, true).Return(nil) - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Namespace: "ns1", Identity: fftypes.Identity{ @@ -102,7 +102,7 @@ func TestHandleDefinitionBroadcastChildOrgDupOk(t *testing.T) { mdi.On("GetOrganizationByIdentity", mock.Anything, "0x23456").Return(parentOrg, nil) mdi.On("GetOrganizationByIdentity", mock.Anything, "0x12345").Return(org, nil) mdi.On("UpsertOrganization", mock.Anything, mock.Anything, true).Return(nil) - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Namespace: "ns1", Identity: fftypes.Identity{ @@ -144,7 +144,7 @@ func TestHandleDefinitionBroadcastChildOrgBadKey(t *testing.T) { mdi := dh.database.(*databasemocks.Plugin) mdi.On("GetOrganizationByIdentity", mock.Anything, "0x23456").Return(parentOrg, nil) - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Namespace: "ns1", Identity: fftypes.Identity{ @@ -179,7 +179,7 @@ func TestHandleDefinitionBroadcastOrgDupMismatch(t *testing.T) { mdi := dh.database.(*databasemocks.Plugin) mdi.On("GetOrganizationByIdentity", mock.Anything, "0x12345").Return(&fftypes.Organization{ID: fftypes.NewUUID(), Identity: "0x12345", Parent: "0x9999"}, nil) - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Namespace: "ns1", Identity: fftypes.Identity{ @@ -216,7 +216,7 @@ func TestHandleDefinitionBroadcastOrgUpsertFail(t *testing.T) { mdi.On("GetOrganizationByName", mock.Anything, "org1").Return(nil, nil) mdi.On("GetOrganizationByID", mock.Anything, org.ID).Return(nil, nil) mdi.On("UpsertOrganization", mock.Anything, mock.Anything, true).Return(fmt.Errorf("pop")) - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Namespace: "ns1", Identity: fftypes.Identity{ @@ -249,7 +249,7 @@ func TestHandleDefinitionBroadcastOrgGetOrgFail(t *testing.T) { mdi := dh.database.(*databasemocks.Plugin) mdi.On("GetOrganizationByIdentity", mock.Anything, "0x12345").Return(nil, fmt.Errorf("pop")) - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Namespace: "ns1", Identity: fftypes.Identity{ @@ -282,7 +282,7 @@ func TestHandleDefinitionBroadcastOrgAuthorMismatch(t *testing.T) { mdi := dh.database.(*databasemocks.Plugin) mdi.On("GetOrganizationByIdentity", mock.Anything, "0x12345").Return(&fftypes.Organization{ID: fftypes.NewUUID(), Identity: "0x12345", Parent: "0x9999"}, nil) - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Namespace: "ns1", Identity: fftypes.Identity{ @@ -317,7 +317,7 @@ func TestHandleDefinitionBroadcastGetParentFail(t *testing.T) { mdi := dh.database.(*databasemocks.Plugin) mdi.On("GetOrganizationByIdentity", mock.Anything, "0x23456").Return(nil, fmt.Errorf("pop")) - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Namespace: "ns1", Identity: fftypes.Identity{ @@ -352,7 +352,7 @@ func TestHandleDefinitionBroadcastGetParentNotFound(t *testing.T) { mdi := dh.database.(*databasemocks.Plugin) mdi.On("GetOrganizationByIdentity", mock.Anything, "0x23456").Return(nil, nil) - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Namespace: "ns1", Identity: fftypes.Identity{ @@ -383,7 +383,7 @@ func TestHandleDefinitionBroadcastValidateFail(t *testing.T) { Value: fftypes.JSONAnyPtrBytes(b), } - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Namespace: "ns1", Identity: fftypes.Identity{ @@ -404,7 +404,7 @@ func TestHandleDefinitionBroadcastUnmarshalFail(t *testing.T) { Value: fftypes.JSONAnyPtr(`!json`), } - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Namespace: "ns1", Identity: fftypes.Identity{ diff --git a/internal/definitions/definition_handler_test.go b/internal/definitions/definition_handler_test.go index e6898fcc2e..3b54f27369 100644 --- a/internal/definitions/definition_handler_test.go +++ b/internal/definitions/definition_handler_test.go @@ -43,9 +43,9 @@ func newTestDefinitionHandlers(t *testing.T) *definitionHandlers { return NewDefinitionHandlers(mdi, mdx, mdm, mbm, mpm, mam, mcm).(*definitionHandlers) } -func TestHandleSystemBroadcastUnknown(t *testing.T) { +func TestHandleDefinitionBroadcastUnknown(t *testing.T) { dh := newTestDefinitionHandlers(t) - action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Tag: "unknown", }, diff --git a/internal/definitions/definition_handler_tokenpool.go b/internal/definitions/definition_handler_tokenpool.go index a544b21f10..167632affc 100644 --- a/internal/definitions/definition_handler_tokenpool.go +++ b/internal/definitions/definition_handler_tokenpool.go @@ -74,10 +74,10 @@ func (dh *definitionHandlers) rejectPool(ctx context.Context, pool *fftypes.Toke return err } -func (dh *definitionHandlers) handleTokenPoolBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (SystemBroadcastAction, error) { +func (dh *definitionHandlers) handleTokenPoolBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (DefinitionMessageAction, *DefinitionBatchActions, error) { var announce fftypes.TokenPoolAnnouncement if valid := dh.getSystemBroadcastPayload(ctx, msg, data, &announce); !valid { - return ActionReject, nil + return ActionReject, nil, nil } pool := announce.Pool @@ -85,27 +85,31 @@ func (dh *definitionHandlers) handleTokenPoolBroadcast(ctx context.Context, msg if err := pool.Validate(ctx); err != nil { log.L(ctx).Warnf("Token pool '%s' rejected - validate failed: %s", pool.ID, err) - return ActionReject, dh.rejectPool(ctx, pool) + return ActionReject, nil, dh.rejectPool(ctx, pool) } // Check if pool has already been confirmed on chain (and confirm the message if so) if existingPool, err := dh.database.GetTokenPoolByID(ctx, pool.ID); err != nil { - return ActionRetry, err + return ActionRetry, nil, err } else if existingPool != nil && existingPool.State == fftypes.TokenPoolStateConfirmed { - return ActionConfirm, nil + return ActionConfirm, nil, nil } if valid, err := dh.persistTokenPool(ctx, &announce); err != nil { - return ActionRetry, err + return ActionRetry, nil, err } else if !valid { - return ActionReject, dh.rejectPool(ctx, pool) + return ActionReject, nil, dh.rejectPool(ctx, pool) } - if err := dh.assets.ActivateTokenPool(ctx, pool, announce.Event); err != nil { - log.L(ctx).Errorf("Failed to activate token pool '%s': %s", pool.ID, err) - return ActionRetry, err - } - - // Message will remain unconfirmed until pool confirmation triggers a rewind - return ActionWait, nil + // Message will remain unconfirmed, but plugin will be notified to activate the pool + // This will ultimately trigger a pool creation event and a rewind + return ActionWait, &DefinitionBatchActions{ + PreFinalize: func(ctx context.Context) error { + if err := dh.assets.ActivateTokenPool(ctx, pool, announce.Event); err != nil { + log.L(ctx).Errorf("Failed to activate token pool '%s': %s", pool.ID, err) + return err + } + return nil + }, + }, nil } diff --git a/internal/definitions/definition_handler_tokenpool_test.go b/internal/definitions/definition_handler_tokenpool_test.go index c98c35d3a5..53e8ff1500 100644 --- a/internal/definitions/definition_handler_tokenpool_test.go +++ b/internal/definitions/definition_handler_tokenpool_test.go @@ -66,7 +66,7 @@ func buildPoolDefinitionMessage(announce *fftypes.TokenPoolAnnouncement) (*fftyp return msg, data, nil } -func TestHandleSystemBroadcastTokenPoolActivateOK(t *testing.T) { +func TestHandleDefinitionBroadcastTokenPoolActivateOK(t *testing.T) { sh := newTestDefinitionHandlers(t) announce := newPoolAnnouncement() @@ -86,14 +86,17 @@ func TestHandleSystemBroadcastTokenPoolActivateOK(t *testing.T) { })).Return(nil) mam.On("ActivateTokenPool", context.Background(), mock.AnythingOfType("*fftypes.TokenPool"), mock.AnythingOfType("*fftypes.BlockchainEvent")).Return(nil) - action, err := sh.HandleSystemBroadcast(context.Background(), msg, data) + action, ba, err := sh.HandleDefinitionBroadcast(context.Background(), msg, data) assert.Equal(t, ActionWait, action) assert.NoError(t, err) + err = ba.PreFinalize(context.Background()) + assert.NoError(t, err) + mdi.AssertExpectations(t) } -func TestHandleSystemBroadcastTokenPoolUpdateOpFail(t *testing.T) { +func TestHandleDefinitionBroadcastTokenPoolUpdateOpFail(t *testing.T) { sh := newTestDefinitionHandlers(t) announce := newPoolAnnouncement() @@ -108,14 +111,14 @@ func TestHandleSystemBroadcastTokenPoolUpdateOpFail(t *testing.T) { mdi.On("UpdateOperation", context.Background(), opID, mock.Anything).Return(fmt.Errorf("pop")) mdi.On("GetTokenPoolByID", context.Background(), pool.ID).Return(nil, nil) - action, err := sh.HandleSystemBroadcast(context.Background(), msg, data) + action, _, err := sh.HandleDefinitionBroadcast(context.Background(), msg, data) assert.Equal(t, ActionRetry, action) assert.EqualError(t, err, "pop") mdi.AssertExpectations(t) } -func TestHandleSystemBroadcastTokenPoolGetPoolFail(t *testing.T) { +func TestHandleDefinitionBroadcastTokenPoolGetPoolFail(t *testing.T) { sh := newTestDefinitionHandlers(t) announce := newPoolAnnouncement() @@ -126,14 +129,14 @@ func TestHandleSystemBroadcastTokenPoolGetPoolFail(t *testing.T) { mdi := sh.database.(*databasemocks.Plugin) mdi.On("GetTokenPoolByID", context.Background(), pool.ID).Return(nil, fmt.Errorf("pop")) - action, err := sh.HandleSystemBroadcast(context.Background(), msg, data) + action, _, err := sh.HandleDefinitionBroadcast(context.Background(), msg, data) assert.Equal(t, ActionRetry, action) assert.EqualError(t, err, "pop") mdi.AssertExpectations(t) } -func TestHandleSystemBroadcastTokenPoolExisting(t *testing.T) { +func TestHandleDefinitionBroadcastTokenPoolExisting(t *testing.T) { sh := newTestDefinitionHandlers(t) announce := newPoolAnnouncement() @@ -151,14 +154,14 @@ func TestHandleSystemBroadcastTokenPoolExisting(t *testing.T) { })).Return(nil) mam.On("ActivateTokenPool", context.Background(), mock.AnythingOfType("*fftypes.TokenPool"), mock.AnythingOfType("*fftypes.BlockchainEvent")).Return(nil) - action, err := sh.HandleSystemBroadcast(context.Background(), msg, data) + action, _, err := sh.HandleDefinitionBroadcast(context.Background(), msg, data) assert.Equal(t, ActionWait, action) assert.NoError(t, err) mdi.AssertExpectations(t) } -func TestHandleSystemBroadcastTokenPoolExistingConfirmed(t *testing.T) { +func TestHandleDefinitionBroadcastTokenPoolExistingConfirmed(t *testing.T) { sh := newTestDefinitionHandlers(t) announce := newPoolAnnouncement() @@ -172,14 +175,14 @@ func TestHandleSystemBroadcastTokenPoolExistingConfirmed(t *testing.T) { mdi := sh.database.(*databasemocks.Plugin) mdi.On("GetTokenPoolByID", context.Background(), pool.ID).Return(existing, nil) - action, err := sh.HandleSystemBroadcast(context.Background(), msg, data) + action, _, err := sh.HandleDefinitionBroadcast(context.Background(), msg, data) assert.Equal(t, ActionConfirm, action) assert.NoError(t, err) mdi.AssertExpectations(t) } -func TestHandleSystemBroadcastTokenPoolIDMismatch(t *testing.T) { +func TestHandleDefinitionBroadcastTokenPoolIDMismatch(t *testing.T) { sh := newTestDefinitionHandlers(t) announce := newPoolAnnouncement() @@ -200,14 +203,14 @@ func TestHandleSystemBroadcastTokenPoolIDMismatch(t *testing.T) { return *event.Reference == *pool.ID && event.Namespace == pool.Namespace && event.Type == fftypes.EventTypePoolRejected })).Return(nil) - action, err := sh.HandleSystemBroadcast(context.Background(), msg, data) + action, _, err := sh.HandleDefinitionBroadcast(context.Background(), msg, data) assert.Equal(t, ActionReject, action) assert.NoError(t, err) mdi.AssertExpectations(t) } -func TestHandleSystemBroadcastTokenPoolFailUpsert(t *testing.T) { +func TestHandleDefinitionBroadcastTokenPoolFailUpsert(t *testing.T) { sh := newTestDefinitionHandlers(t) announce := newPoolAnnouncement() @@ -225,14 +228,14 @@ func TestHandleSystemBroadcastTokenPoolFailUpsert(t *testing.T) { return *p.ID == *pool.ID && p.Message == msg.Header.ID })).Return(fmt.Errorf("pop")) - action, err := sh.HandleSystemBroadcast(context.Background(), msg, data) + action, _, err := sh.HandleDefinitionBroadcast(context.Background(), msg, data) assert.Equal(t, ActionRetry, action) assert.EqualError(t, err, "pop") mdi.AssertExpectations(t) } -func TestHandleSystemBroadcastTokenPoolOpsFail(t *testing.T) { +func TestHandleDefinitionBroadcastTokenPoolOpsFail(t *testing.T) { sh := newTestDefinitionHandlers(t) announce := newPoolAnnouncement() @@ -244,14 +247,14 @@ func TestHandleSystemBroadcastTokenPoolOpsFail(t *testing.T) { mdi.On("GetOperations", context.Background(), mock.Anything).Return(nil, nil, fmt.Errorf("pop")) mdi.On("GetTokenPoolByID", context.Background(), pool.ID).Return(nil, nil) - action, err := sh.HandleSystemBroadcast(context.Background(), msg, data) + action, _, err := sh.HandleDefinitionBroadcast(context.Background(), msg, data) assert.Equal(t, ActionRetry, action) assert.EqualError(t, err, "pop") mdi.AssertExpectations(t) } -func TestHandleSystemBroadcastTokenPoolActivateFail(t *testing.T) { +func TestHandleDefinitionBroadcastTokenPoolActivateFail(t *testing.T) { sh := newTestDefinitionHandlers(t) announce := newPoolAnnouncement() @@ -271,14 +274,17 @@ func TestHandleSystemBroadcastTokenPoolActivateFail(t *testing.T) { })).Return(nil) mam.On("ActivateTokenPool", context.Background(), mock.AnythingOfType("*fftypes.TokenPool"), mock.AnythingOfType("*fftypes.BlockchainEvent")).Return(fmt.Errorf("pop")) - action, err := sh.HandleSystemBroadcast(context.Background(), msg, data) - assert.Equal(t, ActionRetry, action) + action, batchAction, err := sh.HandleDefinitionBroadcast(context.Background(), msg, data) + assert.Equal(t, ActionWait, action) + assert.NoError(t, err) + + err = batchAction.PreFinalize(context.Background()) assert.EqualError(t, err, "pop") mdi.AssertExpectations(t) } -func TestHandleSystemBroadcastTokenPoolValidateFail(t *testing.T) { +func TestHandleDefinitionBroadcastTokenPoolValidateFail(t *testing.T) { sh := newTestDefinitionHandlers(t) announce := &fftypes.TokenPoolAnnouncement{ @@ -293,14 +299,14 @@ func TestHandleSystemBroadcastTokenPoolValidateFail(t *testing.T) { return event.Type == fftypes.EventTypePoolRejected })).Return(nil) - action, err := sh.HandleSystemBroadcast(context.Background(), msg, data) + action, _, err := sh.HandleDefinitionBroadcast(context.Background(), msg, data) assert.Equal(t, ActionReject, action) assert.NoError(t, err) mdi.AssertExpectations(t) } -func TestHandleSystemBroadcastTokenPoolBadMessage(t *testing.T) { +func TestHandleDefinitionBroadcastTokenPoolBadMessage(t *testing.T) { sh := newTestDefinitionHandlers(t) msg := &fftypes.Message{ @@ -310,7 +316,7 @@ func TestHandleSystemBroadcastTokenPoolBadMessage(t *testing.T) { }, } - action, err := sh.HandleSystemBroadcast(context.Background(), msg, nil) + action, _, err := sh.HandleDefinitionBroadcast(context.Background(), msg, nil) assert.Equal(t, ActionReject, action) assert.NoError(t, err) } diff --git a/internal/events/aggregator.go b/internal/events/aggregator.go index 9c6b5bcb2a..a12617b82f 100644 --- a/internal/events/aggregator.go +++ b/internal/events/aggregator.go @@ -1,4 +1,4 @@ -// Copyright © 2021 Kaleido, Inc. +// Copyright © 2022 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -136,16 +136,91 @@ func (ag *aggregator) rewindOffchainBatches() (rewind bool, offset int64) { return rewind, offset } +// batchActions are synchronous actions to be performed while processing system messages, but which must happen after reading the whole batch +type batchActions struct { + // PreFinalize callbacks may perform blocking actions (possibly to an external connector) + // - Will execute after all batch messages have been processed + // - Will execute outside database RunAsGroup + // - If any PreFinalize callback errors out, batch will be aborted and retried + PreFinalize []func(ctx context.Context) error + + // Finalize callbacks may perform final, non-idempotent database operations (such as inserting Events) + // - Will execute after all batch messages have been processed and any PreFinalize callbacks have succeeded + // - Will execute inside database RunAsGroup + // - If any Finalize callback errors out, batch will be aborted and retried (small chance of duplicate execution here) + Finalize []func(ctx context.Context) error +} + +func (ba *batchActions) AddPreFinalize(action func(ctx context.Context) error) { + if action != nil { + ba.PreFinalize = append(ba.PreFinalize, action) + } +} + +func (ba *batchActions) AddFinalize(action func(ctx context.Context) error) { + if action != nil { + ba.Finalize = append(ba.Finalize, action) + } +} + +func (ba *batchActions) RunPreFinalize(ctx context.Context) error { + for _, action := range ba.PreFinalize { + if err := action(ctx); err != nil { + return err + } + } + return nil +} + +func (ba *batchActions) RunFinalize(ctx context.Context) error { + for _, action := range ba.Finalize { + if err := action(ctx); err != nil { + return err + } + } + return nil +} + +func (ag *aggregator) processWithBatchActions(callback func(ctx context.Context, actions *batchActions) error) error { + actions := &batchActions{ + PreFinalize: make([]func(ctx context.Context) error, 0), + Finalize: make([]func(ctx context.Context) error, 0), + } + + err := ag.database.RunAsGroup(ag.ctx, func(ctx context.Context) (err error) { + if err := callback(ctx, actions); err != nil { + return err + } + if len(actions.PreFinalize) == 0 { + return actions.RunFinalize(ctx) + } + return nil + }) + if err != nil { + return err + } + + if len(actions.PreFinalize) == 0 { + return err + } + + if err := actions.RunPreFinalize(ag.ctx); err != nil { + return err + } + return ag.database.RunAsGroup(ag.ctx, func(ctx context.Context) error { + return actions.RunFinalize(ctx) + }) +} + func (ag *aggregator) processPinsDBGroup(items []fftypes.LocallySequenced) (repoll bool, err error) { pins := make([]*fftypes.Pin, len(items)) for i, item := range items { pins[i] = item.(*fftypes.Pin) } - err = ag.database.RunAsGroup(ag.ctx, func(ctx context.Context) (err error) { - err = ag.processPins(ctx, pins) - return err + + return false, ag.processWithBatchActions(func(ctx context.Context, actions *batchActions) error { + return ag.processPins(ctx, pins, actions) }) - return false, err } func (ag *aggregator) getPins(ctx context.Context, filter database.Filter) ([]fftypes.LocallySequenced, error) { @@ -157,7 +232,7 @@ func (ag *aggregator) getPins(ctx context.Context, filter database.Filter) ([]ff return ls, err } -func (ag *aggregator) processPins(ctx context.Context, pins []*fftypes.Pin) (err error) { +func (ag *aggregator) processPins(ctx context.Context, pins []*fftypes.Pin, actions *batchActions) (err error) { l := log.L(ctx) // Keep a batch cache for this list of pins @@ -204,7 +279,7 @@ func (ag *aggregator) processPins(ctx context.Context, pins []*fftypes.Pin) (err dupMsgCheck[*msg.Header.ID] = true // Attempt to process the message (only returns errors for database persistence issues) - if err = ag.processMessage(ctx, batch, pin.Masked, pin.Sequence, msg); err != nil { + if err = ag.processMessage(ctx, batch, pin.Masked, pin.Sequence, msg, actions); err != nil { return err } } @@ -224,7 +299,7 @@ func (ag *aggregator) calcHash(topic string, groupID *fftypes.Bytes32, identity return fftypes.HashResult(h) } -func (ag *aggregator) processMessage(ctx context.Context, batch *fftypes.Batch, masked bool, pinnedSequence int64, msg *fftypes.Message) (err error) { +func (ag *aggregator) processMessage(ctx context.Context, batch *fftypes.Batch, masked bool, pinnedSequence int64, msg *fftypes.Message, actions *batchActions) (err error) { l := log.L(ctx) // Check if it's ready to be processed @@ -273,28 +348,32 @@ func (ag *aggregator) processMessage(ctx context.Context, batch *fftypes.Batch, } } - dispatched, err := ag.attemptMessageDispatch(ctx, msg) + dispatched, err := ag.attemptMessageDispatch(ctx, msg, actions) if err != nil || !dispatched { return err } - // Move the nextPin forwards to the next sequence for this sender, on all - // topics associated with the message - if masked { - for i, nextPin := range nextPins { - nextPin.Nonce++ - nextPin.Hash = ag.calcHash(msg.Header.Topics[i], msg.Header.Group, nextPin.Identity, nextPin.Nonce) - if err = ag.database.UpdateNextPin(ctx, nextPin.Sequence, database.NextPinQueryFactory.NewUpdate(ctx). - Set("nonce", nextPin.Nonce). - Set("hash", nextPin.Hash), - ); err != nil { - return err + actions.AddFinalize(func(ctx context.Context) error { + // Move the nextPin forwards to the next sequence for this sender, on all + // topics associated with the message + if masked { + for i, nextPin := range nextPins { + nextPin.Nonce++ + nextPin.Hash = ag.calcHash(msg.Header.Topics[i], msg.Header.Group, nextPin.Identity, nextPin.Nonce) + if err = ag.database.UpdateNextPin(ctx, nextPin.Sequence, database.NextPinQueryFactory.NewUpdate(ctx). + Set("nonce", nextPin.Nonce). + Set("hash", nextPin.Hash), + ); err != nil { + return err + } } } - } - // Mark the pin dispatched - return ag.database.SetPinDispatched(ctx, pinnedSequence) + // Mark the pin dispatched + return ag.database.SetPinDispatched(ctx, pinnedSequence) + }) + + return nil } func (ag *aggregator) checkMaskedContextReady(ctx context.Context, msg *fftypes.Message, topic string, pinnedSequence int64, pin *fftypes.Bytes32) (*fftypes.NextPin, error) { @@ -399,7 +478,7 @@ func (ag *aggregator) attemptContextInit(ctx context.Context, msg *fftypes.Messa return nextPin, err } -func (ag *aggregator) attemptMessageDispatch(ctx context.Context, msg *fftypes.Message) (bool, error) { +func (ag *aggregator) attemptMessageDispatch(ctx context.Context, msg *fftypes.Message, actions *batchActions) (bool, error) { // If we don't find all the data, then we don't dispatch data, foundAll, err := ag.data.GetMessageData(ctx, msg, true) @@ -427,19 +506,24 @@ func (ag *aggregator) attemptMessageDispatch(ctx context.Context, msg *fftypes.M } } - // We're going to dispatch it at this point, but we need to validate the data first + // Validate the message data valid := true - eventType := fftypes.EventTypeMessageConfirmed switch { case msg.Header.Type == fftypes.MessageTypeDefinition: // We handle definition events in-line on the aggregator, as it would be confusing for apps to be // dispatched subsequent events before we have processed the definition events they depend on. - var action definitions.SystemBroadcastAction - action, err = ag.definitions.HandleSystemBroadcast(ctx, msg, data) - if action == definitions.ActionRetry || action == definitions.ActionWait { + msgAction, batchAction, err := ag.definitions.HandleDefinitionBroadcast(ctx, msg, data) + if msgAction == definitions.ActionRetry { return false, err } - valid = action == definitions.ActionConfirm + if batchAction != nil { + actions.AddPreFinalize(batchAction.PreFinalize) + actions.AddFinalize(batchAction.Finalize) + } + if msgAction == definitions.ActionWait { + return false, nil + } + valid = msgAction == definitions.ActionConfirm case msg.Header.Type == fftypes.MessageTypeGroupInit: // Already handled as part of resolving the context - do nothing. @@ -451,30 +535,30 @@ func (ag *aggregator) attemptMessageDispatch(ctx context.Context, msg *fftypes.M } } - // This message is now confirmed state := fftypes.MessageStateConfirmed + eventType := fftypes.EventTypeMessageConfirmed if !valid { state = fftypes.MessageStateRejected - } - setConfirmed := database.MessageQueryFactory.NewUpdate(ctx). - Set("confirmed", fftypes.Now()). // the timestamp of the aggregator provides ordering - Set("state", state) // mark if the message was confirmed or rejected - err = ag.database.UpdateMessage(ctx, msg.Header.ID, setConfirmed) - if err != nil { - return false, err - } - if !valid { - // An message with invalid (but complete) data is still considered dispatched. - // However, we drive a different event to the applications. eventType = fftypes.EventTypeMessageRejected } - // Generate the appropriate event - event := fftypes.NewEvent(eventType, msg.Header.Namespace, msg.Header.ID) - if err = ag.database.InsertEvent(ctx, event); err != nil { - return false, err - } - log.L(ctx).Infof("Emitting %s for message %s:%s", eventType, msg.Header.Namespace, msg.Header.ID) + actions.AddFinalize(func(ctx context.Context) error { + // This message is now confirmed + setConfirmed := database.MessageQueryFactory.NewUpdate(ctx). + Set("confirmed", fftypes.Now()). // the timestamp of the aggregator provides ordering + Set("state", state) // mark if the message was confirmed or rejected + if err = ag.database.UpdateMessage(ctx, msg.Header.ID, setConfirmed); err != nil { + return err + } + + // Generate the appropriate event + event := fftypes.NewEvent(eventType, msg.Header.Namespace, msg.Header.ID) + if err = ag.database.InsertEvent(ctx, event); err != nil { + return err + } + log.L(ctx).Infof("Emitting %s for message %s:%s", eventType, msg.Header.Namespace, msg.Header.ID) + return nil + }) return true, nil } diff --git a/internal/events/aggregator_test.go b/internal/events/aggregator_test.go index 623b75a1a5..461ae1e24d 100644 --- a/internal/events/aggregator_test.go +++ b/internal/events/aggregator_test.go @@ -46,6 +46,7 @@ func TestAggregationMaskedZeroNonceMatch(t *testing.T) { ag, cancel := newTestAggregator() defer cancel() + ba := &batchActions{} // Generate some pin data member1org := "org1" @@ -166,7 +167,10 @@ func TestAggregationMaskedZeroNonceMatch(t *testing.T) { Index: 0, Dispatched: false, }, - }) + }, ba) + assert.NoError(t, err) + + err = ba.RunFinalize(ag.ctx) assert.NoError(t, err) mdi.AssertExpectations(t) @@ -197,6 +201,11 @@ func TestAggregationMaskedNextSequenceMatch(t *testing.T) { mdi := ag.database.(*databasemocks.Plugin) mdm := ag.data.(*datamocks.Manager) + rag := mdi.On("RunAsGroup", mock.Anything, mock.Anything).Maybe() + rag.RunFn = func(a mock.Arguments) { + rag.ReturnArguments = mock.Arguments{a[1].(func(context.Context) error)(a[0].(context.Context))} + } + // Get the batch mdi.On("GetBatchByID", ag.ctx, batchID).Return(&fftypes.Batch{ ID: batchID, @@ -252,8 +261,8 @@ func TestAggregationMaskedNextSequenceMatch(t *testing.T) { // Confirm the offset mdi.On("UpdateOffset", ag.ctx, mock.Anything, mock.Anything, mock.Anything).Return(nil) - err := ag.processPins(ag.ctx, []*fftypes.Pin{ - { + _, err := ag.processPinsDBGroup([]fftypes.LocallySequenced{ + &fftypes.Pin{ Sequence: 10001, Masked: true, Hash: member2Nonce500, @@ -272,6 +281,7 @@ func TestAggregationBroadcast(t *testing.T) { ag, cancel := newTestAggregator() defer cancel() + ba := &batchActions{} // Generate some pin data topic := "some-topic" @@ -331,7 +341,10 @@ func TestAggregationBroadcast(t *testing.T) { Index: 0, Dispatched: false, }, - }) + }, ba) + assert.NoError(t, err) + + err = ba.RunFinalize(ag.ctx) assert.NoError(t, err) mdi.AssertExpectations(t) @@ -393,6 +406,7 @@ func TestGetPins(t *testing.T) { func TestProcessPinsMissingBatch(t *testing.T) { ag, cancel := newTestAggregator() defer cancel() + ba := &batchActions{} mdi := ag.database.(*databasemocks.Plugin) mdi.On("GetBatchByID", ag.ctx, mock.Anything).Return(nil, nil) @@ -400,7 +414,7 @@ func TestProcessPinsMissingBatch(t *testing.T) { err := ag.processPins(ag.ctx, []*fftypes.Pin{ {Sequence: 12345, Batch: fftypes.NewUUID()}, - }) + }, ba) assert.NoError(t, err) } @@ -408,6 +422,7 @@ func TestProcessPinsMissingBatch(t *testing.T) { func TestProcessPinsMissingNoMsg(t *testing.T) { ag, cancel := newTestAggregator() defer cancel() + ba := &batchActions{} mdi := ag.database.(*databasemocks.Plugin) mdi.On("GetBatchByID", ag.ctx, mock.Anything).Return(&fftypes.Batch{ @@ -422,7 +437,7 @@ func TestProcessPinsMissingNoMsg(t *testing.T) { err := ag.processPins(ag.ctx, []*fftypes.Pin{ {Sequence: 12345, Batch: fftypes.NewUUID(), Index: 25}, - }) + }, ba) assert.NoError(t, err) mdi.AssertExpectations(t) @@ -431,6 +446,7 @@ func TestProcessPinsMissingNoMsg(t *testing.T) { func TestProcessPinsBadMsgHeader(t *testing.T) { ag, cancel := newTestAggregator() defer cancel() + ba := &batchActions{} mdi := ag.database.(*databasemocks.Plugin) mdi.On("GetBatchByID", ag.ctx, mock.Anything).Return(&fftypes.Batch{ @@ -448,7 +464,7 @@ func TestProcessPinsBadMsgHeader(t *testing.T) { err := ag.processPins(ag.ctx, []*fftypes.Pin{ {Sequence: 12345, Batch: fftypes.NewUUID(), Index: 0}, - }) + }, ba) assert.NoError(t, err) mdi.AssertExpectations(t) @@ -457,6 +473,7 @@ func TestProcessPinsBadMsgHeader(t *testing.T) { func TestProcessSkipDupMsg(t *testing.T) { ag, cancel := newTestAggregator() defer cancel() + ba := &batchActions{} batchID := fftypes.NewUUID() mdi := ag.database.(*databasemocks.Plugin) @@ -479,7 +496,7 @@ func TestProcessSkipDupMsg(t *testing.T) { err := ag.processPins(ag.ctx, []*fftypes.Pin{ {Sequence: 12345, Batch: batchID, Index: 0, Hash: fftypes.NewRandB32()}, {Sequence: 12345, Batch: batchID, Index: 1, Hash: fftypes.NewRandB32()}, - }) + }, ba) assert.NoError(t, err) mdi.AssertExpectations(t) @@ -488,6 +505,7 @@ func TestProcessSkipDupMsg(t *testing.T) { func TestProcessMsgFailGetPins(t *testing.T) { ag, cancel := newTestAggregator() defer cancel() + ba := &batchActions{} batchID := fftypes.NewUUID() mdi := ag.database.(*databasemocks.Plugin) @@ -506,7 +524,7 @@ func TestProcessMsgFailGetPins(t *testing.T) { err := ag.processPins(ag.ctx, []*fftypes.Pin{ {Sequence: 12345, Batch: batchID, Index: 0, Hash: fftypes.NewRandB32()}, - }) + }, ba) assert.EqualError(t, err, "pop") mdi.AssertExpectations(t) } @@ -515,7 +533,7 @@ func TestProcessMsgFailMissingGroup(t *testing.T) { ag, cancel := newTestAggregator() defer cancel() - err := ag.processMessage(ag.ctx, &fftypes.Batch{}, true, 12345, &fftypes.Message{}) + err := ag.processMessage(ag.ctx, &fftypes.Batch{}, true, 12345, &fftypes.Message{}, nil) assert.NoError(t, err) } @@ -531,7 +549,7 @@ func TestProcessMsgFailBadPin(t *testing.T) { Topics: fftypes.FFStringArray{"topic1"}, }, Pins: fftypes.FFStringArray{"!Wrong"}, - }) + }, nil) assert.NoError(t, err) } @@ -550,7 +568,7 @@ func TestProcessMsgFailGetNextPins(t *testing.T) { Topics: fftypes.FFStringArray{"topic1"}, }, Pins: fftypes.FFStringArray{fftypes.NewRandB32().String()}, - }) + }, nil) assert.EqualError(t, err, "pop") } @@ -570,7 +588,7 @@ func TestProcessMsgFailDispatch(t *testing.T) { Topics: fftypes.FFStringArray{"topic1"}, }, Pins: fftypes.FFStringArray{fftypes.NewRandB32().String()}, - }) + }, nil) assert.EqualError(t, err, "pop") } @@ -578,6 +596,7 @@ func TestProcessMsgFailDispatch(t *testing.T) { func TestProcessMsgFailPinUpdate(t *testing.T) { ag, cancel := newTestAggregator() defer cancel() + ba := &batchActions{} pin := fftypes.NewRandB32() mdi := ag.database.(*databasemocks.Plugin) @@ -598,7 +617,10 @@ func TestProcessMsgFailPinUpdate(t *testing.T) { Topics: fftypes.FFStringArray{"topic1"}, }, Pins: fftypes.FFStringArray{pin.String()}, - }) + }, ba) + assert.NoError(t, err) + + err = ba.RunFinalize(ag.ctx) assert.EqualError(t, err, "pop") } @@ -833,7 +855,7 @@ func TestAttemptMessageDispatchFailGetData(t *testing.T) { _, err := ag.attemptMessageDispatch(ag.ctx, &fftypes.Message{ Header: fftypes.MessageHeader{ID: fftypes.NewUUID()}, - }) + }, nil) assert.EqualError(t, err, "pop") } @@ -851,7 +873,7 @@ func TestAttemptMessageDispatchFailValidateData(t *testing.T) { Data: fftypes.DataRefs{ {ID: fftypes.NewUUID()}, }, - }) + }, nil) assert.EqualError(t, err, "pop") } @@ -877,7 +899,7 @@ func TestAttemptMessageDispatchMissingBlobs(t *testing.T) { dispatched, err := ag.attemptMessageDispatch(ag.ctx, &fftypes.Message{ Header: fftypes.MessageHeader{ID: fftypes.NewUUID()}, - }) + }, nil) assert.NoError(t, err) assert.False(t, dispatched) @@ -900,7 +922,7 @@ func TestAttemptMessageDispatchMissingTransfers(t *testing.T) { }, } msg.Hash = msg.Header.Hash() - dispatched, err := ag.attemptMessageDispatch(ag.ctx, msg) + dispatched, err := ag.attemptMessageDispatch(ag.ctx, msg, nil) assert.NoError(t, err) assert.False(t, dispatched) @@ -925,7 +947,7 @@ func TestAttemptMessageDispatchGetTransfersFail(t *testing.T) { }, } msg.Hash = msg.Header.Hash() - dispatched, err := ag.attemptMessageDispatch(ag.ctx, msg) + dispatched, err := ag.attemptMessageDispatch(ag.ctx, msg, nil) assert.EqualError(t, err, "pop") assert.False(t, dispatched) @@ -956,7 +978,7 @@ func TestAttemptMessageDispatchTransferMismatch(t *testing.T) { mdi := ag.database.(*databasemocks.Plugin) mdi.On("GetTokenTransfers", ag.ctx, mock.Anything).Return(transfers, nil, nil) - dispatched, err := ag.attemptMessageDispatch(ag.ctx, msg) + dispatched, err := ag.attemptMessageDispatch(ag.ctx, msg, nil) assert.NoError(t, err) assert.False(t, dispatched) @@ -964,12 +986,13 @@ func TestAttemptMessageDispatchTransferMismatch(t *testing.T) { mdi.AssertExpectations(t) } -func TestAttemptMessageDispatchFailValidateBadSystem(t *testing.T) { +func TestDefinitionBroadcastActionReject(t *testing.T) { ag, cancel := newTestAggregator() defer cancel() + ba := &batchActions{} msh := ag.definitions.(*definitionsmocks.DefinitionHandlers) - msh.On("HandleSystemBroadcast", mock.Anything, mock.Anything, mock.Anything).Return(definitions.ActionReject, nil) + msh.On("HandleDefinitionBroadcast", mock.Anything, mock.Anything, mock.Anything).Return(definitions.ActionReject, &definitions.DefinitionBatchActions{}, nil) mdm := ag.data.(*datamocks.Manager) mdm.On("GetMessageData", ag.ctx, mock.Anything, true).Return([]*fftypes.Data{}, true, nil) @@ -1003,17 +1026,17 @@ func TestAttemptMessageDispatchFailValidateBadSystem(t *testing.T) { Data: fftypes.DataRefs{ {ID: fftypes.NewUUID()}, }, - }) + }, ba) assert.NoError(t, err) } -func TestAttemptMessageDispatchFailValidateSystemFail(t *testing.T) { +func TestDefinitionBroadcastActionRetry(t *testing.T) { ag, cancel := newTestAggregator() defer cancel() msh := ag.definitions.(*definitionsmocks.DefinitionHandlers) - msh.On("HandleSystemBroadcast", mock.Anything, mock.Anything, mock.Anything).Return(definitions.ActionRetry, fmt.Errorf("pop")) + msh.On("HandleDefinitionBroadcast", mock.Anything, mock.Anything, mock.Anything).Return(definitions.ActionRetry, &definitions.DefinitionBatchActions{}, fmt.Errorf("pop")) mdm := ag.data.(*datamocks.Manager) mdm.On("GetMessageData", ag.ctx, mock.Anything, true).Return([]*fftypes.Data{}, true, nil) @@ -1027,14 +1050,39 @@ func TestAttemptMessageDispatchFailValidateSystemFail(t *testing.T) { Data: fftypes.DataRefs{ {ID: fftypes.NewUUID()}, }, - }) + }, nil) assert.EqualError(t, err, "pop") } +func TestDefinitionBroadcastActionWait(t *testing.T) { + ag, cancel := newTestAggregator() + defer cancel() + + msh := ag.definitions.(*definitionsmocks.DefinitionHandlers) + msh.On("HandleDefinitionBroadcast", mock.Anything, mock.Anything, mock.Anything).Return(definitions.ActionWait, &definitions.DefinitionBatchActions{}, nil) + + mdm := ag.data.(*datamocks.Manager) + mdm.On("GetMessageData", ag.ctx, mock.Anything, true).Return([]*fftypes.Data{}, true, nil) + + _, err := ag.attemptMessageDispatch(ag.ctx, &fftypes.Message{ + Header: fftypes.MessageHeader{ + Type: fftypes.MessageTypeDefinition, + ID: fftypes.NewUUID(), + Namespace: "any", + }, + Data: fftypes.DataRefs{ + {ID: fftypes.NewUUID()}, + }, + }, nil) + assert.NoError(t, err) + +} + func TestAttemptMessageDispatchEventFail(t *testing.T) { ag, cancel := newTestAggregator() defer cancel() + ba := &batchActions{} mdi := ag.database.(*databasemocks.Plugin) mdm := ag.data.(*datamocks.Manager) @@ -1045,7 +1093,10 @@ func TestAttemptMessageDispatchEventFail(t *testing.T) { _, err := ag.attemptMessageDispatch(ag.ctx, &fftypes.Message{ Header: fftypes.MessageHeader{ID: fftypes.NewUUID()}, - }) + }, ba) + assert.NoError(t, err) + + err = ba.RunFinalize(ag.ctx) assert.EqualError(t, err, "pop") } @@ -1053,6 +1104,7 @@ func TestAttemptMessageDispatchEventFail(t *testing.T) { func TestAttemptMessageDispatchGroupInit(t *testing.T) { ag, cancel := newTestAggregator() defer cancel() + ba := &batchActions{} mdi := ag.database.(*databasemocks.Plugin) mdm := ag.data.(*datamocks.Manager) @@ -1066,7 +1118,7 @@ func TestAttemptMessageDispatchGroupInit(t *testing.T) { ID: fftypes.NewUUID(), Type: fftypes.MessageTypeGroupInit, }, - }) + }, ba) assert.NoError(t, err) } @@ -1074,6 +1126,7 @@ func TestAttemptMessageDispatchGroupInit(t *testing.T) { func TestAttemptMessageUpdateMessageFail(t *testing.T) { ag, cancel := newTestAggregator() defer cancel() + ba := &batchActions{} mdi := ag.database.(*databasemocks.Plugin) mdm := ag.data.(*datamocks.Manager) @@ -1083,7 +1136,10 @@ func TestAttemptMessageUpdateMessageFail(t *testing.T) { _, err := ag.attemptMessageDispatch(ag.ctx, &fftypes.Message{ Header: fftypes.MessageHeader{ID: fftypes.NewUUID()}, - }) + }, ba) + assert.NoError(t, err) + + err = ba.RunFinalize(ag.ctx) assert.EqualError(t, err, "pop") } @@ -1282,3 +1338,85 @@ func TestResolveBlobsCopyOk(t *testing.T) { assert.NoError(t, err) assert.True(t, resolved) } + +func TestBatchActions(t *testing.T) { + prefinalizeCalled := false + finalizeCalled := false + + ba := &batchActions{ + PreFinalize: make([]func(ctx context.Context) error, 0), + Finalize: make([]func(ctx context.Context) error, 0), + } + + ba.AddPreFinalize(func(ctx context.Context) error { + prefinalizeCalled = true + return nil + }) + ba.AddFinalize(func(ctx context.Context) error { + finalizeCalled = true + return nil + }) + + err := ba.RunPreFinalize(context.Background()) + assert.NoError(t, err) + assert.True(t, prefinalizeCalled) + + err = ba.RunFinalize(context.Background()) + assert.NoError(t, err) + assert.True(t, finalizeCalled) +} + +func TestBatchActionsError(t *testing.T) { + ba := &batchActions{ + PreFinalize: make([]func(ctx context.Context) error, 0), + Finalize: make([]func(ctx context.Context) error, 0), + } + + ba.AddPreFinalize(func(ctx context.Context) error { + return fmt.Errorf("pop") + }) + ba.AddFinalize(func(ctx context.Context) error { + return fmt.Errorf("pop") + }) + + err := ba.RunPreFinalize(context.Background()) + assert.EqualError(t, err, "pop") + + err = ba.RunFinalize(context.Background()) + assert.EqualError(t, err, "pop") +} + +func TestProcessWithBatchActionsPreFinalizeError(t *testing.T) { + ag, cancel := newTestAggregator() + defer cancel() + + mdi := ag.database.(*databasemocks.Plugin) + rag := mdi.On("RunAsGroup", mock.Anything, mock.Anything).Maybe() + rag.RunFn = func(a mock.Arguments) { + rag.ReturnArguments = mock.Arguments{a[1].(func(context.Context) error)(a[0].(context.Context))} + } + + err := ag.processWithBatchActions(func(ctx context.Context, actions *batchActions) error { + actions.AddPreFinalize(func(ctx context.Context) error { return fmt.Errorf("pop") }) + return nil + }) + assert.EqualError(t, err, "pop") +} + +func TestProcessWithBatchActionsSuccess(t *testing.T) { + ag, cancel := newTestAggregator() + defer cancel() + + mdi := ag.database.(*databasemocks.Plugin) + rag := mdi.On("RunAsGroup", mock.Anything, mock.Anything).Maybe() + rag.RunFn = func(a mock.Arguments) { + rag.ReturnArguments = mock.Arguments{a[1].(func(context.Context) error)(a[0].(context.Context))} + } + + err := ag.processWithBatchActions(func(ctx context.Context, actions *batchActions) error { + actions.AddPreFinalize(func(ctx context.Context) error { return nil }) + actions.AddFinalize(func(ctx context.Context) error { return nil }) + return nil + }) + assert.NoError(t, err) +} diff --git a/internal/txcommon/txcommon.go b/internal/txcommon/txcommon.go index 77760a8825..6f5e64d941 100644 --- a/internal/txcommon/txcommon.go +++ b/internal/txcommon/txcommon.go @@ -90,17 +90,13 @@ func (t *transactionHelper) PersistTransaction(ctx context.Context, ns string, i return false, err } - } else { - - if err = t.database.InsertTransaction(ctx, &fftypes.Transaction{ - ID: id, - Namespace: ns, - Type: txType, - BlockchainIDs: fftypes.NewFFStringArray(strings.ToLower(blockchainTXID)), - }); err != nil { - return false, err - } - + } else if err = t.database.InsertTransaction(ctx, &fftypes.Transaction{ + ID: id, + Namespace: ns, + Type: txType, + BlockchainIDs: fftypes.NewFFStringArray(strings.ToLower(blockchainTXID)), + }); err != nil { + return false, err } return true, nil diff --git a/mocks/definitionsmocks/definition_handlers.go b/mocks/definitionsmocks/definition_handlers.go index f292a1bf1c..1bb1a35a0b 100644 --- a/mocks/definitionsmocks/definition_handlers.go +++ b/mocks/definitionsmocks/definition_handlers.go @@ -94,25 +94,34 @@ func (_m *DefinitionHandlers) GetGroupsNS(ctx context.Context, ns string, filter return r0, r1, r2 } -// HandleSystemBroadcast provides a mock function with given fields: ctx, msg, data -func (_m *DefinitionHandlers) HandleSystemBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (definitions.SystemBroadcastAction, error) { +// HandleDefinitionBroadcast provides a mock function with given fields: ctx, msg, data +func (_m *DefinitionHandlers) HandleDefinitionBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (definitions.DefinitionMessageAction, *definitions.DefinitionBatchActions, error) { ret := _m.Called(ctx, msg, data) - var r0 definitions.SystemBroadcastAction - if rf, ok := ret.Get(0).(func(context.Context, *fftypes.Message, []*fftypes.Data) definitions.SystemBroadcastAction); ok { + var r0 definitions.DefinitionMessageAction + if rf, ok := ret.Get(0).(func(context.Context, *fftypes.Message, []*fftypes.Data) definitions.DefinitionMessageAction); ok { r0 = rf(ctx, msg, data) } else { - r0 = ret.Get(0).(definitions.SystemBroadcastAction) + r0 = ret.Get(0).(definitions.DefinitionMessageAction) } - var r1 error - if rf, ok := ret.Get(1).(func(context.Context, *fftypes.Message, []*fftypes.Data) error); ok { + var r1 *definitions.DefinitionBatchActions + if rf, ok := ret.Get(1).(func(context.Context, *fftypes.Message, []*fftypes.Data) *definitions.DefinitionBatchActions); ok { r1 = rf(ctx, msg, data) } else { - r1 = ret.Error(1) + if ret.Get(1) != nil { + r1 = ret.Get(1).(*definitions.DefinitionBatchActions) + } } - return r0, r1 + var r2 error + if rf, ok := ret.Get(2).(func(context.Context, *fftypes.Message, []*fftypes.Data) error); ok { + r2 = rf(ctx, msg, data) + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 } // ResolveInitGroup provides a mock function with given fields: ctx, msg