From 23fbf61bbc0ac2179416f7dbb748fee1e422bfcb Mon Sep 17 00:00:00 2001 From: Andrew Richardson Date: Fri, 4 Feb 2022 23:23:46 -0500 Subject: [PATCH] Remove extra "rejected" events Only "message_rejected" will be emitted for rejected definition messages - no special per-type event will be emitted. This requires one extra lookup in syncasync to find rejected token pools, but seems better for overall consistency. Signed-off-by: Andrew Richardson --- docs/swagger/swagger.yaml | 9 -- .../definition_handler_contracts.go | 34 +++---- .../definition_handler_contracts_test.go | 4 +- .../definition_handler_tokenpool.go | 10 +- .../definition_handler_tokenpool_test.go | 10 -- internal/syncasync/sync_async_bridge.go | 39 +++++--- internal/syncasync/sync_async_bridge_test.go | 98 +++++++++++++++---- pkg/fftypes/event.go | 6 -- 8 files changed, 122 insertions(+), 88 deletions(-) diff --git a/docs/swagger/swagger.yaml b/docs/swagger/swagger.yaml index 9b9112a699..35f99af619 100644 --- a/docs/swagger/swagger.yaml +++ b/docs/swagger/swagger.yaml @@ -3868,13 +3868,10 @@ paths: - datatype_confirmed - group_confirmed - token_pool_confirmed - - token_pool_rejected - token_transfer_confirmed - token_transfer_op_failed - contract_interface_confirmed - - contract_interface_rejected - contract_api_confirmed - - contract_api_rejected - blockchain_event type: string type: object @@ -3929,13 +3926,10 @@ paths: - datatype_confirmed - group_confirmed - token_pool_confirmed - - token_pool_rejected - token_transfer_confirmed - token_transfer_op_failed - contract_interface_confirmed - - contract_interface_rejected - contract_api_confirmed - - contract_api_rejected - blockchain_event type: string type: object @@ -4595,13 +4589,10 @@ paths: - datatype_confirmed - group_confirmed - token_pool_confirmed - - token_pool_rejected - token_transfer_confirmed - token_transfer_op_failed - contract_interface_confirmed - - contract_interface_rejected - contract_api_confirmed - - contract_api_rejected - blockchain_event type: string type: object diff --git a/internal/definitions/definition_handler_contracts.go b/internal/definitions/definition_handler_contracts.go index 0ebb77a6ef..9f62bad6b2 100644 --- a/internal/definitions/definition_handler_contracts.go +++ b/internal/definitions/definition_handler_contracts.go @@ -83,20 +83,15 @@ func (dh *definitionHandlers) handleFFIBroadcast(ctx context.Context, msg *fftyp } } - var eventType fftypes.EventType - var actionResult DefinitionMessageAction - if valid { - l.Infof("Contract interface created id=%s author=%s", broadcast.ID, msg.Header.Author) - eventType = fftypes.EventTypeContractInterfaceConfirmed - actionResult = ActionConfirm - } else { + if !valid { l.Warnf("Contract interface rejected id=%s author=%s", broadcast.ID, msg.Header.Author) - eventType = fftypes.EventTypeContractInterfaceRejected - actionResult = ActionReject + return ActionReject, nil, nil } - return actionResult, &DefinitionBatchActions{ + + l.Infof("Contract interface created id=%s author=%s", broadcast.ID, msg.Header.Author) + return ActionConfirm, &DefinitionBatchActions{ Finalize: func(ctx context.Context) error { - event := fftypes.NewEvent(eventType, broadcast.Namespace, broadcast.ID) + event := fftypes.NewEvent(fftypes.EventTypeContractInterfaceConfirmed, broadcast.Namespace, broadcast.ID) return dh.database.InsertEvent(ctx, event) }, }, nil @@ -120,20 +115,15 @@ func (dh *definitionHandlers) handleContractAPIBroadcast(ctx context.Context, ms } } - var eventType fftypes.EventType - var actionResult DefinitionMessageAction - if valid { - l.Infof("Contract API created id=%s author=%s", broadcast.ID, msg.Header.Author) - eventType = fftypes.EventTypeContractAPIConfirmed - actionResult = ActionConfirm - } else { + if !valid { l.Warnf("Contract API rejected id=%s author=%s", broadcast.ID, msg.Header.Author) - eventType = fftypes.EventTypeContractAPIRejected - actionResult = ActionReject + return ActionReject, nil, nil } - return actionResult, &DefinitionBatchActions{ + + l.Infof("Contract API created id=%s author=%s", broadcast.ID, msg.Header.Author) + return ActionConfirm, &DefinitionBatchActions{ Finalize: func(ctx context.Context) error { - event := fftypes.NewEvent(eventType, broadcast.Namespace, broadcast.ID) + event := fftypes.NewEvent(fftypes.EventTypeContractAPIConfirmed, broadcast.Namespace, broadcast.ID) return dh.database.InsertEvent(ctx, event) }, }, nil diff --git a/internal/definitions/definition_handler_contracts_test.go b/internal/definitions/definition_handler_contracts_test.go index 2ae9e2ac77..294e35aadd 100644 --- a/internal/definitions/definition_handler_contracts_test.go +++ b/internal/definitions/definition_handler_contracts_test.go @@ -130,15 +130,13 @@ func TestHandleFFIBroadcastReject(t *testing.T) { mcm := dh.contracts.(*contractmocks.Manager) mbi.On("InsertEvent", mock.Anything, mock.Anything).Return(nil) mcm.On("ValidateFFIAndSetPathnames", mock.Anything, mock.Anything).Return(fmt.Errorf("pop")) - action, ba, err := dh.handleFFIBroadcast(context.Background(), &fftypes.Message{ + action, _, err := dh.handleFFIBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Tag: string(fftypes.SystemTagDefineFFI), }, }, []*fftypes.Data{}) assert.Equal(t, ActionReject, action) assert.NoError(t, err) - err = ba.Finalize(context.Background()) - assert.NoError(t, err) } func TestPersistFFIUpsertFFIFail(t *testing.T) { diff --git a/internal/definitions/definition_handler_tokenpool.go b/internal/definitions/definition_handler_tokenpool.go index 24bbfaa153..071b901a6b 100644 --- a/internal/definitions/definition_handler_tokenpool.go +++ b/internal/definitions/definition_handler_tokenpool.go @@ -42,12 +42,6 @@ func (dh *definitionHandlers) persistTokenPool(ctx context.Context, announce *ff return true, nil } -func (dh *definitionHandlers) rejectPool(ctx context.Context, pool *fftypes.TokenPool) error { - event := fftypes.NewEvent(fftypes.EventTypePoolRejected, pool.Namespace, pool.ID) - err := dh.database.InsertEvent(ctx, event) - return err -} - func (dh *definitionHandlers) handleTokenPoolBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (DefinitionMessageAction, *DefinitionBatchActions, error) { var announce fftypes.TokenPoolAnnouncement if valid := dh.getSystemBroadcastPayload(ctx, msg, data, &announce); !valid { @@ -59,7 +53,7 @@ func (dh *definitionHandlers) handleTokenPoolBroadcast(ctx context.Context, msg if err := pool.Validate(ctx); err != nil { log.L(ctx).Warnf("Token pool '%s' rejected - validate failed: %s", pool.ID, err) - return ActionReject, nil, dh.rejectPool(ctx, pool) + return ActionReject, nil, nil } // Check if pool has already been confirmed on chain (and confirm the message if so) @@ -72,7 +66,7 @@ func (dh *definitionHandlers) handleTokenPoolBroadcast(ctx context.Context, msg if valid, err := dh.persistTokenPool(ctx, &announce); err != nil { return ActionRetry, nil, err } else if !valid { - return ActionReject, nil, dh.rejectPool(ctx, pool) + return ActionReject, nil, nil } // Message will remain unconfirmed, but plugin will be notified to activate the pool diff --git a/internal/definitions/definition_handler_tokenpool_test.go b/internal/definitions/definition_handler_tokenpool_test.go index 6a7d4e370f..9ae2088619 100644 --- a/internal/definitions/definition_handler_tokenpool_test.go +++ b/internal/definitions/definition_handler_tokenpool_test.go @@ -167,9 +167,6 @@ func TestHandleDefinitionBroadcastTokenPoolIDMismatch(t *testing.T) { mdi.On("UpsertTokenPool", context.Background(), mock.MatchedBy(func(p *fftypes.TokenPool) bool { return *p.ID == *pool.ID && p.Message == msg.Header.ID })).Return(database.IDMismatch) - mdi.On("InsertEvent", context.Background(), mock.MatchedBy(func(event *fftypes.Event) bool { - return *event.Reference == *pool.ID && event.Namespace == pool.Namespace && event.Type == fftypes.EventTypePoolRejected - })).Return(nil) action, _, err := sh.HandleDefinitionBroadcast(context.Background(), msg, data) assert.Equal(t, ActionReject, action) @@ -235,16 +232,9 @@ func TestHandleDefinitionBroadcastTokenPoolValidateFail(t *testing.T) { msg, data, err := buildPoolDefinitionMessage(announce) assert.NoError(t, err) - mdi := sh.database.(*databasemocks.Plugin) - mdi.On("InsertEvent", context.Background(), mock.MatchedBy(func(event *fftypes.Event) bool { - return event.Type == fftypes.EventTypePoolRejected - })).Return(nil) - action, _, err := sh.HandleDefinitionBroadcast(context.Background(), msg, data) assert.Equal(t, ActionReject, action) assert.NoError(t, err) - - mdi.AssertExpectations(t) } func TestHandleDefinitionBroadcastTokenPoolBadMessage(t *testing.T) { diff --git a/internal/syncasync/sync_async_bridge.go b/internal/syncasync/sync_async_bridge.go index b1a87a1c77..d7683995eb 100644 --- a/internal/syncasync/sync_async_bridge.go +++ b/internal/syncasync/sync_async_bridge.go @@ -1,4 +1,4 @@ -// Copyright © 2021 Kaleido, Inc. +// Copyright © 2022 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -18,6 +18,7 @@ package syncasync import ( "context" + "encoding/json" "sync" "time" @@ -172,6 +173,20 @@ func (sa *syncAsyncBridge) getPoolFromEvent(event *fftypes.EventDelivery) (pool return pool, nil } +func (sa *syncAsyncBridge) getPoolFromMessage(msg *fftypes.Message) (*fftypes.TokenPool, error) { + if len(msg.Data) > 0 { + data, err := sa.database.GetDataByID(sa.ctx, msg.Data[0].ID, true) + if err != nil || data == nil { + return nil, err + } + var pool fftypes.TokenPoolAnnouncement + if err := json.Unmarshal(data.Value.Bytes(), &pool); err == nil { + return pool.Pool, nil + } + } + return nil, nil +} + func (sa *syncAsyncBridge) getTransferFromEvent(event *fftypes.EventDelivery) (transfer *fftypes.TokenTransfer, err error) { if transfer, err = sa.database.GetTokenTransfer(sa.ctx, event.Reference); err != nil { return nil, err @@ -232,6 +247,17 @@ func (sa *syncAsyncBridge) eventCallback(event *fftypes.EventDelivery) error { if inflight != nil { go sa.resolveRejected(inflight, msg.Header.ID) } + // See if this is a rejection of an inflight token pool + if msg.Header.Type == fftypes.MessageTypeDefinition && msg.Header.Tag == string(fftypes.SystemTagDefinePool) { + if pool, err := sa.getPoolFromMessage(msg); err != nil { + return err + } else if pool != nil { + inflight := sa.getInFlight(event.Namespace, tokenPoolConfirm, pool.ID) + if inflight != nil { + go sa.resolveRejectedTokenPool(inflight, pool.ID) + } + } + } case fftypes.EventTypePoolConfirmed: pool, err := sa.getPoolFromEvent(event) @@ -244,17 +270,6 @@ func (sa *syncAsyncBridge) eventCallback(event *fftypes.EventDelivery) error { go sa.resolveConfirmedTokenPool(inflight, pool) } - case fftypes.EventTypePoolRejected: - pool, err := sa.getPoolFromEvent(event) - if err != nil || pool == nil { - return err - } - // See if this is a rejection of an inflight token pool - inflight := sa.getInFlight(event.Namespace, tokenPoolConfirm, pool.ID) - if inflight != nil { - go sa.resolveRejectedTokenPool(inflight, pool.ID) - } - case fftypes.EventTypeTransferConfirmed: transfer, err := sa.getTransferFromEvent(event) if err != nil || transfer == nil { diff --git a/internal/syncasync/sync_async_bridge_test.go b/internal/syncasync/sync_async_bridge_test.go index 33265e8c23..d73412bc06 100644 --- a/internal/syncasync/sync_async_bridge_test.go +++ b/internal/syncasync/sync_async_bridge_test.go @@ -1,4 +1,4 @@ -// Copyright © 2021 Kaleido, Inc. +// Copyright © 2022 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -18,6 +18,7 @@ package syncasync import ( "context" + "encoding/json" "fmt" "testing" @@ -444,7 +445,7 @@ func TestEventCallbackTokenTransferNotFound(t *testing.T) { mdi.AssertExpectations(t) } -func TestEventCallbackTokenPoolRejectedNotFound(t *testing.T) { +func TestEventCallbackTokenPoolRejectedNoData(t *testing.T) { sa, cancel := newTestSyncAsyncBridge(t) defer cancel() @@ -456,15 +457,24 @@ func TestEventCallbackTokenPoolRejectedNotFound(t *testing.T) { }, } + msg := &fftypes.Message{ + Header: fftypes.MessageHeader{ + ID: fftypes.NewUUID(), + Type: fftypes.MessageTypeDefinition, + Tag: string(fftypes.SystemTagDefinePool), + }, + Data: fftypes.DataRefs{}, + } + mdi := sa.database.(*databasemocks.Plugin) - mdi.On("GetTokenPoolByID", sa.ctx, mock.Anything).Return(nil, nil) + mdi.On("GetMessageByID", sa.ctx, mock.Anything).Return(msg, nil) err := sa.eventCallback(&fftypes.EventDelivery{ Event: fftypes.Event{ Namespace: "ns1", ID: fftypes.NewUUID(), Reference: fftypes.NewUUID(), - Type: fftypes.EventTypePoolRejected, + Type: fftypes.EventTypeMessageRejected, }, }) assert.NoError(t, err) @@ -472,6 +482,47 @@ func TestEventCallbackTokenPoolRejectedNotFound(t *testing.T) { mdi.AssertExpectations(t) } +func TestEventCallbackTokenPoolRejectedDataError(t *testing.T) { + + sa, cancel := newTestSyncAsyncBridge(t) + defer cancel() + + responseID := fftypes.NewUUID() + sa.inflight = map[string]map[fftypes.UUID]*inflightRequest{ + "ns1": { + *responseID: &inflightRequest{}, + }, + } + + dataID := fftypes.NewUUID() + msg := &fftypes.Message{ + Header: fftypes.MessageHeader{ + ID: fftypes.NewUUID(), + Type: fftypes.MessageTypeDefinition, + Tag: string(fftypes.SystemTagDefinePool), + }, + Data: fftypes.DataRefs{ + {ID: dataID}, + }, + } + + mdi := sa.database.(*databasemocks.Plugin) + mdi.On("GetMessageByID", sa.ctx, mock.Anything).Return(msg, nil) + mdi.On("GetDataByID", sa.ctx, dataID, true).Return(nil, fmt.Errorf("pop")) + + err := sa.eventCallback(&fftypes.EventDelivery{ + Event: fftypes.Event{ + Namespace: "ns1", + ID: fftypes.NewUUID(), + Reference: fftypes.NewUUID(), + Type: fftypes.EventTypeMessageRejected, + }, + }) + assert.EqualError(t, err, "pop") + + mdi.AssertExpectations(t) +} + func TestEventCallbackMsgDataLookupFail(t *testing.T) { sa, cancel := newTestSyncAsyncBridge(t) @@ -549,30 +600,41 @@ func TestAwaitTokenPoolConfirmationRejected(t *testing.T) { sa, cancel := newTestSyncAsyncBridge(t) defer cancel() - requestID := fftypes.NewUUID() + pool := &fftypes.TokenPoolAnnouncement{ + Pool: &fftypes.TokenPool{ + ID: fftypes.NewUUID(), + }, + } + poolJSON, _ := json.Marshal(pool) + data := &fftypes.Data{ + ID: fftypes.NewUUID(), + Value: fftypes.JSONAnyPtrBytes(poolJSON), + } + msg := &fftypes.Message{ + Header: fftypes.MessageHeader{ + ID: fftypes.NewUUID(), + Type: fftypes.MessageTypeDefinition, + Tag: string(fftypes.SystemTagDefinePool), + }, + Data: fftypes.DataRefs{ + {ID: data.ID}, + }, + } mse := sa.sysevents.(*sysmessagingmocks.SystemEvents) mse.On("AddSystemEventListener", "ns1", mock.Anything).Return(nil) mdi := sa.database.(*databasemocks.Plugin) - gmid := mdi.On("GetTokenPoolByID", sa.ctx, mock.Anything) - gmid.RunFn = func(a mock.Arguments) { - pool := &fftypes.TokenPool{ - ID: requestID, - Name: "my-pool", - } - gmid.ReturnArguments = mock.Arguments{ - pool, nil, - } - } + mdi.On("GetMessageByID", sa.ctx, msg.Header.ID).Return(msg, nil) + mdi.On("GetDataByID", sa.ctx, data.ID, true).Return(data, nil) - _, err := sa.WaitForTokenPool(sa.ctx, "ns1", requestID, func(ctx context.Context) error { + _, err := sa.WaitForTokenPool(sa.ctx, "ns1", pool.Pool.ID, func(ctx context.Context) error { go func() { sa.eventCallback(&fftypes.EventDelivery{ Event: fftypes.Event{ ID: fftypes.NewUUID(), - Type: fftypes.EventTypePoolRejected, - Reference: requestID, + Type: fftypes.EventTypeMessageRejected, + Reference: msg.Header.ID, Namespace: "ns1", }, }) diff --git a/pkg/fftypes/event.go b/pkg/fftypes/event.go index 01fbeaa7c6..e0db255a69 100644 --- a/pkg/fftypes/event.go +++ b/pkg/fftypes/event.go @@ -35,20 +35,14 @@ var ( EventTypeGroupConfirmed EventType = ffEnum("eventtype", "group_confirmed") // EventTypePoolConfirmed occurs when a new token pool is ready for use EventTypePoolConfirmed EventType = ffEnum("eventtype", "token_pool_confirmed") - // EventTypePoolRejected occurs when a new token pool is rejected (due to validation errors, duplicates, etc) - EventTypePoolRejected EventType = ffEnum("eventtype", "token_pool_rejected") // EventTypeTransferConfirmed occurs when a token transfer has been confirmed EventTypeTransferConfirmed EventType = ffEnum("eventtype", "token_transfer_confirmed") // EventTypeTransferOpFailed occurs when a token transfer submitted by this node has failed (based on feedback from connector) EventTypeTransferOpFailed EventType = ffEnum("eventtype", "token_transfer_op_failed") // EventTypeContractInterfaceConfirmed occurs when a new contract interface has been confirmed EventTypeContractInterfaceConfirmed EventType = ffEnum("eventtype", "contract_interface_confirmed") - // EventTypeContractInterfaceRejected occurs when a new contract interface has been rejected - EventTypeContractInterfaceRejected EventType = ffEnum("eventtype", "contract_interface_rejected") // EventTypeContractAPIConfirmed occurs when a new contract API has been confirmed EventTypeContractAPIConfirmed EventType = ffEnum("eventtype", "contract_api_confirmed") - // EventTypeContractInterfaceRejected occurs when a new contract API has been rejected - EventTypeContractAPIRejected EventType = ffEnum("eventtype", "contract_api_rejected") // EventTypeBlockchainEvent occurs when a new event has been recorded from the blockchain EventTypeBlockchainEvent EventType = ffEnum("eventtype", "blockchain_event") )