diff --git a/internal/assets/token_pool.go b/internal/assets/token_pool.go index 072a6dd951..42c928242f 100644 --- a/internal/assets/token_pool.go +++ b/internal/assets/token_pool.go @@ -76,13 +76,12 @@ func (am *assetManager) createTokenPoolInternal(ctx context.Context, pool *fftyp pool.TX.ID = txid pool.TX.Type = fftypes.TransactionTypeTokenPool - op = fftypes.NewTXOperation( + op = fftypes.NewOperation( plugin, pool.Namespace, txid, "", - fftypes.OpTypeTokenCreatePool, - fftypes.OpStatusPending) + fftypes.OpTypeTokenCreatePool) txcommon.AddTokenPoolCreateInputs(op, pool) return am.database.InsertOperation(ctx, op) diff --git a/internal/assets/token_transfer.go b/internal/assets/token_transfer.go index e67ea32b0e..f11c4bc354 100644 --- a/internal/assets/token_transfer.go +++ b/internal/assets/token_transfer.go @@ -239,18 +239,19 @@ func (s *transferSender) sendInternal(ctx context.Context, method sendMethod) er s.transfer.TX.ID = txid s.transfer.TX.Type = fftypes.TransactionTypeTokenTransfer - op = fftypes.NewTXOperation( + op = fftypes.NewOperation( plugin, s.namespace, txid, "", - fftypes.OpTypeTokenTransfer, - fftypes.OpStatusPending) - txcommon.AddTokenTransferInputs(op, &s.transfer.TokenTransfer) - - if err = s.mgr.database.InsertOperation(ctx, op); err != nil { + fftypes.OpTypeTokenTransfer) + if err = txcommon.AddTokenTransferInputs(op, &s.transfer.TokenTransfer); err == nil { + err = s.mgr.database.InsertOperation(ctx, op) + } + if err != nil { return err } + if s.transfer.Message != nil { s.transfer.Message.State = fftypes.MessageStateStaged err = s.mgr.database.UpsertMessage(ctx, &s.transfer.Message.Message, database.UpsertOptimizationNew) diff --git a/internal/batchpin/batchpin.go b/internal/batchpin/batchpin.go index 38fea250fa..780cd16145 100644 --- a/internal/batchpin/batchpin.go +++ b/internal/batchpin/batchpin.go @@ -50,13 +50,12 @@ func NewBatchPinSubmitter(di database.Plugin, im identity.Manager, bi blockchain func (bp *batchPinSubmitter) SubmitPinnedBatch(ctx context.Context, batch *fftypes.Batch, contexts []*fftypes.Bytes32) error { // The pending blockchain transaction - op := fftypes.NewTXOperation( + op := fftypes.NewOperation( bp.blockchain, batch.Namespace, batch.Payload.TX.ID, "", - fftypes.OpTypeBlockchainBatchPin, - fftypes.OpStatusPending) + fftypes.OpTypeBlockchainBatchPin) if err := bp.database.InsertOperation(ctx, op); err != nil { return err } diff --git a/internal/broadcast/manager.go b/internal/broadcast/manager.go index 292d331159..725b307e9d 100644 --- a/internal/broadcast/manager.go +++ b/internal/broadcast/manager.go @@ -124,14 +124,13 @@ func (bm *broadcastManager) submitTXAndUpdateDB(ctx context.Context, batch *ffty } // The completed PublicStorage upload - op := fftypes.NewTXOperation( + op := fftypes.NewOperation( bm.publicstorage, batch.Namespace, batch.Payload.TX.ID, batch.PayloadRef, - fftypes.OpTypePublicStorageBatchBroadcast, - fftypes.OpStatusSucceeded, // Note we performed the action synchronously above - ) + fftypes.OpTypePublicStorageBatchBroadcast) + op.Status = fftypes.OpStatusSucceeded // Note we performed the action synchronously above err = bm.database.InsertOperation(ctx, op) if err != nil { return err diff --git a/internal/contracts/manager.go b/internal/contracts/manager.go index 029e688560..ec7031fb9a 100644 --- a/internal/contracts/manager.go +++ b/internal/contracts/manager.go @@ -183,13 +183,12 @@ func (cm *contractManager) InvokeContract(ctx context.Context, ns string, req *f return err } - op = fftypes.NewTXOperation( + op = fftypes.NewOperation( cm.blockchain, ns, txid, "", - fftypes.OpTypeContractInvoke, - fftypes.OpStatusPending) + fftypes.OpTypeContractInvoke) op.Input = req.Input return cm.database.InsertOperation(ctx, op) }) diff --git a/internal/events/token_pool_created.go b/internal/events/token_pool_created.go index 1d5e4de810..20f0a2ab3c 100644 --- a/internal/events/token_pool_created.go +++ b/internal/events/token_pool_created.go @@ -106,13 +106,12 @@ func (em *eventManager) shouldAnnounce(ctx context.Context, ti tokens.Plugin, po } addPoolDetailsFromPlugin(announcePool, pool) - nextOp := fftypes.NewTXOperation( + nextOp := fftypes.NewOperation( ti, op.Namespace, op.Transaction, "", - fftypes.OpTypeTokenAnnouncePool, - fftypes.OpStatusPending) + fftypes.OpTypeTokenAnnouncePool) return announcePool, em.database.InsertOperation(ctx, nextOp) } diff --git a/internal/events/tokens_transferred_test.go b/internal/events/tokens_transferred_test.go index 1a7b86bfb7..c768a87d7e 100644 --- a/internal/events/tokens_transferred_test.go +++ b/internal/events/tokens_transferred_test.go @@ -148,7 +148,7 @@ func TestPersistTransferBadOp(t *testing.T) { } ops := []*fftypes.Operation{{ Input: fftypes.JSONObject{ - "id": "bad", + "localId": "bad", }, Transaction: fftypes.NewUUID(), }} @@ -179,7 +179,7 @@ func TestPersistTransferTxFail(t *testing.T) { localID := fftypes.NewUUID() ops := []*fftypes.Operation{{ Input: fftypes.JSONObject{ - "id": localID.String(), + "localId": localID.String(), }, }} @@ -209,7 +209,7 @@ func TestPersistTransferGetTransferFail(t *testing.T) { localID := fftypes.NewUUID() ops := []*fftypes.Operation{{ Input: fftypes.JSONObject{ - "id": localID.String(), + "localId": localID.String(), }, }} @@ -240,7 +240,7 @@ func TestPersistTransferBlockchainEventFail(t *testing.T) { localID := fftypes.NewUUID() ops := []*fftypes.Operation{{ Input: fftypes.JSONObject{ - "id": localID.String(), + "localId": localID.String(), }, }} @@ -275,7 +275,7 @@ func TestTokensTransferredWithTransactionRegenerateLocalID(t *testing.T) { localID := fftypes.NewUUID() operations := []*fftypes.Operation{{ Input: fftypes.JSONObject{ - "id": localID.String(), + "localId": localID.String(), }, }} diff --git a/internal/privatemessaging/privatemessaging.go b/internal/privatemessaging/privatemessaging.go index dde0e4deb4..b93b666749 100644 --- a/internal/privatemessaging/privatemessaging.go +++ b/internal/privatemessaging/privatemessaging.go @@ -158,13 +158,12 @@ func (pm *privateMessaging) transferBlobs(ctx context.Context, data []*fftypes.D } if txid != nil { - op := fftypes.NewTXOperation( + op := fftypes.NewOperation( pm.exchange, d.Namespace, txid, trackingID, - fftypes.OpTypeDataExchangeBlobSend, - fftypes.OpStatusPending) + fftypes.OpTypeDataExchangeBlobSend) if err = pm.database.InsertOperation(ctx, op); err != nil { return err } @@ -210,13 +209,12 @@ func (pm *privateMessaging) sendData(ctx context.Context, mType string, mID *fft } if txid != nil { - op := fftypes.NewTXOperation( + op := fftypes.NewOperation( pm.exchange, ns, txid, trackingID, - fftypes.OpTypeDataExchangeBatchSend, - fftypes.OpStatusPending) + fftypes.OpTypeDataExchangeBatchSend) op.Input = fftypes.JSONObject{ "manifest": tw.Manifest().String(), } diff --git a/internal/syncasync/sync_async_bridge_test.go b/internal/syncasync/sync_async_bridge_test.go index b24d74c8a6..33265e8c23 100644 --- a/internal/syncasync/sync_async_bridge_test.go +++ b/internal/syncasync/sync_async_bridge_test.go @@ -645,7 +645,7 @@ func TestAwaitFailedTokenTransfer(t *testing.T) { op := &fftypes.Operation{ ID: fftypes.NewUUID(), Input: fftypes.JSONObject{ - "id": requestID.String(), + "localId": requestID.String(), }, } @@ -686,7 +686,7 @@ func TestFailedTokenTransferOpError(t *testing.T) { op := &fftypes.Operation{ ID: fftypes.NewUUID(), Input: fftypes.JSONObject{ - "id": requestID.String(), + "localId": requestID.String(), }, } @@ -721,7 +721,7 @@ func TestFailedTokenTransferOpNotFound(t *testing.T) { op := &fftypes.Operation{ ID: fftypes.NewUUID(), Input: fftypes.JSONObject{ - "id": requestID.String(), + "localId": requestID.String(), }, } diff --git a/internal/txcommon/token_inputs.go b/internal/txcommon/token_inputs.go index aba4413a42..b173b67dcd 100644 --- a/internal/txcommon/token_inputs.go +++ b/internal/txcommon/token_inputs.go @@ -1,4 +1,4 @@ -// Copyright © 2021 Kaleido, Inc. +// Copyright © 2022 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -18,8 +18,10 @@ package txcommon import ( "context" + "encoding/json" "fmt" + "github.com/hyperledger/firefly/internal/i18n" "github.com/hyperledger/firefly/pkg/fftypes" ) @@ -49,15 +51,25 @@ func RetrieveTokenPoolCreateInputs(ctx context.Context, op *fftypes.Operation, p return nil } -func AddTokenTransferInputs(op *fftypes.Operation, transfer *fftypes.TokenTransfer) { - op.Input = fftypes.JSONObject{ - "id": transfer.LocalID.String(), +func AddTokenTransferInputs(op *fftypes.Operation, transfer *fftypes.TokenTransfer) (err error) { + var j []byte + if j, err = json.Marshal(transfer); err == nil { + err = json.Unmarshal(j, &op.Input) } + return err } func RetrieveTokenTransferInputs(ctx context.Context, op *fftypes.Operation, transfer *fftypes.TokenTransfer) (err error) { - if transfer.LocalID, err = fftypes.ParseUUID(ctx, op.Input.GetString("id")); err != nil { - return err + var t fftypes.TokenTransfer + s := op.Input.String() + if err = json.Unmarshal([]byte(s), &t); err != nil { + return i18n.WrapError(ctx, err, i18n.MsgJSONObjectParseFailed, s) + } + if t.LocalID == nil { + return i18n.NewError(ctx, i18n.MsgInvalidUUID) } + // The LocalID is the only thing that needs to be read back out when processing an event + // (everything else should be unpacked from the event) + transfer.LocalID = t.LocalID return nil } diff --git a/internal/txcommon/token_inputs_test.go b/internal/txcommon/token_inputs_test.go index 0a2e00e685..218a684cef 100644 --- a/internal/txcommon/token_inputs_test.go +++ b/internal/txcommon/token_inputs_test.go @@ -1,4 +1,4 @@ -// Copyright © 2021 Kaleido, Inc. +// Copyright © 2022 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -99,34 +99,60 @@ func TestAddTokenTransferInputs(t *testing.T) { op := &fftypes.Operation{} transfer := &fftypes.TokenTransfer{ LocalID: fftypes.NewUUID(), + Type: fftypes.TokenTransferTypeTransfer, + Amount: *fftypes.NewFFBigInt(1), + TX: fftypes.TransactionRef{ + Type: fftypes.TransactionTypeTokenTransfer, + ID: fftypes.NewUUID(), + }, } AddTokenTransferInputs(op, transfer) - assert.Equal(t, transfer.LocalID.String(), op.Input.GetString("id")) + assert.Equal(t, fftypes.JSONObject{ + "amount": "1", + "localId": transfer.LocalID.String(), + "tx": map[string]interface{}{ + "id": transfer.TX.ID.String(), + "type": "token_transfer", + }, + "type": "transfer", + }, op.Input) } func TestRetrieveTokenTransferInputs(t *testing.T) { id := fftypes.NewUUID() op := &fftypes.Operation{ Input: fftypes.JSONObject{ - "id": id.String(), + "amount": "1", + "localId": id.String(), }, } - transfer := &fftypes.TokenTransfer{} + transfer := &fftypes.TokenTransfer{Amount: *fftypes.NewFFBigInt(2)} err := RetrieveTokenTransferInputs(context.Background(), op, transfer) assert.NoError(t, err) assert.Equal(t, *id, *transfer.LocalID) + assert.Equal(t, int64(2), transfer.Amount.Int().Int64()) } func TestRetrieveTokenTransferInputsBadID(t *testing.T) { op := &fftypes.Operation{ Input: fftypes.JSONObject{ - "id": "bad", + "localId": "bad", }, } transfer := &fftypes.TokenTransfer{} + err := RetrieveTokenTransferInputs(context.Background(), op, transfer) + assert.Regexp(t, "FF10151", err) +} + +func TestRetrieveTokenTransferInputsMissingID(t *testing.T) { + op := &fftypes.Operation{ + Input: fftypes.JSONObject{}, + } + transfer := &fftypes.TokenTransfer{} + err := RetrieveTokenTransferInputs(context.Background(), op, transfer) assert.Regexp(t, "FF10142", err) } diff --git a/pkg/fftypes/operation.go b/pkg/fftypes/operation.go index 472fdc75b7..463cd16669 100644 --- a/pkg/fftypes/operation.go +++ b/pkg/fftypes/operation.go @@ -56,8 +56,8 @@ type Named interface { Name() string } -// NewTXOperation creates a new operation for a transaction -func NewTXOperation(plugin Named, namespace string, tx *UUID, backendID string, opType OpType, opStatus OpStatus) *Operation { +// NewOperation creates a new operation in a transaction +func NewOperation(plugin Named, namespace string, tx *UUID, backendID string, opType OpType) *Operation { return &Operation{ ID: NewUUID(), Namespace: namespace, @@ -65,7 +65,7 @@ func NewTXOperation(plugin Named, namespace string, tx *UUID, backendID string, BackendID: backendID, Transaction: tx, Type: opType, - Status: opStatus, + Status: OpStatusPending, Created: Now(), } } diff --git a/pkg/fftypes/operation_test.go b/pkg/fftypes/operation_test.go index 1fe370b8ec..b94ab7bcc5 100644 --- a/pkg/fftypes/operation_test.go +++ b/pkg/fftypes/operation_test.go @@ -29,7 +29,7 @@ func (f *fakePlugin) Name() string { return "fake" } func TestNewPendingMessageOp(t *testing.T) { txID := NewUUID() - op := NewTXOperation(&fakePlugin{}, "ns1", txID, "testBackend", OpTypePublicStorageBatchBroadcast, OpStatusPending) + op := NewOperation(&fakePlugin{}, "ns1", txID, "testBackend", OpTypePublicStorageBatchBroadcast) assert.Equal(t, Operation{ ID: op.ID, Namespace: "ns1",