Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 31 additions & 22 deletions internal/definitions/definition_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,37 @@ import (
type DefinitionHandlers interface {
privatemessaging.GroupManager

HandleSystemBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (SystemBroadcastAction, error)
HandleDefinitionBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (DefinitionMessageAction, *DefinitionBatchActions, error)
SendReply(ctx context.Context, event *fftypes.Event, reply *fftypes.MessageInOut)
}

type SystemBroadcastAction int
// DefinitionMessageAction is the action to be taken on an individual definition message
type DefinitionMessageAction int

const (
ActionReject SystemBroadcastAction = iota
// ActionReject the message was successfully processed, but was malformed/invalid and should be marked as rejected
ActionReject DefinitionMessageAction = iota

// ActionConfirm the message was valid and should be confirmed
ActionConfirm

// ActionRetry a recoverable error was encountered - batch should be halted and then re-processed from the start
ActionRetry

// ActionWait the message is still awaiting further pieces for aggregation and should be held in pending state
ActionWait
)

// DefinitionBatchActions are actions to be taken at the end of a definition batch
// See further notes on "batchActions" in the event aggregator
type DefinitionBatchActions struct {
// PreFinalize may perform a blocking action (possibly to an external connector) that should execute outside database RunAsGroup
PreFinalize func(ctx context.Context) error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth noting that error returned from here, will cause the whole batch to "block" (indefinitely retry), just like ActionRetry would from HandleDefinitionBroadcast.

There is not (currently) any option to return an ActionReject in the PreFinalize phase

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... after I wrote this, I found it's actually covered in the internal/events/aggregator comments - not sure if one should refer to the other, to avoid two places that we might forget to update the rules on.


// Finalize may perform final, non-idempotent database operations (such as inserting Events)
Finalize func(ctx context.Context) error
}

type definitionHandlers struct {
database database.Plugin
exchange dataexchange.Plugin
Expand Down Expand Up @@ -86,38 +104,29 @@ func (dh *definitionHandlers) EnsureLocalGroup(ctx context.Context, group *fftyp
return dh.messaging.EnsureLocalGroup(ctx, group)
}

func (dh *definitionHandlers) HandleSystemBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (SystemBroadcastAction, error) {
func (dh *definitionHandlers) HandleDefinitionBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (msgAction DefinitionMessageAction, batchActions *DefinitionBatchActions, err error) {
l := log.L(ctx)
l.Infof("Confirming system broadcast '%s' [%s]", msg.Header.Tag, msg.Header.ID)
var valid bool
var err error
l.Infof("Confirming system definition broadcast '%s' [%s]", msg.Header.Tag, msg.Header.ID)
switch fftypes.SystemTag(msg.Header.Tag) {
case fftypes.SystemTagDefineDatatype:
valid, err = dh.handleDatatypeBroadcast(ctx, msg, data)
msgAction, err = dh.handleDatatypeBroadcast(ctx, msg, data)
case fftypes.SystemTagDefineNamespace:
valid, err = dh.handleNamespaceBroadcast(ctx, msg, data)
msgAction, err = dh.handleNamespaceBroadcast(ctx, msg, data)
case fftypes.SystemTagDefineOrganization:
valid, err = dh.handleOrganizationBroadcast(ctx, msg, data)
msgAction, err = dh.handleOrganizationBroadcast(ctx, msg, data)
case fftypes.SystemTagDefineNode:
valid, err = dh.handleNodeBroadcast(ctx, msg, data)
msgAction, err = dh.handleNodeBroadcast(ctx, msg, data)
case fftypes.SystemTagDefinePool:
return dh.handleTokenPoolBroadcast(ctx, msg, data)
case fftypes.SystemTagDefineFFI:
return dh.handleFFIBroadcast(ctx, msg, data)
msgAction, err = dh.handleFFIBroadcast(ctx, msg, data)
case fftypes.SystemTagDefineContractAPI:
return dh.handleContractAPIBroadcast(ctx, msg, data)
msgAction, err = dh.handleContractAPIBroadcast(ctx, msg, data)
default:
l.Warnf("Unknown SystemTag '%s' for definition ID '%s'", msg.Header.Tag, msg.Header.ID)
return ActionReject, nil
}
switch {
case err != nil:
return ActionRetry, err
case !valid:
return ActionReject, nil
default:
return ActionConfirm, nil
return ActionReject, nil, nil
}
return
}

func (dh *definitionHandlers) getSystemBroadcastPayload(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data, res fftypes.Definition) (valid bool) {
Expand Down
4 changes: 2 additions & 2 deletions internal/definitions/definition_handler_contracts.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (dh *definitionHandlers) persistContractAPI(ctx context.Context, api *fftyp
return err == nil, err
}

func (dh *definitionHandlers) handleFFIBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (actionResult SystemBroadcastAction, err error) {
func (dh *definitionHandlers) handleFFIBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (actionResult DefinitionMessageAction, err error) {
l := log.L(ctx)
var broadcast fftypes.FFI
valid := dh.getSystemBroadcastPayload(ctx, msg, data, &broadcast)
Expand Down Expand Up @@ -97,7 +97,7 @@ func (dh *definitionHandlers) handleFFIBroadcast(ctx context.Context, msg *fftyp
return
}

func (dh *definitionHandlers) handleContractAPIBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (actionResult SystemBroadcastAction, err error) {
func (dh *definitionHandlers) handleContractAPIBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (actionResult DefinitionMessageAction, err error) {
l := log.L(ctx)
var broadcast fftypes.ContractAPI
valid := dh.getSystemBroadcastPayload(ctx, msg, data, &broadcast)
Expand Down
12 changes: 6 additions & 6 deletions internal/definitions/definition_handler_contracts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func TestHandleFFIBroadcastOk(t *testing.T) {
mbi.On("InsertEvent", mock.Anything, mock.Anything).Return(nil)
mcm := dh.contracts.(*contractmocks.Manager)
mcm.On("ValidateFFIAndSetPathnames", mock.Anything, mock.Anything).Return(nil)
action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{
action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{
Header: fftypes.MessageHeader{
Tag: string(fftypes.SystemTagDefineFFI),
},
Expand Down Expand Up @@ -187,7 +187,7 @@ func TestHandleFFIBroadcastValidateFail(t *testing.T) {
}
mbi := dh.database.(*databasemocks.Plugin)
mbi.On("InsertEvent", mock.Anything, mock.Anything).Return(nil)
action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{
action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{
Header: fftypes.MessageHeader{
Tag: string(fftypes.SystemTagDefineFFI),
},
Expand All @@ -209,7 +209,7 @@ func TestHandleFFIBroadcastPersistFail(t *testing.T) {
mdi.On("InsertEvent", mock.Anything, mock.Anything).Return(nil)
mcm := dh.contracts.(*contractmocks.Manager)
mcm.On("ValidateFFIAndSetPathnames", mock.Anything, mock.Anything).Return(nil)
action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{
action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{
Header: fftypes.MessageHeader{
Tag: string(fftypes.SystemTagDefineFFI),
},
Expand All @@ -231,7 +231,7 @@ func TestHandleContractAPIBroadcastOk(t *testing.T) {
mbi.On("UpsertContractAPI", mock.Anything, mock.Anything, mock.Anything).Return(nil)
mbi.On("GetContractAPIByName", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil)
mbi.On("InsertEvent", mock.Anything, mock.Anything).Return(nil)
action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{
action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{
Header: fftypes.MessageHeader{
Tag: string(fftypes.SystemTagDefineContractAPI),
},
Expand Down Expand Up @@ -283,7 +283,7 @@ func TestHandleContractAPIBroadcastValidateFail(t *testing.T) {
}
mbi := dh.database.(*databasemocks.Plugin)
mbi.On("InsertEvent", mock.Anything, mock.Anything).Return(nil)
action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{
action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{
Header: fftypes.MessageHeader{
Tag: string(fftypes.SystemTagDefineContractAPI),
},
Expand All @@ -304,7 +304,7 @@ func TestHandleContractAPIBroadcastPersistFail(t *testing.T) {
mbi.On("GetContractAPIByName", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil)
mbi.On("UpsertContractAPI", mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("pop"))
mbi.On("InsertEvent", mock.Anything, mock.Anything).Return(nil)
action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{
action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{
Header: fftypes.MessageHeader{
Tag: string(fftypes.SystemTagDefineContractAPI),
},
Expand Down
26 changes: 13 additions & 13 deletions internal/definitions/definition_handler_datatype.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2021 Kaleido, Inc.
// Copyright © 2022 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand All @@ -23,42 +23,42 @@ import (
"github.com/hyperledger/firefly/pkg/fftypes"
)

func (dh *definitionHandlers) handleDatatypeBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (valid bool, err error) {
func (dh *definitionHandlers) handleDatatypeBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (DefinitionMessageAction, error) {
l := log.L(ctx)

var dt fftypes.Datatype
valid = dh.getSystemBroadcastPayload(ctx, msg, data, &dt)
valid := dh.getSystemBroadcastPayload(ctx, msg, data, &dt)
if !valid {
return false, nil
return ActionReject, nil
}

if err = dt.Validate(ctx, true); err != nil {
if err := dt.Validate(ctx, true); err != nil {
l.Warnf("Unable to process datatype broadcast %s - validate failed: %s", msg.Header.ID, err)
return false, nil
return ActionReject, nil
}

if err = dh.data.CheckDatatype(ctx, dt.Namespace, &dt); err != nil {
if err := dh.data.CheckDatatype(ctx, dt.Namespace, &dt); err != nil {
l.Warnf("Unable to process datatype broadcast %s - schema check: %s", msg.Header.ID, err)
return false, nil
return ActionReject, nil
}

existing, err := dh.database.GetDatatypeByName(ctx, dt.Namespace, dt.Name, dt.Version)
if err != nil {
return false, err // We only return database errors
return ActionRetry, err // We only return database errors
}
if existing != nil {
l.Warnf("Unable to process datatype broadcast %s (%s:%s) - duplicate of %v", msg.Header.ID, dt.Namespace, dt, existing.ID)
return false, nil
return ActionReject, nil
}

if err = dh.database.UpsertDatatype(ctx, &dt, false); err != nil {
return false, err
return ActionRetry, err
}

event := fftypes.NewEvent(fftypes.EventTypeDatatypeConfirmed, dt.Namespace, dt.ID)
if err = dh.database.InsertEvent(ctx, event); err != nil {
return false, err
return ActionRetry, err
}

return true, nil
return ActionConfirm, nil
}
16 changes: 8 additions & 8 deletions internal/definitions/definition_handler_datatype_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestHandleDefinitionBroadcastDatatypeOk(t *testing.T) {
mbi.On("GetDatatypeByName", mock.Anything, "ns1", "name1", "ver1").Return(nil, nil)
mbi.On("UpsertDatatype", mock.Anything, mock.Anything, false).Return(nil)
mbi.On("InsertEvent", mock.Anything, mock.Anything).Return(nil)
action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{
action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{
Header: fftypes.MessageHeader{
Tag: string(fftypes.SystemTagDefineDatatype),
},
Expand Down Expand Up @@ -89,7 +89,7 @@ func TestHandleDefinitionBroadcastDatatypeEventFail(t *testing.T) {
mbi.On("GetDatatypeByName", mock.Anything, "ns1", "name1", "ver1").Return(nil, nil)
mbi.On("UpsertDatatype", mock.Anything, mock.Anything, false).Return(nil)
mbi.On("InsertEvent", mock.Anything, mock.Anything).Return(fmt.Errorf("pop"))
action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{
action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{
Header: fftypes.MessageHeader{
Tag: string(fftypes.SystemTagDefineDatatype),
},
Expand Down Expand Up @@ -118,7 +118,7 @@ func TestHandleDefinitionBroadcastDatatypeMissingID(t *testing.T) {
Value: fftypes.JSONAnyPtrBytes(b),
}

action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{
action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{
Header: fftypes.MessageHeader{
Tag: string(fftypes.SystemTagDefineDatatype),
},
Expand Down Expand Up @@ -147,7 +147,7 @@ func TestHandleDefinitionBroadcastBadSchema(t *testing.T) {

mdm := dh.data.(*datamocks.Manager)
mdm.On("CheckDatatype", mock.Anything, "ns1", mock.Anything).Return(fmt.Errorf("pop"))
action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{
action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{
Header: fftypes.MessageHeader{
Tag: string(fftypes.SystemTagDefineDatatype),
},
Expand All @@ -171,7 +171,7 @@ func TestHandleDefinitionBroadcastMissingData(t *testing.T) {
}
dt.Hash = dt.Value.Hash()

action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{
action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{
Header: fftypes.MessageHeader{
Tag: string(fftypes.SystemTagDefineDatatype),
},
Expand Down Expand Up @@ -202,7 +202,7 @@ func TestHandleDefinitionBroadcastDatatypeLookupFail(t *testing.T) {
mdm.On("CheckDatatype", mock.Anything, "ns1", mock.Anything).Return(nil)
mbi := dh.database.(*databasemocks.Plugin)
mbi.On("GetDatatypeByName", mock.Anything, "ns1", "name1", "ver1").Return(nil, fmt.Errorf("pop"))
action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{
action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{
Header: fftypes.MessageHeader{
Namespace: fftypes.SystemNamespace,
Tag: string(fftypes.SystemTagDefineDatatype),
Expand Down Expand Up @@ -238,7 +238,7 @@ func TestHandleDefinitionBroadcastUpsertFail(t *testing.T) {
mbi := dh.database.(*databasemocks.Plugin)
mbi.On("GetDatatypeByName", mock.Anything, "ns1", "name1", "ver1").Return(nil, nil)
mbi.On("UpsertDatatype", mock.Anything, mock.Anything, false).Return(fmt.Errorf("pop"))
action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{
action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{
Header: fftypes.MessageHeader{
Tag: string(fftypes.SystemTagDefineDatatype),
},
Expand Down Expand Up @@ -272,7 +272,7 @@ func TestHandleDefinitionBroadcastDatatypeDuplicate(t *testing.T) {
mdm.On("CheckDatatype", mock.Anything, "ns1", mock.Anything).Return(nil)
mbi := dh.database.(*databasemocks.Plugin)
mbi.On("GetDatatypeByName", mock.Anything, "ns1", "name1", "ver1").Return(dt, nil)
action, err := dh.HandleSystemBroadcast(context.Background(), &fftypes.Message{
action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{
Header: fftypes.MessageHeader{
Tag: string(fftypes.SystemTagDefineDatatype),
},
Expand Down
22 changes: 11 additions & 11 deletions internal/definitions/definition_handler_namespace.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2021 Kaleido, Inc.
// Copyright © 2022 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand All @@ -23,42 +23,42 @@ import (
"github.com/hyperledger/firefly/pkg/fftypes"
)

func (dh *definitionHandlers) handleNamespaceBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (valid bool, err error) {
func (dh *definitionHandlers) handleNamespaceBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (DefinitionMessageAction, error) {
l := log.L(ctx)

var ns fftypes.Namespace
valid = dh.getSystemBroadcastPayload(ctx, msg, data, &ns)
valid := dh.getSystemBroadcastPayload(ctx, msg, data, &ns)
if !valid {
return false, nil
return ActionReject, nil
}
if err := ns.Validate(ctx, true); err != nil {
l.Warnf("Unable to process namespace broadcast %s - validate failed: %s", msg.Header.ID, err)
return false, nil
return ActionReject, nil
}

existing, err := dh.database.GetNamespace(ctx, ns.Name)
if err != nil {
return false, err // We only return database errors
return ActionRetry, err // We only return database errors
}
if existing != nil {
if existing.Type != fftypes.NamespaceTypeLocal {
l.Warnf("Unable to process namespace broadcast %s (name=%s) - duplicate of %v", msg.Header.ID, existing.Name, existing.ID)
return false, nil
return ActionReject, nil
}
// Remove the local definition
if err = dh.database.DeleteNamespace(ctx, existing.ID); err != nil {
return false, err
return ActionRetry, err
}
}

if err = dh.database.UpsertNamespace(ctx, &ns, false); err != nil {
return false, err
return ActionRetry, err
}

event := fftypes.NewEvent(fftypes.EventTypeNamespaceConfirmed, ns.Name, ns.ID)
if err = dh.database.InsertEvent(ctx, event); err != nil {
return false, err
return ActionRetry, err
}

return true, nil
return ActionConfirm, nil
}
Loading