From ecc0712ecb445675086aab5117e6f94f5544e32e Mon Sep 17 00:00:00 2001 From: Andrew Richardson Date: Thu, 3 Feb 2022 16:26:29 -0500 Subject: [PATCH 1/2] Store all token transfer inputs on operation Since the transfer is only recorded upon success, all inputs must be stored on the operation in order to support retries or accurately report failures. Signed-off-by: Andrew Richardson --- internal/assets/token_transfer.go | 8 +++-- internal/events/tokens_transferred_test.go | 10 +++--- internal/syncasync/sync_async_bridge_test.go | 6 ++-- internal/txcommon/token_inputs.go | 24 +++++++++---- internal/txcommon/token_inputs_test.go | 36 +++++++++++++++++--- 5 files changed, 62 insertions(+), 22 deletions(-) diff --git a/internal/assets/token_transfer.go b/internal/assets/token_transfer.go index e67ea32b0e..f1c1a14bfc 100644 --- a/internal/assets/token_transfer.go +++ b/internal/assets/token_transfer.go @@ -246,11 +246,13 @@ func (s *transferSender) sendInternal(ctx context.Context, method sendMethod) er "", fftypes.OpTypeTokenTransfer, fftypes.OpStatusPending) - txcommon.AddTokenTransferInputs(op, &s.transfer.TokenTransfer) - - if err = s.mgr.database.InsertOperation(ctx, op); err != nil { + if err = txcommon.AddTokenTransferInputs(op, &s.transfer.TokenTransfer); err == nil { + err = s.mgr.database.InsertOperation(ctx, op) + } + if err != nil { return err } + if s.transfer.Message != nil { s.transfer.Message.State = fftypes.MessageStateStaged err = s.mgr.database.UpsertMessage(ctx, &s.transfer.Message.Message, database.UpsertOptimizationNew) diff --git a/internal/events/tokens_transferred_test.go b/internal/events/tokens_transferred_test.go index 1a7b86bfb7..c768a87d7e 100644 --- a/internal/events/tokens_transferred_test.go +++ b/internal/events/tokens_transferred_test.go @@ -148,7 +148,7 @@ func TestPersistTransferBadOp(t *testing.T) { } ops := []*fftypes.Operation{{ Input: fftypes.JSONObject{ - "id": "bad", + "localId": "bad", }, Transaction: fftypes.NewUUID(), }} @@ -179,7 +179,7 @@ func TestPersistTransferTxFail(t *testing.T) { localID := fftypes.NewUUID() ops := []*fftypes.Operation{{ Input: fftypes.JSONObject{ - "id": localID.String(), + "localId": localID.String(), }, }} @@ -209,7 +209,7 @@ func TestPersistTransferGetTransferFail(t *testing.T) { localID := fftypes.NewUUID() ops := []*fftypes.Operation{{ Input: fftypes.JSONObject{ - "id": localID.String(), + "localId": localID.String(), }, }} @@ -240,7 +240,7 @@ func TestPersistTransferBlockchainEventFail(t *testing.T) { localID := fftypes.NewUUID() ops := []*fftypes.Operation{{ Input: fftypes.JSONObject{ - "id": localID.String(), + "localId": localID.String(), }, }} @@ -275,7 +275,7 @@ func TestTokensTransferredWithTransactionRegenerateLocalID(t *testing.T) { localID := fftypes.NewUUID() operations := []*fftypes.Operation{{ Input: fftypes.JSONObject{ - "id": localID.String(), + "localId": localID.String(), }, }} diff --git a/internal/syncasync/sync_async_bridge_test.go b/internal/syncasync/sync_async_bridge_test.go index b24d74c8a6..33265e8c23 100644 --- a/internal/syncasync/sync_async_bridge_test.go +++ b/internal/syncasync/sync_async_bridge_test.go @@ -645,7 +645,7 @@ func TestAwaitFailedTokenTransfer(t *testing.T) { op := &fftypes.Operation{ ID: fftypes.NewUUID(), Input: fftypes.JSONObject{ - "id": requestID.String(), + "localId": requestID.String(), }, } @@ -686,7 +686,7 @@ func TestFailedTokenTransferOpError(t *testing.T) { op := &fftypes.Operation{ ID: fftypes.NewUUID(), Input: fftypes.JSONObject{ - "id": requestID.String(), + "localId": requestID.String(), }, } @@ -721,7 +721,7 @@ func TestFailedTokenTransferOpNotFound(t *testing.T) { op := &fftypes.Operation{ ID: fftypes.NewUUID(), Input: fftypes.JSONObject{ - "id": requestID.String(), + "localId": requestID.String(), }, } diff --git a/internal/txcommon/token_inputs.go b/internal/txcommon/token_inputs.go index aba4413a42..b173b67dcd 100644 --- a/internal/txcommon/token_inputs.go +++ b/internal/txcommon/token_inputs.go @@ -1,4 +1,4 @@ -// Copyright © 2021 Kaleido, Inc. +// Copyright © 2022 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -18,8 +18,10 @@ package txcommon import ( "context" + "encoding/json" "fmt" + "github.com/hyperledger/firefly/internal/i18n" "github.com/hyperledger/firefly/pkg/fftypes" ) @@ -49,15 +51,25 @@ func RetrieveTokenPoolCreateInputs(ctx context.Context, op *fftypes.Operation, p return nil } -func AddTokenTransferInputs(op *fftypes.Operation, transfer *fftypes.TokenTransfer) { - op.Input = fftypes.JSONObject{ - "id": transfer.LocalID.String(), +func AddTokenTransferInputs(op *fftypes.Operation, transfer *fftypes.TokenTransfer) (err error) { + var j []byte + if j, err = json.Marshal(transfer); err == nil { + err = json.Unmarshal(j, &op.Input) } + return err } func RetrieveTokenTransferInputs(ctx context.Context, op *fftypes.Operation, transfer *fftypes.TokenTransfer) (err error) { - if transfer.LocalID, err = fftypes.ParseUUID(ctx, op.Input.GetString("id")); err != nil { - return err + var t fftypes.TokenTransfer + s := op.Input.String() + if err = json.Unmarshal([]byte(s), &t); err != nil { + return i18n.WrapError(ctx, err, i18n.MsgJSONObjectParseFailed, s) + } + if t.LocalID == nil { + return i18n.NewError(ctx, i18n.MsgInvalidUUID) } + // The LocalID is the only thing that needs to be read back out when processing an event + // (everything else should be unpacked from the event) + transfer.LocalID = t.LocalID return nil } diff --git a/internal/txcommon/token_inputs_test.go b/internal/txcommon/token_inputs_test.go index 0a2e00e685..218a684cef 100644 --- a/internal/txcommon/token_inputs_test.go +++ b/internal/txcommon/token_inputs_test.go @@ -1,4 +1,4 @@ -// Copyright © 2021 Kaleido, Inc. +// Copyright © 2022 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -99,34 +99,60 @@ func TestAddTokenTransferInputs(t *testing.T) { op := &fftypes.Operation{} transfer := &fftypes.TokenTransfer{ LocalID: fftypes.NewUUID(), + Type: fftypes.TokenTransferTypeTransfer, + Amount: *fftypes.NewFFBigInt(1), + TX: fftypes.TransactionRef{ + Type: fftypes.TransactionTypeTokenTransfer, + ID: fftypes.NewUUID(), + }, } AddTokenTransferInputs(op, transfer) - assert.Equal(t, transfer.LocalID.String(), op.Input.GetString("id")) + assert.Equal(t, fftypes.JSONObject{ + "amount": "1", + "localId": transfer.LocalID.String(), + "tx": map[string]interface{}{ + "id": transfer.TX.ID.String(), + "type": "token_transfer", + }, + "type": "transfer", + }, op.Input) } func TestRetrieveTokenTransferInputs(t *testing.T) { id := fftypes.NewUUID() op := &fftypes.Operation{ Input: fftypes.JSONObject{ - "id": id.String(), + "amount": "1", + "localId": id.String(), }, } - transfer := &fftypes.TokenTransfer{} + transfer := &fftypes.TokenTransfer{Amount: *fftypes.NewFFBigInt(2)} err := RetrieveTokenTransferInputs(context.Background(), op, transfer) assert.NoError(t, err) assert.Equal(t, *id, *transfer.LocalID) + assert.Equal(t, int64(2), transfer.Amount.Int().Int64()) } func TestRetrieveTokenTransferInputsBadID(t *testing.T) { op := &fftypes.Operation{ Input: fftypes.JSONObject{ - "id": "bad", + "localId": "bad", }, } transfer := &fftypes.TokenTransfer{} + err := RetrieveTokenTransferInputs(context.Background(), op, transfer) + assert.Regexp(t, "FF10151", err) +} + +func TestRetrieveTokenTransferInputsMissingID(t *testing.T) { + op := &fftypes.Operation{ + Input: fftypes.JSONObject{}, + } + transfer := &fftypes.TokenTransfer{} + err := RetrieveTokenTransferInputs(context.Background(), op, transfer) assert.Regexp(t, "FF10142", err) } From 815a971f8b4acd71dd3f6bcba092f4bd52dc84e2 Mon Sep 17 00:00:00 2001 From: Andrew Richardson Date: Thu, 3 Feb 2022 16:52:41 -0500 Subject: [PATCH 2/2] Clean up NewOperation helper Signed-off-by: Andrew Richardson --- internal/assets/token_pool.go | 5 ++--- internal/assets/token_transfer.go | 5 ++--- internal/batchpin/batchpin.go | 5 ++--- internal/broadcast/manager.go | 7 +++---- internal/contracts/manager.go | 5 ++--- internal/events/token_pool_created.go | 5 ++--- internal/privatemessaging/privatemessaging.go | 10 ++++------ pkg/fftypes/operation.go | 6 +++--- pkg/fftypes/operation_test.go | 2 +- 9 files changed, 21 insertions(+), 29 deletions(-) diff --git a/internal/assets/token_pool.go b/internal/assets/token_pool.go index 072a6dd951..42c928242f 100644 --- a/internal/assets/token_pool.go +++ b/internal/assets/token_pool.go @@ -76,13 +76,12 @@ func (am *assetManager) createTokenPoolInternal(ctx context.Context, pool *fftyp pool.TX.ID = txid pool.TX.Type = fftypes.TransactionTypeTokenPool - op = fftypes.NewTXOperation( + op = fftypes.NewOperation( plugin, pool.Namespace, txid, "", - fftypes.OpTypeTokenCreatePool, - fftypes.OpStatusPending) + fftypes.OpTypeTokenCreatePool) txcommon.AddTokenPoolCreateInputs(op, pool) return am.database.InsertOperation(ctx, op) diff --git a/internal/assets/token_transfer.go b/internal/assets/token_transfer.go index f1c1a14bfc..f11c4bc354 100644 --- a/internal/assets/token_transfer.go +++ b/internal/assets/token_transfer.go @@ -239,13 +239,12 @@ func (s *transferSender) sendInternal(ctx context.Context, method sendMethod) er s.transfer.TX.ID = txid s.transfer.TX.Type = fftypes.TransactionTypeTokenTransfer - op = fftypes.NewTXOperation( + op = fftypes.NewOperation( plugin, s.namespace, txid, "", - fftypes.OpTypeTokenTransfer, - fftypes.OpStatusPending) + fftypes.OpTypeTokenTransfer) if err = txcommon.AddTokenTransferInputs(op, &s.transfer.TokenTransfer); err == nil { err = s.mgr.database.InsertOperation(ctx, op) } diff --git a/internal/batchpin/batchpin.go b/internal/batchpin/batchpin.go index 38fea250fa..780cd16145 100644 --- a/internal/batchpin/batchpin.go +++ b/internal/batchpin/batchpin.go @@ -50,13 +50,12 @@ func NewBatchPinSubmitter(di database.Plugin, im identity.Manager, bi blockchain func (bp *batchPinSubmitter) SubmitPinnedBatch(ctx context.Context, batch *fftypes.Batch, contexts []*fftypes.Bytes32) error { // The pending blockchain transaction - op := fftypes.NewTXOperation( + op := fftypes.NewOperation( bp.blockchain, batch.Namespace, batch.Payload.TX.ID, "", - fftypes.OpTypeBlockchainBatchPin, - fftypes.OpStatusPending) + fftypes.OpTypeBlockchainBatchPin) if err := bp.database.InsertOperation(ctx, op); err != nil { return err } diff --git a/internal/broadcast/manager.go b/internal/broadcast/manager.go index 292d331159..725b307e9d 100644 --- a/internal/broadcast/manager.go +++ b/internal/broadcast/manager.go @@ -124,14 +124,13 @@ func (bm *broadcastManager) submitTXAndUpdateDB(ctx context.Context, batch *ffty } // The completed PublicStorage upload - op := fftypes.NewTXOperation( + op := fftypes.NewOperation( bm.publicstorage, batch.Namespace, batch.Payload.TX.ID, batch.PayloadRef, - fftypes.OpTypePublicStorageBatchBroadcast, - fftypes.OpStatusSucceeded, // Note we performed the action synchronously above - ) + fftypes.OpTypePublicStorageBatchBroadcast) + op.Status = fftypes.OpStatusSucceeded // Note we performed the action synchronously above err = bm.database.InsertOperation(ctx, op) if err != nil { return err diff --git a/internal/contracts/manager.go b/internal/contracts/manager.go index 029e688560..ec7031fb9a 100644 --- a/internal/contracts/manager.go +++ b/internal/contracts/manager.go @@ -183,13 +183,12 @@ func (cm *contractManager) InvokeContract(ctx context.Context, ns string, req *f return err } - op = fftypes.NewTXOperation( + op = fftypes.NewOperation( cm.blockchain, ns, txid, "", - fftypes.OpTypeContractInvoke, - fftypes.OpStatusPending) + fftypes.OpTypeContractInvoke) op.Input = req.Input return cm.database.InsertOperation(ctx, op) }) diff --git a/internal/events/token_pool_created.go b/internal/events/token_pool_created.go index 1d5e4de810..20f0a2ab3c 100644 --- a/internal/events/token_pool_created.go +++ b/internal/events/token_pool_created.go @@ -106,13 +106,12 @@ func (em *eventManager) shouldAnnounce(ctx context.Context, ti tokens.Plugin, po } addPoolDetailsFromPlugin(announcePool, pool) - nextOp := fftypes.NewTXOperation( + nextOp := fftypes.NewOperation( ti, op.Namespace, op.Transaction, "", - fftypes.OpTypeTokenAnnouncePool, - fftypes.OpStatusPending) + fftypes.OpTypeTokenAnnouncePool) return announcePool, em.database.InsertOperation(ctx, nextOp) } diff --git a/internal/privatemessaging/privatemessaging.go b/internal/privatemessaging/privatemessaging.go index dde0e4deb4..b93b666749 100644 --- a/internal/privatemessaging/privatemessaging.go +++ b/internal/privatemessaging/privatemessaging.go @@ -158,13 +158,12 @@ func (pm *privateMessaging) transferBlobs(ctx context.Context, data []*fftypes.D } if txid != nil { - op := fftypes.NewTXOperation( + op := fftypes.NewOperation( pm.exchange, d.Namespace, txid, trackingID, - fftypes.OpTypeDataExchangeBlobSend, - fftypes.OpStatusPending) + fftypes.OpTypeDataExchangeBlobSend) if err = pm.database.InsertOperation(ctx, op); err != nil { return err } @@ -210,13 +209,12 @@ func (pm *privateMessaging) sendData(ctx context.Context, mType string, mID *fft } if txid != nil { - op := fftypes.NewTXOperation( + op := fftypes.NewOperation( pm.exchange, ns, txid, trackingID, - fftypes.OpTypeDataExchangeBatchSend, - fftypes.OpStatusPending) + fftypes.OpTypeDataExchangeBatchSend) op.Input = fftypes.JSONObject{ "manifest": tw.Manifest().String(), } diff --git a/pkg/fftypes/operation.go b/pkg/fftypes/operation.go index 472fdc75b7..463cd16669 100644 --- a/pkg/fftypes/operation.go +++ b/pkg/fftypes/operation.go @@ -56,8 +56,8 @@ type Named interface { Name() string } -// NewTXOperation creates a new operation for a transaction -func NewTXOperation(plugin Named, namespace string, tx *UUID, backendID string, opType OpType, opStatus OpStatus) *Operation { +// NewOperation creates a new operation in a transaction +func NewOperation(plugin Named, namespace string, tx *UUID, backendID string, opType OpType) *Operation { return &Operation{ ID: NewUUID(), Namespace: namespace, @@ -65,7 +65,7 @@ func NewTXOperation(plugin Named, namespace string, tx *UUID, backendID string, BackendID: backendID, Transaction: tx, Type: opType, - Status: opStatus, + Status: OpStatusPending, Created: Now(), } } diff --git a/pkg/fftypes/operation_test.go b/pkg/fftypes/operation_test.go index 1fe370b8ec..b94ab7bcc5 100644 --- a/pkg/fftypes/operation_test.go +++ b/pkg/fftypes/operation_test.go @@ -29,7 +29,7 @@ func (f *fakePlugin) Name() string { return "fake" } func TestNewPendingMessageOp(t *testing.T) { txID := NewUUID() - op := NewTXOperation(&fakePlugin{}, "ns1", txID, "testBackend", OpTypePublicStorageBatchBroadcast, OpStatusPending) + op := NewOperation(&fakePlugin{}, "ns1", txID, "testBackend", OpTypePublicStorageBatchBroadcast) assert.Equal(t, Operation{ ID: op.ID, Namespace: "ns1",