diff --git a/docs/swagger/swagger.yaml b/docs/swagger/swagger.yaml index 70e5041f5e..88726790ca 100644 --- a/docs/swagger/swagger.yaml +++ b/docs/swagger/swagger.yaml @@ -4709,6 +4709,7 @@ paths: type: array state: enum: + - staged - ready - pending - confirmed @@ -4926,6 +4927,7 @@ paths: type: array state: enum: + - staged - ready - pending - confirmed @@ -5307,6 +5309,7 @@ paths: type: array state: enum: + - staged - ready - pending - confirmed diff --git a/internal/assets/manager.go b/internal/assets/manager.go index 8750aa3ccb..c2970ea28e 100644 --- a/internal/assets/manager.go +++ b/internal/assets/manager.go @@ -53,7 +53,7 @@ type Manager interface { GetTokenConnectors(ctx context.Context, ns string) ([]*fftypes.TokenConnector, error) // Bound token callbacks - TokenPoolCreated(tk tokens.Plugin, pool *fftypes.TokenPool, protocolTxID string, additionalInfo fftypes.JSONObject) error + TokenPoolCreated(ti tokens.Plugin, pool *fftypes.TokenPool, protocolTxID string, additionalInfo fftypes.JSONObject) error Start() error WaitStop() diff --git a/internal/assets/token_transfer.go b/internal/assets/token_transfer.go index 625c9dab12..3a5133a873 100644 --- a/internal/assets/token_transfer.go +++ b/internal/assets/token_transfer.go @@ -22,17 +22,11 @@ import ( "github.com/hyperledger/firefly/internal/i18n" "github.com/hyperledger/firefly/internal/sysmessaging" + "github.com/hyperledger/firefly/internal/txcommon" "github.com/hyperledger/firefly/pkg/database" "github.com/hyperledger/firefly/pkg/fftypes" ) -// Note: the counterpart to below (retrieveTokenTransferInputs) lives in the events package -func addTokenTransferInputs(op *fftypes.Operation, transfer *fftypes.TokenTransfer) { - op.Input = fftypes.JSONObject{ - "id": transfer.LocalID.String(), - } -} - func (am *assetManager) GetTokenTransfers(ctx context.Context, ns string, filter database.AndFilter) ([]*fftypes.TokenTransfer, *database.FilterResult, error) { return am.database.GetTokenTransfers(ctx, am.scopeNS(ns, filter)) } @@ -180,41 +174,38 @@ func (am *assetManager) TransferTokens(ctx context.Context, ns, connector, poolN } func (s *transferSender) resolveAndSend(ctx context.Context, method sendMethod) (err error) { - var messageSender sysmessaging.MessageSender if !s.resolved { - if messageSender, err = s.resolve(ctx); err != nil { + if err = s.resolve(ctx); err != nil { return err } s.resolved = true } - if messageSender != nil { - if method == methodSendAndWait { - if err = s.sendInternal(ctx, method); err != nil { - return err - } - return messageSender.SendAndWait(ctx) - } - - if err := messageSender.Send(ctx); err != nil { - return err - } + if method == methodSendAndWait && s.transfer.Message != nil { + // Begin waiting for the message, and trigger the transfer. + // A successful transfer will trigger the message via the event handler, so we can wait for it all to complete. + _, err := s.mgr.syncasync.WaitForMessage(ctx, s.namespace, s.transfer.Message.Header.ID, func(ctx context.Context) error { + return s.sendInternal(ctx, methodSendAndWait) + }) + return err } + return s.sendInternal(ctx, method) } -func (s *transferSender) resolve(ctx context.Context) (sender sysmessaging.MessageSender, err error) { +func (s *transferSender) resolve(ctx context.Context) error { // Resolve the attached message if s.transfer.Message != nil { - if sender, err = s.buildTransferMessage(ctx, s.namespace, s.transfer.Message); err != nil { - return nil, err + sender, err := s.buildTransferMessage(ctx, s.namespace, s.transfer.Message) + if err != nil { + return err } if err = sender.Prepare(ctx); err != nil { - return nil, err + return err } s.transfer.MessageHash = s.transfer.Message.Hash } - return sender, nil + return nil } func (s *transferSender) sendInternal(ctx context.Context, method sendMethod) error { @@ -257,7 +248,7 @@ func (s *transferSender) sendInternal(ctx context.Context, method sendMethod) er "", fftypes.OpTypeTokenTransfer, fftypes.OpStatusPending) - addTokenTransferInputs(op, &s.transfer.TokenTransfer) + txcommon.AddTokenTransferInputs(op, &s.transfer.TokenTransfer) err = s.mgr.database.RunAsGroup(ctx, func(ctx context.Context) (err error) { pool, err := s.mgr.GetTokenPool(ctx, s.namespace, s.connector, s.poolName) @@ -267,8 +258,15 @@ func (s *transferSender) sendInternal(ctx context.Context, method sendMethod) er s.transfer.PoolProtocolID = pool.ProtocolID err = s.mgr.database.UpsertTransaction(ctx, tx, false /* should be new, or idempotent replay */) - if err == nil { - err = s.mgr.database.UpsertOperation(ctx, op, false) + if err != nil { + return err + } + if err = s.mgr.database.UpsertOperation(ctx, op, false); err != nil { + return err + } + if s.transfer.Message != nil { + s.transfer.Message.State = fftypes.MessageStateStaged + err = s.mgr.database.UpsertMessage(ctx, &s.transfer.Message.Message, false, false) } return err }) diff --git a/internal/assets/token_transfer_test.go b/internal/assets/token_transfer_test.go index 51e6d058c4..2cc9e631e8 100644 --- a/internal/assets/token_transfer_test.go +++ b/internal/assets/token_transfer_test.go @@ -486,7 +486,9 @@ func TestTransferTokensWithBroadcastMessage(t *testing.T) { mdi.On("UpsertOperation", context.Background(), mock.Anything, false).Return(nil) mbm.On("NewBroadcast", "ns1", transfer.Message).Return(mms) mms.On("Prepare", context.Background()).Return(nil) - mms.On("Send", context.Background()).Return(nil) + mdi.On("UpsertMessage", context.Background(), mock.MatchedBy(func(msg *fftypes.Message) bool { + return msg.State == fftypes.MessageStateStaged + }), false, false).Return(nil) _, err := am.TransferTokens(context.Background(), "ns1", "magic-tokens", "pool1", transfer, false) assert.NoError(t, err) @@ -499,41 +501,6 @@ func TestTransferTokensWithBroadcastMessage(t *testing.T) { mms.AssertExpectations(t) } -func TestTransferTokensWithBroadcastMessageFail(t *testing.T) { - am, cancel := newTestAssets(t) - defer cancel() - - transfer := &fftypes.TokenTransferInput{ - TokenTransfer: fftypes.TokenTransfer{ - From: "A", - To: "B", - }, - Message: &fftypes.MessageInOut{ - InlineData: fftypes.InlineData{ - { - Value: []byte("test data"), - }, - }, - }, - } - transfer.Amount.Int().SetInt64(5) - - mim := am.identity.(*identitymanagermocks.Manager) - mbm := am.broadcast.(*broadcastmocks.Manager) - mms := &sysmessagingmocks.MessageSender{} - mim.On("GetLocalOrganization", context.Background()).Return(&fftypes.Organization{Identity: "0x12345"}, nil) - mbm.On("NewBroadcast", "ns1", transfer.Message).Return(mms) - mms.On("Prepare", context.Background()).Return(nil) - mms.On("Send", context.Background()).Return(fmt.Errorf("pop")) - - _, err := am.TransferTokens(context.Background(), "ns1", "magic-tokens", "pool1", transfer, false) - assert.EqualError(t, err, "pop") - - mbm.AssertExpectations(t) - mim.AssertExpectations(t) - mms.AssertExpectations(t) -} - func TestTransferTokensWithBroadcastPrepareFail(t *testing.T) { am, cancel := newTestAssets(t) defer cancel() @@ -608,7 +575,9 @@ func TestTransferTokensWithPrivateMessage(t *testing.T) { mdi.On("UpsertOperation", context.Background(), mock.Anything, false).Return(nil) mpm.On("NewMessage", "ns1", transfer.Message).Return(mms) mms.On("Prepare", context.Background()).Return(nil) - mms.On("Send", context.Background()).Return(nil) + mdi.On("UpsertMessage", context.Background(), mock.MatchedBy(func(msg *fftypes.Message) bool { + return msg.State == fftypes.MessageStateStaged + }), false, false).Return(nil) _, err := am.TransferTokens(context.Background(), "ns1", "magic-tokens", "pool1", transfer, false) assert.NoError(t, err) @@ -751,73 +720,25 @@ func TestTransferTokensWithBroadcastConfirm(t *testing.T) { }), false).Return(nil) mbm.On("NewBroadcast", "ns1", transfer.Message).Return(mms) mms.On("Prepare", context.Background()).Return(nil) - mms.On("SendAndWait", context.Background()).Return(nil) - msa.On("WaitForTokenTransfer", context.Background(), "ns1", mock.Anything, mock.Anything). + mdi.On("UpsertMessage", context.Background(), mock.MatchedBy(func(msg *fftypes.Message) bool { + return msg.State == fftypes.MessageStateStaged + }), false, false).Return(nil) + msa.On("WaitForMessage", context.Background(), "ns1", mock.Anything, mock.Anything). Run(func(args mock.Arguments) { send := args[3].(syncasync.RequestSender) send(context.Background()) }). - Return(&transfer.TokenTransfer, nil) - - _, err := am.TransferTokens(context.Background(), "ns1", "magic-tokens", "pool1", transfer, true) - assert.NoError(t, err) - assert.Equal(t, *hash, *transfer.MessageHash) - - mbm.AssertExpectations(t) - mim.AssertExpectations(t) - mdi.AssertExpectations(t) - mti.AssertExpectations(t) - mms.AssertExpectations(t) - msa.AssertExpectations(t) -} - -func TestTransferTokensWithBroadcastConfirmTransferFail(t *testing.T) { - am, cancel := newTestAssets(t) - defer cancel() - - hash := fftypes.NewRandB32() - transfer := &fftypes.TokenTransferInput{ - TokenTransfer: fftypes.TokenTransfer{ - From: "A", - To: "B", - }, - Message: &fftypes.MessageInOut{ - Message: fftypes.Message{ - Hash: hash, - }, - InlineData: fftypes.InlineData{ - { - Value: []byte("test data"), - }, - }, - }, - } - transfer.Amount.Int().SetInt64(5) - - mdi := am.database.(*databasemocks.Plugin) - mti := am.tokens["magic-tokens"].(*tokenmocks.Plugin) - mim := am.identity.(*identitymanagermocks.Manager) - mbm := am.broadcast.(*broadcastmocks.Manager) - mms := &sysmessagingmocks.MessageSender{} - msa := am.syncasync.(*syncasyncmocks.Bridge) - mim.On("GetLocalOrganization", context.Background()).Return(&fftypes.Organization{Identity: "0x12345"}, nil) - mdi.On("GetTokenPool", context.Background(), "ns1", "pool1").Return(&fftypes.TokenPool{}, nil) - mti.On("TransferTokens", context.Background(), mock.Anything, &transfer.TokenTransfer).Return(nil) - mdi.On("UpsertTransaction", context.Background(), mock.MatchedBy(func(tx *fftypes.Transaction) bool { - return tx.Subject.Type == fftypes.TransactionTypeTokenTransfer - }), false).Return(nil) - mdi.On("UpsertOperation", context.Background(), mock.Anything, false).Return(nil) - mbm.On("NewBroadcast", "ns1", transfer.Message).Return(mms) - mms.On("Prepare", context.Background()).Return(nil) + Return(&fftypes.Message{}, nil) msa.On("WaitForTokenTransfer", context.Background(), "ns1", mock.Anything, mock.Anything). Run(func(args mock.Arguments) { send := args[3].(syncasync.RequestSender) send(context.Background()) }). - Return(nil, fmt.Errorf("pop")) + Return(&transfer.TokenTransfer, nil) _, err := am.TransferTokens(context.Background(), "ns1", "magic-tokens", "pool1", transfer, true) - assert.EqualError(t, err, "pop") + assert.NoError(t, err) + assert.Equal(t, *hash, *transfer.MessageHash) mbm.AssertExpectations(t) mim.AssertExpectations(t) diff --git a/internal/events/event_manager.go b/internal/events/event_manager.go index 57ed08e910..cee07c5b72 100644 --- a/internal/events/event_manager.go +++ b/internal/events/event_manager.go @@ -22,6 +22,7 @@ import ( "encoding/json" "strconv" + "github.com/hyperledger/firefly/internal/broadcast" "github.com/hyperledger/firefly/internal/config" "github.com/hyperledger/firefly/internal/data" "github.com/hyperledger/firefly/internal/events/eifactory" @@ -29,6 +30,7 @@ import ( "github.com/hyperledger/firefly/internal/i18n" "github.com/hyperledger/firefly/internal/identity" "github.com/hyperledger/firefly/internal/log" + "github.com/hyperledger/firefly/internal/privatemessaging" "github.com/hyperledger/firefly/internal/retry" "github.com/hyperledger/firefly/internal/syshandlers" "github.com/hyperledger/firefly/internal/sysmessaging" @@ -80,6 +82,8 @@ type eventManager struct { retry retry.Retry txhelper txcommon.Helper aggregator *aggregator + broadcast broadcast.Manager + messaging privatemessaging.Manager newEventNotifier *eventNotifier newPinNotifier *eventNotifier opCorrelationRetries int @@ -87,8 +91,8 @@ type eventManager struct { internalEvents *system.Events } -func NewEventManager(ctx context.Context, pi publicstorage.Plugin, di database.Plugin, im identity.Manager, sh syshandlers.SystemHandlers, dm data.Manager) (EventManager, error) { - if pi == nil || di == nil || im == nil || dm == nil { +func NewEventManager(ctx context.Context, pi publicstorage.Plugin, di database.Plugin, im identity.Manager, sh syshandlers.SystemHandlers, dm data.Manager, bm broadcast.Manager, pm privatemessaging.Manager) (EventManager, error) { + if pi == nil || di == nil || im == nil || dm == nil || bm == nil || pm == nil { return nil, i18n.NewError(ctx, i18n.MsgInitializationNilDepError) } newPinNotifier := newEventNotifier(ctx, "pins") @@ -100,6 +104,8 @@ func NewEventManager(ctx context.Context, pi publicstorage.Plugin, di database.P identity: im, syshandlers: sh, data: dm, + broadcast: bm, + messaging: pm, retry: retry.Retry{ InitialDelay: config.GetDuration(config.EventAggregatorRetryInitDelay), MaximumDelay: config.GetDuration(config.EventAggregatorRetryMaxDelay), diff --git a/internal/events/event_manager_test.go b/internal/events/event_manager_test.go index 0518d747ea..9d512b1300 100644 --- a/internal/events/event_manager_test.go +++ b/internal/events/event_manager_test.go @@ -23,10 +23,12 @@ import ( "github.com/hyperledger/firefly/internal/config" "github.com/hyperledger/firefly/internal/events/system" + "github.com/hyperledger/firefly/mocks/broadcastmocks" "github.com/hyperledger/firefly/mocks/databasemocks" "github.com/hyperledger/firefly/mocks/datamocks" "github.com/hyperledger/firefly/mocks/eventsmocks" "github.com/hyperledger/firefly/mocks/identitymanagermocks" + "github.com/hyperledger/firefly/mocks/privatemessagingmocks" "github.com/hyperledger/firefly/mocks/publicstoragemocks" "github.com/hyperledger/firefly/mocks/syshandlersmocks" "github.com/hyperledger/firefly/pkg/fftypes" @@ -43,8 +45,10 @@ func newTestEventManager(t *testing.T) (*eventManager, func()) { met := &eventsmocks.Plugin{} mdm := &datamocks.Manager{} msh := &syshandlersmocks.SystemHandlers{} + mbm := &broadcastmocks.Manager{} + mpm := &privatemessagingmocks.Manager{} met.On("Name").Return("ut").Maybe() - emi, err := NewEventManager(ctx, mpi, mdi, mim, msh, mdm) + emi, err := NewEventManager(ctx, mpi, mdi, mim, msh, mdm, mbm, mpm) em := emi.(*eventManager) rag := mdi.On("RunAsGroup", em.ctx, mock.Anything).Maybe() rag.RunFn = func(a mock.Arguments) { @@ -74,7 +78,7 @@ func TestStartStop(t *testing.T) { } func TestStartStopBadDependencies(t *testing.T) { - _, err := NewEventManager(context.Background(), nil, nil, nil, nil, nil) + _, err := NewEventManager(context.Background(), nil, nil, nil, nil, nil, nil, nil) assert.Regexp(t, "FF10128", err) } @@ -87,7 +91,9 @@ func TestStartStopBadTransports(t *testing.T) { mpi := &publicstoragemocks.Plugin{} mdm := &datamocks.Manager{} msh := &syshandlersmocks.SystemHandlers{} - _, err := NewEventManager(context.Background(), mpi, mdi, mim, msh, mdm) + mbm := &broadcastmocks.Manager{} + mpm := &privatemessagingmocks.Manager{} + _, err := NewEventManager(context.Background(), mpi, mdi, mim, msh, mdm, mbm, mpm) assert.Regexp(t, "FF10172", err) } diff --git a/internal/events/operation_update.go b/internal/events/operation_update.go index 9b28699f7f..cdb4cc29cc 100644 --- a/internal/events/operation_update.go +++ b/internal/events/operation_update.go @@ -36,5 +36,13 @@ func (em *eventManager) OperationUpdate(plugin fftypes.Named, operationID *fftyp if err := em.database.UpdateOperation(em.ctx, op.ID, update); err != nil { return err } + + // Special handling for OpTypeTokenTransfer, which writes an event when it fails + if op.Type == fftypes.OpTypeTokenTransfer && txState == fftypes.OpStatusFailed { + event := fftypes.NewEvent(fftypes.EventTypeTransferOpFailed, op.Namespace, op.ID) + if err := em.database.InsertEvent(em.ctx, event); err != nil { + return err + } + } return nil } diff --git a/internal/events/operation_update_test.go b/internal/events/operation_update_test.go index 4c7ae4f511..751586582b 100644 --- a/internal/events/operation_update_test.go +++ b/internal/events/operation_update_test.go @@ -79,3 +79,57 @@ func TestOperationUpdateError(t *testing.T) { mdi.AssertExpectations(t) mbi.AssertExpectations(t) } + +func TestOperationUpdateTransferFail(t *testing.T) { + em, cancel := newTestEventManager(t) + defer cancel() + mdi := em.database.(*databasemocks.Plugin) + mbi := &blockchainmocks.Plugin{} + + opID := fftypes.NewUUID() + op := &fftypes.Operation{ + ID: opID, + Type: fftypes.OpTypeTokenTransfer, + Namespace: "ns1", + } + + mdi.On("GetOperationByID", em.ctx, opID).Return(op, nil) + mdi.On("UpdateOperation", em.ctx, opID, mock.Anything).Return(nil) + mdi.On("InsertEvent", em.ctx, mock.MatchedBy(func(e *fftypes.Event) bool { + return e.Type == fftypes.EventTypeTransferOpFailed && e.Namespace == "ns1" + })).Return(nil) + + info := fftypes.JSONObject{"some": "info"} + err := em.OperationUpdate(mbi, opID, fftypes.OpStatusFailed, "some error", info) + assert.NoError(t, err) + + mdi.AssertExpectations(t) + mbi.AssertExpectations(t) +} + +func TestOperationUpdateTransferEventFail(t *testing.T) { + em, cancel := newTestEventManager(t) + defer cancel() + mdi := em.database.(*databasemocks.Plugin) + mbi := &blockchainmocks.Plugin{} + + opID := fftypes.NewUUID() + op := &fftypes.Operation{ + ID: opID, + Type: fftypes.OpTypeTokenTransfer, + Namespace: "ns1", + } + + mdi.On("GetOperationByID", em.ctx, opID).Return(op, nil) + mdi.On("UpdateOperation", em.ctx, opID, mock.Anything).Return(nil) + mdi.On("InsertEvent", em.ctx, mock.MatchedBy(func(e *fftypes.Event) bool { + return e.Type == fftypes.EventTypeTransferOpFailed && e.Namespace == "ns1" + })).Return(fmt.Errorf("pop")) + + info := fftypes.JSONObject{"some": "info"} + err := em.OperationUpdate(mbi, opID, fftypes.OpStatusFailed, "some error", info) + assert.EqualError(t, err, "pop") + + mdi.AssertExpectations(t) + mbi.AssertExpectations(t) +} diff --git a/internal/events/tokens_transferred.go b/internal/events/tokens_transferred.go index 846f7247dc..963d95b116 100644 --- a/internal/events/tokens_transferred.go +++ b/internal/events/tokens_transferred.go @@ -20,21 +20,13 @@ import ( "context" "github.com/hyperledger/firefly/internal/log" + "github.com/hyperledger/firefly/internal/txcommon" "github.com/hyperledger/firefly/pkg/database" "github.com/hyperledger/firefly/pkg/fftypes" "github.com/hyperledger/firefly/pkg/tokens" ) -func retrieveTokenTransferInputs(ctx context.Context, op *fftypes.Operation, transfer *fftypes.TokenTransfer) (err error) { - input := &op.Input - transfer.LocalID, err = fftypes.ParseUUID(ctx, input.GetString("id")) - if err != nil { - return err - } - return nil -} - -func (em *eventManager) persistTokenTransaction(ctx context.Context, ns string, transfer *fftypes.TokenTransfer, protocolTxID string, additionalInfo fftypes.JSONObject) (valid bool, err error) { +func (em *eventManager) loadTransferOperation(ctx context.Context, transfer *fftypes.TokenTransfer) error { transfer.LocalID = nil // Find a matching operation within this transaction @@ -45,11 +37,10 @@ func (em *eventManager) persistTokenTransaction(ctx context.Context, ns string, ) operations, _, err := em.database.GetOperations(ctx, filter) if err != nil { - return false, err + return err } if len(operations) > 0 { - err = retrieveTokenTransferInputs(ctx, operations[0], transfer) - if err != nil { + if err = txcommon.RetrieveTokenTransferInputs(ctx, operations[0], transfer); err != nil { log.L(ctx).Warnf("Failed to read operation inputs for token transfer '%s': %s", transfer.ProtocolID, err) } } @@ -57,7 +48,10 @@ func (em *eventManager) persistTokenTransaction(ctx context.Context, ns string, if transfer.LocalID == nil { transfer.LocalID = fftypes.NewUUID() } + return nil +} +func (em *eventManager) persistTokenTransaction(ctx context.Context, ns string, transfer *fftypes.TokenTransfer, protocolTxID string, additionalInfo fftypes.JSONObject) (valid bool, err error) { transaction := &fftypes.Transaction{ ID: transfer.TX.ID, Status: fftypes.OpStatusSucceeded, @@ -73,8 +67,7 @@ func (em *eventManager) persistTokenTransaction(ctx context.Context, ns string, return em.txhelper.PersistTransaction(ctx, transaction) } -func (em *eventManager) getBatchForTransfer(ctx context.Context, transfer *fftypes.TokenTransfer) (*fftypes.UUID, error) { - // Find the messages assocated with that data +func (em *eventManager) getMessageForTransfer(ctx context.Context, transfer *fftypes.TokenTransfer) (*fftypes.Message, error) { var messages []*fftypes.Message fb := database.MessageQueryFactory.NewFilter(ctx) filter := fb.And( @@ -85,7 +78,7 @@ func (em *eventManager) getBatchForTransfer(ctx context.Context, transfer *fftyp if err != nil || len(messages) == 0 { return nil, err } - return messages[0].BatchID, nil + return messages[0], nil } func (em *eventManager) TokensTransferred(tk tokens.Plugin, transfer *fftypes.TokenTransfer, protocolTxID string, additionalInfo fftypes.JSONObject) error { @@ -105,6 +98,9 @@ func (em *eventManager) TokensTransferred(tk tokens.Plugin, transfer *fftypes.To transfer.Namespace = pool.Namespace if transfer.TX.ID != nil { + if err := em.loadTransferOperation(ctx, transfer); err != nil { + return err + } if valid, err := em.persistTokenTransaction(ctx, pool.Namespace, transfer, protocolTxID, additionalInfo); err != nil || !valid { return err } @@ -145,10 +141,22 @@ func (em *eventManager) TokensTransferred(tk tokens.Plugin, transfer *fftypes.To log.L(ctx).Infof("Token transfer recorded id=%s author=%s", transfer.ProtocolID, transfer.Key) if transfer.MessageHash != nil { - if batchID, err = em.getBatchForTransfer(ctx, transfer); err != nil { - log.L(ctx).Errorf("Failed to lookup batch for token transfer '%s': %s", transfer.ProtocolID, err) + msg, err := em.getMessageForTransfer(ctx, transfer) + if err != nil { return err } + if msg != nil { + if msg.State == fftypes.MessageStateStaged { + // Message can now be sent + msg.State = fftypes.MessageStateReady + if err := em.database.UpsertMessage(ctx, msg, true, false); err != nil { + return err + } + } else { + // Message was already received - aggregator will need to be rewound + batchID = msg.BatchID + } + } } event := fftypes.NewEvent(fftypes.EventTypeTransferConfirmed, pool.Namespace, transfer.LocalID) diff --git a/internal/events/tokens_transferred_test.go b/internal/events/tokens_transferred_test.go index a952a42ed2..3ae2993305 100644 --- a/internal/events/tokens_transferred_test.go +++ b/internal/events/tokens_transferred_test.go @@ -164,7 +164,7 @@ func TestTokensTransferredAddBalanceIgnore(t *testing.T) { mti.AssertExpectations(t) } -func TestTokensTransferredWithMessage(t *testing.T) { +func TestTokensTransferredWithMessageReceived(t *testing.T) { em, cancel := newTestEventManager(t) defer cancel() @@ -222,3 +222,66 @@ func TestTokensTransferredWithMessage(t *testing.T) { mdi.AssertExpectations(t) mti.AssertExpectations(t) } + +func TestTokensTransferredWithMessageSend(t *testing.T) { + em, cancel := newTestEventManager(t) + defer cancel() + + mdi := em.database.(*databasemocks.Plugin) + mti := &tokenmocks.Plugin{} + + transfer := &fftypes.TokenTransfer{ + Type: fftypes.TokenTransferTypeTransfer, + PoolProtocolID: "F1", + TokenIndex: "0", + Connector: "erc1155", + Key: "0x12345", + From: "0x1", + To: "0x2", + MessageHash: fftypes.NewRandB32(), + } + transfer.Amount.Int().SetInt64(1) + fromBalance := &fftypes.TokenBalanceChange{ + PoolProtocolID: "F1", + TokenIndex: "0", + Connector: "erc1155", + Namespace: "ns1", + Key: "0x1", + } + fromBalance.Amount.Int().SetInt64(-1) + toBalance := &fftypes.TokenBalanceChange{ + PoolProtocolID: "F1", + TokenIndex: "0", + Connector: "erc1155", + Namespace: "ns1", + Key: "0x2", + } + toBalance.Amount.Int().SetInt64(1) + pool := &fftypes.TokenPool{ + Namespace: "ns1", + } + messages := []*fftypes.Message{{ + BatchID: fftypes.NewUUID(), + State: fftypes.MessageStateStaged, + }} + + mdi.On("GetTokenPoolByProtocolID", em.ctx, "F1").Return(pool, nil).Times(2) + mdi.On("UpsertTokenTransfer", em.ctx, transfer).Return(nil).Times(2) + mdi.On("AddTokenAccountBalance", em.ctx, fromBalance).Return(nil).Times(2) + mdi.On("AddTokenAccountBalance", em.ctx, toBalance).Return(nil).Times(2) + mdi.On("GetMessages", em.ctx, mock.Anything).Return(messages, nil, nil).Times(2) + mdi.On("UpsertMessage", em.ctx, mock.Anything, true, false).Return(fmt.Errorf("pop")) + mdi.On("UpsertMessage", em.ctx, mock.MatchedBy(func(msg *fftypes.Message) bool { + return msg.State == fftypes.MessageStateReady + }), true, false).Return(nil) + mdi.On("InsertEvent", em.ctx, mock.MatchedBy(func(ev *fftypes.Event) bool { + return ev.Type == fftypes.EventTypeTransferConfirmed && ev.Reference == transfer.LocalID && ev.Namespace == pool.Namespace + })).Return(nil).Once() + + info := fftypes.JSONObject{"some": "info"} + err := em.TokensTransferred(mti, transfer, "tx1", info) + assert.NoError(t, err) + + mdi.AssertExpectations(t) + mti.AssertExpectations(t) +} diff --git a/internal/i18n/en_translations.go b/internal/i18n/en_translations.go index faae6f8f9f..71ecf2a6af 100644 --- a/internal/i18n/en_translations.go +++ b/internal/i18n/en_translations.go @@ -208,4 +208,5 @@ var ( MsgNoUUID = ffm("FF10288", "Field '%s' must not be a UUID", 400) MsgFetchDataDesc = ffm("FF10289", "Fetch the data and include it in the messages returned", 400) MsgWSClosed = ffm("FF10290", "Websocket closed") + MsgTokenTransferFailed = ffm("FF10291", "Token transfer with ID '%s' failed. Please check the FireFly logs for more information") ) diff --git a/internal/orchestrator/bound_callbacks.go b/internal/orchestrator/bound_callbacks.go index 891b1296b7..b4bb3d6db2 100644 --- a/internal/orchestrator/bound_callbacks.go +++ b/internal/orchestrator/bound_callbacks.go @@ -36,7 +36,7 @@ func (bc *boundCallbacks) BlockchainOpUpdate(operationID *fftypes.UUID, txState return bc.ei.OperationUpdate(bc.bi, operationID, txState, errorMessage, opOutput) } -func (bc *boundCallbacks) TokensOpUpdate(plugin tokens.Plugin, operationID *fftypes.UUID, txState fftypes.OpStatus, errorMessage string, opOutput fftypes.JSONObject) error { +func (bc *boundCallbacks) TokenOpUpdate(plugin tokens.Plugin, operationID *fftypes.UUID, txState fftypes.OpStatus, errorMessage string, opOutput fftypes.JSONObject) error { return bc.ei.OperationUpdate(plugin, operationID, txState, errorMessage, opOutput) } diff --git a/internal/orchestrator/bound_callbacks_test.go b/internal/orchestrator/bound_callbacks_test.go index 65e9fb7ab2..4735dffef4 100644 --- a/internal/orchestrator/bound_callbacks_test.go +++ b/internal/orchestrator/bound_callbacks_test.go @@ -54,7 +54,7 @@ func TestBoundCallbacks(t *testing.T) { assert.EqualError(t, err, "pop") mei.On("OperationUpdate", mti, opID, fftypes.OpStatusFailed, "error info", info).Return(fmt.Errorf("pop")) - err = bc.TokensOpUpdate(mti, opID, fftypes.OpStatusFailed, "error info", info) + err = bc.TokenOpUpdate(mti, opID, fftypes.OpStatusFailed, "error info", info) assert.EqualError(t, err, "pop") mei.On("TransferResult", mdx, "tracking12345", fftypes.OpStatusFailed, "error info", info).Return(fmt.Errorf("pop")) diff --git a/internal/orchestrator/orchestrator.go b/internal/orchestrator/orchestrator.go index d8a5e2fa00..d45edbad2e 100644 --- a/internal/orchestrator/orchestrator.go +++ b/internal/orchestrator/orchestrator.go @@ -416,7 +416,7 @@ func (or *orchestrator) initComponents(ctx context.Context) (err error) { or.syshandlers = syshandlers.NewSystemHandlers(or.database, or.dataexchange, or.data, or.broadcast, or.messaging, or.assets) if or.events == nil { - or.events, err = events.NewEventManager(ctx, or.publicstorage, or.database, or.identity, or.syshandlers, or.data) + or.events, err = events.NewEventManager(ctx, or.publicstorage, or.database, or.identity, or.syshandlers, or.data, or.broadcast, or.messaging) if err != nil { return err } diff --git a/internal/syncasync/sync_async_bridge.go b/internal/syncasync/sync_async_bridge.go index 9ddfb96aa2..b1a87a1c77 100644 --- a/internal/syncasync/sync_async_bridge.go +++ b/internal/syncasync/sync_async_bridge.go @@ -25,6 +25,7 @@ import ( "github.com/hyperledger/firefly/internal/i18n" "github.com/hyperledger/firefly/internal/log" "github.com/hyperledger/firefly/internal/sysmessaging" + "github.com/hyperledger/firefly/internal/txcommon" "github.com/hyperledger/firefly/pkg/database" "github.com/hyperledger/firefly/pkg/fftypes" ) @@ -149,9 +150,8 @@ func (inflight *inflightRequest) msInflight() float64 { return float64(dur) / float64(time.Millisecond) } -func (sa *syncAsyncBridge) getMessageFromEvent(event *fftypes.EventDelivery) (*fftypes.Message, error) { - msg, err := sa.database.GetMessageByID(sa.ctx, event.Reference) - if err != nil { +func (sa *syncAsyncBridge) getMessageFromEvent(event *fftypes.EventDelivery) (msg *fftypes.Message, err error) { + if msg, err = sa.database.GetMessageByID(sa.ctx, event.Reference); err != nil { return nil, err } if msg == nil { @@ -161,9 +161,8 @@ func (sa *syncAsyncBridge) getMessageFromEvent(event *fftypes.EventDelivery) (*f return msg, nil } -func (sa *syncAsyncBridge) getPoolFromEvent(event *fftypes.EventDelivery) (*fftypes.TokenPool, error) { - pool, err := sa.database.GetTokenPoolByID(sa.ctx, event.Reference) - if err != nil { +func (sa *syncAsyncBridge) getPoolFromEvent(event *fftypes.EventDelivery) (pool *fftypes.TokenPool, err error) { + if pool, err = sa.database.GetTokenPoolByID(sa.ctx, event.Reference); err != nil { return nil, err } if pool == nil { @@ -173,9 +172,8 @@ func (sa *syncAsyncBridge) getPoolFromEvent(event *fftypes.EventDelivery) (*ffty return pool, nil } -func (sa *syncAsyncBridge) getTransferFromEvent(event *fftypes.EventDelivery) (*fftypes.TokenTransfer, error) { - transfer, err := sa.database.GetTokenTransfer(sa.ctx, event.Reference) - if err != nil { +func (sa *syncAsyncBridge) getTransferFromEvent(event *fftypes.EventDelivery) (transfer *fftypes.TokenTransfer, err error) { + if transfer, err = sa.database.GetTokenTransfer(sa.ctx, event.Reference); err != nil { return nil, err } if transfer == nil { @@ -185,6 +183,17 @@ func (sa *syncAsyncBridge) getTransferFromEvent(event *fftypes.EventDelivery) (* return transfer, nil } +func (sa *syncAsyncBridge) getOperationFromEvent(event *fftypes.EventDelivery) (op *fftypes.Operation, err error) { + if op, err = sa.database.GetOperationByID(sa.ctx, event.Reference); err != nil { + return nil, err + } + if op == nil { + // This should not happen (but we need to move on) + log.L(sa.ctx).Errorf("Unable to resolve operation '%s' for %s event '%s'", event.Reference, event.Type, event.ID) + } + return op, nil +} + func (sa *syncAsyncBridge) eventCallback(event *fftypes.EventDelivery) error { sa.inflightMux.Lock() defer sa.inflightMux.Unlock() @@ -221,7 +230,7 @@ func (sa *syncAsyncBridge) eventCallback(event *fftypes.EventDelivery) error { // See if this is a rejection of an inflight message inflight := sa.getInFlight(event.Namespace, messageConfirm, msg.Header.ID) if inflight != nil { - go sa.resolveRejected(inflight, msg) + go sa.resolveRejected(inflight, msg.Header.ID) } case fftypes.EventTypePoolConfirmed: @@ -243,7 +252,7 @@ func (sa *syncAsyncBridge) eventCallback(event *fftypes.EventDelivery) error { // See if this is a rejection of an inflight token pool inflight := sa.getInFlight(event.Namespace, tokenPoolConfirm, pool.ID) if inflight != nil { - go sa.resolveRejectedTokenPool(inflight, pool) + go sa.resolveRejectedTokenPool(inflight, pool.ID) } case fftypes.EventTypeTransferConfirmed: @@ -256,6 +265,22 @@ func (sa *syncAsyncBridge) eventCallback(event *fftypes.EventDelivery) error { if inflight != nil { go sa.resolveConfirmedTokenTransfer(inflight, transfer) } + + case fftypes.EventTypeTransferOpFailed: + op, err := sa.getOperationFromEvent(event) + if err != nil || op == nil { + return err + } + // Extract the LocalID of the transfer + var transfer fftypes.TokenTransfer + if err := txcommon.RetrieveTokenTransferInputs(sa.ctx, op, &transfer); err != nil { + log.L(sa.ctx).Warnf("Failed to extract token transfer inputs for operation '%s': %s", op.ID, err) + } + // See if this is a failure of an inflight token transfer operation + inflight := sa.getInFlight(event.Namespace, tokenTransferConfirm, transfer.LocalID) + if inflight != nil { + go sa.resolveFailedTokenTransfer(inflight, transfer.LocalID) + } } return nil @@ -279,8 +304,8 @@ func (sa *syncAsyncBridge) resolveConfirmed(inflight *inflightRequest, msg *ffty inflight.response <- inflightResponse{id: msg.Header.ID, data: msg} } -func (sa *syncAsyncBridge) resolveRejected(inflight *inflightRequest, msg *fftypes.Message) { - err := i18n.NewError(sa.ctx, i18n.MsgRejected, msg.Header.ID) +func (sa *syncAsyncBridge) resolveRejected(inflight *inflightRequest, msgID *fftypes.UUID) { + err := i18n.NewError(sa.ctx, i18n.MsgRejected, msgID) log.L(sa.ctx).Errorf("Resolving message confirmation request '%s' with error: %s", inflight.id, err) inflight.response <- inflightResponse{err: err} } @@ -290,8 +315,8 @@ func (sa *syncAsyncBridge) resolveConfirmedTokenPool(inflight *inflightRequest, inflight.response <- inflightResponse{id: pool.ID, data: pool} } -func (sa *syncAsyncBridge) resolveRejectedTokenPool(inflight *inflightRequest, pool *fftypes.TokenPool) { - err := i18n.NewError(sa.ctx, i18n.MsgTokenPoolRejected, pool.ID) +func (sa *syncAsyncBridge) resolveRejectedTokenPool(inflight *inflightRequest, poolID *fftypes.UUID) { + err := i18n.NewError(sa.ctx, i18n.MsgTokenPoolRejected, poolID) log.L(sa.ctx).Errorf("Resolving token pool confirmation request '%s' with error '%s'", inflight.id, err) inflight.response <- inflightResponse{err: err} } @@ -301,6 +326,12 @@ func (sa *syncAsyncBridge) resolveConfirmedTokenTransfer(inflight *inflightReque inflight.response <- inflightResponse{id: transfer.LocalID, data: transfer} } +func (sa *syncAsyncBridge) resolveFailedTokenTransfer(inflight *inflightRequest, transferID *fftypes.UUID) { + err := i18n.NewError(sa.ctx, i18n.MsgTokenTransferFailed, transferID) + log.L(sa.ctx).Debugf("Resolving token transfer confirmation request '%s' with error '%s'", inflight.id, err) + inflight.response <- inflightResponse{err: err} +} + func (sa *syncAsyncBridge) sendAndWait(ctx context.Context, ns string, id *fftypes.UUID, reqType requestType, send RequestSender) (interface{}, error) { inflight, err := sa.addInFlight(ns, id, reqType) if err != nil { diff --git a/internal/syncasync/sync_async_bridge_test.go b/internal/syncasync/sync_async_bridge_test.go index 3fbf1c1ad9..1f53b55a43 100644 --- a/internal/syncasync/sync_async_bridge_test.go +++ b/internal/syncasync/sync_async_bridge_test.go @@ -54,7 +54,6 @@ func TestRequestReplyOk(t *testing.T) { mdi := sa.database.(*databasemocks.Plugin) gmid := mdi.On("GetMessageByID", sa.ctx, mock.Anything) gmid.RunFn = func(a mock.Arguments) { - assert.NotNil(t, requestID) gmid.ReturnArguments = mock.Arguments{ &fftypes.Message{ Header: fftypes.MessageHeader{ @@ -106,7 +105,6 @@ func TestAwaitConfirmationOk(t *testing.T) { mdi := sa.database.(*databasemocks.Plugin) gmid := mdi.On("GetMessageByID", sa.ctx, mock.Anything) gmid.RunFn = func(a mock.Arguments) { - assert.NotNil(t, requestID) msgSent := &fftypes.Message{} msgSent.Header.ID = requestID msgSent.Confirmed = fftypes.Now() @@ -153,7 +151,6 @@ func TestAwaitConfirmationRejected(t *testing.T) { mdi := sa.database.(*databasemocks.Plugin) gmid := mdi.On("GetMessageByID", sa.ctx, mock.Anything) gmid.RunFn = func(a mock.Arguments) { - assert.NotNil(t, requestID) msgSent := &fftypes.Message{} msgSent.Header.ID = requestID msgSent.Confirmed = fftypes.Now() @@ -506,7 +503,6 @@ func TestAwaitTokenPoolConfirmation(t *testing.T) { mdi := sa.database.(*databasemocks.Plugin) gmid := mdi.On("GetTokenPoolByID", sa.ctx, mock.Anything) gmid.RunFn = func(a mock.Arguments) { - assert.NotNil(t, requestID) pool := &fftypes.TokenPool{ ID: requestID, Name: "my-pool", @@ -561,7 +557,6 @@ func TestAwaitTokenPoolConfirmationRejected(t *testing.T) { mdi := sa.database.(*databasemocks.Plugin) gmid := mdi.On("GetTokenPoolByID", sa.ctx, mock.Anything) gmid.RunFn = func(a mock.Arguments) { - assert.NotNil(t, requestID) pool := &fftypes.TokenPool{ ID: requestID, Name: "my-pool", @@ -600,13 +595,12 @@ func TestAwaitTokenTransferConfirmation(t *testing.T) { mdi := sa.database.(*databasemocks.Plugin) gmid := mdi.On("GetTokenTransfer", sa.ctx, mock.Anything) gmid.RunFn = func(a mock.Arguments) { - assert.NotNil(t, requestID) - pool := &fftypes.TokenTransfer{ + transfer := &fftypes.TokenTransfer{ LocalID: requestID, ProtocolID: "abc", } gmid.ReturnArguments = mock.Arguments{ - pool, nil, + transfer, nil, } } @@ -641,3 +635,141 @@ func TestAwaitTokenTransferConfirmationSendFail(t *testing.T) { }) assert.EqualError(t, err, "pop") } + +func TestAwaitFailedTokenTransfer(t *testing.T) { + + sa, cancel := newTestSyncAsyncBridge(t) + defer cancel() + + requestID := fftypes.NewUUID() + op := &fftypes.Operation{ + ID: fftypes.NewUUID(), + Input: fftypes.JSONObject{ + "id": requestID.String(), + }, + } + + mse := sa.sysevents.(*sysmessagingmocks.SystemEvents) + mse.On("AddSystemEventListener", "ns1", mock.Anything).Return(nil) + + mdi := sa.database.(*databasemocks.Plugin) + mdi.On("GetOperationByID", sa.ctx, op.ID).Return(op, nil) + + _, err := sa.WaitForTokenTransfer(sa.ctx, "ns1", requestID, func(ctx context.Context) error { + go func() { + sa.eventCallback(&fftypes.EventDelivery{ + Event: fftypes.Event{ + ID: fftypes.NewUUID(), + Type: fftypes.EventTypeTransferOpFailed, + Reference: op.ID, + Namespace: "ns1", + }, + }) + }() + return nil + }) + assert.Regexp(t, "FF10291", err) +} + +func TestFailedTokenTransferOpError(t *testing.T) { + + sa, cancel := newTestSyncAsyncBridge(t) + defer cancel() + + requestID := fftypes.NewUUID() + sa.inflight = map[string]map[fftypes.UUID]*inflightRequest{ + "ns1": { + *requestID: &inflightRequest{}, + }, + } + + op := &fftypes.Operation{ + ID: fftypes.NewUUID(), + Input: fftypes.JSONObject{ + "id": requestID.String(), + }, + } + + mdi := sa.database.(*databasemocks.Plugin) + mdi.On("GetOperationByID", sa.ctx, op.ID).Return(nil, fmt.Errorf("pop")) + + err := sa.eventCallback(&fftypes.EventDelivery{ + Event: fftypes.Event{ + ID: fftypes.NewUUID(), + Type: fftypes.EventTypeTransferOpFailed, + Reference: op.ID, + Namespace: "ns1", + }, + }) + assert.EqualError(t, err, "pop") + + mdi.AssertExpectations(t) +} + +func TestFailedTokenTransferOpNotFound(t *testing.T) { + + sa, cancel := newTestSyncAsyncBridge(t) + defer cancel() + + requestID := fftypes.NewUUID() + sa.inflight = map[string]map[fftypes.UUID]*inflightRequest{ + "ns1": { + *requestID: &inflightRequest{}, + }, + } + + op := &fftypes.Operation{ + ID: fftypes.NewUUID(), + Input: fftypes.JSONObject{ + "id": requestID.String(), + }, + } + + mdi := sa.database.(*databasemocks.Plugin) + mdi.On("GetOperationByID", sa.ctx, op.ID).Return(nil, nil) + + err := sa.eventCallback(&fftypes.EventDelivery{ + Event: fftypes.Event{ + ID: fftypes.NewUUID(), + Type: fftypes.EventTypeTransferOpFailed, + Reference: op.ID, + Namespace: "ns1", + }, + }) + assert.NoError(t, err) + + mdi.AssertExpectations(t) +} + +func TestFailedTokenTransferIDLookupFail(t *testing.T) { + + sa, cancel := newTestSyncAsyncBridge(t) + defer cancel() + + requestID := fftypes.NewUUID() + sa.inflight = map[string]map[fftypes.UUID]*inflightRequest{ + "ns1": { + *requestID: &inflightRequest{}, + }, + } + + op := &fftypes.Operation{ + ID: fftypes.NewUUID(), + Input: fftypes.JSONObject{}, + } + + mdi := sa.database.(*databasemocks.Plugin) + mdi.On("GetOperationByID", sa.ctx, op.ID).Return(op, nil) + + err := sa.eventCallback(&fftypes.EventDelivery{ + Event: fftypes.Event{ + ID: fftypes.NewUUID(), + Type: fftypes.EventTypeTransferOpFailed, + Reference: op.ID, + Namespace: "ns1", + }, + }) + assert.NoError(t, err) + + mdi.AssertExpectations(t) +} diff --git a/internal/tokens/fftokens/fftokens.go b/internal/tokens/fftokens/fftokens.go index 8ff6f37bd8..228a199e5b 100644 --- a/internal/tokens/fftokens/fftokens.go +++ b/internal/tokens/fftokens/fftokens.go @@ -151,7 +151,7 @@ func (ft *FFTokens) handleReceipt(ctx context.Context, data fftypes.JSONObject) replyType = fftypes.OpStatusFailed } l.Infof("Tokens '%s' reply: request=%s message=%s", replyType, requestID, message) - return ft.callbacks.TokensOpUpdate(ft, operationID, replyType, message, data) + return ft.callbacks.TokenOpUpdate(ft, operationID, replyType, message, data) } func (ft *FFTokens) handleTokenPoolCreate(ctx context.Context, data fftypes.JSONObject) (err error) { diff --git a/internal/tokens/fftokens/fftokens_test.go b/internal/tokens/fftokens/fftokens_test.go index bc3f6228d0..dc5e01eff7 100644 --- a/internal/tokens/fftokens/fftokens_test.go +++ b/internal/tokens/fftokens/fftokens_test.go @@ -356,11 +356,11 @@ func TestEvents(t *testing.T) { fromServer <- `{"id":"3","event":"receipt","data":{"id":"abc"}}` // receipt: success - mcb.On("TokensOpUpdate", h, opID, fftypes.OpStatusSucceeded, "", mock.Anything).Return(nil).Once() + mcb.On("TokenOpUpdate", h, opID, fftypes.OpStatusSucceeded, "", mock.Anything).Return(nil).Once() fromServer <- `{"id":"4","event":"receipt","data":{"id":"` + opID.String() + `","success":true}}` // receipt: failure - mcb.On("TokensOpUpdate", h, opID, fftypes.OpStatusFailed, "", mock.Anything).Return(nil).Once() + mcb.On("TokenOpUpdate", h, opID, fftypes.OpStatusFailed, "", mock.Anything).Return(nil).Once() fromServer <- `{"id":"5","event":"receipt","data":{"id":"` + opID.String() + `","success":false}}` // token-pool: missing data diff --git a/internal/txcommon/token_inputs.go b/internal/txcommon/token_inputs.go new file mode 100644 index 0000000000..4383c2248e --- /dev/null +++ b/internal/txcommon/token_inputs.go @@ -0,0 +1,36 @@ +// Copyright © 2021 Kaleido, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package txcommon + +import ( + "context" + + "github.com/hyperledger/firefly/pkg/fftypes" +) + +func AddTokenTransferInputs(op *fftypes.Operation, transfer *fftypes.TokenTransfer) { + op.Input = fftypes.JSONObject{ + "id": transfer.LocalID.String(), + } +} + +func RetrieveTokenTransferInputs(ctx context.Context, op *fftypes.Operation, transfer *fftypes.TokenTransfer) (err error) { + if transfer.LocalID, err = fftypes.ParseUUID(ctx, op.Input.GetString("id")); err != nil { + return err + } + return nil +} diff --git a/internal/txcommon/token_inputs_test.go b/internal/txcommon/token_inputs_test.go new file mode 100644 index 0000000000..d56fb61a8e --- /dev/null +++ b/internal/txcommon/token_inputs_test.go @@ -0,0 +1,61 @@ +// Copyright © 2021 Kaleido, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package txcommon + +import ( + "context" + "testing" + + "github.com/hyperledger/firefly/pkg/fftypes" + "github.com/stretchr/testify/assert" +) + +func TestAddTokenTransferInputs(t *testing.T) { + op := &fftypes.Operation{} + transfer := &fftypes.TokenTransfer{ + LocalID: fftypes.NewUUID(), + } + + AddTokenTransferInputs(op, transfer) + assert.Equal(t, transfer.LocalID.String(), op.Input.GetString("id")) +} + +func TestRetrieveTokenTransferInputs(t *testing.T) { + id := fftypes.NewUUID() + op := &fftypes.Operation{ + Input: fftypes.JSONObject{ + "id": id.String(), + }, + } + transfer := &fftypes.TokenTransfer{} + + err := RetrieveTokenTransferInputs(context.Background(), op, transfer) + assert.NoError(t, err) + assert.Equal(t, *id, *transfer.LocalID) +} + +func TestRetrieveTokenTransferInputsBadID(t *testing.T) { + op := &fftypes.Operation{ + Input: fftypes.JSONObject{ + "id": "bad", + }, + } + transfer := &fftypes.TokenTransfer{} + + err := RetrieveTokenTransferInputs(context.Background(), op, transfer) + assert.Regexp(t, "FF10142", err) +} diff --git a/mocks/assetmocks/manager.go b/mocks/assetmocks/manager.go index 059502600a..7837aa8bd6 100644 --- a/mocks/assetmocks/manager.go +++ b/mocks/assetmocks/manager.go @@ -403,13 +403,13 @@ func (_m *Manager) Start() error { return r0 } -// TokenPoolCreated provides a mock function with given fields: tk, pool, protocolTxID, additionalInfo -func (_m *Manager) TokenPoolCreated(tk tokens.Plugin, pool *fftypes.TokenPool, protocolTxID string, additionalInfo fftypes.JSONObject) error { - ret := _m.Called(tk, pool, protocolTxID, additionalInfo) +// TokenPoolCreated provides a mock function with given fields: ti, pool, protocolTxID, additionalInfo +func (_m *Manager) TokenPoolCreated(ti tokens.Plugin, pool *fftypes.TokenPool, protocolTxID string, additionalInfo fftypes.JSONObject) error { + ret := _m.Called(ti, pool, protocolTxID, additionalInfo) var r0 error if rf, ok := ret.Get(0).(func(tokens.Plugin, *fftypes.TokenPool, string, fftypes.JSONObject) error); ok { - r0 = rf(tk, pool, protocolTxID, additionalInfo) + r0 = rf(ti, pool, protocolTxID, additionalInfo) } else { r0 = ret.Error(0) } diff --git a/mocks/tokenmocks/callbacks.go b/mocks/tokenmocks/callbacks.go index 10940a06e6..2ee9240a38 100644 --- a/mocks/tokenmocks/callbacks.go +++ b/mocks/tokenmocks/callbacks.go @@ -14,13 +14,13 @@ type Callbacks struct { mock.Mock } -// TokenPoolCreated provides a mock function with given fields: plugin, pool, protocolTxID, additionalInfo -func (_m *Callbacks) TokenPoolCreated(plugin tokens.Plugin, pool *fftypes.TokenPool, protocolTxID string, additionalInfo fftypes.JSONObject) error { - ret := _m.Called(plugin, pool, protocolTxID, additionalInfo) +// TokenOpUpdate provides a mock function with given fields: plugin, operationID, txState, errorMessage, opOutput +func (_m *Callbacks) TokenOpUpdate(plugin tokens.Plugin, operationID *fftypes.UUID, txState fftypes.OpStatus, errorMessage string, opOutput fftypes.JSONObject) error { + ret := _m.Called(plugin, operationID, txState, errorMessage, opOutput) var r0 error - if rf, ok := ret.Get(0).(func(tokens.Plugin, *fftypes.TokenPool, string, fftypes.JSONObject) error); ok { - r0 = rf(plugin, pool, protocolTxID, additionalInfo) + if rf, ok := ret.Get(0).(func(tokens.Plugin, *fftypes.UUID, fftypes.OpStatus, string, fftypes.JSONObject) error); ok { + r0 = rf(plugin, operationID, txState, errorMessage, opOutput) } else { r0 = ret.Error(0) } @@ -28,13 +28,13 @@ func (_m *Callbacks) TokenPoolCreated(plugin tokens.Plugin, pool *fftypes.TokenP return r0 } -// TokensOpUpdate provides a mock function with given fields: plugin, operationID, txState, errorMessage, opOutput -func (_m *Callbacks) TokensOpUpdate(plugin tokens.Plugin, operationID *fftypes.UUID, txState fftypes.OpStatus, errorMessage string, opOutput fftypes.JSONObject) error { - ret := _m.Called(plugin, operationID, txState, errorMessage, opOutput) +// TokenPoolCreated provides a mock function with given fields: plugin, pool, protocolTxID, additionalInfo +func (_m *Callbacks) TokenPoolCreated(plugin tokens.Plugin, pool *fftypes.TokenPool, protocolTxID string, additionalInfo fftypes.JSONObject) error { + ret := _m.Called(plugin, pool, protocolTxID, additionalInfo) var r0 error - if rf, ok := ret.Get(0).(func(tokens.Plugin, *fftypes.UUID, fftypes.OpStatus, string, fftypes.JSONObject) error); ok { - r0 = rf(plugin, operationID, txState, errorMessage, opOutput) + if rf, ok := ret.Get(0).(func(tokens.Plugin, *fftypes.TokenPool, string, fftypes.JSONObject) error); ok { + r0 = rf(plugin, pool, protocolTxID, additionalInfo) } else { r0 = ret.Error(0) } diff --git a/pkg/fftypes/event.go b/pkg/fftypes/event.go index d4471d34d9..6f6e27ee16 100644 --- a/pkg/fftypes/event.go +++ b/pkg/fftypes/event.go @@ -37,6 +37,8 @@ var ( EventTypePoolRejected EventType = ffEnum("eventtype", "token_pool_rejected") // EventTypeTransferConfirmed occurs when a token transfer has been confirmed EventTypeTransferConfirmed EventType = ffEnum("eventtype", "token_transfer_confirmed") + // EventTypeTransferOpFailed occurs when a token transfer submitted by this node has failed (based on feedback from connector) + EventTypeTransferOpFailed EventType = ffEnum("eventtype", "token_transfer_op_failed") ) // Event is an activity in the system, delivered reliably to applications, that indicates something has happened in the network diff --git a/pkg/fftypes/message.go b/pkg/fftypes/message.go index 11ce41b393..b2fcddc17b 100644 --- a/pkg/fftypes/message.go +++ b/pkg/fftypes/message.go @@ -51,6 +51,8 @@ var ( type MessageState = FFEnum var ( + // MessageStateStaged is a message created locally which is not ready to send + MessageStateStaged MessageState = ffEnum("messagestate", "staged") // MessageStateReady is a message created locally which is ready to send MessageStateReady MessageState = ffEnum("messagestate", "ready") // MessageStatePending is a message that has been received but is awaiting aggregation/confirmation diff --git a/pkg/tokens/plugin.go b/pkg/tokens/plugin.go index 07bb5970a7..d79bc9365c 100644 --- a/pkg/tokens/plugin.go +++ b/pkg/tokens/plugin.go @@ -66,7 +66,7 @@ type Callbacks interface { // Only the party submitting the transaction will see this data. // // Error should will only be returned in shutdown scenarios - TokensOpUpdate(plugin Plugin, operationID *fftypes.UUID, txState fftypes.OpStatus, errorMessage string, opOutput fftypes.JSONObject) error + TokenOpUpdate(plugin Plugin, operationID *fftypes.UUID, txState fftypes.OpStatus, errorMessage string, opOutput fftypes.JSONObject) error // TokenPoolCreated notifies on the creation of a new token pool, which might have been // submitted by us, or by any other authorized party in the network.