Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions internal/assets/token_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,12 @@ func (am *assetManager) createTokenPoolInternal(ctx context.Context, pool *fftyp
pool.TX.ID = txid
pool.TX.Type = fftypes.TransactionTypeTokenPool

op = fftypes.NewTXOperation(
op = fftypes.NewOperation(
plugin,
pool.Namespace,
txid,
"",
fftypes.OpTypeTokenCreatePool,
fftypes.OpStatusPending)
fftypes.OpTypeTokenCreatePool)
txcommon.AddTokenPoolCreateInputs(op, pool)

return am.database.InsertOperation(ctx, op)
Expand Down
13 changes: 7 additions & 6 deletions internal/assets/token_transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,18 +239,19 @@ func (s *transferSender) sendInternal(ctx context.Context, method sendMethod) er
s.transfer.TX.ID = txid
s.transfer.TX.Type = fftypes.TransactionTypeTokenTransfer

op = fftypes.NewTXOperation(
op = fftypes.NewOperation(
plugin,
s.namespace,
txid,
"",
fftypes.OpTypeTokenTransfer,
fftypes.OpStatusPending)
txcommon.AddTokenTransferInputs(op, &s.transfer.TokenTransfer)

if err = s.mgr.database.InsertOperation(ctx, op); err != nil {
fftypes.OpTypeTokenTransfer)
if err = txcommon.AddTokenTransferInputs(op, &s.transfer.TokenTransfer); err == nil {
err = s.mgr.database.InsertOperation(ctx, op)
}
if err != nil {
return err
}

if s.transfer.Message != nil {
s.transfer.Message.State = fftypes.MessageStateStaged
err = s.mgr.database.UpsertMessage(ctx, &s.transfer.Message.Message, database.UpsertOptimizationNew)
Expand Down
5 changes: 2 additions & 3 deletions internal/batchpin/batchpin.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,12 @@ func NewBatchPinSubmitter(di database.Plugin, im identity.Manager, bi blockchain
func (bp *batchPinSubmitter) SubmitPinnedBatch(ctx context.Context, batch *fftypes.Batch, contexts []*fftypes.Bytes32) error {

// The pending blockchain transaction
op := fftypes.NewTXOperation(
op := fftypes.NewOperation(
bp.blockchain,
batch.Namespace,
batch.Payload.TX.ID,
"",
fftypes.OpTypeBlockchainBatchPin,
fftypes.OpStatusPending)
fftypes.OpTypeBlockchainBatchPin)
if err := bp.database.InsertOperation(ctx, op); err != nil {
return err
}
Expand Down
7 changes: 3 additions & 4 deletions internal/broadcast/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,14 +124,13 @@ func (bm *broadcastManager) submitTXAndUpdateDB(ctx context.Context, batch *ffty
}

// The completed PublicStorage upload
op := fftypes.NewTXOperation(
op := fftypes.NewOperation(
bm.publicstorage,
batch.Namespace,
batch.Payload.TX.ID,
batch.PayloadRef,
fftypes.OpTypePublicStorageBatchBroadcast,
fftypes.OpStatusSucceeded, // Note we performed the action synchronously above
)
fftypes.OpTypePublicStorageBatchBroadcast)
op.Status = fftypes.OpStatusSucceeded // Note we performed the action synchronously above
err = bm.database.InsertOperation(ctx, op)
if err != nil {
return err
Expand Down
5 changes: 2 additions & 3 deletions internal/contracts/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,13 +183,12 @@ func (cm *contractManager) InvokeContract(ctx context.Context, ns string, req *f
return err
}

op = fftypes.NewTXOperation(
op = fftypes.NewOperation(
cm.blockchain,
ns,
txid,
"",
fftypes.OpTypeContractInvoke,
fftypes.OpStatusPending)
fftypes.OpTypeContractInvoke)
op.Input = req.Input
return cm.database.InsertOperation(ctx, op)
})
Expand Down
5 changes: 2 additions & 3 deletions internal/events/token_pool_created.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,12 @@ func (em *eventManager) shouldAnnounce(ctx context.Context, ti tokens.Plugin, po
}
addPoolDetailsFromPlugin(announcePool, pool)

nextOp := fftypes.NewTXOperation(
nextOp := fftypes.NewOperation(
ti,
op.Namespace,
op.Transaction,
"",
fftypes.OpTypeTokenAnnouncePool,
fftypes.OpStatusPending)
fftypes.OpTypeTokenAnnouncePool)
return announcePool, em.database.InsertOperation(ctx, nextOp)
}

Expand Down
10 changes: 5 additions & 5 deletions internal/events/tokens_transferred_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func TestPersistTransferBadOp(t *testing.T) {
}
ops := []*fftypes.Operation{{
Input: fftypes.JSONObject{
"id": "bad",
"localId": "bad",
},
Transaction: fftypes.NewUUID(),
}}
Expand Down Expand Up @@ -179,7 +179,7 @@ func TestPersistTransferTxFail(t *testing.T) {
localID := fftypes.NewUUID()
ops := []*fftypes.Operation{{
Input: fftypes.JSONObject{
"id": localID.String(),
"localId": localID.String(),
},
}}

Expand Down Expand Up @@ -209,7 +209,7 @@ func TestPersistTransferGetTransferFail(t *testing.T) {
localID := fftypes.NewUUID()
ops := []*fftypes.Operation{{
Input: fftypes.JSONObject{
"id": localID.String(),
"localId": localID.String(),
},
}}

Expand Down Expand Up @@ -240,7 +240,7 @@ func TestPersistTransferBlockchainEventFail(t *testing.T) {
localID := fftypes.NewUUID()
ops := []*fftypes.Operation{{
Input: fftypes.JSONObject{
"id": localID.String(),
"localId": localID.String(),
},
}}

Expand Down Expand Up @@ -275,7 +275,7 @@ func TestTokensTransferredWithTransactionRegenerateLocalID(t *testing.T) {
localID := fftypes.NewUUID()
operations := []*fftypes.Operation{{
Input: fftypes.JSONObject{
"id": localID.String(),
"localId": localID.String(),
},
}}

Expand Down
10 changes: 4 additions & 6 deletions internal/privatemessaging/privatemessaging.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,13 +158,12 @@ func (pm *privateMessaging) transferBlobs(ctx context.Context, data []*fftypes.D
}

if txid != nil {
op := fftypes.NewTXOperation(
op := fftypes.NewOperation(
pm.exchange,
d.Namespace,
txid,
trackingID,
fftypes.OpTypeDataExchangeBlobSend,
fftypes.OpStatusPending)
fftypes.OpTypeDataExchangeBlobSend)
if err = pm.database.InsertOperation(ctx, op); err != nil {
return err
}
Expand Down Expand Up @@ -210,13 +209,12 @@ func (pm *privateMessaging) sendData(ctx context.Context, mType string, mID *fft
}

if txid != nil {
op := fftypes.NewTXOperation(
op := fftypes.NewOperation(
pm.exchange,
ns,
txid,
trackingID,
fftypes.OpTypeDataExchangeBatchSend,
fftypes.OpStatusPending)
fftypes.OpTypeDataExchangeBatchSend)
op.Input = fftypes.JSONObject{
"manifest": tw.Manifest().String(),
}
Expand Down
6 changes: 3 additions & 3 deletions internal/syncasync/sync_async_bridge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,7 +645,7 @@ func TestAwaitFailedTokenTransfer(t *testing.T) {
op := &fftypes.Operation{
ID: fftypes.NewUUID(),
Input: fftypes.JSONObject{
"id": requestID.String(),
"localId": requestID.String(),
},
}

Expand Down Expand Up @@ -686,7 +686,7 @@ func TestFailedTokenTransferOpError(t *testing.T) {
op := &fftypes.Operation{
ID: fftypes.NewUUID(),
Input: fftypes.JSONObject{
"id": requestID.String(),
"localId": requestID.String(),
},
}

Expand Down Expand Up @@ -721,7 +721,7 @@ func TestFailedTokenTransferOpNotFound(t *testing.T) {
op := &fftypes.Operation{
ID: fftypes.NewUUID(),
Input: fftypes.JSONObject{
"id": requestID.String(),
"localId": requestID.String(),
},
}

Expand Down
24 changes: 18 additions & 6 deletions internal/txcommon/token_inputs.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2021 Kaleido, Inc.
// Copyright © 2022 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand All @@ -18,8 +18,10 @@ package txcommon

import (
"context"
"encoding/json"
"fmt"

"github.com/hyperledger/firefly/internal/i18n"
"github.com/hyperledger/firefly/pkg/fftypes"
)

Expand Down Expand Up @@ -49,15 +51,25 @@ func RetrieveTokenPoolCreateInputs(ctx context.Context, op *fftypes.Operation, p
return nil
}

func AddTokenTransferInputs(op *fftypes.Operation, transfer *fftypes.TokenTransfer) {
op.Input = fftypes.JSONObject{
"id": transfer.LocalID.String(),
func AddTokenTransferInputs(op *fftypes.Operation, transfer *fftypes.TokenTransfer) (err error) {
var j []byte
if j, err = json.Marshal(transfer); err == nil {
err = json.Unmarshal(j, &op.Input)
}
return err
}

func RetrieveTokenTransferInputs(ctx context.Context, op *fftypes.Operation, transfer *fftypes.TokenTransfer) (err error) {
if transfer.LocalID, err = fftypes.ParseUUID(ctx, op.Input.GetString("id")); err != nil {
return err
var t fftypes.TokenTransfer
s := op.Input.String()
if err = json.Unmarshal([]byte(s), &t); err != nil {
return i18n.WrapError(ctx, err, i18n.MsgJSONObjectParseFailed, s)
}
if t.LocalID == nil {
return i18n.NewError(ctx, i18n.MsgInvalidUUID)
}
// The LocalID is the only thing that needs to be read back out when processing an event
// (everything else should be unpacked from the event)
transfer.LocalID = t.LocalID
return nil
}
36 changes: 31 additions & 5 deletions internal/txcommon/token_inputs_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2021 Kaleido, Inc.
// Copyright © 2022 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -99,34 +99,60 @@ func TestAddTokenTransferInputs(t *testing.T) {
op := &fftypes.Operation{}
transfer := &fftypes.TokenTransfer{
LocalID: fftypes.NewUUID(),
Type: fftypes.TokenTransferTypeTransfer,
Amount: *fftypes.NewFFBigInt(1),
TX: fftypes.TransactionRef{
Type: fftypes.TransactionTypeTokenTransfer,
ID: fftypes.NewUUID(),
},
}

AddTokenTransferInputs(op, transfer)
assert.Equal(t, transfer.LocalID.String(), op.Input.GetString("id"))
assert.Equal(t, fftypes.JSONObject{
"amount": "1",
"localId": transfer.LocalID.String(),
"tx": map[string]interface{}{
"id": transfer.TX.ID.String(),
"type": "token_transfer",
},
"type": "transfer",
}, op.Input)
}

func TestRetrieveTokenTransferInputs(t *testing.T) {
id := fftypes.NewUUID()
op := &fftypes.Operation{
Input: fftypes.JSONObject{
"id": id.String(),
"amount": "1",
"localId": id.String(),
},
}
transfer := &fftypes.TokenTransfer{}
transfer := &fftypes.TokenTransfer{Amount: *fftypes.NewFFBigInt(2)}

err := RetrieveTokenTransferInputs(context.Background(), op, transfer)
assert.NoError(t, err)
assert.Equal(t, *id, *transfer.LocalID)
assert.Equal(t, int64(2), transfer.Amount.Int().Int64())
}

func TestRetrieveTokenTransferInputsBadID(t *testing.T) {
op := &fftypes.Operation{
Input: fftypes.JSONObject{
"id": "bad",
"localId": "bad",
},
}
transfer := &fftypes.TokenTransfer{}

err := RetrieveTokenTransferInputs(context.Background(), op, transfer)
assert.Regexp(t, "FF10151", err)
}

func TestRetrieveTokenTransferInputsMissingID(t *testing.T) {
op := &fftypes.Operation{
Input: fftypes.JSONObject{},
}
transfer := &fftypes.TokenTransfer{}

err := RetrieveTokenTransferInputs(context.Background(), op, transfer)
assert.Regexp(t, "FF10142", err)
}
6 changes: 3 additions & 3 deletions pkg/fftypes/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,16 @@ type Named interface {
Name() string
}

// NewTXOperation creates a new operation for a transaction
func NewTXOperation(plugin Named, namespace string, tx *UUID, backendID string, opType OpType, opStatus OpStatus) *Operation {
// NewOperation creates a new operation in a transaction
func NewOperation(plugin Named, namespace string, tx *UUID, backendID string, opType OpType) *Operation {
return &Operation{
ID: NewUUID(),
Namespace: namespace,
Plugin: plugin.Name(),
BackendID: backendID,
Transaction: tx,
Type: opType,
Status: opStatus,
Status: OpStatusPending,
Created: Now(),
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/fftypes/operation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (f *fakePlugin) Name() string { return "fake" }
func TestNewPendingMessageOp(t *testing.T) {

txID := NewUUID()
op := NewTXOperation(&fakePlugin{}, "ns1", txID, "testBackend", OpTypePublicStorageBatchBroadcast, OpStatusPending)
op := NewOperation(&fakePlugin{}, "ns1", txID, "testBackend", OpTypePublicStorageBatchBroadcast)
assert.Equal(t, Operation{
ID: op.ID,
Namespace: "ns1",
Expand Down