From 25848769615055977db3d175ec0b4869af758e0b Mon Sep 17 00:00:00 2001 From: Andrew Richardson Date: Mon, 27 Sep 2021 13:00:00 -0400 Subject: [PATCH 1/6] Announce token pool info via broadcast Rather than requiring all token pool metadata (namespace, name, id) to be written to the blockchain, send it out via a broadcast message after creating the actual pool on chain. Signed-off-by: Andrew Richardson --- internal/assets/manager.go | 50 +- internal/assets/manager_test.go | 14 +- internal/assets/token_pool_created.go | 103 +-- internal/assets/token_pool_created_test.go | 278 +++----- internal/broadcast/manager.go | 1 + internal/broadcast/tokenpool.go | 38 ++ internal/broadcast/tokenpool_test.go | 137 ++++ internal/orchestrator/bound_callbacks.go | 5 +- internal/orchestrator/bound_callbacks_test.go | 6 +- internal/orchestrator/orchestrator.go | 4 +- internal/syshandlers/syshandler.go | 10 +- internal/syshandlers/syshandler_test.go | 4 +- internal/syshandlers/syshandler_tokenpool.go | 128 ++++ .../syshandlers/syshandler_tokenpool_test.go | 601 ++++++++++++++++++ internal/tokens/fftokens/fftokens.go | 70 +- internal/tokens/fftokens/fftokens_test.go | 44 +- mocks/assetmocks/manager.go | 24 +- mocks/broadcastmocks/manager.go | 23 + mocks/tokenmocks/callbacks.go | 10 +- pkg/fftypes/constants.go | 3 + pkg/fftypes/operation.go | 2 + pkg/fftypes/tokenpool.go | 27 + pkg/fftypes/tokenpool_test.go | 60 ++ pkg/tokens/plugin.go | 2 +- 24 files changed, 1283 insertions(+), 361 deletions(-) create mode 100644 internal/broadcast/tokenpool.go create mode 100644 internal/broadcast/tokenpool_test.go create mode 100644 internal/syshandlers/syshandler_tokenpool.go create mode 100644 internal/syshandlers/syshandler_tokenpool_test.go create mode 100644 pkg/fftypes/tokenpool_test.go diff --git a/internal/assets/manager.go b/internal/assets/manager.go index d37e9e1728..68c8149466 100644 --- a/internal/assets/manager.go +++ b/internal/assets/manager.go @@ -18,7 +18,9 @@ package assets import ( "context" + "fmt" + "github.com/hyperledger/firefly/internal/broadcast" "github.com/hyperledger/firefly/internal/config" "github.com/hyperledger/firefly/internal/data" "github.com/hyperledger/firefly/internal/i18n" @@ -37,9 +39,10 @@ type Manager interface { GetTokenPools(ctx context.Context, ns, typeName string, filter database.AndFilter) ([]*fftypes.TokenPool, *database.FilterResult, error) GetTokenPool(ctx context.Context, ns, typeName, name string) (*fftypes.TokenPool, error) GetTokenAccounts(ctx context.Context, ns, typeName, name string, filter database.AndFilter) ([]*fftypes.TokenAccount, *database.FilterResult, error) + ValidateTokenPoolTx(ctx context.Context, pool *fftypes.TokenPool, protocolTxID string) error // Bound token callbacks - TokenPoolCreated(tk tokens.Plugin, pool *fftypes.TokenPool, signingIdentity string, protocolTxID string, additionalInfo fftypes.JSONObject) error + TokenPoolCreated(tk tokens.Plugin, tokenType fftypes.TokenType, tx *fftypes.UUID, protocolID, signingIdentity, protocolTxID string, additionalInfo fftypes.JSONObject) error Start() error WaitStop() @@ -51,12 +54,13 @@ type assetManager struct { identity identity.Plugin data data.Manager syncasync syncasync.Bridge + broadcast broadcast.Manager tokens map[string]tokens.Plugin retry retry.Retry txhelper txcommon.Helper } -func NewAssetManager(ctx context.Context, di database.Plugin, ii identity.Plugin, dm data.Manager, sa syncasync.Bridge, ti map[string]tokens.Plugin) (Manager, error) { +func NewAssetManager(ctx context.Context, di database.Plugin, ii identity.Plugin, dm data.Manager, sa syncasync.Bridge, bm broadcast.Manager, ti map[string]tokens.Plugin) (Manager, error) { if di == nil || ii == nil || sa == nil || ti == nil { return nil, i18n.NewError(ctx, i18n.MsgInitializationNilDepError) } @@ -66,6 +70,7 @@ func NewAssetManager(ctx context.Context, di database.Plugin, ii identity.Plugin identity: ii, data: dm, syncasync: sa, + broadcast: bm, tokens: ti, retry: retry.Retry{ InitialDelay: config.GetDuration(config.AssetManagerRetryInitialDelay), @@ -86,6 +91,28 @@ func (am *assetManager) selectTokenPlugin(ctx context.Context, name string) (tok return nil, i18n.NewError(ctx, i18n.MsgUnknownTokensPlugin, name) } +func storeTokenOpInputs(op *fftypes.Operation, pool *fftypes.TokenPool) { + op.Input = fftypes.JSONObject{ + "id": pool.ID.String(), + "namespace": pool.Namespace, + "name": pool.Name, + } +} + +func retrieveTokenOpInputs(ctx context.Context, op *fftypes.Operation, pool *fftypes.TokenPool) (err error) { + input := &op.Input + pool.ID, err = fftypes.ParseUUID(ctx, input.GetString("id")) + if err != nil { + return err + } + pool.Namespace = input.GetString("namespace") + pool.Name = input.GetString("name") + if pool.Namespace == "" || pool.Name == "" { + return fmt.Errorf("namespace or name missing from inputs") + } + return nil +} + func (am *assetManager) CreateTokenPool(ctx context.Context, ns string, typeName string, pool *fftypes.TokenPool, waitConfirm bool) (*fftypes.TokenPool, error) { return am.CreateTokenPoolWithID(ctx, ns, fftypes.NewUUID(), typeName, pool, waitConfirm) } @@ -132,6 +159,13 @@ func (am *assetManager) CreateTokenPoolWithID(ctx context.Context, ns string, id return nil, err } + pool.ID = id + pool.Namespace = ns + pool.TX = fftypes.TransactionRef{ + ID: tx.ID, + Type: tx.Subject.Type, + } + op := fftypes.NewTXOperation( plugin, ns, @@ -140,17 +174,12 @@ func (am *assetManager) CreateTokenPoolWithID(ctx context.Context, ns string, id fftypes.OpTypeTokensCreatePool, fftypes.OpStatusPending, author.Identifier) + storeTokenOpInputs(op, pool) err = am.database.UpsertOperation(ctx, op, false) if err != nil { return nil, err } - pool.ID = id - pool.Namespace = ns - pool.TX = fftypes.TransactionRef{ - ID: tx.ID, - Type: tx.Subject.Type, - } return pool, plugin.CreateTokenPool(ctx, op.ID, author, pool) } @@ -189,6 +218,11 @@ func (am *assetManager) GetTokenAccounts(ctx context.Context, ns, typeName, name return am.database.GetTokenAccounts(ctx, filter.Condition(filter.Builder().Eq("protocolid", pool.ProtocolID))) } +func (am *assetManager) ValidateTokenPoolTx(ctx context.Context, pool *fftypes.TokenPool, protocolTxID string) error { + // TODO: validate that the given token pool was created with the given protocolTxId + return nil +} + func (am *assetManager) Start() error { return nil } diff --git a/internal/assets/manager_test.go b/internal/assets/manager_test.go index 53935d6415..9d0627e489 100644 --- a/internal/assets/manager_test.go +++ b/internal/assets/manager_test.go @@ -21,6 +21,7 @@ import ( "github.com/hyperledger/firefly/internal/config" "github.com/hyperledger/firefly/internal/syncasync" + "github.com/hyperledger/firefly/mocks/broadcastmocks" "github.com/hyperledger/firefly/mocks/databasemocks" "github.com/hyperledger/firefly/mocks/datamocks" "github.com/hyperledger/firefly/mocks/identitymocks" @@ -40,12 +41,13 @@ func newTestAssets(t *testing.T) (*assetManager, func()) { mii := &identitymocks.Plugin{} mdm := &datamocks.Manager{} msa := &syncasyncmocks.Bridge{} + mbm := &broadcastmocks.Manager{} mti := &tokenmocks.Plugin{} mti.On("Name").Return("ut_tokens").Maybe() defaultIdentity := &fftypes.Identity{Identifier: "UTNodeID", OnChain: "0x12345"} mii.On("Resolve", mock.Anything, "UTNodeID").Return(defaultIdentity, nil).Maybe() ctx, cancel := context.WithCancel(context.Background()) - a, err := NewAssetManager(ctx, mdi, mii, mdm, msa, map[string]tokens.Plugin{"magic-tokens": mti}) + a, err := NewAssetManager(ctx, mdi, mii, mdm, msa, mbm, map[string]tokens.Plugin{"magic-tokens": mti}) rag := mdi.On("RunAsGroup", ctx, mock.Anything).Maybe() rag.RunFn = func(a mock.Arguments) { rag.ReturnArguments = mock.Arguments{a[1].(func(context.Context) error)(a[0].(context.Context))} @@ -55,7 +57,7 @@ func newTestAssets(t *testing.T) (*assetManager, func()) { } func TestInitFail(t *testing.T) { - _, err := NewAssetManager(context.Background(), nil, nil, nil, nil, nil) + _, err := NewAssetManager(context.Background(), nil, nil, nil, nil, nil, nil) assert.Regexp(t, "FF10128", err) } @@ -289,3 +291,11 @@ func TestGetTokenAccountsBadPool(t *testing.T) { _, _, err := am.GetTokenAccounts(context.Background(), "ns1", "magic-tokens", "test", f) assert.EqualError(t, err, "pop") } + +func TestValidateTokenPoolTx(t *testing.T) { + am, cancel := newTestAssets(t) + defer cancel() + + err := am.ValidateTokenPoolTx(context.Background(), nil, "") + assert.NoError(t, err) +} diff --git a/internal/assets/token_pool_created.go b/internal/assets/token_pool_created.go index bcdb042dca..eb1a0f6fc0 100644 --- a/internal/assets/token_pool_created.go +++ b/internal/assets/token_pool_created.go @@ -17,69 +17,78 @@ package assets import ( - "context" - "github.com/hyperledger/firefly/internal/log" "github.com/hyperledger/firefly/pkg/database" "github.com/hyperledger/firefly/pkg/fftypes" "github.com/hyperledger/firefly/pkg/tokens" ) -func (am *assetManager) persistTokenPoolTransaction(ctx context.Context, pool *fftypes.TokenPool, signingIdentity string, protocolTxID string, additionalInfo fftypes.JSONObject) (valid bool, err error) { - if pool.ID == nil || pool.TX.ID == nil { - log.L(ctx).Errorf("Invalid token pool '%s'. Missing ID (%v) or transaction ID (%v)", pool.ID, pool.ID, pool.TX.ID) - return false, nil // this is not retryable +func (am *assetManager) TokenPoolCreated(tk tokens.Plugin, tokenType fftypes.TokenType, tx *fftypes.UUID, protocolID, signingIdentity, protocolTxID string, additionalInfo fftypes.JSONObject) error { + // Find a matching operation within this transaction + fb := database.OperationQueryFactory.NewFilter(am.ctx) + filter := fb.And( + fb.Eq("tx", tx), + fb.Eq("type", fftypes.OpTypeTokensCreatePool), + ) + operations, _, err := am.database.GetOperations(am.ctx, filter) + if err != nil || len(operations) == 0 { + log.L(am.ctx).Debugf("Token pool transaction '%s' ignored, as it was not submitted by this node", tx) + return nil + } + + pool := &fftypes.TokenPoolAnnouncement{ + TokenPool: fftypes.TokenPool{ + Type: tokenType, + ProtocolID: protocolID, + Author: signingIdentity, + }, + ProtocolTxID: protocolTxID, } - return am.txhelper.PersistTransaction(ctx, &fftypes.Transaction{ - ID: pool.TX.ID, + err = retrieveTokenOpInputs(am.ctx, operations[0], &pool.TokenPool) + if err != nil { + log.L(am.ctx).Errorf("Error retrieving pool info from transaction '%s' (%s) - ignoring: %v", tx, err, operations[0].Input) + return nil + } + + // Update the transaction with the info received (but leave transaction as "pending") + transaction := &fftypes.Transaction{ + ID: tx, + Status: fftypes.OpStatusPending, Subject: fftypes.TransactionSubject{ Namespace: pool.Namespace, - Type: pool.TX.Type, + Type: fftypes.TransactionTypeTokenPool, Signer: signingIdentity, Reference: pool.ID, }, ProtocolID: protocolTxID, Info: additionalInfo, - }) -} - -func (am *assetManager) persistTokenPool(ctx context.Context, pool *fftypes.TokenPool) (valid bool, err error) { - l := log.L(ctx) - if err := fftypes.ValidateFFNameField(ctx, pool.Name, "name"); err != nil { - l.Errorf("Invalid token pool '%s' - invalid name '%s': %a", pool.ID, pool.Name, err) - return false, nil // This is not retryable } - err = am.database.UpsertTokenPool(ctx, pool) - if err != nil { - if err == database.IDMismatch { - log.L(ctx).Errorf("Invalid token pool '%s'. ID mismatch with existing record", pool.ID) - return false, nil // This is not retryable + pool.TX.ID = transaction.ID + pool.TX.Type = transaction.Subject.Type + + // Add a new operation for the announcement + op := fftypes.NewTXOperation( + tk, + pool.Namespace, + tx, + "", + fftypes.OpTypeTokensAnnouncePool, + fftypes.OpStatusPending, + signingIdentity) + + var valid bool + err = am.retry.Do(am.ctx, "persist token pool transaction", func(attempt int) (bool, error) { + valid, err = am.txhelper.PersistTransaction(am.ctx, transaction) + if valid && err == nil { + err = am.database.UpsertOperation(am.ctx, op, false) } - l.Errorf("Failed to insert token pool '%s': %s", pool.ID, err) - return false, err // a persistence failure here is considered retryable (so returned) + return err != nil, err + }) + if !valid || err != nil { + return err } - return true, nil -} -func (am *assetManager) TokenPoolCreated(tk tokens.Plugin, pool *fftypes.TokenPool, signingIdentity string, protocolTxID string, additionalInfo fftypes.JSONObject) error { - return am.retry.Do(am.ctx, "persist token pool", func(attempt int) (bool, error) { - err := am.database.RunAsGroup(am.ctx, func(ctx context.Context) error { - valid, err := am.persistTokenPoolTransaction(ctx, pool, signingIdentity, protocolTxID, additionalInfo) - if valid && err == nil { - valid, err = am.persistTokenPool(ctx, pool) - } - if err != nil { - return err - } - if !valid { - log.L(ctx).Warnf("Token pool rejected id=%s author=%s", pool.ID, signingIdentity) - event := fftypes.NewEvent(fftypes.EventTypePoolRejected, pool.Namespace, pool.ID) - return am.database.InsertEvent(ctx, event) - } - log.L(ctx).Infof("Token pool created id=%s author=%s", pool.ID, signingIdentity) - event := fftypes.NewEvent(fftypes.EventTypePoolConfirmed, pool.Namespace, pool.ID) - return am.database.InsertEvent(ctx, event) - }) - return err != nil, err // retry indefinitely (until context closes) - }) + // Announce the details of the new token pool + _, err = am.broadcast.BroadcastTokenPool(am.ctx, pool.Namespace, pool, false) + return err } diff --git a/internal/assets/token_pool_created_test.go b/internal/assets/token_pool_created_test.go index 484f931023..5c0419e2d5 100644 --- a/internal/assets/token_pool_created_test.go +++ b/internal/assets/token_pool_created_test.go @@ -17,9 +17,9 @@ package assets import ( - "fmt" "testing" + "github.com/hyperledger/firefly/mocks/broadcastmocks" "github.com/hyperledger/firefly/mocks/databasemocks" "github.com/hyperledger/firefly/mocks/tokenmocks" "github.com/hyperledger/firefly/pkg/database" @@ -33,267 +33,149 @@ func TestTokenPoolCreatedSuccess(t *testing.T) { defer cancel() mdi := am.database.(*databasemocks.Plugin) mti := &tokenmocks.Plugin{} - - pool := &fftypes.TokenPool{ - ID: fftypes.NewUUID(), - TX: fftypes.TransactionRef{ - ID: fftypes.NewUUID(), - Type: fftypes.TransactionTypeTokenPool, + mbm := am.broadcast.(*broadcastmocks.Manager) + + poolID := fftypes.NewUUID() + txID := fftypes.NewUUID() + operations := []*fftypes.Operation{ + { + ID: fftypes.NewUUID(), + Input: fftypes.JSONObject{ + "id": poolID.String(), + "namespace": "test-ns", + "name": "my-pool", + }, }, - Namespace: "test-ns", - Name: "my-pool", } - mdi.On("GetTransactionByID", mock.Anything, pool.TX.ID).Return(nil, nil) + mti.On("Name").Return("mock-tokens") + mdi.On("GetOperations", am.ctx, mock.Anything).Return(operations, nil, nil) + mdi.On("GetTransactionByID", mock.Anything, txID).Return(nil, nil) mdi.On("UpsertTransaction", am.ctx, mock.MatchedBy(func(tx *fftypes.Transaction) bool { return tx.Subject.Type == fftypes.TransactionTypeTokenPool }), false).Return(nil) - mdi.On("UpsertTokenPool", am.ctx, pool).Return(nil) - mdi.On("InsertEvent", am.ctx, mock.MatchedBy(func(ev *fftypes.Event) bool { - return ev.Type == fftypes.EventTypePoolConfirmed && ev.Reference == pool.ID && ev.Namespace == pool.Namespace - })).Return(nil) + mdi.On("UpsertOperation", am.ctx, mock.MatchedBy(func(op *fftypes.Operation) bool { + return op.Type == fftypes.OpTypeTokensAnnouncePool + }), false).Return(nil) + mbm.On("BroadcastTokenPool", am.ctx, "test-ns", mock.MatchedBy(func(pool *fftypes.TokenPoolAnnouncement) bool { + return pool.Namespace == "test-ns" && pool.Name == "my-pool" && *pool.ID == *poolID + }), false).Return(nil, nil) info := fftypes.JSONObject{"some": "info"} - err := am.TokenPoolCreated(mti, pool, "0x12345", "tx1", info) + err := am.TokenPoolCreated(mti, fftypes.TokenTypeFungible, txID, "123", "0x0", "tx1", info) assert.NoError(t, err) - mdi.AssertExpectations(t) -} - -func TestTokenPoolMissingID(t *testing.T) { - am, cancel := newTestAssets(t) - defer cancel() - mdi := am.database.(*databasemocks.Plugin) - mti := &tokenmocks.Plugin{} - - pool := &fftypes.TokenPool{} - - mdi.On("InsertEvent", am.ctx, mock.MatchedBy(func(ev *fftypes.Event) bool { - return ev.Type == fftypes.EventTypePoolRejected && ev.Reference == pool.ID && ev.Namespace == pool.Namespace - })).Return(nil) - info := fftypes.JSONObject{"some": "info"} - err := am.TokenPoolCreated(mti, pool, "0x12345", "tx1", info) - assert.NoError(t, err) mdi.AssertExpectations(t) + mbm.AssertExpectations(t) } -func TestTokenPoolBadNamespace(t *testing.T) { +func TestTokenPoolCreatedOpNotFound(t *testing.T) { am, cancel := newTestAssets(t) defer cancel() mdi := am.database.(*databasemocks.Plugin) mti := &tokenmocks.Plugin{} + mbm := am.broadcast.(*broadcastmocks.Manager) - pool := &fftypes.TokenPool{ - ID: fftypes.NewUUID(), - TX: fftypes.TransactionRef{ - ID: fftypes.NewUUID(), - Type: fftypes.TransactionTypeTokenPool, - }, - } + txID := fftypes.NewUUID() + operations := []*fftypes.Operation{} - mdi.On("InsertEvent", am.ctx, mock.MatchedBy(func(ev *fftypes.Event) bool { - return ev.Type == fftypes.EventTypePoolRejected && ev.Reference == pool.ID && ev.Namespace == pool.Namespace - })).Return(nil) + mti.On("Name").Return("mock-tokens") + mdi.On("GetOperations", am.ctx, mock.Anything).Return(operations, nil, nil) info := fftypes.JSONObject{"some": "info"} - err := am.TokenPoolCreated(mti, pool, "0x12345", "tx1", info) + err := am.TokenPoolCreated(mti, fftypes.TokenTypeFungible, txID, "123", "0x0", "tx1", info) assert.NoError(t, err) + mdi.AssertExpectations(t) + mbm.AssertExpectations(t) } -func TestTokenPoolBadName(t *testing.T) { +func TestTokenPoolMissingID(t *testing.T) { am, cancel := newTestAssets(t) defer cancel() mdi := am.database.(*databasemocks.Plugin) mti := &tokenmocks.Plugin{} + mbm := am.broadcast.(*broadcastmocks.Manager) - pool := &fftypes.TokenPool{ - ID: fftypes.NewUUID(), - TX: fftypes.TransactionRef{ - ID: fftypes.NewUUID(), - Type: fftypes.TransactionTypeTokenPool, + txID := fftypes.NewUUID() + operations := []*fftypes.Operation{ + { + ID: fftypes.NewUUID(), + Input: fftypes.JSONObject{}, }, - Namespace: "test-ns", } - mdi.On("GetTransactionByID", mock.Anything, pool.TX.ID).Return(nil, nil) - mdi.On("UpsertTransaction", am.ctx, mock.MatchedBy(func(tx *fftypes.Transaction) bool { - return tx.Subject.Type == fftypes.TransactionTypeTokenPool - }), false).Return(nil) - mdi.On("InsertEvent", am.ctx, mock.MatchedBy(func(ev *fftypes.Event) bool { - return ev.Type == fftypes.EventTypePoolRejected && ev.Reference == pool.ID && ev.Namespace == pool.Namespace - })).Return(nil) + mti.On("Name").Return("mock-tokens") + mdi.On("GetOperations", am.ctx, mock.Anything).Return(operations, nil, nil) info := fftypes.JSONObject{"some": "info"} - err := am.TokenPoolCreated(mti, pool, "0x12345", "tx1", info) + err := am.TokenPoolCreated(mti, fftypes.TokenTypeFungible, txID, "123", "0x0", "tx1", info) assert.NoError(t, err) - mdi.AssertExpectations(t) -} - -func TestTokenPoolGetTransactionFail(t *testing.T) { - am, cancel := newTestAssets(t) - defer cancel() - mdi := am.database.(*databasemocks.Plugin) - pool := &fftypes.TokenPool{ - ID: fftypes.NewUUID(), - TX: fftypes.TransactionRef{ - ID: fftypes.NewUUID(), - Type: fftypes.TransactionTypeTokenPool, - }, - Namespace: "test-ns", - Name: "my-pool", - } - - mdi.On("GetTransactionByID", mock.Anything, mock.Anything).Return(nil, fmt.Errorf("pop")) - - info := fftypes.JSONObject{"some": "info"} - valid, err := am.persistTokenPoolTransaction(am.ctx, pool, "0x12345", "tx1", info) - assert.EqualError(t, err, "pop") - assert.False(t, valid) mdi.AssertExpectations(t) + mbm.AssertExpectations(t) } -func TestTokenPoolGetTransactionInvalidMatch(t *testing.T) { +func TestTokenPoolCreatedMissingNamespace(t *testing.T) { am, cancel := newTestAssets(t) defer cancel() mdi := am.database.(*databasemocks.Plugin) - - pool := &fftypes.TokenPool{ - ID: fftypes.NewUUID(), - TX: fftypes.TransactionRef{ - ID: fftypes.NewUUID(), - Type: fftypes.TransactionTypeTokenPool, + mti := &tokenmocks.Plugin{} + mbm := am.broadcast.(*broadcastmocks.Manager) + + poolID := fftypes.NewUUID() + txID := fftypes.NewUUID() + operations := []*fftypes.Operation{ + { + ID: fftypes.NewUUID(), + Input: fftypes.JSONObject{ + "id": poolID.String(), + }, }, - Namespace: "test-ns", - Name: "my-pool", } - mdi.On("GetTransactionByID", mock.Anything, mock.Anything).Return(&fftypes.Transaction{ - ID: fftypes.NewUUID(), // wrong - }, nil) + mti.On("Name").Return("mock-tokens") + mdi.On("GetOperations", am.ctx, mock.Anything).Return(operations, nil, nil) info := fftypes.JSONObject{"some": "info"} - valid, err := am.persistTokenPoolTransaction(am.ctx, pool, "0x12345", "tx1", info) + err := am.TokenPoolCreated(mti, fftypes.TokenTypeFungible, txID, "123", "0x0", "tx1", info) assert.NoError(t, err) - assert.False(t, valid) - mdi.AssertExpectations(t) -} -func TestTokenPoolNewTXUpsertFail(t *testing.T) { - am, cancel := newTestAssets(t) - defer cancel() - mdi := am.database.(*databasemocks.Plugin) - - pool := &fftypes.TokenPool{ - ID: fftypes.NewUUID(), - TX: fftypes.TransactionRef{ - ID: fftypes.NewUUID(), - Type: fftypes.TransactionTypeTokenPool, - }, - Namespace: "test-ns", - Name: "my-pool", - } - - mdi.On("GetTransactionByID", mock.Anything, mock.Anything).Return(nil, nil) - mdi.On("UpsertTransaction", mock.Anything, mock.Anything, false).Return(fmt.Errorf("pop")) - - info := fftypes.JSONObject{"some": "info"} - valid, err := am.persistTokenPoolTransaction(am.ctx, pool, "0x12345", "tx1", info) - assert.EqualError(t, err, "pop") - assert.False(t, valid) mdi.AssertExpectations(t) + mbm.AssertExpectations(t) } -func TestTokenPoolExistingTXHashMismatch(t *testing.T) { +func TestTokenPoolCreatedUpsertFail(t *testing.T) { am, cancel := newTestAssets(t) defer cancel() mdi := am.database.(*databasemocks.Plugin) - - pool := &fftypes.TokenPool{ - ID: fftypes.NewUUID(), - TX: fftypes.TransactionRef{ - ID: fftypes.NewUUID(), - Type: fftypes.TransactionTypeTokenPool, - }, - Namespace: "test-ns", - Name: "my-pool", - } - - mdi.On("GetTransactionByID", mock.Anything, mock.Anything).Return(&fftypes.Transaction{ - Subject: fftypes.TransactionSubject{ - Type: fftypes.TransactionTypeTokenPool, - Namespace: pool.Namespace, - Signer: "0x12345", - Reference: pool.ID, - }, - }, nil) - mdi.On("UpsertTransaction", mock.Anything, mock.Anything, false).Return(database.HashMismatch) - - info := fftypes.JSONObject{"some": "info"} - valid, err := am.persistTokenPoolTransaction(am.ctx, pool, "0x12345", "tx1", info) - assert.NoError(t, err) - assert.False(t, valid) - mdi.AssertExpectations(t) -} - -func TestTokenPoolIDMismatch(t *testing.T) { - em, cancel := newTestAssets(t) - defer cancel() - mdi := em.database.(*databasemocks.Plugin) mti := &tokenmocks.Plugin{} - - pool := &fftypes.TokenPool{ - ID: fftypes.NewUUID(), - TX: fftypes.TransactionRef{ - ID: fftypes.NewUUID(), - Type: fftypes.TransactionTypeTokenPool, + mbm := am.broadcast.(*broadcastmocks.Manager) + + poolID := fftypes.NewUUID() + txID := fftypes.NewUUID() + operations := []*fftypes.Operation{ + { + ID: fftypes.NewUUID(), + Input: fftypes.JSONObject{ + "id": poolID.String(), + "namespace": "test-ns", + "name": "my-pool", + }, }, - Namespace: "test-ns", - Name: "my-pool", } - mdi.On("GetTransactionByID", mock.Anything, pool.TX.ID).Return(nil, nil) - mdi.On("UpsertTransaction", em.ctx, mock.MatchedBy(func(tx *fftypes.Transaction) bool { + mti.On("Name").Return("mock-tokens") + mdi.On("GetOperations", am.ctx, mock.Anything).Return(operations, nil, nil) + mdi.On("GetTransactionByID", mock.Anything, txID).Return(nil, nil) + mdi.On("UpsertTransaction", am.ctx, mock.MatchedBy(func(tx *fftypes.Transaction) bool { return tx.Subject.Type == fftypes.TransactionTypeTokenPool - }), false).Return(nil) - mdi.On("UpsertTokenPool", em.ctx, pool).Return(database.IDMismatch) - mdi.On("InsertEvent", em.ctx, mock.MatchedBy(func(ev *fftypes.Event) bool { - return ev.Type == fftypes.EventTypePoolRejected && ev.Reference == pool.ID && ev.Namespace == pool.Namespace - })).Return(nil) + }), false).Return(database.HashMismatch) info := fftypes.JSONObject{"some": "info"} - err := em.TokenPoolCreated(mti, pool, "0x12345", "tx1", info) + err := am.TokenPoolCreated(mti, fftypes.TokenTypeFungible, txID, "123", "0x0", "tx1", info) assert.NoError(t, err) - mdi.AssertExpectations(t) -} -func TestTokenPoolUpsertFailAndRetry(t *testing.T) { - em, cancel := newTestAssets(t) - defer cancel() - mdi := em.database.(*databasemocks.Plugin) - mti := &tokenmocks.Plugin{} - - pool := &fftypes.TokenPool{ - ID: fftypes.NewUUID(), - TX: fftypes.TransactionRef{ - ID: fftypes.NewUUID(), - Type: fftypes.TransactionTypeTokenPool, - }, - Namespace: "test-ns", - Name: "my-pool", - } - - mdi.On("GetTransactionByID", mock.Anything, pool.TX.ID).Return(nil, nil) - mdi.On("UpsertTransaction", mock.Anything, mock.Anything, false).Return(nil) - mdi.On("UpsertTokenPool", em.ctx, pool).Return(fmt.Errorf("pop")).Once() - mdi.On("UpsertTokenPool", em.ctx, pool).Return(nil).Once() - mdi.On("InsertEvent", em.ctx, mock.MatchedBy(func(ev *fftypes.Event) bool { - return ev.Type == fftypes.EventTypePoolConfirmed && ev.Reference == pool.ID && ev.Namespace == pool.Namespace - })).Return(nil) - - info := fftypes.JSONObject{"some": "info"} - err := em.TokenPoolCreated(mti, pool, "0x12345", "tx1", info) - assert.NoError(t, err) mdi.AssertExpectations(t) + mbm.AssertExpectations(t) } diff --git a/internal/broadcast/manager.go b/internal/broadcast/manager.go index 36aaf8faa9..bf2f8dcc24 100644 --- a/internal/broadcast/manager.go +++ b/internal/broadcast/manager.go @@ -41,6 +41,7 @@ type Manager interface { BroadcastMessage(ctx context.Context, ns string, in *fftypes.MessageInOut, waitConfirm bool) (out *fftypes.Message, err error) BroadcastMessageWithID(ctx context.Context, ns string, id *fftypes.UUID, unresolved *fftypes.MessageInOut, resolved *fftypes.Message, waitConfirm bool) (out *fftypes.Message, err error) BroadcastDefinition(ctx context.Context, def fftypes.Definition, signingIdentity *fftypes.Identity, tag fftypes.SystemTag, waitConfirm bool) (msg *fftypes.Message, err error) + BroadcastTokenPool(ctx context.Context, ns string, pool *fftypes.TokenPoolAnnouncement, waitConfirm bool) (msg *fftypes.Message, err error) GetNodeSigningIdentity(ctx context.Context) (*fftypes.Identity, error) Start() error WaitStop() diff --git a/internal/broadcast/tokenpool.go b/internal/broadcast/tokenpool.go new file mode 100644 index 0000000000..43fe8d319a --- /dev/null +++ b/internal/broadcast/tokenpool.go @@ -0,0 +1,38 @@ +// Copyright © 2021 Kaleido, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package broadcast + +import ( + "context" + + "github.com/hyperledger/firefly/pkg/fftypes" +) + +func (bm *broadcastManager) BroadcastTokenPool(ctx context.Context, ns string, pool *fftypes.TokenPoolAnnouncement, waitConfirm bool) (msg *fftypes.Message, err error) { + if err := pool.Validate(ctx, false); err != nil { + return nil, err + } + if err := bm.data.VerifyNamespaceExists(ctx, pool.Namespace); err != nil { + return nil, err + } + + msg, err = bm.broadcastDefinitionAsNode(ctx, pool, fftypes.SystemTagDefinePool, waitConfirm) + if msg != nil { + pool.Message = msg.Header.ID + } + return msg, err +} diff --git a/internal/broadcast/tokenpool_test.go b/internal/broadcast/tokenpool_test.go new file mode 100644 index 0000000000..a9ede71cc5 --- /dev/null +++ b/internal/broadcast/tokenpool_test.go @@ -0,0 +1,137 @@ +// Copyright © 2021 Kaleido, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package broadcast + +import ( + "context" + "fmt" + "testing" + + "github.com/hyperledger/firefly/mocks/databasemocks" + "github.com/hyperledger/firefly/mocks/datamocks" + "github.com/hyperledger/firefly/pkg/fftypes" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" +) + +func TestBroadcastTokenPoolNSGetFail(t *testing.T) { + bm, cancel := newTestBroadcast(t) + defer cancel() + mdm := bm.data.(*datamocks.Manager) + + pool := &fftypes.TokenPoolAnnouncement{ + TokenPool: fftypes.TokenPool{ + ID: fftypes.NewUUID(), + Namespace: "ns1", + Name: "mypool", + Type: fftypes.TokenTypeNonFungible, + ProtocolID: "N1", + Symbol: "COIN", + }, + ProtocolTxID: "tx123", + } + + mdm.On("VerifyNamespaceExists", mock.Anything, "ns1").Return(fmt.Errorf("pop")) + + _, err := bm.BroadcastTokenPool(context.Background(), "ns1", pool, false) + assert.EqualError(t, err, "pop") + + mdm.AssertExpectations(t) +} + +func TestBroadcastTokenPoolInvalid(t *testing.T) { + bm, cancel := newTestBroadcast(t) + defer cancel() + mdi := bm.database.(*databasemocks.Plugin) + mdm := bm.data.(*datamocks.Manager) + + pool := &fftypes.TokenPoolAnnouncement{ + TokenPool: fftypes.TokenPool{ + ID: fftypes.NewUUID(), + Namespace: "", + Name: "", + Type: fftypes.TokenTypeNonFungible, + ProtocolID: "N1", + Symbol: "COIN", + }, + ProtocolTxID: "tx123", + } + + _, err := bm.BroadcastTokenPool(context.Background(), "ns1", pool, false) + assert.Regexp(t, "FF10131", err) + + mdi.AssertExpectations(t) + mdm.AssertExpectations(t) +} + +func TestBroadcastTokenPoolBroadcastFail(t *testing.T) { + bm, cancel := newTestBroadcast(t) + defer cancel() + mdi := bm.database.(*databasemocks.Plugin) + mdm := bm.data.(*datamocks.Manager) + + pool := &fftypes.TokenPoolAnnouncement{ + TokenPool: fftypes.TokenPool{ + ID: fftypes.NewUUID(), + Namespace: "ns1", + Name: "mypool", + Type: fftypes.TokenTypeNonFungible, + ProtocolID: "N1", + Symbol: "COIN", + }, + ProtocolTxID: "tx123", + } + + mdm.On("VerifyNamespaceExists", mock.Anything, "ns1").Return(nil) + mdi.On("UpsertData", mock.Anything, mock.Anything, true, false).Return(nil) + mdi.On("InsertMessageLocal", mock.Anything, mock.Anything).Return(fmt.Errorf("pop")) + + _, err := bm.BroadcastTokenPool(context.Background(), "ns1", pool, false) + assert.EqualError(t, err, "pop") + + mdi.AssertExpectations(t) + mdm.AssertExpectations(t) +} + +func TestBroadcastTokenPoolOk(t *testing.T) { + bm, cancel := newTestBroadcast(t) + defer cancel() + mdi := bm.database.(*databasemocks.Plugin) + mdm := bm.data.(*datamocks.Manager) + + pool := &fftypes.TokenPoolAnnouncement{ + TokenPool: fftypes.TokenPool{ + ID: fftypes.NewUUID(), + Namespace: "ns1", + Name: "mypool", + Type: fftypes.TokenTypeNonFungible, + ProtocolID: "N1", + Symbol: "COIN", + }, + ProtocolTxID: "tx123", + } + + mdm.On("VerifyNamespaceExists", mock.Anything, "ns1").Return(nil) + mdi.On("UpsertData", mock.Anything, mock.Anything, true, false).Return(nil) + mdi.On("InsertMessageLocal", mock.Anything, mock.Anything).Return(nil) + + _, err := bm.BroadcastTokenPool(context.Background(), "ns1", pool, false) + assert.NoError(t, err) + + mdi.AssertExpectations(t) + mdm.AssertExpectations(t) +} diff --git a/internal/orchestrator/bound_callbacks.go b/internal/orchestrator/bound_callbacks.go index 35d8d04d97..8db638655e 100644 --- a/internal/orchestrator/bound_callbacks.go +++ b/internal/orchestrator/bound_callbacks.go @@ -18,6 +18,7 @@ package orchestrator import ( "github.com/hyperledger/firefly/internal/assets" + "github.com/hyperledger/firefly/internal/events" "github.com/hyperledger/firefly/pkg/blockchain" "github.com/hyperledger/firefly/pkg/dataexchange" @@ -56,6 +57,6 @@ func (bc *boundCallbacks) MessageReceived(peerID string, data []byte) error { return bc.ei.MessageReceived(bc.dx, peerID, data) } -func (bc *boundCallbacks) TokenPoolCreated(plugin tokens.Plugin, pool *fftypes.TokenPool, signingIdentity string, protocolTxID string, additionalInfo fftypes.JSONObject) error { - return bc.am.TokenPoolCreated(plugin, pool, signingIdentity, protocolTxID, additionalInfo) +func (bc *boundCallbacks) TokenPoolCreated(plugin tokens.Plugin, tokenType fftypes.TokenType, tx *fftypes.UUID, protocolID, signingIdentity, protocolTxID string, additionalInfo fftypes.JSONObject) error { + return bc.am.TokenPoolCreated(plugin, tokenType, tx, protocolID, signingIdentity, protocolTxID, additionalInfo) } diff --git a/internal/orchestrator/bound_callbacks_test.go b/internal/orchestrator/bound_callbacks_test.go index 893a5f3e5c..388a1516eb 100644 --- a/internal/orchestrator/bound_callbacks_test.go +++ b/internal/orchestrator/bound_callbacks_test.go @@ -40,9 +40,9 @@ func TestBoundCallbacks(t *testing.T) { info := fftypes.JSONObject{"hello": "world"} batch := &blockchain.BatchPin{TransactionID: fftypes.NewUUID()} - pool := &fftypes.TokenPool{} hash := fftypes.NewRandB32() opID := fftypes.NewUUID() + txID := fftypes.NewUUID() mei.On("BatchPinComplete", mbi, batch, "0x12345", "tx12345", info).Return(fmt.Errorf("pop")) err := bc.BatchPinComplete(batch, "0x12345", "tx12345", info) @@ -68,7 +68,7 @@ func TestBoundCallbacks(t *testing.T) { err = bc.MessageReceived("peer1", []byte{}) assert.EqualError(t, err, "pop") - mam.On("TokenPoolCreated", mti, pool, "0x12345", "tx12345", info).Return(fmt.Errorf("pop")) - err = bc.TokenPoolCreated(mti, pool, "0x12345", "tx12345", info) + mam.On("TokenPoolCreated", mti, fftypes.TokenTypeFungible, txID, "123", "0x12345", "tx12345", info).Return(fmt.Errorf("pop")) + err = bc.TokenPoolCreated(mti, fftypes.TokenTypeFungible, txID, "123", "0x12345", "tx12345", info) assert.EqualError(t, err, "pop") } diff --git a/internal/orchestrator/orchestrator.go b/internal/orchestrator/orchestrator.go index ee0e4ab66a..81a8eeb67c 100644 --- a/internal/orchestrator/orchestrator.go +++ b/internal/orchestrator/orchestrator.go @@ -391,7 +391,7 @@ func (or *orchestrator) initComponents(ctx context.Context) (err error) { } } - or.syshandlers = syshandlers.NewSystemHandlers(or.database, or.identity, or.dataexchange, or.data, or.broadcast, or.messaging) + or.syshandlers = syshandlers.NewSystemHandlers(or.database, or.identity, or.dataexchange, or.data, or.broadcast, or.messaging, or.assets) if or.events == nil { or.events, err = events.NewEventManager(ctx, or.publicstorage, or.database, or.identity, or.syshandlers, or.data) @@ -410,7 +410,7 @@ func (or *orchestrator) initComponents(ctx context.Context) (err error) { or.syncasync.Init(or.events) if or.assets == nil { - or.assets, err = assets.NewAssetManager(ctx, or.database, or.identity, or.data, or.syncasync, or.tokens) + or.assets, err = assets.NewAssetManager(ctx, or.database, or.identity, or.data, or.syncasync, or.broadcast, or.tokens) if err != nil { return err } diff --git a/internal/syshandlers/syshandler.go b/internal/syshandlers/syshandler.go index 6386ac9da5..5aaa71e3b3 100644 --- a/internal/syshandlers/syshandler.go +++ b/internal/syshandlers/syshandler.go @@ -20,10 +20,12 @@ import ( "context" "encoding/json" + "github.com/hyperledger/firefly/internal/assets" "github.com/hyperledger/firefly/internal/broadcast" "github.com/hyperledger/firefly/internal/data" "github.com/hyperledger/firefly/internal/log" "github.com/hyperledger/firefly/internal/privatemessaging" + "github.com/hyperledger/firefly/internal/txcommon" "github.com/hyperledger/firefly/pkg/database" "github.com/hyperledger/firefly/pkg/dataexchange" "github.com/hyperledger/firefly/pkg/fftypes" @@ -45,9 +47,11 @@ type systemHandlers struct { data data.Manager broadcast broadcast.Manager messaging privatemessaging.Manager + assets assets.Manager + txhelper txcommon.Helper } -func NewSystemHandlers(di database.Plugin, ii identity.Plugin, dx dataexchange.Plugin, dm data.Manager, bm broadcast.Manager, pm privatemessaging.Manager) SystemHandlers { +func NewSystemHandlers(di database.Plugin, ii identity.Plugin, dx dataexchange.Plugin, dm data.Manager, bm broadcast.Manager, pm privatemessaging.Manager, am assets.Manager) SystemHandlers { return &systemHandlers{ database: di, identity: ii, @@ -55,6 +59,8 @@ func NewSystemHandlers(di database.Plugin, ii identity.Plugin, dx dataexchange.P data: dm, broadcast: bm, messaging: pm, + assets: am, + txhelper: txcommon.NewTransactionHelper(di), } } @@ -86,6 +92,8 @@ func (sh *systemHandlers) HandleSystemBroadcast(ctx context.Context, msg *fftype return sh.handleOrganizationBroadcast(ctx, msg, data) case fftypes.SystemTagDefineNode: return sh.handleNodeBroadcast(ctx, msg, data) + case fftypes.SystemTagDefinePool: + return sh.handleTokenPoolBroadcast(ctx, msg, data) default: l.Warnf("Unknown topic '%s' for system broadcast ID '%s'", msg.Header.Tag, msg.Header.ID) } diff --git a/internal/syshandlers/syshandler_test.go b/internal/syshandlers/syshandler_test.go index 17546b0717..e913f973bb 100644 --- a/internal/syshandlers/syshandler_test.go +++ b/internal/syshandlers/syshandler_test.go @@ -20,6 +20,7 @@ import ( "context" "testing" + "github.com/hyperledger/firefly/mocks/assetmocks" "github.com/hyperledger/firefly/mocks/broadcastmocks" "github.com/hyperledger/firefly/mocks/databasemocks" "github.com/hyperledger/firefly/mocks/dataexchangemocks" @@ -38,7 +39,8 @@ func newTestSystemHandlers(t *testing.T) *systemHandlers { mdm := &datamocks.Manager{} mbm := &broadcastmocks.Manager{} mpm := &privatemessagingmocks.Manager{} - return NewSystemHandlers(mdi, mii, mdx, mdm, mbm, mpm).(*systemHandlers) + mam := &assetmocks.Manager{} + return NewSystemHandlers(mdi, mii, mdx, mdm, mbm, mpm, mam).(*systemHandlers) } func TestHandleSystemBroadcastUnknown(t *testing.T) { diff --git a/internal/syshandlers/syshandler_tokenpool.go b/internal/syshandlers/syshandler_tokenpool.go new file mode 100644 index 0000000000..9f509f6507 --- /dev/null +++ b/internal/syshandlers/syshandler_tokenpool.go @@ -0,0 +1,128 @@ +// Copyright © 2021 Kaleido, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package syshandlers + +import ( + "context" + + "github.com/hyperledger/firefly/internal/log" + "github.com/hyperledger/firefly/pkg/database" + "github.com/hyperledger/firefly/pkg/fftypes" +) + +func (sh *systemHandlers) persistTokenPool(ctx context.Context, pool *fftypes.TokenPoolAnnouncement) (valid bool, err error) { + // Find a matching operation within this transaction + fb := database.OperationQueryFactory.NewFilter(ctx) + filter := fb.And( + fb.Eq("tx", pool.TX.ID), + fb.Eq("type", fftypes.OpTypeTokensAnnouncePool), + ) + operations, _, err := sh.database.GetOperations(ctx, filter) + if err != nil { + return false, err // retryable + } + + if len(operations) > 0 { + // Mark announce operation completed + update := database.OperationQueryFactory.NewUpdate(ctx). + Set("status", fftypes.OpStatusSucceeded) + if err := sh.database.UpdateOperation(ctx, operations[0].ID, update); err != nil { + return false, err // retryable + } + + // Validate received info matches the database + transaction, err := sh.database.GetTransactionByID(ctx, pool.TX.ID) + if err != nil { + return false, err // retryable + } + if transaction.ProtocolID != pool.ProtocolTxID { + log.L(ctx).Warnf("Ignoring token pool from transaction '%s' - unexpected protocol ID '%s'", pool.TX.ID, pool.ProtocolTxID) + return false, nil // not retryable + } + + // Mark transaction completed + transaction.Status = fftypes.OpStatusSucceeded + err = sh.database.UpsertTransaction(ctx, transaction, false) + if err != nil { + return false, err // retryable + } + } else { + // No local announce operation found (broadcast originated from another node) + log.L(ctx).Infof("Validating token pool transaction '%s' with protocol ID '%s'", pool.TX.ID, pool.ProtocolTxID) + err = sh.assets.ValidateTokenPoolTx(ctx, &pool.TokenPool, pool.ProtocolTxID) + if err != nil { + log.L(ctx).Errorf("Failed to validate token pool transaction '%s': %v", pool.TX.ID, err) + return false, err // retryable + } + transaction := &fftypes.Transaction{ + ID: pool.TX.ID, + Status: fftypes.OpStatusSucceeded, + Subject: fftypes.TransactionSubject{ + Namespace: pool.Namespace, + Type: fftypes.TransactionTypeTokenPool, + Signer: pool.Author, + Reference: pool.ID, + }, + ProtocolID: pool.ProtocolTxID, + } + valid, err = sh.txhelper.PersistTransaction(ctx, transaction) + if !valid || err != nil { + return valid, err + } + } + + err = sh.database.UpsertTokenPool(ctx, &pool.TokenPool) + if err != nil { + if err == database.IDMismatch { + log.L(ctx).Errorf("Invalid token pool '%s'. ID mismatch with existing record", pool.ID) + return false, nil // not retryable + } + log.L(ctx).Errorf("Failed to insert token pool '%s': %s", pool.ID, err) + return false, err // retryable + } + return true, nil +} + +func (sh *systemHandlers) handleTokenPoolBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (valid bool, err error) { + l := log.L(ctx) + + var pool fftypes.TokenPoolAnnouncement + valid = sh.getSystemBroadcastPayload(ctx, msg, data, &pool) + if valid { + if err = pool.Validate(ctx, true); err != nil { + l.Warnf("Unable to process token pool broadcast %s - validate failed: %s", msg.Header.ID, err) + valid = false + } else { + pool.Message = msg.Header.ID + valid, err = sh.persistTokenPool(ctx, &pool) + if err != nil { + return valid, err + } + } + } + + var event *fftypes.Event + if valid { + l.Infof("Token pool created id=%s author=%s", pool.ID, msg.Header.Author) + event = fftypes.NewEvent(fftypes.EventTypePoolConfirmed, pool.Namespace, pool.ID) + } else { + l.Warnf("Token pool rejected id=%s author=%s", pool.ID, msg.Header.Author) + event = fftypes.NewEvent(fftypes.EventTypePoolRejected, pool.Namespace, pool.ID) + } + err = sh.database.InsertEvent(ctx, event) + return valid, err +} diff --git a/internal/syshandlers/syshandler_tokenpool_test.go b/internal/syshandlers/syshandler_tokenpool_test.go new file mode 100644 index 0000000000..b316996d4d --- /dev/null +++ b/internal/syshandlers/syshandler_tokenpool_test.go @@ -0,0 +1,601 @@ +// Copyright © 2021 Kaleido, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package syshandlers + +import ( + "context" + "encoding/json" + "fmt" + "testing" + + "github.com/hyperledger/firefly/mocks/assetmocks" + "github.com/hyperledger/firefly/mocks/databasemocks" + "github.com/hyperledger/firefly/pkg/database" + "github.com/hyperledger/firefly/pkg/fftypes" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" +) + +func TestHandleSystemBroadcastTokenPoolSelfOk(t *testing.T) { + sh := newTestSystemHandlers(t) + + pool := &fftypes.TokenPoolAnnouncement{ + TokenPool: fftypes.TokenPool{ + ID: fftypes.NewUUID(), + Namespace: "ns1", + Name: "name1", + Type: fftypes.TokenTypeFungible, + ProtocolID: "12345", + Symbol: "COIN", + TX: fftypes.TransactionRef{ + Type: fftypes.TransactionTypeTokenPool, + ID: fftypes.NewUUID(), + }, + }, + ProtocolTxID: "tx123", + } + msg := &fftypes.Message{ + Header: fftypes.MessageHeader{ + ID: fftypes.NewUUID(), + Tag: string(fftypes.SystemTagDefinePool), + }, + } + b, err := json.Marshal(&pool) + assert.NoError(t, err) + data := []*fftypes.Data{{ + Value: fftypes.Byteable(b), + }} + opID := fftypes.NewUUID() + operations := []*fftypes.Operation{{ID: opID}} + tx := &fftypes.Transaction{ProtocolID: "tx123"} + + mdi := sh.database.(*databasemocks.Plugin) + mdi.On("GetOperations", context.Background(), mock.Anything).Return(operations, nil, nil) + mdi.On("UpdateOperation", context.Background(), opID, mock.Anything).Return(nil) + mdi.On("GetTransactionByID", context.Background(), pool.TX.ID).Return(tx, nil) + mdi.On("UpsertTransaction", context.Background(), tx, false).Return(nil) + mdi.On("UpsertTokenPool", context.Background(), mock.MatchedBy(func(p *fftypes.TokenPool) bool { + return *p.ID == *pool.ID && p.Message == msg.Header.ID + })).Return(nil) + mdi.On("InsertEvent", context.Background(), mock.MatchedBy(func(event *fftypes.Event) bool { + return *event.Reference == *pool.ID && event.Namespace == pool.Namespace && event.Type == fftypes.EventTypePoolConfirmed + })).Return(nil) + + valid, err := sh.HandleSystemBroadcast(context.Background(), msg, data) + assert.True(t, valid) + assert.NoError(t, err) + assert.Equal(t, fftypes.OpStatusSucceeded, tx.Status) + + mdi.AssertExpectations(t) +} + +func TestHandleSystemBroadcastTokenPoolSelfUpdateOpFail(t *testing.T) { + sh := newTestSystemHandlers(t) + + pool := &fftypes.TokenPoolAnnouncement{ + TokenPool: fftypes.TokenPool{ + ID: fftypes.NewUUID(), + Namespace: "ns1", + Name: "name1", + Type: fftypes.TokenTypeFungible, + ProtocolID: "12345", + Symbol: "COIN", + TX: fftypes.TransactionRef{ + Type: fftypes.TransactionTypeTokenPool, + ID: fftypes.NewUUID(), + }, + }, + ProtocolTxID: "tx123", + } + msg := &fftypes.Message{ + Header: fftypes.MessageHeader{ + ID: fftypes.NewUUID(), + Tag: string(fftypes.SystemTagDefinePool), + }, + } + b, err := json.Marshal(&pool) + assert.NoError(t, err) + data := []*fftypes.Data{{ + Value: fftypes.Byteable(b), + }} + opID := fftypes.NewUUID() + operations := []*fftypes.Operation{{ID: opID}} + + mdi := sh.database.(*databasemocks.Plugin) + mdi.On("GetOperations", context.Background(), mock.Anything).Return(operations, nil, nil) + mdi.On("UpdateOperation", context.Background(), opID, mock.Anything).Return(fmt.Errorf("pop")) + + valid, err := sh.HandleSystemBroadcast(context.Background(), msg, data) + assert.False(t, valid) + assert.EqualError(t, err, "pop") + + mdi.AssertExpectations(t) +} + +func TestHandleSystemBroadcastTokenPoolSelfGetTXFail(t *testing.T) { + sh := newTestSystemHandlers(t) + + pool := &fftypes.TokenPoolAnnouncement{ + TokenPool: fftypes.TokenPool{ + ID: fftypes.NewUUID(), + Namespace: "ns1", + Name: "name1", + Type: fftypes.TokenTypeFungible, + ProtocolID: "12345", + Symbol: "COIN", + TX: fftypes.TransactionRef{ + Type: fftypes.TransactionTypeTokenPool, + ID: fftypes.NewUUID(), + }, + }, + ProtocolTxID: "tx123", + } + msg := &fftypes.Message{ + Header: fftypes.MessageHeader{ + ID: fftypes.NewUUID(), + Tag: string(fftypes.SystemTagDefinePool), + }, + } + b, err := json.Marshal(&pool) + assert.NoError(t, err) + data := []*fftypes.Data{{ + Value: fftypes.Byteable(b), + }} + opID := fftypes.NewUUID() + operations := []*fftypes.Operation{{ID: opID}} + + mdi := sh.database.(*databasemocks.Plugin) + mdi.On("GetOperations", context.Background(), mock.Anything).Return(operations, nil, nil) + mdi.On("UpdateOperation", context.Background(), opID, mock.Anything).Return(nil) + mdi.On("GetTransactionByID", context.Background(), pool.TX.ID).Return(nil, fmt.Errorf("pop")) + + valid, err := sh.HandleSystemBroadcast(context.Background(), msg, data) + assert.False(t, valid) + assert.EqualError(t, err, "pop") + + mdi.AssertExpectations(t) +} + +func TestHandleSystemBroadcastTokenPoolSelfTXMismatch(t *testing.T) { + sh := newTestSystemHandlers(t) + + pool := &fftypes.TokenPoolAnnouncement{ + TokenPool: fftypes.TokenPool{ + ID: fftypes.NewUUID(), + Namespace: "ns1", + Name: "name1", + Type: fftypes.TokenTypeFungible, + ProtocolID: "12345", + Symbol: "COIN", + TX: fftypes.TransactionRef{ + Type: fftypes.TransactionTypeTokenPool, + ID: fftypes.NewUUID(), + }, + }, + ProtocolTxID: "tx123", + } + msg := &fftypes.Message{ + Header: fftypes.MessageHeader{ + ID: fftypes.NewUUID(), + Tag: string(fftypes.SystemTagDefinePool), + }, + } + b, err := json.Marshal(&pool) + assert.NoError(t, err) + data := []*fftypes.Data{{ + Value: fftypes.Byteable(b), + }} + opID := fftypes.NewUUID() + operations := []*fftypes.Operation{{ID: opID}} + tx := &fftypes.Transaction{ProtocolID: "bad"} + + mdi := sh.database.(*databasemocks.Plugin) + mdi.On("GetOperations", context.Background(), mock.Anything).Return(operations, nil, nil) + mdi.On("UpdateOperation", context.Background(), opID, mock.Anything).Return(nil) + mdi.On("GetTransactionByID", context.Background(), pool.TX.ID).Return(tx, nil) + mdi.On("InsertEvent", context.Background(), mock.MatchedBy(func(event *fftypes.Event) bool { + return *event.Reference == *pool.ID && event.Namespace == pool.Namespace && event.Type == fftypes.EventTypePoolRejected + })).Return(nil) + + valid, err := sh.HandleSystemBroadcast(context.Background(), msg, data) + assert.False(t, valid) + assert.NoError(t, err) + + mdi.AssertExpectations(t) +} + +func TestHandleSystemBroadcastTokenPoolSelfUpdateTXFail(t *testing.T) { + sh := newTestSystemHandlers(t) + + pool := &fftypes.TokenPoolAnnouncement{ + TokenPool: fftypes.TokenPool{ + ID: fftypes.NewUUID(), + Namespace: "ns1", + Name: "name1", + Type: fftypes.TokenTypeFungible, + ProtocolID: "12345", + Symbol: "COIN", + TX: fftypes.TransactionRef{ + Type: fftypes.TransactionTypeTokenPool, + ID: fftypes.NewUUID(), + }, + }, + ProtocolTxID: "tx123", + } + msg := &fftypes.Message{ + Header: fftypes.MessageHeader{ + ID: fftypes.NewUUID(), + Tag: string(fftypes.SystemTagDefinePool), + }, + } + b, err := json.Marshal(&pool) + assert.NoError(t, err) + data := []*fftypes.Data{{ + Value: fftypes.Byteable(b), + }} + opID := fftypes.NewUUID() + operations := []*fftypes.Operation{{ID: opID}} + tx := &fftypes.Transaction{ProtocolID: "tx123"} + + mdi := sh.database.(*databasemocks.Plugin) + mdi.On("GetOperations", context.Background(), mock.Anything).Return(operations, nil, nil) + mdi.On("UpdateOperation", context.Background(), opID, mock.Anything).Return(nil) + mdi.On("GetTransactionByID", context.Background(), pool.TX.ID).Return(tx, nil) + mdi.On("UpsertTransaction", context.Background(), tx, false).Return(fmt.Errorf("pop")) + + valid, err := sh.HandleSystemBroadcast(context.Background(), msg, data) + assert.False(t, valid) + assert.EqualError(t, err, "pop") + + mdi.AssertExpectations(t) +} + +func TestHandleSystemBroadcastTokenPoolOk(t *testing.T) { + sh := newTestSystemHandlers(t) + + pool := &fftypes.TokenPoolAnnouncement{ + TokenPool: fftypes.TokenPool{ + ID: fftypes.NewUUID(), + Namespace: "ns1", + Name: "name1", + Type: fftypes.TokenTypeFungible, + ProtocolID: "12345", + Symbol: "COIN", + TX: fftypes.TransactionRef{ + Type: fftypes.TransactionTypeTokenPool, + ID: fftypes.NewUUID(), + }, + }, + ProtocolTxID: "tx123", + } + msg := &fftypes.Message{ + Header: fftypes.MessageHeader{ + ID: fftypes.NewUUID(), + Tag: string(fftypes.SystemTagDefinePool), + }, + } + b, err := json.Marshal(&pool) + assert.NoError(t, err) + data := []*fftypes.Data{{ + Value: fftypes.Byteable(b), + }} + operations := []*fftypes.Operation{} + + mdi := sh.database.(*databasemocks.Plugin) + mdi.On("GetOperations", context.Background(), mock.Anything).Return(operations, nil, nil) + mdi.On("GetTransactionByID", context.Background(), pool.TX.ID).Return(nil, nil) + mdi.On("UpsertTransaction", context.Background(), mock.MatchedBy(func(t *fftypes.Transaction) bool { + return t.Subject.Type == fftypes.TransactionTypeTokenPool && *t.Subject.Reference == *pool.ID + }), false).Return(nil) + mdi.On("UpsertTokenPool", context.Background(), mock.MatchedBy(func(p *fftypes.TokenPool) bool { + return *p.ID == *pool.ID && p.Message == msg.Header.ID + })).Return(nil) + mdi.On("InsertEvent", context.Background(), mock.MatchedBy(func(event *fftypes.Event) bool { + return *event.Reference == *pool.ID && event.Namespace == pool.Namespace && event.Type == fftypes.EventTypePoolConfirmed + })).Return(nil) + + mam := sh.assets.(*assetmocks.Manager) + mam.On("ValidateTokenPoolTx", context.Background(), mock.MatchedBy(func(p *fftypes.TokenPool) bool { + return *p.ID == *pool.ID + }), "tx123").Return(nil) + + valid, err := sh.HandleSystemBroadcast(context.Background(), msg, data) + assert.True(t, valid) + assert.NoError(t, err) + + mdi.AssertExpectations(t) + mam.AssertExpectations(t) +} + +func TestHandleSystemBroadcastTokenPoolValidateTxFail(t *testing.T) { + sh := newTestSystemHandlers(t) + + pool := &fftypes.TokenPoolAnnouncement{ + TokenPool: fftypes.TokenPool{ + ID: fftypes.NewUUID(), + Namespace: "ns1", + Name: "name1", + Type: fftypes.TokenTypeFungible, + ProtocolID: "12345", + Symbol: "COIN", + TX: fftypes.TransactionRef{ + Type: fftypes.TransactionTypeTokenPool, + ID: fftypes.NewUUID(), + }, + }, + ProtocolTxID: "tx123", + } + msg := &fftypes.Message{ + Header: fftypes.MessageHeader{ + ID: fftypes.NewUUID(), + Tag: string(fftypes.SystemTagDefinePool), + }, + } + b, err := json.Marshal(&pool) + assert.NoError(t, err) + data := []*fftypes.Data{{ + Value: fftypes.Byteable(b), + }} + operations := []*fftypes.Operation{} + + mdi := sh.database.(*databasemocks.Plugin) + mdi.On("GetOperations", context.Background(), mock.Anything).Return(operations, nil, nil) + + mam := sh.assets.(*assetmocks.Manager) + mam.On("ValidateTokenPoolTx", context.Background(), mock.MatchedBy(func(p *fftypes.TokenPool) bool { + return *p.ID == *pool.ID + }), "tx123").Return(fmt.Errorf("pop")) + + valid, err := sh.HandleSystemBroadcast(context.Background(), msg, data) + assert.False(t, valid) + assert.EqualError(t, err, "pop") + + mdi.AssertExpectations(t) + mam.AssertExpectations(t) +} + +func TestHandleSystemBroadcastTokenPoolBadTX(t *testing.T) { + sh := newTestSystemHandlers(t) + + pool := &fftypes.TokenPoolAnnouncement{ + TokenPool: fftypes.TokenPool{ + ID: fftypes.NewUUID(), + Namespace: "ns1", + Name: "name1", + Type: fftypes.TokenTypeFungible, + ProtocolID: "12345", + Symbol: "COIN", + TX: fftypes.TransactionRef{ + Type: fftypes.TransactionTypeTokenPool, + ID: nil, + }, + }, + ProtocolTxID: "tx123", + } + msg := &fftypes.Message{ + Header: fftypes.MessageHeader{ + ID: fftypes.NewUUID(), + Tag: string(fftypes.SystemTagDefinePool), + }, + } + b, err := json.Marshal(&pool) + assert.NoError(t, err) + data := []*fftypes.Data{{ + Value: fftypes.Byteable(b), + }} + operations := []*fftypes.Operation{} + + mdi := sh.database.(*databasemocks.Plugin) + mdi.On("GetOperations", context.Background(), mock.Anything).Return(operations, nil, nil) + mdi.On("InsertEvent", context.Background(), mock.MatchedBy(func(event *fftypes.Event) bool { + return *event.Reference == *pool.ID && event.Namespace == pool.Namespace && event.Type == fftypes.EventTypePoolRejected + })).Return(nil) + + mam := sh.assets.(*assetmocks.Manager) + mam.On("ValidateTokenPoolTx", context.Background(), mock.MatchedBy(func(p *fftypes.TokenPool) bool { + return *p.ID == *pool.ID + }), "tx123").Return(nil) + + valid, err := sh.HandleSystemBroadcast(context.Background(), msg, data) + assert.False(t, valid) + assert.NoError(t, err) + + mdi.AssertExpectations(t) + mam.AssertExpectations(t) +} + +func TestHandleSystemBroadcastTokenPoolIDMismatch(t *testing.T) { + sh := newTestSystemHandlers(t) + + pool := &fftypes.TokenPoolAnnouncement{ + TokenPool: fftypes.TokenPool{ + ID: fftypes.NewUUID(), + Namespace: "ns1", + Name: "name1", + Type: fftypes.TokenTypeFungible, + ProtocolID: "12345", + Symbol: "COIN", + TX: fftypes.TransactionRef{ + Type: fftypes.TransactionTypeTokenPool, + ID: fftypes.NewUUID(), + }, + }, + ProtocolTxID: "tx123", + } + msg := &fftypes.Message{ + Header: fftypes.MessageHeader{ + ID: fftypes.NewUUID(), + Tag: string(fftypes.SystemTagDefinePool), + }, + } + b, err := json.Marshal(&pool) + assert.NoError(t, err) + data := []*fftypes.Data{{ + Value: fftypes.Byteable(b), + }} + operations := []*fftypes.Operation{} + + mdi := sh.database.(*databasemocks.Plugin) + mdi.On("GetOperations", context.Background(), mock.Anything).Return(operations, nil, nil) + mdi.On("GetTransactionByID", context.Background(), pool.TX.ID).Return(nil, nil) + mdi.On("UpsertTransaction", context.Background(), mock.MatchedBy(func(t *fftypes.Transaction) bool { + return t.Subject.Type == fftypes.TransactionTypeTokenPool && *t.Subject.Reference == *pool.ID + }), false).Return(nil) + mdi.On("UpsertTokenPool", context.Background(), mock.MatchedBy(func(p *fftypes.TokenPool) bool { + return *p.ID == *pool.ID && p.Message == msg.Header.ID + })).Return(database.IDMismatch) + mdi.On("InsertEvent", context.Background(), mock.MatchedBy(func(event *fftypes.Event) bool { + return *event.Reference == *pool.ID && event.Namespace == pool.Namespace && event.Type == fftypes.EventTypePoolRejected + })).Return(nil) + + mam := sh.assets.(*assetmocks.Manager) + mam.On("ValidateTokenPoolTx", context.Background(), mock.MatchedBy(func(p *fftypes.TokenPool) bool { + return *p.ID == *pool.ID + }), "tx123").Return(nil) + + valid, err := sh.HandleSystemBroadcast(context.Background(), msg, data) + assert.False(t, valid) + assert.NoError(t, err) + + mdi.AssertExpectations(t) + mam.AssertExpectations(t) +} + +func TestHandleSystemBroadcastTokenPoolFailUpsert(t *testing.T) { + sh := newTestSystemHandlers(t) + + pool := &fftypes.TokenPoolAnnouncement{ + TokenPool: fftypes.TokenPool{ + ID: fftypes.NewUUID(), + Namespace: "ns1", + Name: "name1", + Type: fftypes.TokenTypeFungible, + ProtocolID: "12345", + Symbol: "COIN", + TX: fftypes.TransactionRef{ + Type: fftypes.TransactionTypeTokenPool, + ID: fftypes.NewUUID(), + }, + }, + ProtocolTxID: "tx123", + } + msg := &fftypes.Message{ + Header: fftypes.MessageHeader{ + ID: fftypes.NewUUID(), + Tag: string(fftypes.SystemTagDefinePool), + }, + } + b, err := json.Marshal(&pool) + assert.NoError(t, err) + data := []*fftypes.Data{{ + Value: fftypes.Byteable(b), + }} + operations := []*fftypes.Operation{} + + mdi := sh.database.(*databasemocks.Plugin) + mdi.On("GetOperations", context.Background(), mock.Anything).Return(operations, nil, nil) + mdi.On("GetTransactionByID", context.Background(), pool.TX.ID).Return(nil, nil) + mdi.On("UpsertTransaction", context.Background(), mock.MatchedBy(func(t *fftypes.Transaction) bool { + return t.Subject.Type == fftypes.TransactionTypeTokenPool && *t.Subject.Reference == *pool.ID + }), false).Return(nil) + mdi.On("UpsertTokenPool", context.Background(), mock.MatchedBy(func(p *fftypes.TokenPool) bool { + return *p.ID == *pool.ID && p.Message == msg.Header.ID + })).Return(fmt.Errorf("pop")) + + mam := sh.assets.(*assetmocks.Manager) + mam.On("ValidateTokenPoolTx", context.Background(), mock.MatchedBy(func(p *fftypes.TokenPool) bool { + return *p.ID == *pool.ID + }), "tx123").Return(nil) + + valid, err := sh.HandleSystemBroadcast(context.Background(), msg, data) + assert.False(t, valid) + assert.EqualError(t, err, "pop") + + mdi.AssertExpectations(t) + mam.AssertExpectations(t) +} + +func TestHandleSystemBroadcastTokenPoolOpsFail(t *testing.T) { + sh := newTestSystemHandlers(t) + + pool := &fftypes.TokenPoolAnnouncement{ + TokenPool: fftypes.TokenPool{ + ID: fftypes.NewUUID(), + Namespace: "ns1", + Name: "name1", + Type: fftypes.TokenTypeFungible, + ProtocolID: "12345", + Symbol: "COIN", + TX: fftypes.TransactionRef{ + Type: fftypes.TransactionTypeTokenPool, + ID: fftypes.NewUUID(), + }, + }, + ProtocolTxID: "tx123", + } + msg := &fftypes.Message{ + Header: fftypes.MessageHeader{ + ID: fftypes.NewUUID(), + Tag: string(fftypes.SystemTagDefinePool), + }, + } + b, err := json.Marshal(&pool) + assert.NoError(t, err) + data := []*fftypes.Data{{ + Value: fftypes.Byteable(b), + }} + + mdi := sh.database.(*databasemocks.Plugin) + mdi.On("GetOperations", context.Background(), mock.Anything).Return(nil, nil, fmt.Errorf("pop")) + + valid, err := sh.HandleSystemBroadcast(context.Background(), msg, data) + assert.False(t, valid) + assert.EqualError(t, err, "pop") + + mdi.AssertExpectations(t) +} + +func TestHandleSystemBroadcastTokenPoolValidateFail(t *testing.T) { + sh := newTestSystemHandlers(t) + + pool := &fftypes.TokenPoolAnnouncement{ + TokenPool: fftypes.TokenPool{}, + ProtocolTxID: "tx123", + } + msg := &fftypes.Message{ + Header: fftypes.MessageHeader{ + ID: fftypes.NewUUID(), + Tag: string(fftypes.SystemTagDefinePool), + }, + } + b, err := json.Marshal(&pool) + assert.NoError(t, err) + data := []*fftypes.Data{{ + Value: fftypes.Byteable(b), + }} + + mdi := sh.database.(*databasemocks.Plugin) + mdi.On("InsertEvent", context.Background(), mock.MatchedBy(func(event *fftypes.Event) bool { + return event.Type == fftypes.EventTypePoolRejected + })).Return(nil) + + valid, err := sh.HandleSystemBroadcast(context.Background(), msg, data) + assert.False(t, valid) + assert.NoError(t, err) + + mdi.AssertExpectations(t) +} diff --git a/internal/tokens/fftokens/fftokens.go b/internal/tokens/fftokens/fftokens.go index d6d97408b5..743578c8ed 100644 --- a/internal/tokens/fftokens/fftokens.go +++ b/internal/tokens/fftokens/fftokens.go @@ -53,16 +53,9 @@ const ( ) type createPool struct { - Type fftypes.TokenType `json:"type"` - RequestID string `json:"requestId"` - Data string `json:"data"` -} - -type createPoolData struct { - Namespace string `json:"namespace"` - Name string `json:"name"` - ID *fftypes.UUID `json:"id"` - TransactionID *fftypes.UUID `json:"transactionId"` + Type fftypes.TokenType `json:"type"` + RequestID string `json:"requestId"` + TrackingID string `json:"trackingId"` } func (h *FFTokens) Name() string { @@ -126,51 +119,30 @@ func (h *FFTokens) handleReceipt(ctx context.Context, data fftypes.JSONObject) e } func (h *FFTokens) handleTokenPoolCreate(ctx context.Context, data fftypes.JSONObject) (err error) { - packedData := data.GetString("data") tokenType := data.GetString("type") protocolID := data.GetString("poolId") + trackingID := data.GetString("trackingId") operatorAddress := data.GetString("operator") tx := data.GetObject("transaction") txHash := tx.GetString("transactionHash") - if packedData == "" || - tokenType == "" || + if tokenType == "" || protocolID == "" || + trackingID == "" || operatorAddress == "" || txHash == "" { log.L(ctx).Errorf("TokenPool event is not valid - missing data: %+v", data) return nil // move on } - unpackedData := createPoolData{} - err = json.Unmarshal([]byte(packedData), &unpackedData) + txID, err := fftypes.ParseUUID(ctx, trackingID) if err != nil { - log.L(ctx).Errorf("TokenPool event is not valid - could not unpack data (%s): %+v", err, data) - return nil // move on - } - if unpackedData.Namespace == "" || - unpackedData.Name == "" || - unpackedData.ID == nil || - unpackedData.TransactionID == nil { - log.L(ctx).Errorf("TokenPool event is not valid - missing packed data: %+v", unpackedData) + log.L(ctx).Errorf("TokenPool event is not valid - invalid transaction ID (%s): %+v", err, data) return nil // move on } - pool := &fftypes.TokenPool{ - ID: unpackedData.ID, - TX: fftypes.TransactionRef{ - ID: unpackedData.TransactionID, - Type: fftypes.TransactionTypeTokenPool, - }, - Namespace: unpackedData.Namespace, - Name: unpackedData.Name, - Type: fftypes.FFEnum(tokenType), - Connector: h.configuredName, - ProtocolID: protocolID, - } - // If there's an error dispatching the event, we must return the error and shutdown - return h.callbacks.TokenPoolCreated(h, pool, operatorAddress, txHash, tx) + return h.callbacks.TokenPoolCreated(h, fftypes.FFEnum(tokenType), txID, protocolID, operatorAddress, txHash, tx) } func (h *FFTokens) eventLoop() { @@ -224,23 +196,13 @@ func (h *FFTokens) eventLoop() { } func (h *FFTokens) CreateTokenPool(ctx context.Context, operationID *fftypes.UUID, identity *fftypes.Identity, pool *fftypes.TokenPool) error { - data := createPoolData{ - Namespace: pool.Namespace, - Name: pool.Name, - ID: pool.ID, - TransactionID: pool.TX.ID, - } - packedData, err := json.Marshal(data) - var res *resty.Response - if err == nil { - res, err = h.client.R().SetContext(ctx). - SetBody(&createPool{ - Type: pool.Type, - RequestID: operationID.String(), - Data: string(packedData), - }). - Post("/api/v1/pool") - } + res, err := h.client.R().SetContext(ctx). + SetBody(&createPool{ + Type: pool.Type, + RequestID: operationID.String(), + TrackingID: pool.TX.ID.String(), + }). + Post("/api/v1/pool") if err != nil || !res.IsSuccess() { return restclient.WrapRestErr(ctx, res, err, i18n.MsgTokensRESTErr) } diff --git a/internal/tokens/fftokens/fftokens_test.go b/internal/tokens/fftokens/fftokens_test.go index fe375ac7fa..30c2c2b55c 100644 --- a/internal/tokens/fftokens/fftokens_test.go +++ b/internal/tokens/fftokens/fftokens_test.go @@ -118,8 +118,8 @@ func TestCreateTokenPool(t *testing.T) { assert.Contains(t, body, "requestId") delete(body, "requestId") assert.Equal(t, fftypes.JSONObject{ - "data": "{\"namespace\":\"ns1\",\"name\":\"new-pool\",\"id\":\"" + pool.ID.String() + "\",\"transactionId\":\"" + pool.TX.ID.String() + "\"}", - "type": "fungible", + "trackingId": pool.TX.ID.String(), + "type": "fungible", }, body) res := &http.Response{ @@ -172,53 +172,33 @@ func TestEvents(t *testing.T) { opID := fftypes.NewUUID() fromServer <- `{"id":"2","event":"receipt","data":{}}` - fromServer <- `{"id":"2","event":"receipt","data":{"id":"abc"}}` + fromServer <- `{"id":"3","event":"receipt","data":{"id":"abc"}}` // receipt: success mcb.On("TokensOpUpdate", h, opID, fftypes.OpStatusSucceeded, "", mock.Anything).Return(nil).Once() - fromServer <- `{"id":"3","event":"receipt","data":{"id":"` + opID.String() + `","success":true}}` + fromServer <- `{"id":"4","event":"receipt","data":{"id":"` + opID.String() + `","success":true}}` // receipt: failure mcb.On("TokensOpUpdate", h, opID, fftypes.OpStatusFailed, "", mock.Anything).Return(nil).Once() - fromServer <- `{"id":"4","event":"receipt","data":{"id":"` + opID.String() + `","success":false}}` + fromServer <- `{"id":"5","event":"receipt","data":{"id":"` + opID.String() + `","success":false}}` // token-pool: missing data - fromServer <- `{"id":"5","event":"token-pool"}` - msg = <-toServer - assert.Equal(t, `{"data":{"id":"5"},"event":"ack"}`, string(msg)) - - // token-pool: unparseable packed data - fromServer <- `{"id":"6","event":"token-pool","data":{"data":"!","type":"fungible","poolId":"F1","operator":"0x0","transaction":{"transactionHash":"abc"}}}` + fromServer <- `{"id":"6","event":"token-pool"}` msg = <-toServer assert.Equal(t, `{"data":{"id":"6"},"event":"ack"}`, string(msg)) - // token-pool: missing packed data - fromServer <- `{"id":"7","event":"token-pool","data":{"data":"{}","type":"fungible","poolId":"F1","operator":"0x0","transaction":{"transactionHash":"abc"}}}` + // token-pool: invalid uuid + fromServer <- `{"id":"7","event":"token-pool","data":{"trackingId":"bad","type":"fungible","poolId":"F1","operator":"0x0","transaction":{"transactionHash":"abc"}}}` msg = <-toServer assert.Equal(t, `{"data":{"id":"7"},"event":"ack"}`, string(msg)) - // token-pool: invalid uuids - fromServer <- `{"id":"8","event":"token-pool","data":{"data":"{\"namespace\":\"ns1\",\"name\":\"new-pool\",\"id\":\"bad\",\"transactionId\":\"bad\"}","type":"fungible","poolId":"F1","operator":"0x0","transaction":{"transactionHash":"abc"}}}` - msg = <-toServer - assert.Equal(t, `{"data":{"id":"8"},"event":"ack"}`, string(msg)) - - id1 := fftypes.NewUUID() - id2 := fftypes.NewUUID() + txID := fftypes.NewUUID() // token-pool: success - mcb.On("TokenPoolCreated", h, mock.MatchedBy(func(pool *fftypes.TokenPool) bool { - assert.Equal(t, "ns1", pool.Namespace) - assert.Equal(t, "new-pool", pool.Name) - assert.Equal(t, fftypes.TokenTypeFungible, pool.Type) - assert.Equal(t, "F1", pool.ProtocolID) - assert.Equal(t, *id1, *pool.ID) - assert.Equal(t, *id2, *pool.TX.ID) - return true - }), "0x0", "abc", fftypes.JSONObject{"transactionHash": "abc"}, - ).Return(nil) - fromServer <- `{"id":"9","event":"token-pool","data":{"data":"{\"namespace\":\"ns1\",\"name\":\"new-pool\",\"id\":\"` + id1.String() + `\",\"transactionId\":\"` + id2.String() + `\"}","type":"fungible","poolId":"F1","operator":"0x0","transaction":{"transactionHash":"abc"}}}` + mcb.On("TokenPoolCreated", h, fftypes.TokenTypeFungible, txID, "F1", "0x0", "abc", fftypes.JSONObject{"transactionHash": "abc"}).Return(nil) + fromServer <- `{"id":"8","event":"token-pool","data":{"trackingId":"` + txID.String() + `","type":"fungible","poolId":"F1","operator":"0x0","transaction":{"transactionHash":"abc"}}}` msg = <-toServer - assert.Equal(t, `{"data":{"id":"9"},"event":"ack"}`, string(msg)) + assert.Equal(t, `{"data":{"id":"8"},"event":"ack"}`, string(msg)) mcb.AssertExpectations(t) } diff --git a/mocks/assetmocks/manager.go b/mocks/assetmocks/manager.go index b0f67b6b2e..7213fbab5b 100644 --- a/mocks/assetmocks/manager.go +++ b/mocks/assetmocks/manager.go @@ -165,13 +165,27 @@ func (_m *Manager) Start() error { return r0 } -// TokenPoolCreated provides a mock function with given fields: tk, pool, signingIdentity, protocolTxID, additionalInfo -func (_m *Manager) TokenPoolCreated(tk tokens.Plugin, pool *fftypes.TokenPool, signingIdentity string, protocolTxID string, additionalInfo fftypes.JSONObject) error { - ret := _m.Called(tk, pool, signingIdentity, protocolTxID, additionalInfo) +// TokenPoolCreated provides a mock function with given fields: tk, tokenType, tx, protocolID, signingIdentity, protocolTxID, additionalInfo +func (_m *Manager) TokenPoolCreated(tk tokens.Plugin, tokenType fftypes.FFEnum, tx *fftypes.UUID, protocolID string, signingIdentity string, protocolTxID string, additionalInfo fftypes.JSONObject) error { + ret := _m.Called(tk, tokenType, tx, protocolID, signingIdentity, protocolTxID, additionalInfo) var r0 error - if rf, ok := ret.Get(0).(func(tokens.Plugin, *fftypes.TokenPool, string, string, fftypes.JSONObject) error); ok { - r0 = rf(tk, pool, signingIdentity, protocolTxID, additionalInfo) + if rf, ok := ret.Get(0).(func(tokens.Plugin, fftypes.FFEnum, *fftypes.UUID, string, string, string, fftypes.JSONObject) error); ok { + r0 = rf(tk, tokenType, tx, protocolID, signingIdentity, protocolTxID, additionalInfo) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// ValidateTokenPoolTx provides a mock function with given fields: ctx, pool, protocolTxID +func (_m *Manager) ValidateTokenPoolTx(ctx context.Context, pool *fftypes.TokenPool, protocolTxID string) error { + ret := _m.Called(ctx, pool, protocolTxID) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, *fftypes.TokenPool, string) error); ok { + r0 = rf(ctx, pool, protocolTxID) } else { r0 = ret.Error(0) } diff --git a/mocks/broadcastmocks/manager.go b/mocks/broadcastmocks/manager.go index 3ddbaf6af9..b518180ad2 100644 --- a/mocks/broadcastmocks/manager.go +++ b/mocks/broadcastmocks/manager.go @@ -129,6 +129,29 @@ func (_m *Manager) BroadcastNamespace(ctx context.Context, ns *fftypes.Namespace return r0, r1 } +// BroadcastTokenPool provides a mock function with given fields: ctx, ns, pool, waitConfirm +func (_m *Manager) BroadcastTokenPool(ctx context.Context, ns string, pool *fftypes.TokenPoolAnnouncement, waitConfirm bool) (*fftypes.Message, error) { + ret := _m.Called(ctx, ns, pool, waitConfirm) + + var r0 *fftypes.Message + if rf, ok := ret.Get(0).(func(context.Context, string, *fftypes.TokenPoolAnnouncement, bool) *fftypes.Message); ok { + r0 = rf(ctx, ns, pool, waitConfirm) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*fftypes.Message) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, string, *fftypes.TokenPoolAnnouncement, bool) error); ok { + r1 = rf(ctx, ns, pool, waitConfirm) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // GetNodeSigningIdentity provides a mock function with given fields: ctx func (_m *Manager) GetNodeSigningIdentity(ctx context.Context) (*fftypes.Identity, error) { ret := _m.Called(ctx) diff --git a/mocks/tokenmocks/callbacks.go b/mocks/tokenmocks/callbacks.go index 5761fab62c..5c34674a8b 100644 --- a/mocks/tokenmocks/callbacks.go +++ b/mocks/tokenmocks/callbacks.go @@ -14,13 +14,13 @@ type Callbacks struct { mock.Mock } -// TokenPoolCreated provides a mock function with given fields: plugin, pool, signingIdentity, protocolTxID, additionalInfo -func (_m *Callbacks) TokenPoolCreated(plugin tokens.Plugin, pool *fftypes.TokenPool, signingIdentity string, protocolTxID string, additionalInfo fftypes.JSONObject) error { - ret := _m.Called(plugin, pool, signingIdentity, protocolTxID, additionalInfo) +// TokenPoolCreated provides a mock function with given fields: plugin, tokenType, tx, protocolID, signingIdentity, protocolTxID, additionalInfo +func (_m *Callbacks) TokenPoolCreated(plugin tokens.Plugin, tokenType fftypes.FFEnum, tx *fftypes.UUID, protocolID string, signingIdentity string, protocolTxID string, additionalInfo fftypes.JSONObject) error { + ret := _m.Called(plugin, tokenType, tx, protocolID, signingIdentity, protocolTxID, additionalInfo) var r0 error - if rf, ok := ret.Get(0).(func(tokens.Plugin, *fftypes.TokenPool, string, string, fftypes.JSONObject) error); ok { - r0 = rf(plugin, pool, signingIdentity, protocolTxID, additionalInfo) + if rf, ok := ret.Get(0).(func(tokens.Plugin, fftypes.FFEnum, *fftypes.UUID, string, string, string, fftypes.JSONObject) error); ok { + r0 = rf(plugin, tokenType, tx, protocolID, signingIdentity, protocolTxID, additionalInfo) } else { r0 = ret.Error(0) } diff --git a/pkg/fftypes/constants.go b/pkg/fftypes/constants.go index d2afee11d0..0386dd46ed 100644 --- a/pkg/fftypes/constants.go +++ b/pkg/fftypes/constants.go @@ -40,4 +40,7 @@ const ( // SystemTagDefineGroup is the topic for messages that send the definition of a group, to all parties in that group SystemTagDefineGroup SystemTag = "ff_define_group" + + // SystemTagDefinePool is the topic for messages that broadcast data definitions + SystemTagDefinePool SystemTag = "ff_define_pool" ) diff --git a/pkg/fftypes/operation.go b/pkg/fftypes/operation.go index e20b4fa91c..96afc8d951 100644 --- a/pkg/fftypes/operation.go +++ b/pkg/fftypes/operation.go @@ -32,6 +32,8 @@ var ( OpTypeDataExchangeBlobSend OpType = ffEnum("optype", "dataexchange_blob_send") // OpTypeTokensCreatePool is a token pool creation OpTypeTokensCreatePool OpType = ffEnum("optype", "tokens_create_pool") + // OpTypeTokensAnnounce is a broadcast of token pool info + OpTypeTokensAnnouncePool OpType = ffEnum("optype", "tokens_announce_pool") ) // OpStatus is the current status of an operation diff --git a/pkg/fftypes/tokenpool.go b/pkg/fftypes/tokenpool.go index 40d911da5b..eff7111d0f 100644 --- a/pkg/fftypes/tokenpool.go +++ b/pkg/fftypes/tokenpool.go @@ -16,6 +16,10 @@ package fftypes +import ( + "context" +) + type TokenType = FFEnum var ( @@ -35,3 +39,26 @@ type TokenPool struct { Message *UUID `json:"message,omitempty"` TX TransactionRef `json:"tx,omitempty"` } + +type TokenPoolAnnouncement struct { + TokenPool + ProtocolTxID string `json:"protocolTxID"` +} + +func (t *TokenPool) Validate(ctx context.Context, existing bool) (err error) { + if err = ValidateFFNameField(ctx, t.Namespace, "namespace"); err != nil { + return err + } + if err = ValidateFFNameField(ctx, t.Name, "name"); err != nil { + return err + } + return nil +} + +func (t *TokenPool) Topic() string { + return namespaceTopic(t.Namespace) +} + +func (t *TokenPool) SetBroadcastMessage(msgID *UUID) { + t.Message = msgID +} diff --git a/pkg/fftypes/tokenpool_test.go b/pkg/fftypes/tokenpool_test.go new file mode 100644 index 0000000000..a21c568999 --- /dev/null +++ b/pkg/fftypes/tokenpool_test.go @@ -0,0 +1,60 @@ +// Copyright © 2021 Kaleido, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fftypes + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestTokenPoolValidation(t *testing.T) { + pool := &TokenPool{ + Namespace: "!wrong", + Name: "ok", + } + err := pool.Validate(context.Background(), false) + assert.Regexp(t, "FF10131.*'namespace'", err) + + pool = &TokenPool{ + Namespace: "ok", + Name: "!wrong", + } + err = pool.Validate(context.Background(), false) + assert.Regexp(t, "FF10131.*'name'", err) + + pool = &TokenPool{ + Namespace: "ok", + Name: "ok", + } + err = pool.Validate(context.Background(), false) + assert.NoError(t, err) +} + +func TestTokenPoolDefinition(t *testing.T) { + pool := &TokenPool{ + Namespace: "ok", + Name: "ok", + } + var def Definition = pool + assert.Equal(t, "ff_ns_ok", def.Topic()) + + id := NewUUID() + def.SetBroadcastMessage(id) + assert.Equal(t, id, pool.Message) +} diff --git a/pkg/tokens/plugin.go b/pkg/tokens/plugin.go index a509f67b85..6cdc0d6350 100644 --- a/pkg/tokens/plugin.go +++ b/pkg/tokens/plugin.go @@ -63,7 +63,7 @@ type Callbacks interface { // submitted by us, or by any other authorized party in the network. // // Error should will only be returned in shutdown scenarios - TokenPoolCreated(plugin Plugin, pool *fftypes.TokenPool, signingIdentity string, protocolTxID string, additionalInfo fftypes.JSONObject) error + TokenPoolCreated(plugin Plugin, tokenType fftypes.TokenType, tx *fftypes.UUID, protocolID, signingIdentity, protocolTxID string, additionalInfo fftypes.JSONObject) error } // Capabilities the supported featureset of the tokens From 8b77955027eeae0d5d1a48bebabacdf7d83d35ef Mon Sep 17 00:00:00 2001 From: Andrew Richardson Date: Mon, 27 Sep 2021 15:48:57 -0400 Subject: [PATCH 2/6] Initialize AssetManager earlier Must now be initialized before syshandlers. Signed-off-by: Andrew Richardson --- internal/orchestrator/orchestrator.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/internal/orchestrator/orchestrator.go b/internal/orchestrator/orchestrator.go index 81a8eeb67c..bb45d1bbf4 100644 --- a/internal/orchestrator/orchestrator.go +++ b/internal/orchestrator/orchestrator.go @@ -391,6 +391,13 @@ func (or *orchestrator) initComponents(ctx context.Context) (err error) { } } + if or.assets == nil { + or.assets, err = assets.NewAssetManager(ctx, or.database, or.identity, or.data, or.syncasync, or.broadcast, or.tokens) + if err != nil { + return err + } + } + or.syshandlers = syshandlers.NewSystemHandlers(or.database, or.identity, or.dataexchange, or.data, or.broadcast, or.messaging, or.assets) if or.events == nil { @@ -409,13 +416,6 @@ func (or *orchestrator) initComponents(ctx context.Context) (err error) { or.syncasync.Init(or.events) - if or.assets == nil { - or.assets, err = assets.NewAssetManager(ctx, or.database, or.identity, or.data, or.syncasync, or.broadcast, or.tokens) - if err != nil { - return err - } - } - return nil } From 7d2d4eacb40720f2472abbc01f212478235ac66b Mon Sep 17 00:00:00 2001 From: Andrew Richardson Date: Wed, 29 Sep 2021 19:15:05 -0400 Subject: [PATCH 3/6] Add pass-through "config" parameter when creating pools This data is opaque to FireFly, but can be passed through to the token plugin to influence how the pool is created (reserved for future use). Signed-off-by: Andrew Richardson --- docs/swagger/swagger.yaml | 15 +++++++++++++++ internal/assets/manager.go | 2 ++ internal/tokens/fftokens/fftokens.go | 8 +++++--- internal/tokens/fftokens/fftokens_test.go | 12 +++++++++--- pkg/fftypes/tokenpool.go | 1 + 5 files changed, 32 insertions(+), 6 deletions(-) diff --git a/docs/swagger/swagger.yaml b/docs/swagger/swagger.yaml index f16189fdef..ae0042b35b 100644 --- a/docs/swagger/swagger.yaml +++ b/docs/swagger/swagger.yaml @@ -4301,6 +4301,9 @@ paths: properties: author: type: string + config: + additionalProperties: {} + type: object connector: type: string id: {} @@ -4362,6 +4365,9 @@ paths: properties: author: type: string + config: + additionalProperties: {} + type: object name: type: string symbol: @@ -4380,6 +4386,9 @@ paths: properties: author: type: string + config: + additionalProperties: {} + type: object connector: type: string id: {} @@ -4409,6 +4418,9 @@ paths: properties: author: type: string + config: + additionalProperties: {} + type: object connector: type: string id: {} @@ -4472,6 +4484,9 @@ paths: properties: author: type: string + config: + additionalProperties: {} + type: object connector: type: string id: {} diff --git a/internal/assets/manager.go b/internal/assets/manager.go index 68c8149466..f8bc855aab 100644 --- a/internal/assets/manager.go +++ b/internal/assets/manager.go @@ -96,6 +96,7 @@ func storeTokenOpInputs(op *fftypes.Operation, pool *fftypes.TokenPool) { "id": pool.ID.String(), "namespace": pool.Namespace, "name": pool.Name, + "config": pool.Config, } } @@ -110,6 +111,7 @@ func retrieveTokenOpInputs(ctx context.Context, op *fftypes.Operation, pool *fft if pool.Namespace == "" || pool.Name == "" { return fmt.Errorf("namespace or name missing from inputs") } + pool.Config = input.GetObject("config") return nil } diff --git a/internal/tokens/fftokens/fftokens.go b/internal/tokens/fftokens/fftokens.go index 743578c8ed..67b31060ba 100644 --- a/internal/tokens/fftokens/fftokens.go +++ b/internal/tokens/fftokens/fftokens.go @@ -53,9 +53,10 @@ const ( ) type createPool struct { - Type fftypes.TokenType `json:"type"` - RequestID string `json:"requestId"` - TrackingID string `json:"trackingId"` + Type fftypes.TokenType `json:"type"` + RequestID string `json:"requestId"` + TrackingID string `json:"trackingId"` + Config fftypes.JSONObject `json:"config"` } func (h *FFTokens) Name() string { @@ -201,6 +202,7 @@ func (h *FFTokens) CreateTokenPool(ctx context.Context, operationID *fftypes.UUI Type: pool.Type, RequestID: operationID.String(), TrackingID: pool.TX.ID.String(), + Config: pool.Config, }). Post("/api/v1/pool") if err != nil || !res.IsSuccess() { diff --git a/internal/tokens/fftokens/fftokens_test.go b/internal/tokens/fftokens/fftokens_test.go index 30c2c2b55c..18d35c2cb8 100644 --- a/internal/tokens/fftokens/fftokens_test.go +++ b/internal/tokens/fftokens/fftokens_test.go @@ -99,6 +99,7 @@ func TestCreateTokenPool(t *testing.T) { h, _, _, httpURL, done := newTestFFTokens(t) defer done() + opID := fftypes.NewUUID() pool := &fftypes.TokenPool{ ID: fftypes.NewUUID(), TX: fftypes.TransactionRef{ @@ -108,6 +109,9 @@ func TestCreateTokenPool(t *testing.T) { Namespace: "ns1", Name: "new-pool", Type: "fungible", + Config: fftypes.JSONObject{ + "foo": "bar", + }, } httpmock.RegisterResponder("POST", fmt.Sprintf("%s/api/v1/pool", httpURL), @@ -115,11 +119,13 @@ func TestCreateTokenPool(t *testing.T) { body := make(fftypes.JSONObject) err := json.NewDecoder(req.Body).Decode(&body) assert.NoError(t, err) - assert.Contains(t, body, "requestId") - delete(body, "requestId") assert.Equal(t, fftypes.JSONObject{ + "requestId": opID.String(), "trackingId": pool.TX.ID.String(), "type": "fungible", + "config": map[string]interface{}{ + "foo": "bar", + }, }, body) res := &http.Response{ @@ -132,7 +138,7 @@ func TestCreateTokenPool(t *testing.T) { return res, nil }) - err := h.CreateTokenPool(context.Background(), fftypes.NewUUID(), &fftypes.Identity{}, pool) + err := h.CreateTokenPool(context.Background(), opID, &fftypes.Identity{}, pool) assert.NoError(t, err) } diff --git a/pkg/fftypes/tokenpool.go b/pkg/fftypes/tokenpool.go index eff7111d0f..ae0565a3da 100644 --- a/pkg/fftypes/tokenpool.go +++ b/pkg/fftypes/tokenpool.go @@ -37,6 +37,7 @@ type TokenPool struct { Symbol string `json:"symbol,omitempty"` Connector string `json:"connector,omitempty"` Message *UUID `json:"message,omitempty"` + Config JSONObject `json:"config,omitempty"` TX TransactionRef `json:"tx,omitempty"` } From a20375d73a19f09da75e155f43dace573d1e2925 Mon Sep 17 00:00:00 2001 From: Andrew Richardson Date: Wed, 29 Sep 2021 19:21:44 -0400 Subject: [PATCH 4/6] Clean up some comments and method names Signed-off-by: Andrew Richardson --- internal/assets/manager.go | 6 +++--- internal/assets/token_pool_created.go | 10 +++++++--- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/internal/assets/manager.go b/internal/assets/manager.go index f8bc855aab..162bfc3775 100644 --- a/internal/assets/manager.go +++ b/internal/assets/manager.go @@ -91,7 +91,7 @@ func (am *assetManager) selectTokenPlugin(ctx context.Context, name string) (tok return nil, i18n.NewError(ctx, i18n.MsgUnknownTokensPlugin, name) } -func storeTokenOpInputs(op *fftypes.Operation, pool *fftypes.TokenPool) { +func addTokenPoolCreateInputs(op *fftypes.Operation, pool *fftypes.TokenPool) { op.Input = fftypes.JSONObject{ "id": pool.ID.String(), "namespace": pool.Namespace, @@ -100,7 +100,7 @@ func storeTokenOpInputs(op *fftypes.Operation, pool *fftypes.TokenPool) { } } -func retrieveTokenOpInputs(ctx context.Context, op *fftypes.Operation, pool *fftypes.TokenPool) (err error) { +func retrieveTokenPoolCreateInputs(ctx context.Context, op *fftypes.Operation, pool *fftypes.TokenPool) (err error) { input := &op.Input pool.ID, err = fftypes.ParseUUID(ctx, input.GetString("id")) if err != nil { @@ -176,7 +176,7 @@ func (am *assetManager) CreateTokenPoolWithID(ctx context.Context, ns string, id fftypes.OpTypeTokensCreatePool, fftypes.OpStatusPending, author.Identifier) - storeTokenOpInputs(op, pool) + addTokenPoolCreateInputs(op, pool) err = am.database.UpsertOperation(ctx, op, false) if err != nil { return nil, err diff --git a/internal/assets/token_pool_created.go b/internal/assets/token_pool_created.go index eb1a0f6fc0..cc53e1e3d5 100644 --- a/internal/assets/token_pool_created.go +++ b/internal/assets/token_pool_created.go @@ -32,7 +32,7 @@ func (am *assetManager) TokenPoolCreated(tk tokens.Plugin, tokenType fftypes.Tok ) operations, _, err := am.database.GetOperations(am.ctx, filter) if err != nil || len(operations) == 0 { - log.L(am.ctx).Debugf("Token pool transaction '%s' ignored, as it was not submitted by this node", tx) + log.L(am.ctx).Debugf("Token pool transaction '%s' ignored, as it did not match an operation submitted by this node", tx) return nil } @@ -44,13 +44,17 @@ func (am *assetManager) TokenPoolCreated(tk tokens.Plugin, tokenType fftypes.Tok }, ProtocolTxID: protocolTxID, } - err = retrieveTokenOpInputs(am.ctx, operations[0], &pool.TokenPool) + err = retrieveTokenPoolCreateInputs(am.ctx, operations[0], &pool.TokenPool) if err != nil { log.L(am.ctx).Errorf("Error retrieving pool info from transaction '%s' (%s) - ignoring: %v", tx, err, operations[0].Input) return nil } - // Update the transaction with the info received (but leave transaction as "pending") + // Update the transaction with the info received (but leave transaction as "pending"). + // At this point we are the only node in the network that knows about this transaction object. + // Our local token connector has performed whatever actions it needs to perform, to give us + // enough information to distribute to all other token connectors in the network. + // (e.g. details of a newly created token instance or an existing one) transaction := &fftypes.Transaction{ ID: tx, Status: fftypes.OpStatusPending, From 16a77b836ecc33824e8f4f9e2d4fc470a1d1da86 Mon Sep 17 00:00:00 2001 From: Andrew Richardson Date: Thu, 30 Sep 2021 10:17:19 -0400 Subject: [PATCH 5/6] Store message ID on TokensAnnouncePool operation output Signed-off-by: Andrew Richardson --- internal/syshandlers/syshandler_tokenpool.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/internal/syshandlers/syshandler_tokenpool.go b/internal/syshandlers/syshandler_tokenpool.go index 9f509f6507..71885bd2d7 100644 --- a/internal/syshandlers/syshandler_tokenpool.go +++ b/internal/syshandlers/syshandler_tokenpool.go @@ -39,7 +39,8 @@ func (sh *systemHandlers) persistTokenPool(ctx context.Context, pool *fftypes.To if len(operations) > 0 { // Mark announce operation completed update := database.OperationQueryFactory.NewUpdate(ctx). - Set("status", fftypes.OpStatusSucceeded) + Set("status", fftypes.OpStatusSucceeded). + Set("output", fftypes.JSONObject{"message": pool.Message}) if err := sh.database.UpdateOperation(ctx, operations[0].ID, update); err != nil { return false, err // retryable } From cd8849f87f7ac804fe1634cce72db030d88f6f7a Mon Sep 17 00:00:00 2001 From: Andrew Richardson Date: Fri, 1 Oct 2021 10:18:14 -0400 Subject: [PATCH 6/6] Add operations index by tx_id Also remove index on namespace, as it's less likely to impact real usage scenarios and there are a lot of indexes on this table. Signed-off-by: Andrew Richardson --- .../postgres/000008_create_operations_table.up.sql | 6 +++--- db/migrations/sqlite/000008_create_operations_table.up.sql | 5 ++--- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/db/migrations/postgres/000008_create_operations_table.up.sql b/db/migrations/postgres/000008_create_operations_table.up.sql index 2562ade015..6f125b8284 100644 --- a/db/migrations/postgres/000008_create_operations_table.up.sql +++ b/db/migrations/postgres/000008_create_operations_table.up.sql @@ -6,7 +6,7 @@ CREATE TABLE operations ( tx_id UUID NOT NULL, optype VARCHAR(64) NOT NULL, opstatus VARCHAR(64) NOT NULL, - member VARCHAR(1024), + member VARCHAR(1024), plugin VARCHAR(64) NOT NULL, backend_id VARCHAR(256) NOT NULL, created BIGINT NOT NULL, @@ -18,6 +18,6 @@ CREATE TABLE operations ( CREATE UNIQUE INDEX operations_id ON operations(id); CREATE INDEX operations_created ON operations(created); CREATE INDEX operations_backend ON operations(backend_id); -CREATE INDEX operations_namespace ON operations(namespace); +CREATE INDEX operations_tx ON operations(tx_id); -COMMIT; \ No newline at end of file +COMMIT; diff --git a/db/migrations/sqlite/000008_create_operations_table.up.sql b/db/migrations/sqlite/000008_create_operations_table.up.sql index b62871d419..e7a60aac0b 100644 --- a/db/migrations/sqlite/000008_create_operations_table.up.sql +++ b/db/migrations/sqlite/000008_create_operations_table.up.sql @@ -5,7 +5,7 @@ CREATE TABLE operations ( tx_id UUID NOT NULL, optype VARCHAR(64) NOT NULL, opstatus VARCHAR(64) NOT NULL, - member VARCHAR(1024), + member VARCHAR(1024), plugin VARCHAR(64) NOT NULL, backend_id VARCHAR(256) NOT NULL, created BIGINT NOT NULL, @@ -17,5 +17,4 @@ CREATE TABLE operations ( CREATE UNIQUE INDEX operations_id ON operations(id); CREATE INDEX operations_created ON operations(created); CREATE INDEX operations_backend ON operations(backend_id); -CREATE INDEX operations_namespace ON operations(namespace); - +CREATE INDEX operations_tx ON operations(tx_id);