Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions internal/definitions/definition_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,24 +109,23 @@ func (dh *definitionHandlers) HandleDefinitionBroadcast(ctx context.Context, msg
l.Infof("Confirming system definition broadcast '%s' [%s]", msg.Header.Tag, msg.Header.ID)
switch fftypes.SystemTag(msg.Header.Tag) {
case fftypes.SystemTagDefineDatatype:
msgAction, err = dh.handleDatatypeBroadcast(ctx, msg, data)
return dh.handleDatatypeBroadcast(ctx, msg, data)
case fftypes.SystemTagDefineNamespace:
msgAction, err = dh.handleNamespaceBroadcast(ctx, msg, data)
return dh.handleNamespaceBroadcast(ctx, msg, data)
case fftypes.SystemTagDefineOrganization:
msgAction, err = dh.handleOrganizationBroadcast(ctx, msg, data)
return dh.handleOrganizationBroadcast(ctx, msg, data)
case fftypes.SystemTagDefineNode:
msgAction, err = dh.handleNodeBroadcast(ctx, msg, data)
return dh.handleNodeBroadcast(ctx, msg, data)
case fftypes.SystemTagDefinePool:
return dh.handleTokenPoolBroadcast(ctx, msg, data)
case fftypes.SystemTagDefineFFI:
msgAction, err = dh.handleFFIBroadcast(ctx, msg, data)
return dh.handleFFIBroadcast(ctx, msg, data)
case fftypes.SystemTagDefineContractAPI:
msgAction, err = dh.handleContractAPIBroadcast(ctx, msg, data)
return dh.handleContractAPIBroadcast(ctx, msg, data)
default:
l.Warnf("Unknown SystemTag '%s' for definition ID '%s'", msg.Header.Tag, msg.Header.ID)
return ActionReject, nil, nil
}
return
}

func (dh *definitionHandlers) getSystemBroadcastPayload(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data, res fftypes.Definition) (valid bool) {
Expand Down
60 changes: 35 additions & 25 deletions internal/definitions/definition_handler_contracts.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (dh *definitionHandlers) persistContractAPI(ctx context.Context, api *fftyp
return err == nil, err
}

func (dh *definitionHandlers) handleFFIBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (actionResult DefinitionMessageAction, err error) {
func (dh *definitionHandlers) handleFFIBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (DefinitionMessageAction, *DefinitionBatchActions, error) {
l := log.L(ctx)
var broadcast fftypes.FFI
valid := dh.getSystemBroadcastPayload(ctx, msg, data, &broadcast)
Expand All @@ -74,57 +74,67 @@ func (dh *definitionHandlers) handleFFIBroadcast(ctx context.Context, msg *fftyp
l.Warnf("Unable to process contract definition broadcast %s - validate failed: %s", msg.Header.ID, validationErr)
valid = false
} else {
var err error
broadcast.Message = msg.Header.ID
valid, err = dh.persistFFI(ctx, &broadcast)
if err != nil {
return ActionRetry, nil, err
}
}
}

var event *fftypes.Event
switch {
case err != nil:
actionResult = ActionRetry
case valid:
var eventType fftypes.EventType
var actionResult DefinitionMessageAction
if valid {
l.Infof("Contract interface created id=%s author=%s", broadcast.ID, msg.Header.Author)
event = fftypes.NewEvent(fftypes.EventTypeContractInterfaceConfirmed, broadcast.Namespace, broadcast.ID)
eventType = fftypes.EventTypeContractInterfaceConfirmed
actionResult = ActionConfirm
err = dh.database.InsertEvent(ctx, event)
default:
} else {
l.Warnf("Contract interface rejected id=%s author=%s", broadcast.ID, msg.Header.Author)
event = fftypes.NewEvent(fftypes.EventTypeContractInterfaceRejected, broadcast.Namespace, broadcast.ID)
eventType = fftypes.EventTypeContractInterfaceRejected
actionResult = ActionReject
err = dh.database.InsertEvent(ctx, event)
}
return
return actionResult, &DefinitionBatchActions{
Finalize: func(ctx context.Context) error {
event := fftypes.NewEvent(eventType, broadcast.Namespace, broadcast.ID)
return dh.database.InsertEvent(ctx, event)
},
}, nil
}

func (dh *definitionHandlers) handleContractAPIBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (actionResult DefinitionMessageAction, err error) {
func (dh *definitionHandlers) handleContractAPIBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (DefinitionMessageAction, *DefinitionBatchActions, error) {
l := log.L(ctx)
var broadcast fftypes.ContractAPI
valid := dh.getSystemBroadcastPayload(ctx, msg, data, &broadcast)
if valid {
if validateErr := broadcast.Validate(ctx, true); validateErr != nil {
l.Warnf("Unable to process contract API broadcast %s - validate failed: %s", msg.Header.ID, err)
l.Warnf("Unable to process contract API broadcast %s - validate failed: %s", msg.Header.ID, validateErr)
valid = false
} else {
var err error
broadcast.Message = msg.Header.ID
valid, err = dh.persistContractAPI(ctx, &broadcast)
if err != nil {
return ActionRetry, nil, err
}
}
}

var event *fftypes.Event
switch {
case err != nil:
actionResult = ActionRetry
case valid:
var eventType fftypes.EventType
var actionResult DefinitionMessageAction
if valid {
l.Infof("Contract API created id=%s author=%s", broadcast.ID, msg.Header.Author)
event = fftypes.NewEvent(fftypes.EventTypeContractAPIConfirmed, broadcast.Namespace, broadcast.ID)
eventType = fftypes.EventTypeContractAPIConfirmed
actionResult = ActionConfirm
err = dh.database.InsertEvent(ctx, event)
default:
} else {
l.Warnf("Contract API rejected id=%s author=%s", broadcast.ID, msg.Header.Author)
event = fftypes.NewEvent(fftypes.EventTypeContractAPIRejected, broadcast.Namespace, broadcast.ID)
eventType = fftypes.EventTypeContractAPIRejected
actionResult = ActionReject
err = dh.database.InsertEvent(ctx, event)
}
return
return actionResult, &DefinitionBatchActions{
Finalize: func(ctx context.Context) error {
event := fftypes.NewEvent(eventType, broadcast.Namespace, broadcast.ID)
return dh.database.InsertEvent(ctx, event)
},
}, nil
}
12 changes: 9 additions & 3 deletions internal/definitions/definition_handler_contracts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,15 @@ func TestHandleFFIBroadcastOk(t *testing.T) {
mbi.On("InsertEvent", mock.Anything, mock.Anything).Return(nil)
mcm := dh.contracts.(*contractmocks.Manager)
mcm.On("ValidateFFIAndSetPathnames", mock.Anything, mock.Anything).Return(nil)
action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{
action, ba, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{
Header: fftypes.MessageHeader{
Tag: string(fftypes.SystemTagDefineFFI),
},
}, []*fftypes.Data{data})
assert.Equal(t, ActionConfirm, action)
assert.NoError(t, err)
err = ba.Finalize(context.Background())
assert.NoError(t, err)
mbi.AssertExpectations(t)
}

Expand All @@ -128,13 +130,15 @@ func TestHandleFFIBroadcastReject(t *testing.T) {
mcm := dh.contracts.(*contractmocks.Manager)
mbi.On("InsertEvent", mock.Anything, mock.Anything).Return(nil)
mcm.On("ValidateFFIAndSetPathnames", mock.Anything, mock.Anything).Return(fmt.Errorf("pop"))
action, err := dh.handleFFIBroadcast(context.Background(), &fftypes.Message{
action, ba, err := dh.handleFFIBroadcast(context.Background(), &fftypes.Message{
Header: fftypes.MessageHeader{
Tag: string(fftypes.SystemTagDefineFFI),
},
}, []*fftypes.Data{})
assert.Equal(t, ActionReject, action)
assert.NoError(t, err)
err = ba.Finalize(context.Background())
assert.NoError(t, err)
}

func TestPersistFFIUpsertFFIFail(t *testing.T) {
Expand Down Expand Up @@ -231,13 +235,15 @@ func TestHandleContractAPIBroadcastOk(t *testing.T) {
mbi.On("UpsertContractAPI", mock.Anything, mock.Anything, mock.Anything).Return(nil)
mbi.On("GetContractAPIByName", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil)
mbi.On("InsertEvent", mock.Anything, mock.Anything).Return(nil)
action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{
action, ba, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{
Header: fftypes.MessageHeader{
Tag: string(fftypes.SystemTagDefineContractAPI),
},
}, []*fftypes.Data{data})
assert.Equal(t, ActionConfirm, action)
assert.NoError(t, err)
err = ba.Finalize(context.Background())
assert.NoError(t, err)
mbi.AssertExpectations(t)
}

Expand Down
26 changes: 13 additions & 13 deletions internal/definitions/definition_handler_datatype.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,42 +23,42 @@ import (
"github.com/hyperledger/firefly/pkg/fftypes"
)

func (dh *definitionHandlers) handleDatatypeBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (DefinitionMessageAction, error) {
func (dh *definitionHandlers) handleDatatypeBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (DefinitionMessageAction, *DefinitionBatchActions, error) {
l := log.L(ctx)

var dt fftypes.Datatype
valid := dh.getSystemBroadcastPayload(ctx, msg, data, &dt)
if !valid {
return ActionReject, nil
return ActionReject, nil, nil
}

if err := dt.Validate(ctx, true); err != nil {
l.Warnf("Unable to process datatype broadcast %s - validate failed: %s", msg.Header.ID, err)
return ActionReject, nil
return ActionReject, nil, nil
}

if err := dh.data.CheckDatatype(ctx, dt.Namespace, &dt); err != nil {
l.Warnf("Unable to process datatype broadcast %s - schema check: %s", msg.Header.ID, err)
return ActionReject, nil
return ActionReject, nil, nil
}

existing, err := dh.database.GetDatatypeByName(ctx, dt.Namespace, dt.Name, dt.Version)
if err != nil {
return ActionRetry, err // We only return database errors
return ActionRetry, nil, err // We only return database errors
}
if existing != nil {
l.Warnf("Unable to process datatype broadcast %s (%s:%s) - duplicate of %v", msg.Header.ID, dt.Namespace, dt, existing.ID)
return ActionReject, nil
return ActionReject, nil, nil
}

if err = dh.database.UpsertDatatype(ctx, &dt, false); err != nil {
return ActionRetry, err
return ActionRetry, nil, err
}

event := fftypes.NewEvent(fftypes.EventTypeDatatypeConfirmed, dt.Namespace, dt.ID)
if err = dh.database.InsertEvent(ctx, event); err != nil {
return ActionRetry, err
}

return ActionConfirm, nil
return ActionConfirm, &DefinitionBatchActions{
Finalize: func(ctx context.Context) error {
event := fftypes.NewEvent(fftypes.EventTypeDatatypeConfirmed, dt.Namespace, dt.ID)
return dh.database.InsertEvent(ctx, event)
},
}, nil
}
10 changes: 7 additions & 3 deletions internal/definitions/definition_handler_datatype_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,15 @@ func TestHandleDefinitionBroadcastDatatypeOk(t *testing.T) {
mbi.On("GetDatatypeByName", mock.Anything, "ns1", "name1", "ver1").Return(nil, nil)
mbi.On("UpsertDatatype", mock.Anything, mock.Anything, false).Return(nil)
mbi.On("InsertEvent", mock.Anything, mock.Anything).Return(nil)
action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{
action, ba, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{
Header: fftypes.MessageHeader{
Tag: string(fftypes.SystemTagDefineDatatype),
},
}, []*fftypes.Data{data})
assert.Equal(t, ActionConfirm, action)
assert.NoError(t, err)
err = ba.Finalize(context.Background())
assert.NoError(t, err)

mdm.AssertExpectations(t)
mbi.AssertExpectations(t)
Expand Down Expand Up @@ -89,12 +91,14 @@ func TestHandleDefinitionBroadcastDatatypeEventFail(t *testing.T) {
mbi.On("GetDatatypeByName", mock.Anything, "ns1", "name1", "ver1").Return(nil, nil)
mbi.On("UpsertDatatype", mock.Anything, mock.Anything, false).Return(nil)
mbi.On("InsertEvent", mock.Anything, mock.Anything).Return(fmt.Errorf("pop"))
action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{
action, ba, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{
Header: fftypes.MessageHeader{
Tag: string(fftypes.SystemTagDefineDatatype),
},
}, []*fftypes.Data{data})
assert.Equal(t, ActionRetry, action)
assert.Equal(t, ActionConfirm, action)
assert.NoError(t, err)
err = ba.Finalize(context.Background())
assert.EqualError(t, err, "pop")

mdm.AssertExpectations(t)
Expand Down
26 changes: 13 additions & 13 deletions internal/definitions/definition_handler_namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,42 +23,42 @@ import (
"github.com/hyperledger/firefly/pkg/fftypes"
)

func (dh *definitionHandlers) handleNamespaceBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (DefinitionMessageAction, error) {
func (dh *definitionHandlers) handleNamespaceBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (DefinitionMessageAction, *DefinitionBatchActions, error) {
l := log.L(ctx)

var ns fftypes.Namespace
valid := dh.getSystemBroadcastPayload(ctx, msg, data, &ns)
if !valid {
return ActionReject, nil
return ActionReject, nil, nil
}
if err := ns.Validate(ctx, true); err != nil {
l.Warnf("Unable to process namespace broadcast %s - validate failed: %s", msg.Header.ID, err)
return ActionReject, nil
return ActionReject, nil, nil
}

existing, err := dh.database.GetNamespace(ctx, ns.Name)
if err != nil {
return ActionRetry, err // We only return database errors
return ActionRetry, nil, err // We only return database errors
}
if existing != nil {
if existing.Type != fftypes.NamespaceTypeLocal {
l.Warnf("Unable to process namespace broadcast %s (name=%s) - duplicate of %v", msg.Header.ID, existing.Name, existing.ID)
return ActionReject, nil
return ActionReject, nil, nil
}
// Remove the local definition
if err = dh.database.DeleteNamespace(ctx, existing.ID); err != nil {
return ActionRetry, err
return ActionRetry, nil, err
}
}

if err = dh.database.UpsertNamespace(ctx, &ns, false); err != nil {
return ActionRetry, err
return ActionRetry, nil, err
}

event := fftypes.NewEvent(fftypes.EventTypeNamespaceConfirmed, ns.Name, ns.ID)
if err = dh.database.InsertEvent(ctx, event); err != nil {
return ActionRetry, err
}

return ActionConfirm, nil
return ActionConfirm, &DefinitionBatchActions{
Finalize: func(ctx context.Context) error {
event := fftypes.NewEvent(fftypes.EventTypeNamespaceConfirmed, ns.Name, ns.ID)
return dh.database.InsertEvent(ctx, event)
},
}, nil
}
14 changes: 10 additions & 4 deletions internal/definitions/definition_handler_namespace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,15 @@ func TestHandleDefinitionBroadcastNSOk(t *testing.T) {
mdi.On("GetNamespace", mock.Anything, "ns1").Return(nil, nil)
mdi.On("UpsertNamespace", mock.Anything, mock.Anything, false).Return(nil)
mdi.On("InsertEvent", mock.Anything, mock.Anything).Return(nil)
action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{
action, ba, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{
Header: fftypes.MessageHeader{
Tag: string(fftypes.SystemTagDefineNamespace),
},
}, []*fftypes.Data{data})
assert.Equal(t, ActionConfirm, action)
assert.NoError(t, err)
err = ba.Finalize(context.Background())
assert.NoError(t, err)

mdi.AssertExpectations(t)
}
Expand All @@ -73,12 +75,14 @@ func TestHandleDefinitionBroadcastNSEventFail(t *testing.T) {
mdi.On("GetNamespace", mock.Anything, "ns1").Return(nil, nil)
mdi.On("UpsertNamespace", mock.Anything, mock.Anything, false).Return(nil)
mdi.On("InsertEvent", mock.Anything, mock.Anything).Return(fmt.Errorf("pop"))
action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{
action, ba, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{
Header: fftypes.MessageHeader{
Tag: string(fftypes.SystemTagDefineNamespace),
},
}, []*fftypes.Data{data})
assert.Equal(t, ActionRetry, action)
assert.Equal(t, ActionConfirm, action)
assert.NoError(t, err)
err = ba.Finalize(context.Background())
assert.EqualError(t, err, "pop")

mdi.AssertExpectations(t)
Expand Down Expand Up @@ -203,13 +207,15 @@ func TestHandleDefinitionBroadcastDuplicateOverrideLocal(t *testing.T) {
mdi.On("DeleteNamespace", mock.Anything, mock.Anything).Return(nil)
mdi.On("UpsertNamespace", mock.Anything, mock.Anything, false).Return(nil)
mdi.On("InsertEvent", mock.Anything, mock.Anything).Return(nil)
action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{
action, ba, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{
Header: fftypes.MessageHeader{
Tag: string(fftypes.SystemTagDefineNamespace),
},
}, []*fftypes.Data{data})
assert.Equal(t, ActionConfirm, action)
assert.NoError(t, err)
err = ba.Finalize(context.Background())
assert.NoError(t, err)

mdi.AssertExpectations(t)
}
Expand Down
Loading