diff --git a/internal/definitions/definition_handler.go b/internal/definitions/definition_handler.go index 6620176dba..8f6321f27a 100644 --- a/internal/definitions/definition_handler.go +++ b/internal/definitions/definition_handler.go @@ -109,24 +109,23 @@ func (dh *definitionHandlers) HandleDefinitionBroadcast(ctx context.Context, msg l.Infof("Confirming system definition broadcast '%s' [%s]", msg.Header.Tag, msg.Header.ID) switch fftypes.SystemTag(msg.Header.Tag) { case fftypes.SystemTagDefineDatatype: - msgAction, err = dh.handleDatatypeBroadcast(ctx, msg, data) + return dh.handleDatatypeBroadcast(ctx, msg, data) case fftypes.SystemTagDefineNamespace: - msgAction, err = dh.handleNamespaceBroadcast(ctx, msg, data) + return dh.handleNamespaceBroadcast(ctx, msg, data) case fftypes.SystemTagDefineOrganization: - msgAction, err = dh.handleOrganizationBroadcast(ctx, msg, data) + return dh.handleOrganizationBroadcast(ctx, msg, data) case fftypes.SystemTagDefineNode: - msgAction, err = dh.handleNodeBroadcast(ctx, msg, data) + return dh.handleNodeBroadcast(ctx, msg, data) case fftypes.SystemTagDefinePool: return dh.handleTokenPoolBroadcast(ctx, msg, data) case fftypes.SystemTagDefineFFI: - msgAction, err = dh.handleFFIBroadcast(ctx, msg, data) + return dh.handleFFIBroadcast(ctx, msg, data) case fftypes.SystemTagDefineContractAPI: - msgAction, err = dh.handleContractAPIBroadcast(ctx, msg, data) + return dh.handleContractAPIBroadcast(ctx, msg, data) default: l.Warnf("Unknown SystemTag '%s' for definition ID '%s'", msg.Header.Tag, msg.Header.ID) return ActionReject, nil, nil } - return } func (dh *definitionHandlers) getSystemBroadcastPayload(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data, res fftypes.Definition) (valid bool) { diff --git a/internal/definitions/definition_handler_contracts.go b/internal/definitions/definition_handler_contracts.go index aca2433a7a..0ebb77a6ef 100644 --- a/internal/definitions/definition_handler_contracts.go +++ b/internal/definitions/definition_handler_contracts.go @@ -65,7 +65,7 @@ func (dh *definitionHandlers) persistContractAPI(ctx context.Context, api *fftyp return err == nil, err } -func (dh *definitionHandlers) handleFFIBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (actionResult DefinitionMessageAction, err error) { +func (dh *definitionHandlers) handleFFIBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (DefinitionMessageAction, *DefinitionBatchActions, error) { l := log.L(ctx) var broadcast fftypes.FFI valid := dh.getSystemBroadcastPayload(ctx, msg, data, &broadcast) @@ -74,57 +74,67 @@ func (dh *definitionHandlers) handleFFIBroadcast(ctx context.Context, msg *fftyp l.Warnf("Unable to process contract definition broadcast %s - validate failed: %s", msg.Header.ID, validationErr) valid = false } else { + var err error broadcast.Message = msg.Header.ID valid, err = dh.persistFFI(ctx, &broadcast) + if err != nil { + return ActionRetry, nil, err + } } } - var event *fftypes.Event - switch { - case err != nil: - actionResult = ActionRetry - case valid: + var eventType fftypes.EventType + var actionResult DefinitionMessageAction + if valid { l.Infof("Contract interface created id=%s author=%s", broadcast.ID, msg.Header.Author) - event = fftypes.NewEvent(fftypes.EventTypeContractInterfaceConfirmed, broadcast.Namespace, broadcast.ID) + eventType = fftypes.EventTypeContractInterfaceConfirmed actionResult = ActionConfirm - err = dh.database.InsertEvent(ctx, event) - default: + } else { l.Warnf("Contract interface rejected id=%s author=%s", broadcast.ID, msg.Header.Author) - event = fftypes.NewEvent(fftypes.EventTypeContractInterfaceRejected, broadcast.Namespace, broadcast.ID) + eventType = fftypes.EventTypeContractInterfaceRejected actionResult = ActionReject - err = dh.database.InsertEvent(ctx, event) } - return + return actionResult, &DefinitionBatchActions{ + Finalize: func(ctx context.Context) error { + event := fftypes.NewEvent(eventType, broadcast.Namespace, broadcast.ID) + return dh.database.InsertEvent(ctx, event) + }, + }, nil } -func (dh *definitionHandlers) handleContractAPIBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (actionResult DefinitionMessageAction, err error) { +func (dh *definitionHandlers) handleContractAPIBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (DefinitionMessageAction, *DefinitionBatchActions, error) { l := log.L(ctx) var broadcast fftypes.ContractAPI valid := dh.getSystemBroadcastPayload(ctx, msg, data, &broadcast) if valid { if validateErr := broadcast.Validate(ctx, true); validateErr != nil { - l.Warnf("Unable to process contract API broadcast %s - validate failed: %s", msg.Header.ID, err) + l.Warnf("Unable to process contract API broadcast %s - validate failed: %s", msg.Header.ID, validateErr) valid = false } else { + var err error broadcast.Message = msg.Header.ID valid, err = dh.persistContractAPI(ctx, &broadcast) + if err != nil { + return ActionRetry, nil, err + } } } - var event *fftypes.Event - switch { - case err != nil: - actionResult = ActionRetry - case valid: + var eventType fftypes.EventType + var actionResult DefinitionMessageAction + if valid { l.Infof("Contract API created id=%s author=%s", broadcast.ID, msg.Header.Author) - event = fftypes.NewEvent(fftypes.EventTypeContractAPIConfirmed, broadcast.Namespace, broadcast.ID) + eventType = fftypes.EventTypeContractAPIConfirmed actionResult = ActionConfirm - err = dh.database.InsertEvent(ctx, event) - default: + } else { l.Warnf("Contract API rejected id=%s author=%s", broadcast.ID, msg.Header.Author) - event = fftypes.NewEvent(fftypes.EventTypeContractAPIRejected, broadcast.Namespace, broadcast.ID) + eventType = fftypes.EventTypeContractAPIRejected actionResult = ActionReject - err = dh.database.InsertEvent(ctx, event) } - return + return actionResult, &DefinitionBatchActions{ + Finalize: func(ctx context.Context) error { + event := fftypes.NewEvent(eventType, broadcast.Namespace, broadcast.ID) + return dh.database.InsertEvent(ctx, event) + }, + }, nil } diff --git a/internal/definitions/definition_handler_contracts_test.go b/internal/definitions/definition_handler_contracts_test.go index c1db340ed8..2ae9e2ac77 100644 --- a/internal/definitions/definition_handler_contracts_test.go +++ b/internal/definitions/definition_handler_contracts_test.go @@ -102,13 +102,15 @@ func TestHandleFFIBroadcastOk(t *testing.T) { mbi.On("InsertEvent", mock.Anything, mock.Anything).Return(nil) mcm := dh.contracts.(*contractmocks.Manager) mcm.On("ValidateFFIAndSetPathnames", mock.Anything, mock.Anything).Return(nil) - action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ + action, ba, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Tag: string(fftypes.SystemTagDefineFFI), }, }, []*fftypes.Data{data}) assert.Equal(t, ActionConfirm, action) assert.NoError(t, err) + err = ba.Finalize(context.Background()) + assert.NoError(t, err) mbi.AssertExpectations(t) } @@ -128,13 +130,15 @@ func TestHandleFFIBroadcastReject(t *testing.T) { mcm := dh.contracts.(*contractmocks.Manager) mbi.On("InsertEvent", mock.Anything, mock.Anything).Return(nil) mcm.On("ValidateFFIAndSetPathnames", mock.Anything, mock.Anything).Return(fmt.Errorf("pop")) - action, err := dh.handleFFIBroadcast(context.Background(), &fftypes.Message{ + action, ba, err := dh.handleFFIBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Tag: string(fftypes.SystemTagDefineFFI), }, }, []*fftypes.Data{}) assert.Equal(t, ActionReject, action) assert.NoError(t, err) + err = ba.Finalize(context.Background()) + assert.NoError(t, err) } func TestPersistFFIUpsertFFIFail(t *testing.T) { @@ -231,13 +235,15 @@ func TestHandleContractAPIBroadcastOk(t *testing.T) { mbi.On("UpsertContractAPI", mock.Anything, mock.Anything, mock.Anything).Return(nil) mbi.On("GetContractAPIByName", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil) mbi.On("InsertEvent", mock.Anything, mock.Anything).Return(nil) - action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ + action, ba, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Tag: string(fftypes.SystemTagDefineContractAPI), }, }, []*fftypes.Data{data}) assert.Equal(t, ActionConfirm, action) assert.NoError(t, err) + err = ba.Finalize(context.Background()) + assert.NoError(t, err) mbi.AssertExpectations(t) } diff --git a/internal/definitions/definition_handler_datatype.go b/internal/definitions/definition_handler_datatype.go index ea8de67cac..a601f46e39 100644 --- a/internal/definitions/definition_handler_datatype.go +++ b/internal/definitions/definition_handler_datatype.go @@ -23,42 +23,42 @@ import ( "github.com/hyperledger/firefly/pkg/fftypes" ) -func (dh *definitionHandlers) handleDatatypeBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (DefinitionMessageAction, error) { +func (dh *definitionHandlers) handleDatatypeBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (DefinitionMessageAction, *DefinitionBatchActions, error) { l := log.L(ctx) var dt fftypes.Datatype valid := dh.getSystemBroadcastPayload(ctx, msg, data, &dt) if !valid { - return ActionReject, nil + return ActionReject, nil, nil } if err := dt.Validate(ctx, true); err != nil { l.Warnf("Unable to process datatype broadcast %s - validate failed: %s", msg.Header.ID, err) - return ActionReject, nil + return ActionReject, nil, nil } if err := dh.data.CheckDatatype(ctx, dt.Namespace, &dt); err != nil { l.Warnf("Unable to process datatype broadcast %s - schema check: %s", msg.Header.ID, err) - return ActionReject, nil + return ActionReject, nil, nil } existing, err := dh.database.GetDatatypeByName(ctx, dt.Namespace, dt.Name, dt.Version) if err != nil { - return ActionRetry, err // We only return database errors + return ActionRetry, nil, err // We only return database errors } if existing != nil { l.Warnf("Unable to process datatype broadcast %s (%s:%s) - duplicate of %v", msg.Header.ID, dt.Namespace, dt, existing.ID) - return ActionReject, nil + return ActionReject, nil, nil } if err = dh.database.UpsertDatatype(ctx, &dt, false); err != nil { - return ActionRetry, err + return ActionRetry, nil, err } - event := fftypes.NewEvent(fftypes.EventTypeDatatypeConfirmed, dt.Namespace, dt.ID) - if err = dh.database.InsertEvent(ctx, event); err != nil { - return ActionRetry, err - } - - return ActionConfirm, nil + return ActionConfirm, &DefinitionBatchActions{ + Finalize: func(ctx context.Context) error { + event := fftypes.NewEvent(fftypes.EventTypeDatatypeConfirmed, dt.Namespace, dt.ID) + return dh.database.InsertEvent(ctx, event) + }, + }, nil } diff --git a/internal/definitions/definition_handler_datatype_test.go b/internal/definitions/definition_handler_datatype_test.go index d9988deef4..6c1fd7e6ef 100644 --- a/internal/definitions/definition_handler_datatype_test.go +++ b/internal/definitions/definition_handler_datatype_test.go @@ -53,13 +53,15 @@ func TestHandleDefinitionBroadcastDatatypeOk(t *testing.T) { mbi.On("GetDatatypeByName", mock.Anything, "ns1", "name1", "ver1").Return(nil, nil) mbi.On("UpsertDatatype", mock.Anything, mock.Anything, false).Return(nil) mbi.On("InsertEvent", mock.Anything, mock.Anything).Return(nil) - action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ + action, ba, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Tag: string(fftypes.SystemTagDefineDatatype), }, }, []*fftypes.Data{data}) assert.Equal(t, ActionConfirm, action) assert.NoError(t, err) + err = ba.Finalize(context.Background()) + assert.NoError(t, err) mdm.AssertExpectations(t) mbi.AssertExpectations(t) @@ -89,12 +91,14 @@ func TestHandleDefinitionBroadcastDatatypeEventFail(t *testing.T) { mbi.On("GetDatatypeByName", mock.Anything, "ns1", "name1", "ver1").Return(nil, nil) mbi.On("UpsertDatatype", mock.Anything, mock.Anything, false).Return(nil) mbi.On("InsertEvent", mock.Anything, mock.Anything).Return(fmt.Errorf("pop")) - action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ + action, ba, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Tag: string(fftypes.SystemTagDefineDatatype), }, }, []*fftypes.Data{data}) - assert.Equal(t, ActionRetry, action) + assert.Equal(t, ActionConfirm, action) + assert.NoError(t, err) + err = ba.Finalize(context.Background()) assert.EqualError(t, err, "pop") mdm.AssertExpectations(t) diff --git a/internal/definitions/definition_handler_namespace.go b/internal/definitions/definition_handler_namespace.go index 1e65e742c0..3b4785827c 100644 --- a/internal/definitions/definition_handler_namespace.go +++ b/internal/definitions/definition_handler_namespace.go @@ -23,42 +23,42 @@ import ( "github.com/hyperledger/firefly/pkg/fftypes" ) -func (dh *definitionHandlers) handleNamespaceBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (DefinitionMessageAction, error) { +func (dh *definitionHandlers) handleNamespaceBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (DefinitionMessageAction, *DefinitionBatchActions, error) { l := log.L(ctx) var ns fftypes.Namespace valid := dh.getSystemBroadcastPayload(ctx, msg, data, &ns) if !valid { - return ActionReject, nil + return ActionReject, nil, nil } if err := ns.Validate(ctx, true); err != nil { l.Warnf("Unable to process namespace broadcast %s - validate failed: %s", msg.Header.ID, err) - return ActionReject, nil + return ActionReject, nil, nil } existing, err := dh.database.GetNamespace(ctx, ns.Name) if err != nil { - return ActionRetry, err // We only return database errors + return ActionRetry, nil, err // We only return database errors } if existing != nil { if existing.Type != fftypes.NamespaceTypeLocal { l.Warnf("Unable to process namespace broadcast %s (name=%s) - duplicate of %v", msg.Header.ID, existing.Name, existing.ID) - return ActionReject, nil + return ActionReject, nil, nil } // Remove the local definition if err = dh.database.DeleteNamespace(ctx, existing.ID); err != nil { - return ActionRetry, err + return ActionRetry, nil, err } } if err = dh.database.UpsertNamespace(ctx, &ns, false); err != nil { - return ActionRetry, err + return ActionRetry, nil, err } - event := fftypes.NewEvent(fftypes.EventTypeNamespaceConfirmed, ns.Name, ns.ID) - if err = dh.database.InsertEvent(ctx, event); err != nil { - return ActionRetry, err - } - - return ActionConfirm, nil + return ActionConfirm, &DefinitionBatchActions{ + Finalize: func(ctx context.Context) error { + event := fftypes.NewEvent(fftypes.EventTypeNamespaceConfirmed, ns.Name, ns.ID) + return dh.database.InsertEvent(ctx, event) + }, + }, nil } diff --git a/internal/definitions/definition_handler_namespace_test.go b/internal/definitions/definition_handler_namespace_test.go index 81b041dbe4..d7df428b2a 100644 --- a/internal/definitions/definition_handler_namespace_test.go +++ b/internal/definitions/definition_handler_namespace_test.go @@ -45,13 +45,15 @@ func TestHandleDefinitionBroadcastNSOk(t *testing.T) { mdi.On("GetNamespace", mock.Anything, "ns1").Return(nil, nil) mdi.On("UpsertNamespace", mock.Anything, mock.Anything, false).Return(nil) mdi.On("InsertEvent", mock.Anything, mock.Anything).Return(nil) - action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ + action, ba, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Tag: string(fftypes.SystemTagDefineNamespace), }, }, []*fftypes.Data{data}) assert.Equal(t, ActionConfirm, action) assert.NoError(t, err) + err = ba.Finalize(context.Background()) + assert.NoError(t, err) mdi.AssertExpectations(t) } @@ -73,12 +75,14 @@ func TestHandleDefinitionBroadcastNSEventFail(t *testing.T) { mdi.On("GetNamespace", mock.Anything, "ns1").Return(nil, nil) mdi.On("UpsertNamespace", mock.Anything, mock.Anything, false).Return(nil) mdi.On("InsertEvent", mock.Anything, mock.Anything).Return(fmt.Errorf("pop")) - action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ + action, ba, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Tag: string(fftypes.SystemTagDefineNamespace), }, }, []*fftypes.Data{data}) - assert.Equal(t, ActionRetry, action) + assert.Equal(t, ActionConfirm, action) + assert.NoError(t, err) + err = ba.Finalize(context.Background()) assert.EqualError(t, err, "pop") mdi.AssertExpectations(t) @@ -203,13 +207,15 @@ func TestHandleDefinitionBroadcastDuplicateOverrideLocal(t *testing.T) { mdi.On("DeleteNamespace", mock.Anything, mock.Anything).Return(nil) mdi.On("UpsertNamespace", mock.Anything, mock.Anything, false).Return(nil) mdi.On("InsertEvent", mock.Anything, mock.Anything).Return(nil) - action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ + action, ba, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Tag: string(fftypes.SystemTagDefineNamespace), }, }, []*fftypes.Data{data}) assert.Equal(t, ActionConfirm, action) assert.NoError(t, err) + err = ba.Finalize(context.Background()) + assert.NoError(t, err) mdi.AssertExpectations(t) } diff --git a/internal/definitions/definition_handler_network_node.go b/internal/definitions/definition_handler_network_node.go index c229e8ae4a..d6ea0b01b4 100644 --- a/internal/definitions/definition_handler_network_node.go +++ b/internal/definitions/definition_handler_network_node.go @@ -23,32 +23,32 @@ import ( "github.com/hyperledger/firefly/pkg/fftypes" ) -func (dh *definitionHandlers) handleNodeBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (DefinitionMessageAction, error) { +func (dh *definitionHandlers) handleNodeBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (DefinitionMessageAction, *DefinitionBatchActions, error) { l := log.L(ctx) var node fftypes.Node valid := dh.getSystemBroadcastPayload(ctx, msg, data, &node) if !valid { - return ActionReject, nil + return ActionReject, nil, nil } if err := node.Validate(ctx, true); err != nil { l.Warnf("Unable to process node broadcast %s - validate failed: %s", msg.Header.ID, err) - return ActionReject, nil + return ActionReject, nil, nil } owner, err := dh.database.GetOrganizationByIdentity(ctx, node.Owner) if err != nil { - return ActionRetry, err // We only return database errors + return ActionRetry, nil, err // We only return database errors } if owner == nil { l.Warnf("Unable to process node broadcast %s - parent identity not found: %s", msg.Header.ID, node.Owner) - return ActionReject, nil + return ActionReject, nil, nil } if msg.Header.Key != node.Owner { l.Warnf("Unable to process node broadcast %s - incorrect signature. Expected=%s Received=%s", msg.Header.ID, node.Owner, msg.Header.Author) - return ActionReject, nil + return ActionReject, nil, nil } existing, err := dh.database.GetNode(ctx, node.Owner, node.Name) @@ -56,24 +56,24 @@ func (dh *definitionHandlers) handleNodeBroadcast(ctx context.Context, msg *ffty existing, err = dh.database.GetNodeByID(ctx, node.ID) } if err != nil { - return ActionRetry, err // We only return database errors + return ActionRetry, nil, err // We only return database errors } if existing != nil { if existing.Owner != node.Owner { l.Warnf("Unable to process node broadcast %s - mismatch with existing %v", msg.Header.ID, existing.ID) - return ActionReject, nil + return ActionReject, nil, nil } node.ID = nil // we keep the existing ID } if err = dh.database.UpsertNode(ctx, &node, true); err != nil { - return ActionRetry, err + return ActionRetry, nil, err } - // Tell the data exchange about this node. Treat these errors like database errors - and return for retry processing - if err = dh.exchange.AddPeer(ctx, node.DX.Peer, node.DX.Endpoint); err != nil { - return ActionRetry, err - } - - return ActionConfirm, nil + return ActionConfirm, &DefinitionBatchActions{ + PreFinalize: func(ctx context.Context) error { + // Tell the data exchange about this node. Treat these errors like database errors - and return for retry processing + return dh.exchange.AddPeer(ctx, node.DX.Peer, node.DX.Endpoint) + }, + }, nil } diff --git a/internal/definitions/definition_handler_network_node_test.go b/internal/definitions/definition_handler_network_node_test.go index 1843221ae6..5dd611b63d 100644 --- a/internal/definitions/definition_handler_network_node_test.go +++ b/internal/definitions/definition_handler_network_node_test.go @@ -137,7 +137,7 @@ func TestHandleDefinitionBroadcastNodeAddPeerFail(t *testing.T) { mdi.On("UpsertNode", mock.Anything, mock.Anything, true).Return(nil) mdx := dh.exchange.(*dataexchangemocks.Plugin) mdx.On("AddPeer", mock.Anything, "peer1", mock.Anything).Return(fmt.Errorf("pop")) - action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ + action, ba, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Namespace: "ns1", Identity: fftypes.Identity{ @@ -147,7 +147,9 @@ func TestHandleDefinitionBroadcastNodeAddPeerFail(t *testing.T) { Tag: string(fftypes.SystemTagDefineNode), }, }, []*fftypes.Data{data}) - assert.Equal(t, ActionRetry, action) + assert.Equal(t, ActionConfirm, action) + assert.NoError(t, err) + err = ba.PreFinalize(context.Background()) assert.EqualError(t, err, "pop") mdi.AssertExpectations(t) diff --git a/internal/definitions/definition_handler_network_org.go b/internal/definitions/definition_handler_network_org.go index 7cbf0ff060..9695f2f205 100644 --- a/internal/definitions/definition_handler_network_org.go +++ b/internal/definitions/definition_handler_network_org.go @@ -23,33 +23,33 @@ import ( "github.com/hyperledger/firefly/pkg/fftypes" ) -func (dh *definitionHandlers) handleOrganizationBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (DefinitionMessageAction, error) { +func (dh *definitionHandlers) handleOrganizationBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (DefinitionMessageAction, *DefinitionBatchActions, error) { l := log.L(ctx) var org fftypes.Organization valid := dh.getSystemBroadcastPayload(ctx, msg, data, &org) if !valid { - return ActionReject, nil + return ActionReject, nil, nil } if err := org.Validate(ctx, true); err != nil { l.Warnf("Unable to process organization broadcast %s - validate failed: %s", msg.Header.ID, err) - return ActionReject, nil + return ActionReject, nil, nil } if org.Parent != "" { parent, err := dh.database.GetOrganizationByIdentity(ctx, org.Parent) if err != nil { - return ActionRetry, err // We only return database errors + return ActionRetry, nil, err // We only return database errors } if parent == nil { l.Warnf("Unable to process organization broadcast %s - parent identity not found: %s", msg.Header.ID, org.Parent) - return ActionReject, nil + return ActionReject, nil, nil } if msg.Header.Key != parent.Identity { l.Warnf("Unable to process organization broadcast %s - incorrect signature. Expected=%s Received=%s", msg.Header.ID, parent.Identity, msg.Header.Author) - return ActionReject, nil + return ActionReject, nil, nil } } @@ -61,19 +61,19 @@ func (dh *definitionHandlers) handleOrganizationBroadcast(ctx context.Context, m } } if err != nil { - return ActionRetry, err // We only return database errors + return ActionRetry, nil, err // We only return database errors } if existing != nil { if existing.Parent != org.Parent { l.Warnf("Unable to process organization broadcast %s - mismatch with existing %v", msg.Header.ID, existing.ID) - return ActionReject, nil + return ActionReject, nil, nil } org.ID = nil // we keep the existing ID } if err = dh.database.UpsertOrganization(ctx, &org, true); err != nil { - return ActionRetry, err + return ActionRetry, nil, err } - return ActionConfirm, nil + return ActionConfirm, nil, nil }