From c8fa1d056091ebb35b95803aac2c65feff968e93 Mon Sep 17 00:00:00 2001 From: Andrew Richardson Date: Wed, 20 Oct 2021 21:02:04 -0400 Subject: [PATCH 1/3] Add EventTypeTransferOpFailed for token transfer operations that fail Signed-off-by: Andrew Richardson --- internal/assets/manager.go | 2 +- internal/assets/token_transfer.go | 10 +- internal/events/operation_update.go | 8 + internal/events/operation_update_test.go | 54 +++++++ internal/events/tokens_transferred.go | 12 +- internal/i18n/en_translations.go | 1 + internal/orchestrator/bound_callbacks.go | 2 +- internal/orchestrator/bound_callbacks_test.go | 2 +- internal/syncasync/sync_async_bridge.go | 61 ++++++-- internal/syncasync/sync_async_bridge_test.go | 148 +++++++++++++++++- internal/tokens/fftokens/fftokens.go | 2 +- internal/tokens/fftokens/fftokens_test.go | 4 +- internal/txcommon/token_inputs.go | 38 +++++ internal/txcommon/token_inputs_test.go | 61 ++++++++ mocks/assetmocks/manager.go | 8 +- mocks/tokenmocks/callbacks.go | 20 +-- pkg/fftypes/event.go | 2 + pkg/tokens/plugin.go | 2 +- 18 files changed, 375 insertions(+), 62 deletions(-) create mode 100644 internal/txcommon/token_inputs.go create mode 100644 internal/txcommon/token_inputs_test.go diff --git a/internal/assets/manager.go b/internal/assets/manager.go index 8750aa3ccb..c2970ea28e 100644 --- a/internal/assets/manager.go +++ b/internal/assets/manager.go @@ -53,7 +53,7 @@ type Manager interface { GetTokenConnectors(ctx context.Context, ns string) ([]*fftypes.TokenConnector, error) // Bound token callbacks - TokenPoolCreated(tk tokens.Plugin, pool *fftypes.TokenPool, protocolTxID string, additionalInfo fftypes.JSONObject) error + TokenPoolCreated(ti tokens.Plugin, pool *fftypes.TokenPool, protocolTxID string, additionalInfo fftypes.JSONObject) error Start() error WaitStop() diff --git a/internal/assets/token_transfer.go b/internal/assets/token_transfer.go index 6bb2addd6d..8f1a921044 100644 --- a/internal/assets/token_transfer.go +++ b/internal/assets/token_transfer.go @@ -22,17 +22,11 @@ import ( "github.com/hyperledger/firefly/internal/i18n" "github.com/hyperledger/firefly/internal/sysmessaging" + "github.com/hyperledger/firefly/internal/txcommon" "github.com/hyperledger/firefly/pkg/database" "github.com/hyperledger/firefly/pkg/fftypes" ) -// Note: the counterpart to below (retrieveTokenTransferInputs) lives in the events package -func addTokenTransferInputs(op *fftypes.Operation, transfer *fftypes.TokenTransfer) { - op.Input = fftypes.JSONObject{ - "id": transfer.LocalID.String(), - } -} - func (am *assetManager) GetTokenTransfers(ctx context.Context, ns string, filter database.AndFilter) ([]*fftypes.TokenTransfer, *database.FilterResult, error) { return am.database.GetTokenTransfers(ctx, am.scopeNS(ns, filter)) } @@ -258,7 +252,7 @@ func (s *transferSender) sendInternal(ctx context.Context, method sendMethod) er fftypes.OpTypeTokenTransfer, fftypes.OpStatusPending, "") - addTokenTransferInputs(op, &s.transfer.TokenTransfer) + txcommon.AddTokenTransferInputs(op, &s.transfer.TokenTransfer) err = s.mgr.database.RunAsGroup(ctx, func(ctx context.Context) (err error) { pool, err := s.mgr.GetTokenPool(ctx, s.namespace, s.connector, s.poolName) diff --git a/internal/events/operation_update.go b/internal/events/operation_update.go index 9b28699f7f..cdb4cc29cc 100644 --- a/internal/events/operation_update.go +++ b/internal/events/operation_update.go @@ -36,5 +36,13 @@ func (em *eventManager) OperationUpdate(plugin fftypes.Named, operationID *fftyp if err := em.database.UpdateOperation(em.ctx, op.ID, update); err != nil { return err } + + // Special handling for OpTypeTokenTransfer, which writes an event when it fails + if op.Type == fftypes.OpTypeTokenTransfer && txState == fftypes.OpStatusFailed { + event := fftypes.NewEvent(fftypes.EventTypeTransferOpFailed, op.Namespace, op.ID) + if err := em.database.InsertEvent(em.ctx, event); err != nil { + return err + } + } return nil } diff --git a/internal/events/operation_update_test.go b/internal/events/operation_update_test.go index 4c7ae4f511..751586582b 100644 --- a/internal/events/operation_update_test.go +++ b/internal/events/operation_update_test.go @@ -79,3 +79,57 @@ func TestOperationUpdateError(t *testing.T) { mdi.AssertExpectations(t) mbi.AssertExpectations(t) } + +func TestOperationUpdateTransferFail(t *testing.T) { + em, cancel := newTestEventManager(t) + defer cancel() + mdi := em.database.(*databasemocks.Plugin) + mbi := &blockchainmocks.Plugin{} + + opID := fftypes.NewUUID() + op := &fftypes.Operation{ + ID: opID, + Type: fftypes.OpTypeTokenTransfer, + Namespace: "ns1", + } + + mdi.On("GetOperationByID", em.ctx, opID).Return(op, nil) + mdi.On("UpdateOperation", em.ctx, opID, mock.Anything).Return(nil) + mdi.On("InsertEvent", em.ctx, mock.MatchedBy(func(e *fftypes.Event) bool { + return e.Type == fftypes.EventTypeTransferOpFailed && e.Namespace == "ns1" + })).Return(nil) + + info := fftypes.JSONObject{"some": "info"} + err := em.OperationUpdate(mbi, opID, fftypes.OpStatusFailed, "some error", info) + assert.NoError(t, err) + + mdi.AssertExpectations(t) + mbi.AssertExpectations(t) +} + +func TestOperationUpdateTransferEventFail(t *testing.T) { + em, cancel := newTestEventManager(t) + defer cancel() + mdi := em.database.(*databasemocks.Plugin) + mbi := &blockchainmocks.Plugin{} + + opID := fftypes.NewUUID() + op := &fftypes.Operation{ + ID: opID, + Type: fftypes.OpTypeTokenTransfer, + Namespace: "ns1", + } + + mdi.On("GetOperationByID", em.ctx, opID).Return(op, nil) + mdi.On("UpdateOperation", em.ctx, opID, mock.Anything).Return(nil) + mdi.On("InsertEvent", em.ctx, mock.MatchedBy(func(e *fftypes.Event) bool { + return e.Type == fftypes.EventTypeTransferOpFailed && e.Namespace == "ns1" + })).Return(fmt.Errorf("pop")) + + info := fftypes.JSONObject{"some": "info"} + err := em.OperationUpdate(mbi, opID, fftypes.OpStatusFailed, "some error", info) + assert.EqualError(t, err, "pop") + + mdi.AssertExpectations(t) + mbi.AssertExpectations(t) +} diff --git a/internal/events/tokens_transferred.go b/internal/events/tokens_transferred.go index 846f7247dc..98b11457a1 100644 --- a/internal/events/tokens_transferred.go +++ b/internal/events/tokens_transferred.go @@ -20,20 +20,12 @@ import ( "context" "github.com/hyperledger/firefly/internal/log" + "github.com/hyperledger/firefly/internal/txcommon" "github.com/hyperledger/firefly/pkg/database" "github.com/hyperledger/firefly/pkg/fftypes" "github.com/hyperledger/firefly/pkg/tokens" ) -func retrieveTokenTransferInputs(ctx context.Context, op *fftypes.Operation, transfer *fftypes.TokenTransfer) (err error) { - input := &op.Input - transfer.LocalID, err = fftypes.ParseUUID(ctx, input.GetString("id")) - if err != nil { - return err - } - return nil -} - func (em *eventManager) persistTokenTransaction(ctx context.Context, ns string, transfer *fftypes.TokenTransfer, protocolTxID string, additionalInfo fftypes.JSONObject) (valid bool, err error) { transfer.LocalID = nil @@ -48,7 +40,7 @@ func (em *eventManager) persistTokenTransaction(ctx context.Context, ns string, return false, err } if len(operations) > 0 { - err = retrieveTokenTransferInputs(ctx, operations[0], transfer) + err = txcommon.RetrieveTokenTransferInputs(ctx, operations[0], transfer) if err != nil { log.L(ctx).Warnf("Failed to read operation inputs for token transfer '%s': %s", transfer.ProtocolID, err) } diff --git a/internal/i18n/en_translations.go b/internal/i18n/en_translations.go index 681c226848..c53d72815e 100644 --- a/internal/i18n/en_translations.go +++ b/internal/i18n/en_translations.go @@ -206,4 +206,5 @@ var ( MsgFailedToDecodeCertificate = ffm("FF10286", "Failed to decode certificate: %s", 500) MsgInvalidMessageType = ffm("FF10287", "Invalid message type - allowed types are %s", 400) MsgNoUUID = ffm("FF10288", "Field '%s' must not be a UUID", 400) + MsgTokenTransferFailed = ffm("FF10289", "Token transfer with ID '%s' failed. Please check the FireFly logs for more information") ) diff --git a/internal/orchestrator/bound_callbacks.go b/internal/orchestrator/bound_callbacks.go index 891b1296b7..b4bb3d6db2 100644 --- a/internal/orchestrator/bound_callbacks.go +++ b/internal/orchestrator/bound_callbacks.go @@ -36,7 +36,7 @@ func (bc *boundCallbacks) BlockchainOpUpdate(operationID *fftypes.UUID, txState return bc.ei.OperationUpdate(bc.bi, operationID, txState, errorMessage, opOutput) } -func (bc *boundCallbacks) TokensOpUpdate(plugin tokens.Plugin, operationID *fftypes.UUID, txState fftypes.OpStatus, errorMessage string, opOutput fftypes.JSONObject) error { +func (bc *boundCallbacks) TokenOpUpdate(plugin tokens.Plugin, operationID *fftypes.UUID, txState fftypes.OpStatus, errorMessage string, opOutput fftypes.JSONObject) error { return bc.ei.OperationUpdate(plugin, operationID, txState, errorMessage, opOutput) } diff --git a/internal/orchestrator/bound_callbacks_test.go b/internal/orchestrator/bound_callbacks_test.go index 65e9fb7ab2..4735dffef4 100644 --- a/internal/orchestrator/bound_callbacks_test.go +++ b/internal/orchestrator/bound_callbacks_test.go @@ -54,7 +54,7 @@ func TestBoundCallbacks(t *testing.T) { assert.EqualError(t, err, "pop") mei.On("OperationUpdate", mti, opID, fftypes.OpStatusFailed, "error info", info).Return(fmt.Errorf("pop")) - err = bc.TokensOpUpdate(mti, opID, fftypes.OpStatusFailed, "error info", info) + err = bc.TokenOpUpdate(mti, opID, fftypes.OpStatusFailed, "error info", info) assert.EqualError(t, err, "pop") mei.On("TransferResult", mdx, "tracking12345", fftypes.OpStatusFailed, "error info", info).Return(fmt.Errorf("pop")) diff --git a/internal/syncasync/sync_async_bridge.go b/internal/syncasync/sync_async_bridge.go index 9ddfb96aa2..b1a87a1c77 100644 --- a/internal/syncasync/sync_async_bridge.go +++ b/internal/syncasync/sync_async_bridge.go @@ -25,6 +25,7 @@ import ( "github.com/hyperledger/firefly/internal/i18n" "github.com/hyperledger/firefly/internal/log" "github.com/hyperledger/firefly/internal/sysmessaging" + "github.com/hyperledger/firefly/internal/txcommon" "github.com/hyperledger/firefly/pkg/database" "github.com/hyperledger/firefly/pkg/fftypes" ) @@ -149,9 +150,8 @@ func (inflight *inflightRequest) msInflight() float64 { return float64(dur) / float64(time.Millisecond) } -func (sa *syncAsyncBridge) getMessageFromEvent(event *fftypes.EventDelivery) (*fftypes.Message, error) { - msg, err := sa.database.GetMessageByID(sa.ctx, event.Reference) - if err != nil { +func (sa *syncAsyncBridge) getMessageFromEvent(event *fftypes.EventDelivery) (msg *fftypes.Message, err error) { + if msg, err = sa.database.GetMessageByID(sa.ctx, event.Reference); err != nil { return nil, err } if msg == nil { @@ -161,9 +161,8 @@ func (sa *syncAsyncBridge) getMessageFromEvent(event *fftypes.EventDelivery) (*f return msg, nil } -func (sa *syncAsyncBridge) getPoolFromEvent(event *fftypes.EventDelivery) (*fftypes.TokenPool, error) { - pool, err := sa.database.GetTokenPoolByID(sa.ctx, event.Reference) - if err != nil { +func (sa *syncAsyncBridge) getPoolFromEvent(event *fftypes.EventDelivery) (pool *fftypes.TokenPool, err error) { + if pool, err = sa.database.GetTokenPoolByID(sa.ctx, event.Reference); err != nil { return nil, err } if pool == nil { @@ -173,9 +172,8 @@ func (sa *syncAsyncBridge) getPoolFromEvent(event *fftypes.EventDelivery) (*ffty return pool, nil } -func (sa *syncAsyncBridge) getTransferFromEvent(event *fftypes.EventDelivery) (*fftypes.TokenTransfer, error) { - transfer, err := sa.database.GetTokenTransfer(sa.ctx, event.Reference) - if err != nil { +func (sa *syncAsyncBridge) getTransferFromEvent(event *fftypes.EventDelivery) (transfer *fftypes.TokenTransfer, err error) { + if transfer, err = sa.database.GetTokenTransfer(sa.ctx, event.Reference); err != nil { return nil, err } if transfer == nil { @@ -185,6 +183,17 @@ func (sa *syncAsyncBridge) getTransferFromEvent(event *fftypes.EventDelivery) (* return transfer, nil } +func (sa *syncAsyncBridge) getOperationFromEvent(event *fftypes.EventDelivery) (op *fftypes.Operation, err error) { + if op, err = sa.database.GetOperationByID(sa.ctx, event.Reference); err != nil { + return nil, err + } + if op == nil { + // This should not happen (but we need to move on) + log.L(sa.ctx).Errorf("Unable to resolve operation '%s' for %s event '%s'", event.Reference, event.Type, event.ID) + } + return op, nil +} + func (sa *syncAsyncBridge) eventCallback(event *fftypes.EventDelivery) error { sa.inflightMux.Lock() defer sa.inflightMux.Unlock() @@ -221,7 +230,7 @@ func (sa *syncAsyncBridge) eventCallback(event *fftypes.EventDelivery) error { // See if this is a rejection of an inflight message inflight := sa.getInFlight(event.Namespace, messageConfirm, msg.Header.ID) if inflight != nil { - go sa.resolveRejected(inflight, msg) + go sa.resolveRejected(inflight, msg.Header.ID) } case fftypes.EventTypePoolConfirmed: @@ -243,7 +252,7 @@ func (sa *syncAsyncBridge) eventCallback(event *fftypes.EventDelivery) error { // See if this is a rejection of an inflight token pool inflight := sa.getInFlight(event.Namespace, tokenPoolConfirm, pool.ID) if inflight != nil { - go sa.resolveRejectedTokenPool(inflight, pool) + go sa.resolveRejectedTokenPool(inflight, pool.ID) } case fftypes.EventTypeTransferConfirmed: @@ -256,6 +265,22 @@ func (sa *syncAsyncBridge) eventCallback(event *fftypes.EventDelivery) error { if inflight != nil { go sa.resolveConfirmedTokenTransfer(inflight, transfer) } + + case fftypes.EventTypeTransferOpFailed: + op, err := sa.getOperationFromEvent(event) + if err != nil || op == nil { + return err + } + // Extract the LocalID of the transfer + var transfer fftypes.TokenTransfer + if err := txcommon.RetrieveTokenTransferInputs(sa.ctx, op, &transfer); err != nil { + log.L(sa.ctx).Warnf("Failed to extract token transfer inputs for operation '%s': %s", op.ID, err) + } + // See if this is a failure of an inflight token transfer operation + inflight := sa.getInFlight(event.Namespace, tokenTransferConfirm, transfer.LocalID) + if inflight != nil { + go sa.resolveFailedTokenTransfer(inflight, transfer.LocalID) + } } return nil @@ -279,8 +304,8 @@ func (sa *syncAsyncBridge) resolveConfirmed(inflight *inflightRequest, msg *ffty inflight.response <- inflightResponse{id: msg.Header.ID, data: msg} } -func (sa *syncAsyncBridge) resolveRejected(inflight *inflightRequest, msg *fftypes.Message) { - err := i18n.NewError(sa.ctx, i18n.MsgRejected, msg.Header.ID) +func (sa *syncAsyncBridge) resolveRejected(inflight *inflightRequest, msgID *fftypes.UUID) { + err := i18n.NewError(sa.ctx, i18n.MsgRejected, msgID) log.L(sa.ctx).Errorf("Resolving message confirmation request '%s' with error: %s", inflight.id, err) inflight.response <- inflightResponse{err: err} } @@ -290,8 +315,8 @@ func (sa *syncAsyncBridge) resolveConfirmedTokenPool(inflight *inflightRequest, inflight.response <- inflightResponse{id: pool.ID, data: pool} } -func (sa *syncAsyncBridge) resolveRejectedTokenPool(inflight *inflightRequest, pool *fftypes.TokenPool) { - err := i18n.NewError(sa.ctx, i18n.MsgTokenPoolRejected, pool.ID) +func (sa *syncAsyncBridge) resolveRejectedTokenPool(inflight *inflightRequest, poolID *fftypes.UUID) { + err := i18n.NewError(sa.ctx, i18n.MsgTokenPoolRejected, poolID) log.L(sa.ctx).Errorf("Resolving token pool confirmation request '%s' with error '%s'", inflight.id, err) inflight.response <- inflightResponse{err: err} } @@ -301,6 +326,12 @@ func (sa *syncAsyncBridge) resolveConfirmedTokenTransfer(inflight *inflightReque inflight.response <- inflightResponse{id: transfer.LocalID, data: transfer} } +func (sa *syncAsyncBridge) resolveFailedTokenTransfer(inflight *inflightRequest, transferID *fftypes.UUID) { + err := i18n.NewError(sa.ctx, i18n.MsgTokenTransferFailed, transferID) + log.L(sa.ctx).Debugf("Resolving token transfer confirmation request '%s' with error '%s'", inflight.id, err) + inflight.response <- inflightResponse{err: err} +} + func (sa *syncAsyncBridge) sendAndWait(ctx context.Context, ns string, id *fftypes.UUID, reqType requestType, send RequestSender) (interface{}, error) { inflight, err := sa.addInFlight(ns, id, reqType) if err != nil { diff --git a/internal/syncasync/sync_async_bridge_test.go b/internal/syncasync/sync_async_bridge_test.go index 3fbf1c1ad9..5d576a9424 100644 --- a/internal/syncasync/sync_async_bridge_test.go +++ b/internal/syncasync/sync_async_bridge_test.go @@ -54,7 +54,6 @@ func TestRequestReplyOk(t *testing.T) { mdi := sa.database.(*databasemocks.Plugin) gmid := mdi.On("GetMessageByID", sa.ctx, mock.Anything) gmid.RunFn = func(a mock.Arguments) { - assert.NotNil(t, requestID) gmid.ReturnArguments = mock.Arguments{ &fftypes.Message{ Header: fftypes.MessageHeader{ @@ -106,7 +105,6 @@ func TestAwaitConfirmationOk(t *testing.T) { mdi := sa.database.(*databasemocks.Plugin) gmid := mdi.On("GetMessageByID", sa.ctx, mock.Anything) gmid.RunFn = func(a mock.Arguments) { - assert.NotNil(t, requestID) msgSent := &fftypes.Message{} msgSent.Header.ID = requestID msgSent.Confirmed = fftypes.Now() @@ -153,7 +151,6 @@ func TestAwaitConfirmationRejected(t *testing.T) { mdi := sa.database.(*databasemocks.Plugin) gmid := mdi.On("GetMessageByID", sa.ctx, mock.Anything) gmid.RunFn = func(a mock.Arguments) { - assert.NotNil(t, requestID) msgSent := &fftypes.Message{} msgSent.Header.ID = requestID msgSent.Confirmed = fftypes.Now() @@ -506,7 +503,6 @@ func TestAwaitTokenPoolConfirmation(t *testing.T) { mdi := sa.database.(*databasemocks.Plugin) gmid := mdi.On("GetTokenPoolByID", sa.ctx, mock.Anything) gmid.RunFn = func(a mock.Arguments) { - assert.NotNil(t, requestID) pool := &fftypes.TokenPool{ ID: requestID, Name: "my-pool", @@ -561,7 +557,6 @@ func TestAwaitTokenPoolConfirmationRejected(t *testing.T) { mdi := sa.database.(*databasemocks.Plugin) gmid := mdi.On("GetTokenPoolByID", sa.ctx, mock.Anything) gmid.RunFn = func(a mock.Arguments) { - assert.NotNil(t, requestID) pool := &fftypes.TokenPool{ ID: requestID, Name: "my-pool", @@ -600,13 +595,12 @@ func TestAwaitTokenTransferConfirmation(t *testing.T) { mdi := sa.database.(*databasemocks.Plugin) gmid := mdi.On("GetTokenTransfer", sa.ctx, mock.Anything) gmid.RunFn = func(a mock.Arguments) { - assert.NotNil(t, requestID) - pool := &fftypes.TokenTransfer{ + transfer := &fftypes.TokenTransfer{ LocalID: requestID, ProtocolID: "abc", } gmid.ReturnArguments = mock.Arguments{ - pool, nil, + transfer, nil, } } @@ -641,3 +635,141 @@ func TestAwaitTokenTransferConfirmationSendFail(t *testing.T) { }) assert.EqualError(t, err, "pop") } + +func TestAwaitFailedTokenTransfer(t *testing.T) { + + sa, cancel := newTestSyncAsyncBridge(t) + defer cancel() + + requestID := fftypes.NewUUID() + op := &fftypes.Operation{ + ID: fftypes.NewUUID(), + Input: fftypes.JSONObject{ + "id": requestID.String(), + }, + } + + mse := sa.sysevents.(*sysmessagingmocks.SystemEvents) + mse.On("AddSystemEventListener", "ns1", mock.Anything).Return(nil) + + mdi := sa.database.(*databasemocks.Plugin) + mdi.On("GetOperationByID", sa.ctx, op.ID).Return(op, nil) + + _, err := sa.WaitForTokenTransfer(sa.ctx, "ns1", requestID, func(ctx context.Context) error { + go func() { + sa.eventCallback(&fftypes.EventDelivery{ + Event: fftypes.Event{ + ID: fftypes.NewUUID(), + Type: fftypes.EventTypeTransferOpFailed, + Reference: op.ID, + Namespace: "ns1", + }, + }) + }() + return nil + }) + assert.Regexp(t, "FF10289", err) +} + +func TestFailedTokenTransferOpError(t *testing.T) { + + sa, cancel := newTestSyncAsyncBridge(t) + defer cancel() + + requestID := fftypes.NewUUID() + sa.inflight = map[string]map[fftypes.UUID]*inflightRequest{ + "ns1": { + *requestID: &inflightRequest{}, + }, + } + + op := &fftypes.Operation{ + ID: fftypes.NewUUID(), + Input: fftypes.JSONObject{ + "id": requestID.String(), + }, + } + + mdi := sa.database.(*databasemocks.Plugin) + mdi.On("GetOperationByID", sa.ctx, op.ID).Return(nil, fmt.Errorf("pop")) + + err := sa.eventCallback(&fftypes.EventDelivery{ + Event: fftypes.Event{ + ID: fftypes.NewUUID(), + Type: fftypes.EventTypeTransferOpFailed, + Reference: op.ID, + Namespace: "ns1", + }, + }) + assert.EqualError(t, err, "pop") + + mdi.AssertExpectations(t) +} + +func TestFailedTokenTransferOpNotFound(t *testing.T) { + + sa, cancel := newTestSyncAsyncBridge(t) + defer cancel() + + requestID := fftypes.NewUUID() + sa.inflight = map[string]map[fftypes.UUID]*inflightRequest{ + "ns1": { + *requestID: &inflightRequest{}, + }, + } + + op := &fftypes.Operation{ + ID: fftypes.NewUUID(), + Input: fftypes.JSONObject{ + "id": requestID.String(), + }, + } + + mdi := sa.database.(*databasemocks.Plugin) + mdi.On("GetOperationByID", sa.ctx, op.ID).Return(nil, nil) + + err := sa.eventCallback(&fftypes.EventDelivery{ + Event: fftypes.Event{ + ID: fftypes.NewUUID(), + Type: fftypes.EventTypeTransferOpFailed, + Reference: op.ID, + Namespace: "ns1", + }, + }) + assert.NoError(t, err) + + mdi.AssertExpectations(t) +} + +func TestFailedTokenTransferIDLookupFail(t *testing.T) { + + sa, cancel := newTestSyncAsyncBridge(t) + defer cancel() + + requestID := fftypes.NewUUID() + sa.inflight = map[string]map[fftypes.UUID]*inflightRequest{ + "ns1": { + *requestID: &inflightRequest{}, + }, + } + + op := &fftypes.Operation{ + ID: fftypes.NewUUID(), + Input: fftypes.JSONObject{}, + } + + mdi := sa.database.(*databasemocks.Plugin) + mdi.On("GetOperationByID", sa.ctx, op.ID).Return(op, nil) + + err := sa.eventCallback(&fftypes.EventDelivery{ + Event: fftypes.Event{ + ID: fftypes.NewUUID(), + Type: fftypes.EventTypeTransferOpFailed, + Reference: op.ID, + Namespace: "ns1", + }, + }) + assert.NoError(t, err) + + mdi.AssertExpectations(t) +} diff --git a/internal/tokens/fftokens/fftokens.go b/internal/tokens/fftokens/fftokens.go index 8ff6f37bd8..228a199e5b 100644 --- a/internal/tokens/fftokens/fftokens.go +++ b/internal/tokens/fftokens/fftokens.go @@ -151,7 +151,7 @@ func (ft *FFTokens) handleReceipt(ctx context.Context, data fftypes.JSONObject) replyType = fftypes.OpStatusFailed } l.Infof("Tokens '%s' reply: request=%s message=%s", replyType, requestID, message) - return ft.callbacks.TokensOpUpdate(ft, operationID, replyType, message, data) + return ft.callbacks.TokenOpUpdate(ft, operationID, replyType, message, data) } func (ft *FFTokens) handleTokenPoolCreate(ctx context.Context, data fftypes.JSONObject) (err error) { diff --git a/internal/tokens/fftokens/fftokens_test.go b/internal/tokens/fftokens/fftokens_test.go index bc3f6228d0..dc5e01eff7 100644 --- a/internal/tokens/fftokens/fftokens_test.go +++ b/internal/tokens/fftokens/fftokens_test.go @@ -356,11 +356,11 @@ func TestEvents(t *testing.T) { fromServer <- `{"id":"3","event":"receipt","data":{"id":"abc"}}` // receipt: success - mcb.On("TokensOpUpdate", h, opID, fftypes.OpStatusSucceeded, "", mock.Anything).Return(nil).Once() + mcb.On("TokenOpUpdate", h, opID, fftypes.OpStatusSucceeded, "", mock.Anything).Return(nil).Once() fromServer <- `{"id":"4","event":"receipt","data":{"id":"` + opID.String() + `","success":true}}` // receipt: failure - mcb.On("TokensOpUpdate", h, opID, fftypes.OpStatusFailed, "", mock.Anything).Return(nil).Once() + mcb.On("TokenOpUpdate", h, opID, fftypes.OpStatusFailed, "", mock.Anything).Return(nil).Once() fromServer <- `{"id":"5","event":"receipt","data":{"id":"` + opID.String() + `","success":false}}` // token-pool: missing data diff --git a/internal/txcommon/token_inputs.go b/internal/txcommon/token_inputs.go new file mode 100644 index 0000000000..cb1e775419 --- /dev/null +++ b/internal/txcommon/token_inputs.go @@ -0,0 +1,38 @@ +// Copyright © 2021 Kaleido, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package txcommon + +import ( + "context" + + "github.com/hyperledger/firefly/pkg/fftypes" +) + +func AddTokenTransferInputs(op *fftypes.Operation, transfer *fftypes.TokenTransfer) { + op.Input = fftypes.JSONObject{ + "id": transfer.LocalID.String(), + } +} + +func RetrieveTokenTransferInputs(ctx context.Context, op *fftypes.Operation, transfer *fftypes.TokenTransfer) (err error) { + input := &op.Input + transfer.LocalID, err = fftypes.ParseUUID(ctx, input.GetString("id")) + if err != nil { + return err + } + return nil +} diff --git a/internal/txcommon/token_inputs_test.go b/internal/txcommon/token_inputs_test.go new file mode 100644 index 0000000000..d56fb61a8e --- /dev/null +++ b/internal/txcommon/token_inputs_test.go @@ -0,0 +1,61 @@ +// Copyright © 2021 Kaleido, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package txcommon + +import ( + "context" + "testing" + + "github.com/hyperledger/firefly/pkg/fftypes" + "github.com/stretchr/testify/assert" +) + +func TestAddTokenTransferInputs(t *testing.T) { + op := &fftypes.Operation{} + transfer := &fftypes.TokenTransfer{ + LocalID: fftypes.NewUUID(), + } + + AddTokenTransferInputs(op, transfer) + assert.Equal(t, transfer.LocalID.String(), op.Input.GetString("id")) +} + +func TestRetrieveTokenTransferInputs(t *testing.T) { + id := fftypes.NewUUID() + op := &fftypes.Operation{ + Input: fftypes.JSONObject{ + "id": id.String(), + }, + } + transfer := &fftypes.TokenTransfer{} + + err := RetrieveTokenTransferInputs(context.Background(), op, transfer) + assert.NoError(t, err) + assert.Equal(t, *id, *transfer.LocalID) +} + +func TestRetrieveTokenTransferInputsBadID(t *testing.T) { + op := &fftypes.Operation{ + Input: fftypes.JSONObject{ + "id": "bad", + }, + } + transfer := &fftypes.TokenTransfer{} + + err := RetrieveTokenTransferInputs(context.Background(), op, transfer) + assert.Regexp(t, "FF10142", err) +} diff --git a/mocks/assetmocks/manager.go b/mocks/assetmocks/manager.go index 059502600a..7837aa8bd6 100644 --- a/mocks/assetmocks/manager.go +++ b/mocks/assetmocks/manager.go @@ -403,13 +403,13 @@ func (_m *Manager) Start() error { return r0 } -// TokenPoolCreated provides a mock function with given fields: tk, pool, protocolTxID, additionalInfo -func (_m *Manager) TokenPoolCreated(tk tokens.Plugin, pool *fftypes.TokenPool, protocolTxID string, additionalInfo fftypes.JSONObject) error { - ret := _m.Called(tk, pool, protocolTxID, additionalInfo) +// TokenPoolCreated provides a mock function with given fields: ti, pool, protocolTxID, additionalInfo +func (_m *Manager) TokenPoolCreated(ti tokens.Plugin, pool *fftypes.TokenPool, protocolTxID string, additionalInfo fftypes.JSONObject) error { + ret := _m.Called(ti, pool, protocolTxID, additionalInfo) var r0 error if rf, ok := ret.Get(0).(func(tokens.Plugin, *fftypes.TokenPool, string, fftypes.JSONObject) error); ok { - r0 = rf(tk, pool, protocolTxID, additionalInfo) + r0 = rf(ti, pool, protocolTxID, additionalInfo) } else { r0 = ret.Error(0) } diff --git a/mocks/tokenmocks/callbacks.go b/mocks/tokenmocks/callbacks.go index 10940a06e6..2ee9240a38 100644 --- a/mocks/tokenmocks/callbacks.go +++ b/mocks/tokenmocks/callbacks.go @@ -14,13 +14,13 @@ type Callbacks struct { mock.Mock } -// TokenPoolCreated provides a mock function with given fields: plugin, pool, protocolTxID, additionalInfo -func (_m *Callbacks) TokenPoolCreated(plugin tokens.Plugin, pool *fftypes.TokenPool, protocolTxID string, additionalInfo fftypes.JSONObject) error { - ret := _m.Called(plugin, pool, protocolTxID, additionalInfo) +// TokenOpUpdate provides a mock function with given fields: plugin, operationID, txState, errorMessage, opOutput +func (_m *Callbacks) TokenOpUpdate(plugin tokens.Plugin, operationID *fftypes.UUID, txState fftypes.OpStatus, errorMessage string, opOutput fftypes.JSONObject) error { + ret := _m.Called(plugin, operationID, txState, errorMessage, opOutput) var r0 error - if rf, ok := ret.Get(0).(func(tokens.Plugin, *fftypes.TokenPool, string, fftypes.JSONObject) error); ok { - r0 = rf(plugin, pool, protocolTxID, additionalInfo) + if rf, ok := ret.Get(0).(func(tokens.Plugin, *fftypes.UUID, fftypes.OpStatus, string, fftypes.JSONObject) error); ok { + r0 = rf(plugin, operationID, txState, errorMessage, opOutput) } else { r0 = ret.Error(0) } @@ -28,13 +28,13 @@ func (_m *Callbacks) TokenPoolCreated(plugin tokens.Plugin, pool *fftypes.TokenP return r0 } -// TokensOpUpdate provides a mock function with given fields: plugin, operationID, txState, errorMessage, opOutput -func (_m *Callbacks) TokensOpUpdate(plugin tokens.Plugin, operationID *fftypes.UUID, txState fftypes.OpStatus, errorMessage string, opOutput fftypes.JSONObject) error { - ret := _m.Called(plugin, operationID, txState, errorMessage, opOutput) +// TokenPoolCreated provides a mock function with given fields: plugin, pool, protocolTxID, additionalInfo +func (_m *Callbacks) TokenPoolCreated(plugin tokens.Plugin, pool *fftypes.TokenPool, protocolTxID string, additionalInfo fftypes.JSONObject) error { + ret := _m.Called(plugin, pool, protocolTxID, additionalInfo) var r0 error - if rf, ok := ret.Get(0).(func(tokens.Plugin, *fftypes.UUID, fftypes.OpStatus, string, fftypes.JSONObject) error); ok { - r0 = rf(plugin, operationID, txState, errorMessage, opOutput) + if rf, ok := ret.Get(0).(func(tokens.Plugin, *fftypes.TokenPool, string, fftypes.JSONObject) error); ok { + r0 = rf(plugin, pool, protocolTxID, additionalInfo) } else { r0 = ret.Error(0) } diff --git a/pkg/fftypes/event.go b/pkg/fftypes/event.go index d4471d34d9..6f6e27ee16 100644 --- a/pkg/fftypes/event.go +++ b/pkg/fftypes/event.go @@ -37,6 +37,8 @@ var ( EventTypePoolRejected EventType = ffEnum("eventtype", "token_pool_rejected") // EventTypeTransferConfirmed occurs when a token transfer has been confirmed EventTypeTransferConfirmed EventType = ffEnum("eventtype", "token_transfer_confirmed") + // EventTypeTransferOpFailed occurs when a token transfer submitted by this node has failed (based on feedback from connector) + EventTypeTransferOpFailed EventType = ffEnum("eventtype", "token_transfer_op_failed") ) // Event is an activity in the system, delivered reliably to applications, that indicates something has happened in the network diff --git a/pkg/tokens/plugin.go b/pkg/tokens/plugin.go index 07bb5970a7..d79bc9365c 100644 --- a/pkg/tokens/plugin.go +++ b/pkg/tokens/plugin.go @@ -66,7 +66,7 @@ type Callbacks interface { // Only the party submitting the transaction will see this data. // // Error should will only be returned in shutdown scenarios - TokensOpUpdate(plugin Plugin, operationID *fftypes.UUID, txState fftypes.OpStatus, errorMessage string, opOutput fftypes.JSONObject) error + TokenOpUpdate(plugin Plugin, operationID *fftypes.UUID, txState fftypes.OpStatus, errorMessage string, opOutput fftypes.JSONObject) error // TokenPoolCreated notifies on the creation of a new token pool, which might have been // submitted by us, or by any other authorized party in the network. From eda999578cde2f74aa7342d2afac68a2a10ef0eb Mon Sep 17 00:00:00 2001 From: Andrew Richardson Date: Tue, 26 Oct 2021 16:31:27 -0400 Subject: [PATCH 2/3] Only send transfer message if token transfer is successful The message is now stashed away on the Operation inputs, then loaded and sent by the event manager if the transfer succeeds. Signed-off-by: Andrew Richardson --- docs/swagger/swagger.yaml | 3 + internal/assets/token_transfer.go | 44 +++++---- internal/assets/token_transfer_test.go | 107 +++------------------ internal/events/event_manager.go | 10 +- internal/events/event_manager_test.go | 12 ++- internal/events/tokens_transferred.go | 34 +++++-- internal/events/tokens_transferred_test.go | 65 ++++++++++++- internal/orchestrator/orchestrator.go | 2 +- internal/txcommon/token_inputs.go | 4 +- pkg/fftypes/message.go | 2 + 10 files changed, 151 insertions(+), 132 deletions(-) diff --git a/docs/swagger/swagger.yaml b/docs/swagger/swagger.yaml index d0b1745b63..c530053d23 100644 --- a/docs/swagger/swagger.yaml +++ b/docs/swagger/swagger.yaml @@ -4709,6 +4709,7 @@ paths: type: array state: enum: + - notready - ready - pending - confirmed @@ -4926,6 +4927,7 @@ paths: type: array state: enum: + - notready - ready - pending - confirmed @@ -5307,6 +5309,7 @@ paths: type: array state: enum: + - notready - ready - pending - confirmed diff --git a/internal/assets/token_transfer.go b/internal/assets/token_transfer.go index 8f1a921044..fcf7075e0c 100644 --- a/internal/assets/token_transfer.go +++ b/internal/assets/token_transfer.go @@ -174,41 +174,38 @@ func (am *assetManager) TransferTokens(ctx context.Context, ns, connector, poolN } func (s *transferSender) resolveAndSend(ctx context.Context, method sendMethod) (err error) { - var messageSender sysmessaging.MessageSender if !s.resolved { - if messageSender, err = s.resolve(ctx); err != nil { + if err = s.resolve(ctx); err != nil { return err } s.resolved = true } - if messageSender != nil { - if method == methodSendAndWait { - if err = s.sendInternal(ctx, method); err != nil { - return err - } - return messageSender.SendAndWait(ctx) - } - - if err := messageSender.Send(ctx); err != nil { - return err - } + if method == methodSendAndWait && s.transfer.Message != nil { + // Begin waiting for the message, and trigger the transfer. + // A successful transfer will trigger the message via the event handler, so we can wait for it all to complete. + _, err := s.mgr.syncasync.WaitForMessage(ctx, s.namespace, s.transfer.Message.Header.ID, func(ctx context.Context) error { + return s.sendInternal(ctx, methodSendAndWait) + }) + return err } + return s.sendInternal(ctx, method) } -func (s *transferSender) resolve(ctx context.Context) (sender sysmessaging.MessageSender, err error) { +func (s *transferSender) resolve(ctx context.Context) error { // Resolve the attached message if s.transfer.Message != nil { - if sender, err = s.buildTransferMessage(ctx, s.namespace, s.transfer.Message); err != nil { - return nil, err + sender, err := s.buildTransferMessage(ctx, s.namespace, s.transfer.Message) + if err != nil { + return err } if err = sender.Prepare(ctx); err != nil { - return nil, err + return err } s.transfer.MessageHash = s.transfer.Message.Hash } - return sender, nil + return nil } func (s *transferSender) sendInternal(ctx context.Context, method sendMethod) error { @@ -262,8 +259,15 @@ func (s *transferSender) sendInternal(ctx context.Context, method sendMethod) er s.transfer.PoolProtocolID = pool.ProtocolID err = s.mgr.database.UpsertTransaction(ctx, tx, false /* should be new, or idempotent replay */) - if err == nil { - err = s.mgr.database.UpsertOperation(ctx, op, false) + if err != nil { + return err + } + if err = s.mgr.database.UpsertOperation(ctx, op, false); err != nil { + return err + } + if s.transfer.Message != nil { + s.transfer.Message.State = fftypes.MessageStateNotReady + err = s.mgr.database.UpsertMessage(ctx, &s.transfer.Message.Message, false, false) } return err }) diff --git a/internal/assets/token_transfer_test.go b/internal/assets/token_transfer_test.go index 51e6d058c4..71c9956ae2 100644 --- a/internal/assets/token_transfer_test.go +++ b/internal/assets/token_transfer_test.go @@ -486,7 +486,9 @@ func TestTransferTokensWithBroadcastMessage(t *testing.T) { mdi.On("UpsertOperation", context.Background(), mock.Anything, false).Return(nil) mbm.On("NewBroadcast", "ns1", transfer.Message).Return(mms) mms.On("Prepare", context.Background()).Return(nil) - mms.On("Send", context.Background()).Return(nil) + mdi.On("UpsertMessage", context.Background(), mock.MatchedBy(func(msg *fftypes.Message) bool { + return msg.State == fftypes.MessageStateNotReady + }), false, false).Return(nil) _, err := am.TransferTokens(context.Background(), "ns1", "magic-tokens", "pool1", transfer, false) assert.NoError(t, err) @@ -499,41 +501,6 @@ func TestTransferTokensWithBroadcastMessage(t *testing.T) { mms.AssertExpectations(t) } -func TestTransferTokensWithBroadcastMessageFail(t *testing.T) { - am, cancel := newTestAssets(t) - defer cancel() - - transfer := &fftypes.TokenTransferInput{ - TokenTransfer: fftypes.TokenTransfer{ - From: "A", - To: "B", - }, - Message: &fftypes.MessageInOut{ - InlineData: fftypes.InlineData{ - { - Value: []byte("test data"), - }, - }, - }, - } - transfer.Amount.Int().SetInt64(5) - - mim := am.identity.(*identitymanagermocks.Manager) - mbm := am.broadcast.(*broadcastmocks.Manager) - mms := &sysmessagingmocks.MessageSender{} - mim.On("GetLocalOrganization", context.Background()).Return(&fftypes.Organization{Identity: "0x12345"}, nil) - mbm.On("NewBroadcast", "ns1", transfer.Message).Return(mms) - mms.On("Prepare", context.Background()).Return(nil) - mms.On("Send", context.Background()).Return(fmt.Errorf("pop")) - - _, err := am.TransferTokens(context.Background(), "ns1", "magic-tokens", "pool1", transfer, false) - assert.EqualError(t, err, "pop") - - mbm.AssertExpectations(t) - mim.AssertExpectations(t) - mms.AssertExpectations(t) -} - func TestTransferTokensWithBroadcastPrepareFail(t *testing.T) { am, cancel := newTestAssets(t) defer cancel() @@ -608,7 +575,9 @@ func TestTransferTokensWithPrivateMessage(t *testing.T) { mdi.On("UpsertOperation", context.Background(), mock.Anything, false).Return(nil) mpm.On("NewMessage", "ns1", transfer.Message).Return(mms) mms.On("Prepare", context.Background()).Return(nil) - mms.On("Send", context.Background()).Return(nil) + mdi.On("UpsertMessage", context.Background(), mock.MatchedBy(func(msg *fftypes.Message) bool { + return msg.State == fftypes.MessageStateNotReady + }), false, false).Return(nil) _, err := am.TransferTokens(context.Background(), "ns1", "magic-tokens", "pool1", transfer, false) assert.NoError(t, err) @@ -751,73 +720,25 @@ func TestTransferTokensWithBroadcastConfirm(t *testing.T) { }), false).Return(nil) mbm.On("NewBroadcast", "ns1", transfer.Message).Return(mms) mms.On("Prepare", context.Background()).Return(nil) - mms.On("SendAndWait", context.Background()).Return(nil) - msa.On("WaitForTokenTransfer", context.Background(), "ns1", mock.Anything, mock.Anything). + mdi.On("UpsertMessage", context.Background(), mock.MatchedBy(func(msg *fftypes.Message) bool { + return msg.State == fftypes.MessageStateNotReady + }), false, false).Return(nil) + msa.On("WaitForMessage", context.Background(), "ns1", mock.Anything, mock.Anything). Run(func(args mock.Arguments) { send := args[3].(syncasync.RequestSender) send(context.Background()) }). - Return(&transfer.TokenTransfer, nil) - - _, err := am.TransferTokens(context.Background(), "ns1", "magic-tokens", "pool1", transfer, true) - assert.NoError(t, err) - assert.Equal(t, *hash, *transfer.MessageHash) - - mbm.AssertExpectations(t) - mim.AssertExpectations(t) - mdi.AssertExpectations(t) - mti.AssertExpectations(t) - mms.AssertExpectations(t) - msa.AssertExpectations(t) -} - -func TestTransferTokensWithBroadcastConfirmTransferFail(t *testing.T) { - am, cancel := newTestAssets(t) - defer cancel() - - hash := fftypes.NewRandB32() - transfer := &fftypes.TokenTransferInput{ - TokenTransfer: fftypes.TokenTransfer{ - From: "A", - To: "B", - }, - Message: &fftypes.MessageInOut{ - Message: fftypes.Message{ - Hash: hash, - }, - InlineData: fftypes.InlineData{ - { - Value: []byte("test data"), - }, - }, - }, - } - transfer.Amount.Int().SetInt64(5) - - mdi := am.database.(*databasemocks.Plugin) - mti := am.tokens["magic-tokens"].(*tokenmocks.Plugin) - mim := am.identity.(*identitymanagermocks.Manager) - mbm := am.broadcast.(*broadcastmocks.Manager) - mms := &sysmessagingmocks.MessageSender{} - msa := am.syncasync.(*syncasyncmocks.Bridge) - mim.On("GetLocalOrganization", context.Background()).Return(&fftypes.Organization{Identity: "0x12345"}, nil) - mdi.On("GetTokenPool", context.Background(), "ns1", "pool1").Return(&fftypes.TokenPool{}, nil) - mti.On("TransferTokens", context.Background(), mock.Anything, &transfer.TokenTransfer).Return(nil) - mdi.On("UpsertTransaction", context.Background(), mock.MatchedBy(func(tx *fftypes.Transaction) bool { - return tx.Subject.Type == fftypes.TransactionTypeTokenTransfer - }), false).Return(nil) - mdi.On("UpsertOperation", context.Background(), mock.Anything, false).Return(nil) - mbm.On("NewBroadcast", "ns1", transfer.Message).Return(mms) - mms.On("Prepare", context.Background()).Return(nil) + Return(&fftypes.Message{}, nil) msa.On("WaitForTokenTransfer", context.Background(), "ns1", mock.Anything, mock.Anything). Run(func(args mock.Arguments) { send := args[3].(syncasync.RequestSender) send(context.Background()) }). - Return(nil, fmt.Errorf("pop")) + Return(&transfer.TokenTransfer, nil) _, err := am.TransferTokens(context.Background(), "ns1", "magic-tokens", "pool1", transfer, true) - assert.EqualError(t, err, "pop") + assert.NoError(t, err) + assert.Equal(t, *hash, *transfer.MessageHash) mbm.AssertExpectations(t) mim.AssertExpectations(t) diff --git a/internal/events/event_manager.go b/internal/events/event_manager.go index 57ed08e910..cee07c5b72 100644 --- a/internal/events/event_manager.go +++ b/internal/events/event_manager.go @@ -22,6 +22,7 @@ import ( "encoding/json" "strconv" + "github.com/hyperledger/firefly/internal/broadcast" "github.com/hyperledger/firefly/internal/config" "github.com/hyperledger/firefly/internal/data" "github.com/hyperledger/firefly/internal/events/eifactory" @@ -29,6 +30,7 @@ import ( "github.com/hyperledger/firefly/internal/i18n" "github.com/hyperledger/firefly/internal/identity" "github.com/hyperledger/firefly/internal/log" + "github.com/hyperledger/firefly/internal/privatemessaging" "github.com/hyperledger/firefly/internal/retry" "github.com/hyperledger/firefly/internal/syshandlers" "github.com/hyperledger/firefly/internal/sysmessaging" @@ -80,6 +82,8 @@ type eventManager struct { retry retry.Retry txhelper txcommon.Helper aggregator *aggregator + broadcast broadcast.Manager + messaging privatemessaging.Manager newEventNotifier *eventNotifier newPinNotifier *eventNotifier opCorrelationRetries int @@ -87,8 +91,8 @@ type eventManager struct { internalEvents *system.Events } -func NewEventManager(ctx context.Context, pi publicstorage.Plugin, di database.Plugin, im identity.Manager, sh syshandlers.SystemHandlers, dm data.Manager) (EventManager, error) { - if pi == nil || di == nil || im == nil || dm == nil { +func NewEventManager(ctx context.Context, pi publicstorage.Plugin, di database.Plugin, im identity.Manager, sh syshandlers.SystemHandlers, dm data.Manager, bm broadcast.Manager, pm privatemessaging.Manager) (EventManager, error) { + if pi == nil || di == nil || im == nil || dm == nil || bm == nil || pm == nil { return nil, i18n.NewError(ctx, i18n.MsgInitializationNilDepError) } newPinNotifier := newEventNotifier(ctx, "pins") @@ -100,6 +104,8 @@ func NewEventManager(ctx context.Context, pi publicstorage.Plugin, di database.P identity: im, syshandlers: sh, data: dm, + broadcast: bm, + messaging: pm, retry: retry.Retry{ InitialDelay: config.GetDuration(config.EventAggregatorRetryInitDelay), MaximumDelay: config.GetDuration(config.EventAggregatorRetryMaxDelay), diff --git a/internal/events/event_manager_test.go b/internal/events/event_manager_test.go index 0518d747ea..9d512b1300 100644 --- a/internal/events/event_manager_test.go +++ b/internal/events/event_manager_test.go @@ -23,10 +23,12 @@ import ( "github.com/hyperledger/firefly/internal/config" "github.com/hyperledger/firefly/internal/events/system" + "github.com/hyperledger/firefly/mocks/broadcastmocks" "github.com/hyperledger/firefly/mocks/databasemocks" "github.com/hyperledger/firefly/mocks/datamocks" "github.com/hyperledger/firefly/mocks/eventsmocks" "github.com/hyperledger/firefly/mocks/identitymanagermocks" + "github.com/hyperledger/firefly/mocks/privatemessagingmocks" "github.com/hyperledger/firefly/mocks/publicstoragemocks" "github.com/hyperledger/firefly/mocks/syshandlersmocks" "github.com/hyperledger/firefly/pkg/fftypes" @@ -43,8 +45,10 @@ func newTestEventManager(t *testing.T) (*eventManager, func()) { met := &eventsmocks.Plugin{} mdm := &datamocks.Manager{} msh := &syshandlersmocks.SystemHandlers{} + mbm := &broadcastmocks.Manager{} + mpm := &privatemessagingmocks.Manager{} met.On("Name").Return("ut").Maybe() - emi, err := NewEventManager(ctx, mpi, mdi, mim, msh, mdm) + emi, err := NewEventManager(ctx, mpi, mdi, mim, msh, mdm, mbm, mpm) em := emi.(*eventManager) rag := mdi.On("RunAsGroup", em.ctx, mock.Anything).Maybe() rag.RunFn = func(a mock.Arguments) { @@ -74,7 +78,7 @@ func TestStartStop(t *testing.T) { } func TestStartStopBadDependencies(t *testing.T) { - _, err := NewEventManager(context.Background(), nil, nil, nil, nil, nil) + _, err := NewEventManager(context.Background(), nil, nil, nil, nil, nil, nil, nil) assert.Regexp(t, "FF10128", err) } @@ -87,7 +91,9 @@ func TestStartStopBadTransports(t *testing.T) { mpi := &publicstoragemocks.Plugin{} mdm := &datamocks.Manager{} msh := &syshandlersmocks.SystemHandlers{} - _, err := NewEventManager(context.Background(), mpi, mdi, mim, msh, mdm) + mbm := &broadcastmocks.Manager{} + mpm := &privatemessagingmocks.Manager{} + _, err := NewEventManager(context.Background(), mpi, mdi, mim, msh, mdm, mbm, mpm) assert.Regexp(t, "FF10172", err) } diff --git a/internal/events/tokens_transferred.go b/internal/events/tokens_transferred.go index 98b11457a1..94890aeae8 100644 --- a/internal/events/tokens_transferred.go +++ b/internal/events/tokens_transferred.go @@ -26,7 +26,7 @@ import ( "github.com/hyperledger/firefly/pkg/tokens" ) -func (em *eventManager) persistTokenTransaction(ctx context.Context, ns string, transfer *fftypes.TokenTransfer, protocolTxID string, additionalInfo fftypes.JSONObject) (valid bool, err error) { +func (em *eventManager) loadTransferOperation(ctx context.Context, transfer *fftypes.TokenTransfer) error { transfer.LocalID = nil // Find a matching operation within this transaction @@ -37,11 +37,10 @@ func (em *eventManager) persistTokenTransaction(ctx context.Context, ns string, ) operations, _, err := em.database.GetOperations(ctx, filter) if err != nil { - return false, err + return err } if len(operations) > 0 { - err = txcommon.RetrieveTokenTransferInputs(ctx, operations[0], transfer) - if err != nil { + if err = txcommon.RetrieveTokenTransferInputs(ctx, operations[0], transfer); err != nil { log.L(ctx).Warnf("Failed to read operation inputs for token transfer '%s': %s", transfer.ProtocolID, err) } } @@ -49,7 +48,10 @@ func (em *eventManager) persistTokenTransaction(ctx context.Context, ns string, if transfer.LocalID == nil { transfer.LocalID = fftypes.NewUUID() } + return nil +} +func (em *eventManager) persistTokenTransaction(ctx context.Context, ns string, transfer *fftypes.TokenTransfer, protocolTxID string, additionalInfo fftypes.JSONObject) (valid bool, err error) { transaction := &fftypes.Transaction{ ID: transfer.TX.ID, Status: fftypes.OpStatusSucceeded, @@ -65,8 +67,7 @@ func (em *eventManager) persistTokenTransaction(ctx context.Context, ns string, return em.txhelper.PersistTransaction(ctx, transaction) } -func (em *eventManager) getBatchForTransfer(ctx context.Context, transfer *fftypes.TokenTransfer) (*fftypes.UUID, error) { - // Find the messages assocated with that data +func (em *eventManager) getMessageForTransfer(ctx context.Context, transfer *fftypes.TokenTransfer) (*fftypes.Message, error) { var messages []*fftypes.Message fb := database.MessageQueryFactory.NewFilter(ctx) filter := fb.And( @@ -77,7 +78,7 @@ func (em *eventManager) getBatchForTransfer(ctx context.Context, transfer *fftyp if err != nil || len(messages) == 0 { return nil, err } - return messages[0].BatchID, nil + return messages[0], nil } func (em *eventManager) TokensTransferred(tk tokens.Plugin, transfer *fftypes.TokenTransfer, protocolTxID string, additionalInfo fftypes.JSONObject) error { @@ -97,6 +98,9 @@ func (em *eventManager) TokensTransferred(tk tokens.Plugin, transfer *fftypes.To transfer.Namespace = pool.Namespace if transfer.TX.ID != nil { + if err := em.loadTransferOperation(ctx, transfer); err != nil { + return err + } if valid, err := em.persistTokenTransaction(ctx, pool.Namespace, transfer, protocolTxID, additionalInfo); err != nil || !valid { return err } @@ -137,10 +141,22 @@ func (em *eventManager) TokensTransferred(tk tokens.Plugin, transfer *fftypes.To log.L(ctx).Infof("Token transfer recorded id=%s author=%s", transfer.ProtocolID, transfer.Key) if transfer.MessageHash != nil { - if batchID, err = em.getBatchForTransfer(ctx, transfer); err != nil { - log.L(ctx).Errorf("Failed to lookup batch for token transfer '%s': %s", transfer.ProtocolID, err) + msg, err := em.getMessageForTransfer(ctx, transfer) + if err != nil { return err } + if msg != nil { + if msg.State == fftypes.MessageStateNotReady { + // Message can now be sent + msg.State = fftypes.MessageStateReady + if err := em.database.UpsertMessage(ctx, msg, true, false); err != nil { + return err + } + } else { + // Message was already received - aggregator will need to be rewound + batchID = msg.BatchID + } + } } event := fftypes.NewEvent(fftypes.EventTypeTransferConfirmed, pool.Namespace, transfer.LocalID) diff --git a/internal/events/tokens_transferred_test.go b/internal/events/tokens_transferred_test.go index a952a42ed2..9d58264785 100644 --- a/internal/events/tokens_transferred_test.go +++ b/internal/events/tokens_transferred_test.go @@ -164,7 +164,7 @@ func TestTokensTransferredAddBalanceIgnore(t *testing.T) { mti.AssertExpectations(t) } -func TestTokensTransferredWithMessage(t *testing.T) { +func TestTokensTransferredWithMessageReceived(t *testing.T) { em, cancel := newTestEventManager(t) defer cancel() @@ -222,3 +222,66 @@ func TestTokensTransferredWithMessage(t *testing.T) { mdi.AssertExpectations(t) mti.AssertExpectations(t) } + +func TestTokensTransferredWithMessageSend(t *testing.T) { + em, cancel := newTestEventManager(t) + defer cancel() + + mdi := em.database.(*databasemocks.Plugin) + mti := &tokenmocks.Plugin{} + + transfer := &fftypes.TokenTransfer{ + Type: fftypes.TokenTransferTypeTransfer, + PoolProtocolID: "F1", + TokenIndex: "0", + Connector: "erc1155", + Key: "0x12345", + From: "0x1", + To: "0x2", + MessageHash: fftypes.NewRandB32(), + } + transfer.Amount.Int().SetInt64(1) + fromBalance := &fftypes.TokenBalanceChange{ + PoolProtocolID: "F1", + TokenIndex: "0", + Connector: "erc1155", + Namespace: "ns1", + Key: "0x1", + } + fromBalance.Amount.Int().SetInt64(-1) + toBalance := &fftypes.TokenBalanceChange{ + PoolProtocolID: "F1", + TokenIndex: "0", + Connector: "erc1155", + Namespace: "ns1", + Key: "0x2", + } + toBalance.Amount.Int().SetInt64(1) + pool := &fftypes.TokenPool{ + Namespace: "ns1", + } + messages := []*fftypes.Message{{ + BatchID: fftypes.NewUUID(), + State: fftypes.MessageStateNotReady, + }} + + mdi.On("GetTokenPoolByProtocolID", em.ctx, "F1").Return(pool, nil).Times(2) + mdi.On("UpsertTokenTransfer", em.ctx, transfer).Return(nil).Times(2) + mdi.On("AddTokenAccountBalance", em.ctx, fromBalance).Return(nil).Times(2) + mdi.On("AddTokenAccountBalance", em.ctx, toBalance).Return(nil).Times(2) + mdi.On("GetMessages", em.ctx, mock.Anything).Return(messages, nil, nil).Times(2) + mdi.On("UpsertMessage", em.ctx, mock.Anything, true, false).Return(fmt.Errorf("pop")) + mdi.On("UpsertMessage", em.ctx, mock.MatchedBy(func(msg *fftypes.Message) bool { + return msg.State == fftypes.MessageStateReady + }), true, false).Return(nil) + mdi.On("InsertEvent", em.ctx, mock.MatchedBy(func(ev *fftypes.Event) bool { + return ev.Type == fftypes.EventTypeTransferConfirmed && ev.Reference == transfer.LocalID && ev.Namespace == pool.Namespace + })).Return(nil).Once() + + info := fftypes.JSONObject{"some": "info"} + err := em.TokensTransferred(mti, transfer, "tx1", info) + assert.NoError(t, err) + + mdi.AssertExpectations(t) + mti.AssertExpectations(t) +} diff --git a/internal/orchestrator/orchestrator.go b/internal/orchestrator/orchestrator.go index fd14c34022..baf2d90d34 100644 --- a/internal/orchestrator/orchestrator.go +++ b/internal/orchestrator/orchestrator.go @@ -414,7 +414,7 @@ func (or *orchestrator) initComponents(ctx context.Context) (err error) { or.syshandlers = syshandlers.NewSystemHandlers(or.database, or.dataexchange, or.data, or.broadcast, or.messaging, or.assets) if or.events == nil { - or.events, err = events.NewEventManager(ctx, or.publicstorage, or.database, or.identity, or.syshandlers, or.data) + or.events, err = events.NewEventManager(ctx, or.publicstorage, or.database, or.identity, or.syshandlers, or.data, or.broadcast, or.messaging) if err != nil { return err } diff --git a/internal/txcommon/token_inputs.go b/internal/txcommon/token_inputs.go index cb1e775419..4383c2248e 100644 --- a/internal/txcommon/token_inputs.go +++ b/internal/txcommon/token_inputs.go @@ -29,9 +29,7 @@ func AddTokenTransferInputs(op *fftypes.Operation, transfer *fftypes.TokenTransf } func RetrieveTokenTransferInputs(ctx context.Context, op *fftypes.Operation, transfer *fftypes.TokenTransfer) (err error) { - input := &op.Input - transfer.LocalID, err = fftypes.ParseUUID(ctx, input.GetString("id")) - if err != nil { + if transfer.LocalID, err = fftypes.ParseUUID(ctx, op.Input.GetString("id")); err != nil { return err } return nil diff --git a/pkg/fftypes/message.go b/pkg/fftypes/message.go index 11ce41b393..daa998003f 100644 --- a/pkg/fftypes/message.go +++ b/pkg/fftypes/message.go @@ -51,6 +51,8 @@ var ( type MessageState = FFEnum var ( + // MessageStateNotReady is a message created locally which is not ready to send + MessageStateNotReady MessageState = ffEnum("messagestate", "notready") // MessageStateReady is a message created locally which is ready to send MessageStateReady MessageState = ffEnum("messagestate", "ready") // MessageStatePending is a message that has been received but is awaiting aggregation/confirmation From a766f1cdd0a52c555884be7d14410484bb3aca5c Mon Sep 17 00:00:00 2001 From: Andrew Richardson Date: Fri, 29 Oct 2021 14:52:05 -0400 Subject: [PATCH 3/3] Rename message state "notready" to "staged" Signed-off-by: Andrew Richardson --- docs/swagger/swagger.yaml | 6 +++--- internal/assets/token_transfer.go | 2 +- internal/assets/token_transfer_test.go | 6 +++--- internal/events/tokens_transferred.go | 2 +- internal/events/tokens_transferred_test.go | 2 +- pkg/fftypes/message.go | 4 ++-- 6 files changed, 11 insertions(+), 11 deletions(-) diff --git a/docs/swagger/swagger.yaml b/docs/swagger/swagger.yaml index c530053d23..d39be52e5e 100644 --- a/docs/swagger/swagger.yaml +++ b/docs/swagger/swagger.yaml @@ -4709,7 +4709,7 @@ paths: type: array state: enum: - - notready + - staged - ready - pending - confirmed @@ -4927,7 +4927,7 @@ paths: type: array state: enum: - - notready + - staged - ready - pending - confirmed @@ -5309,7 +5309,7 @@ paths: type: array state: enum: - - notready + - staged - ready - pending - confirmed diff --git a/internal/assets/token_transfer.go b/internal/assets/token_transfer.go index fcf7075e0c..c9835528ae 100644 --- a/internal/assets/token_transfer.go +++ b/internal/assets/token_transfer.go @@ -266,7 +266,7 @@ func (s *transferSender) sendInternal(ctx context.Context, method sendMethod) er return err } if s.transfer.Message != nil { - s.transfer.Message.State = fftypes.MessageStateNotReady + s.transfer.Message.State = fftypes.MessageStateStaged err = s.mgr.database.UpsertMessage(ctx, &s.transfer.Message.Message, false, false) } return err diff --git a/internal/assets/token_transfer_test.go b/internal/assets/token_transfer_test.go index 71c9956ae2..2cc9e631e8 100644 --- a/internal/assets/token_transfer_test.go +++ b/internal/assets/token_transfer_test.go @@ -487,7 +487,7 @@ func TestTransferTokensWithBroadcastMessage(t *testing.T) { mbm.On("NewBroadcast", "ns1", transfer.Message).Return(mms) mms.On("Prepare", context.Background()).Return(nil) mdi.On("UpsertMessage", context.Background(), mock.MatchedBy(func(msg *fftypes.Message) bool { - return msg.State == fftypes.MessageStateNotReady + return msg.State == fftypes.MessageStateStaged }), false, false).Return(nil) _, err := am.TransferTokens(context.Background(), "ns1", "magic-tokens", "pool1", transfer, false) @@ -576,7 +576,7 @@ func TestTransferTokensWithPrivateMessage(t *testing.T) { mpm.On("NewMessage", "ns1", transfer.Message).Return(mms) mms.On("Prepare", context.Background()).Return(nil) mdi.On("UpsertMessage", context.Background(), mock.MatchedBy(func(msg *fftypes.Message) bool { - return msg.State == fftypes.MessageStateNotReady + return msg.State == fftypes.MessageStateStaged }), false, false).Return(nil) _, err := am.TransferTokens(context.Background(), "ns1", "magic-tokens", "pool1", transfer, false) @@ -721,7 +721,7 @@ func TestTransferTokensWithBroadcastConfirm(t *testing.T) { mbm.On("NewBroadcast", "ns1", transfer.Message).Return(mms) mms.On("Prepare", context.Background()).Return(nil) mdi.On("UpsertMessage", context.Background(), mock.MatchedBy(func(msg *fftypes.Message) bool { - return msg.State == fftypes.MessageStateNotReady + return msg.State == fftypes.MessageStateStaged }), false, false).Return(nil) msa.On("WaitForMessage", context.Background(), "ns1", mock.Anything, mock.Anything). Run(func(args mock.Arguments) { diff --git a/internal/events/tokens_transferred.go b/internal/events/tokens_transferred.go index 94890aeae8..963d95b116 100644 --- a/internal/events/tokens_transferred.go +++ b/internal/events/tokens_transferred.go @@ -146,7 +146,7 @@ func (em *eventManager) TokensTransferred(tk tokens.Plugin, transfer *fftypes.To return err } if msg != nil { - if msg.State == fftypes.MessageStateNotReady { + if msg.State == fftypes.MessageStateStaged { // Message can now be sent msg.State = fftypes.MessageStateReady if err := em.database.UpsertMessage(ctx, msg, true, false); err != nil { diff --git a/internal/events/tokens_transferred_test.go b/internal/events/tokens_transferred_test.go index 9d58264785..3ae2993305 100644 --- a/internal/events/tokens_transferred_test.go +++ b/internal/events/tokens_transferred_test.go @@ -262,7 +262,7 @@ func TestTokensTransferredWithMessageSend(t *testing.T) { } messages := []*fftypes.Message{{ BatchID: fftypes.NewUUID(), - State: fftypes.MessageStateNotReady, + State: fftypes.MessageStateStaged, }} mdi.On("GetTokenPoolByProtocolID", em.ctx, "F1").Return(pool, nil).Times(2) diff --git a/pkg/fftypes/message.go b/pkg/fftypes/message.go index daa998003f..b2fcddc17b 100644 --- a/pkg/fftypes/message.go +++ b/pkg/fftypes/message.go @@ -51,8 +51,8 @@ var ( type MessageState = FFEnum var ( - // MessageStateNotReady is a message created locally which is not ready to send - MessageStateNotReady MessageState = ffEnum("messagestate", "notready") + // MessageStateStaged is a message created locally which is not ready to send + MessageStateStaged MessageState = ffEnum("messagestate", "staged") // MessageStateReady is a message created locally which is ready to send MessageStateReady MessageState = ffEnum("messagestate", "ready") // MessageStatePending is a message that has been received but is awaiting aggregation/confirmation