Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions docs/swagger/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3868,13 +3868,10 @@ paths:
- datatype_confirmed
- group_confirmed
- token_pool_confirmed
- token_pool_rejected
- token_transfer_confirmed
- token_transfer_op_failed
- contract_interface_confirmed
- contract_interface_rejected
- contract_api_confirmed
- contract_api_rejected
- blockchain_event
type: string
type: object
Expand Down Expand Up @@ -3929,13 +3926,10 @@ paths:
- datatype_confirmed
- group_confirmed
- token_pool_confirmed
- token_pool_rejected
- token_transfer_confirmed
- token_transfer_op_failed
- contract_interface_confirmed
- contract_interface_rejected
- contract_api_confirmed
- contract_api_rejected
- blockchain_event
type: string
type: object
Expand Down Expand Up @@ -4595,13 +4589,10 @@ paths:
- datatype_confirmed
- group_confirmed
- token_pool_confirmed
- token_pool_rejected
- token_transfer_confirmed
- token_transfer_op_failed
- contract_interface_confirmed
- contract_interface_rejected
- contract_api_confirmed
- contract_api_rejected
- blockchain_event
type: string
type: object
Expand Down
34 changes: 12 additions & 22 deletions internal/definitions/definition_handler_contracts.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,20 +83,15 @@ func (dh *definitionHandlers) handleFFIBroadcast(ctx context.Context, msg *fftyp
}
}

var eventType fftypes.EventType
var actionResult DefinitionMessageAction
if valid {
l.Infof("Contract interface created id=%s author=%s", broadcast.ID, msg.Header.Author)
eventType = fftypes.EventTypeContractInterfaceConfirmed
actionResult = ActionConfirm
} else {
if !valid {
l.Warnf("Contract interface rejected id=%s author=%s", broadcast.ID, msg.Header.Author)
eventType = fftypes.EventTypeContractInterfaceRejected
actionResult = ActionReject
return ActionReject, nil, nil
}
return actionResult, &DefinitionBatchActions{

l.Infof("Contract interface created id=%s author=%s", broadcast.ID, msg.Header.Author)
return ActionConfirm, &DefinitionBatchActions{
Finalize: func(ctx context.Context) error {
event := fftypes.NewEvent(eventType, broadcast.Namespace, broadcast.ID)
event := fftypes.NewEvent(fftypes.EventTypeContractInterfaceConfirmed, broadcast.Namespace, broadcast.ID)
return dh.database.InsertEvent(ctx, event)
},
}, nil
Expand All @@ -120,20 +115,15 @@ func (dh *definitionHandlers) handleContractAPIBroadcast(ctx context.Context, ms
}
}

var eventType fftypes.EventType
var actionResult DefinitionMessageAction
if valid {
l.Infof("Contract API created id=%s author=%s", broadcast.ID, msg.Header.Author)
eventType = fftypes.EventTypeContractAPIConfirmed
actionResult = ActionConfirm
} else {
if !valid {
l.Warnf("Contract API rejected id=%s author=%s", broadcast.ID, msg.Header.Author)
eventType = fftypes.EventTypeContractAPIRejected
actionResult = ActionReject
return ActionReject, nil, nil
}
return actionResult, &DefinitionBatchActions{

l.Infof("Contract API created id=%s author=%s", broadcast.ID, msg.Header.Author)
return ActionConfirm, &DefinitionBatchActions{
Finalize: func(ctx context.Context) error {
event := fftypes.NewEvent(eventType, broadcast.Namespace, broadcast.ID)
event := fftypes.NewEvent(fftypes.EventTypeContractAPIConfirmed, broadcast.Namespace, broadcast.ID)
return dh.database.InsertEvent(ctx, event)
},
}, nil
Expand Down
4 changes: 1 addition & 3 deletions internal/definitions/definition_handler_contracts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,15 +130,13 @@ func TestHandleFFIBroadcastReject(t *testing.T) {
mcm := dh.contracts.(*contractmocks.Manager)
mbi.On("InsertEvent", mock.Anything, mock.Anything).Return(nil)
mcm.On("ValidateFFIAndSetPathnames", mock.Anything, mock.Anything).Return(fmt.Errorf("pop"))
action, ba, err := dh.handleFFIBroadcast(context.Background(), &fftypes.Message{
action, _, err := dh.handleFFIBroadcast(context.Background(), &fftypes.Message{
Header: fftypes.MessageHeader{
Tag: string(fftypes.SystemTagDefineFFI),
},
}, []*fftypes.Data{})
assert.Equal(t, ActionReject, action)
assert.NoError(t, err)
err = ba.Finalize(context.Background())
assert.NoError(t, err)
}

func TestPersistFFIUpsertFFIFail(t *testing.T) {
Expand Down
10 changes: 2 additions & 8 deletions internal/definitions/definition_handler_tokenpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,6 @@ func (dh *definitionHandlers) persistTokenPool(ctx context.Context, announce *ff
return true, nil
}

func (dh *definitionHandlers) rejectPool(ctx context.Context, pool *fftypes.TokenPool) error {
event := fftypes.NewEvent(fftypes.EventTypePoolRejected, pool.Namespace, pool.ID)
err := dh.database.InsertEvent(ctx, event)
return err
}

func (dh *definitionHandlers) handleTokenPoolBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (DefinitionMessageAction, *DefinitionBatchActions, error) {
var announce fftypes.TokenPoolAnnouncement
if valid := dh.getSystemBroadcastPayload(ctx, msg, data, &announce); !valid {
Expand All @@ -59,7 +53,7 @@ func (dh *definitionHandlers) handleTokenPoolBroadcast(ctx context.Context, msg

if err := pool.Validate(ctx); err != nil {
log.L(ctx).Warnf("Token pool '%s' rejected - validate failed: %s", pool.ID, err)
return ActionReject, nil, dh.rejectPool(ctx, pool)
return ActionReject, nil, nil
}

// Check if pool has already been confirmed on chain (and confirm the message if so)
Expand All @@ -72,7 +66,7 @@ func (dh *definitionHandlers) handleTokenPoolBroadcast(ctx context.Context, msg
if valid, err := dh.persistTokenPool(ctx, &announce); err != nil {
return ActionRetry, nil, err
} else if !valid {
return ActionReject, nil, dh.rejectPool(ctx, pool)
return ActionReject, nil, nil
}

// Message will remain unconfirmed, but plugin will be notified to activate the pool
Expand Down
10 changes: 0 additions & 10 deletions internal/definitions/definition_handler_tokenpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,6 @@ func TestHandleDefinitionBroadcastTokenPoolIDMismatch(t *testing.T) {
mdi.On("UpsertTokenPool", context.Background(), mock.MatchedBy(func(p *fftypes.TokenPool) bool {
return *p.ID == *pool.ID && p.Message == msg.Header.ID
})).Return(database.IDMismatch)
mdi.On("InsertEvent", context.Background(), mock.MatchedBy(func(event *fftypes.Event) bool {
return *event.Reference == *pool.ID && event.Namespace == pool.Namespace && event.Type == fftypes.EventTypePoolRejected
})).Return(nil)

action, _, err := sh.HandleDefinitionBroadcast(context.Background(), msg, data)
assert.Equal(t, ActionReject, action)
Expand Down Expand Up @@ -235,16 +232,9 @@ func TestHandleDefinitionBroadcastTokenPoolValidateFail(t *testing.T) {
msg, data, err := buildPoolDefinitionMessage(announce)
assert.NoError(t, err)

mdi := sh.database.(*databasemocks.Plugin)
mdi.On("InsertEvent", context.Background(), mock.MatchedBy(func(event *fftypes.Event) bool {
return event.Type == fftypes.EventTypePoolRejected
})).Return(nil)

action, _, err := sh.HandleDefinitionBroadcast(context.Background(), msg, data)
assert.Equal(t, ActionReject, action)
assert.NoError(t, err)

mdi.AssertExpectations(t)
}

func TestHandleDefinitionBroadcastTokenPoolBadMessage(t *testing.T) {
Expand Down
39 changes: 27 additions & 12 deletions internal/syncasync/sync_async_bridge.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2021 Kaleido, Inc.
// Copyright © 2022 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand All @@ -18,6 +18,7 @@ package syncasync

import (
"context"
"encoding/json"
"sync"
"time"

Expand Down Expand Up @@ -172,6 +173,20 @@ func (sa *syncAsyncBridge) getPoolFromEvent(event *fftypes.EventDelivery) (pool
return pool, nil
}

func (sa *syncAsyncBridge) getPoolFromMessage(msg *fftypes.Message) (*fftypes.TokenPool, error) {
if len(msg.Data) > 0 {
data, err := sa.database.GetDataByID(sa.ctx, msg.Data[0].ID, true)
if err != nil || data == nil {
return nil, err
}
var pool fftypes.TokenPoolAnnouncement
if err := json.Unmarshal(data.Value.Bytes(), &pool); err == nil {
return pool.Pool, nil
}
}
return nil, nil
}

func (sa *syncAsyncBridge) getTransferFromEvent(event *fftypes.EventDelivery) (transfer *fftypes.TokenTransfer, err error) {
if transfer, err = sa.database.GetTokenTransfer(sa.ctx, event.Reference); err != nil {
return nil, err
Expand Down Expand Up @@ -232,6 +247,17 @@ func (sa *syncAsyncBridge) eventCallback(event *fftypes.EventDelivery) error {
if inflight != nil {
go sa.resolveRejected(inflight, msg.Header.ID)
}
// See if this is a rejection of an inflight token pool
if msg.Header.Type == fftypes.MessageTypeDefinition && msg.Header.Tag == string(fftypes.SystemTagDefinePool) {
if pool, err := sa.getPoolFromMessage(msg); err != nil {
return err
} else if pool != nil {
inflight := sa.getInFlight(event.Namespace, tokenPoolConfirm, pool.ID)
if inflight != nil {
go sa.resolveRejectedTokenPool(inflight, pool.ID)
}
}
}

case fftypes.EventTypePoolConfirmed:
pool, err := sa.getPoolFromEvent(event)
Expand All @@ -244,17 +270,6 @@ func (sa *syncAsyncBridge) eventCallback(event *fftypes.EventDelivery) error {
go sa.resolveConfirmedTokenPool(inflight, pool)
}

case fftypes.EventTypePoolRejected:
pool, err := sa.getPoolFromEvent(event)
if err != nil || pool == nil {
return err
}
// See if this is a rejection of an inflight token pool
inflight := sa.getInFlight(event.Namespace, tokenPoolConfirm, pool.ID)
if inflight != nil {
go sa.resolveRejectedTokenPool(inflight, pool.ID)
}

case fftypes.EventTypeTransferConfirmed:
transfer, err := sa.getTransferFromEvent(event)
if err != nil || transfer == nil {
Expand Down
98 changes: 80 additions & 18 deletions internal/syncasync/sync_async_bridge_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2021 Kaleido, Inc.
// Copyright © 2022 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand All @@ -18,6 +18,7 @@ package syncasync

import (
"context"
"encoding/json"
"fmt"
"testing"

Expand Down Expand Up @@ -444,7 +445,7 @@ func TestEventCallbackTokenTransferNotFound(t *testing.T) {
mdi.AssertExpectations(t)
}

func TestEventCallbackTokenPoolRejectedNotFound(t *testing.T) {
func TestEventCallbackTokenPoolRejectedNoData(t *testing.T) {

sa, cancel := newTestSyncAsyncBridge(t)
defer cancel()
Expand All @@ -456,22 +457,72 @@ func TestEventCallbackTokenPoolRejectedNotFound(t *testing.T) {
},
}

msg := &fftypes.Message{
Header: fftypes.MessageHeader{
ID: fftypes.NewUUID(),
Type: fftypes.MessageTypeDefinition,
Tag: string(fftypes.SystemTagDefinePool),
},
Data: fftypes.DataRefs{},
}

mdi := sa.database.(*databasemocks.Plugin)
mdi.On("GetTokenPoolByID", sa.ctx, mock.Anything).Return(nil, nil)
mdi.On("GetMessageByID", sa.ctx, mock.Anything).Return(msg, nil)

err := sa.eventCallback(&fftypes.EventDelivery{
Event: fftypes.Event{
Namespace: "ns1",
ID: fftypes.NewUUID(),
Reference: fftypes.NewUUID(),
Type: fftypes.EventTypePoolRejected,
Type: fftypes.EventTypeMessageRejected,
},
})
assert.NoError(t, err)

mdi.AssertExpectations(t)
}

func TestEventCallbackTokenPoolRejectedDataError(t *testing.T) {

sa, cancel := newTestSyncAsyncBridge(t)
defer cancel()

responseID := fftypes.NewUUID()
sa.inflight = map[string]map[fftypes.UUID]*inflightRequest{
"ns1": {
*responseID: &inflightRequest{},
},
}

dataID := fftypes.NewUUID()
msg := &fftypes.Message{
Header: fftypes.MessageHeader{
ID: fftypes.NewUUID(),
Type: fftypes.MessageTypeDefinition,
Tag: string(fftypes.SystemTagDefinePool),
},
Data: fftypes.DataRefs{
{ID: dataID},
},
}

mdi := sa.database.(*databasemocks.Plugin)
mdi.On("GetMessageByID", sa.ctx, mock.Anything).Return(msg, nil)
mdi.On("GetDataByID", sa.ctx, dataID, true).Return(nil, fmt.Errorf("pop"))

err := sa.eventCallback(&fftypes.EventDelivery{
Event: fftypes.Event{
Namespace: "ns1",
ID: fftypes.NewUUID(),
Reference: fftypes.NewUUID(),
Type: fftypes.EventTypeMessageRejected,
},
})
assert.EqualError(t, err, "pop")

mdi.AssertExpectations(t)
}

func TestEventCallbackMsgDataLookupFail(t *testing.T) {

sa, cancel := newTestSyncAsyncBridge(t)
Expand Down Expand Up @@ -549,30 +600,41 @@ func TestAwaitTokenPoolConfirmationRejected(t *testing.T) {
sa, cancel := newTestSyncAsyncBridge(t)
defer cancel()

requestID := fftypes.NewUUID()
pool := &fftypes.TokenPoolAnnouncement{
Pool: &fftypes.TokenPool{
ID: fftypes.NewUUID(),
},
}
poolJSON, _ := json.Marshal(pool)
data := &fftypes.Data{
ID: fftypes.NewUUID(),
Value: fftypes.JSONAnyPtrBytes(poolJSON),
}
msg := &fftypes.Message{
Header: fftypes.MessageHeader{
ID: fftypes.NewUUID(),
Type: fftypes.MessageTypeDefinition,
Tag: string(fftypes.SystemTagDefinePool),
},
Data: fftypes.DataRefs{
{ID: data.ID},
},
}

mse := sa.sysevents.(*sysmessagingmocks.SystemEvents)
mse.On("AddSystemEventListener", "ns1", mock.Anything).Return(nil)

mdi := sa.database.(*databasemocks.Plugin)
gmid := mdi.On("GetTokenPoolByID", sa.ctx, mock.Anything)
gmid.RunFn = func(a mock.Arguments) {
pool := &fftypes.TokenPool{
ID: requestID,
Name: "my-pool",
}
gmid.ReturnArguments = mock.Arguments{
pool, nil,
}
}
mdi.On("GetMessageByID", sa.ctx, msg.Header.ID).Return(msg, nil)
mdi.On("GetDataByID", sa.ctx, data.ID, true).Return(data, nil)

_, err := sa.WaitForTokenPool(sa.ctx, "ns1", requestID, func(ctx context.Context) error {
_, err := sa.WaitForTokenPool(sa.ctx, "ns1", pool.Pool.ID, func(ctx context.Context) error {
go func() {
sa.eventCallback(&fftypes.EventDelivery{
Event: fftypes.Event{
ID: fftypes.NewUUID(),
Type: fftypes.EventTypePoolRejected,
Reference: requestID,
Type: fftypes.EventTypeMessageRejected,
Reference: msg.Header.ID,
Namespace: "ns1",
},
})
Expand Down
Loading