diff --git a/docs/swagger/swagger.yaml b/docs/swagger/swagger.yaml index b8ec1bb19b..ed4d8ba364 100644 --- a/docs/swagger/swagger.yaml +++ b/docs/swagger/swagger.yaml @@ -5102,14 +5102,120 @@ paths: schema: properties: amount: {} + created: {} from: type: string key: type: string + localId: {} + message: + properties: + batch: {} + confirmed: {} + data: + items: + properties: + blob: + properties: + hash: {} + public: + type: string + type: object + datatype: + properties: + name: + type: string + version: + type: string + type: object + hash: {} + id: {} + validator: + type: string + value: + format: byte + type: string + type: object + type: array + group: + properties: + ledger: {} + members: + items: + properties: + identity: + type: string + node: + type: string + type: object + type: array + name: + type: string + type: object + hash: {} + header: + properties: + author: + type: string + cid: {} + created: {} + datahash: {} + group: {} + id: {} + key: + type: string + namespace: + type: string + tag: + type: string + topics: + items: + type: string + type: array + txtype: + type: string + type: + enum: + - definition + - broadcast + - private + - groupinit + - transfer_broadcast + - transfer_private + type: string + type: object + local: + type: boolean + pending: + type: boolean + pins: + items: + type: string + type: array + rejected: + type: boolean + type: object + messageHash: {} + poolProtocolId: + type: string + protocolId: + type: string to: type: string tokenIndex: type: string + tx: + properties: + id: {} + type: + type: string + type: object + type: + enum: + - mint + - burn + - transfer + type: string type: object responses: "200": diff --git a/internal/apiserver/route_post_token_transfer.go b/internal/apiserver/route_post_token_transfer.go index 00a9df5474..970f7ec38f 100644 --- a/internal/apiserver/route_post_token_transfer.go +++ b/internal/apiserver/route_post_token_transfer.go @@ -40,13 +40,13 @@ var postTokenTransfer = &oapispec.Route{ }, FilterFactory: nil, Description: i18n.MsgTBD, - JSONInputValue: func() interface{} { return &fftypes.TokenTransfer{} }, + JSONInputValue: func() interface{} { return &fftypes.TokenTransferInput{} }, JSONInputMask: []string{"Type", "LocalID", "PoolProtocolID", "ProtocolID", "MessageHash", "TX", "Created"}, JSONOutputValue: func() interface{} { return &fftypes.TokenTransfer{} }, JSONOutputCodes: []int{http.StatusAccepted, http.StatusOK}, JSONHandler: func(r *oapispec.APIRequest) (output interface{}, err error) { waitConfirm := strings.EqualFold(r.QP["confirm"], "true") r.SuccessStatus = syncRetcode(waitConfirm) - return r.Or.Assets().TransferTokens(r.Ctx, r.PP["ns"], r.PP["type"], r.PP["name"], r.Input.(*fftypes.TokenTransfer), waitConfirm) + return r.Or.Assets().TransferTokens(r.Ctx, r.PP["ns"], r.PP["type"], r.PP["name"], r.Input.(*fftypes.TokenTransferInput), waitConfirm) }, } diff --git a/internal/apiserver/route_post_token_transfer_test.go b/internal/apiserver/route_post_token_transfer_test.go index caf1c8bef7..a9b2e195c0 100644 --- a/internal/apiserver/route_post_token_transfer_test.go +++ b/internal/apiserver/route_post_token_transfer_test.go @@ -32,14 +32,14 @@ func TestPostTokenTransfer(t *testing.T) { o, r := newTestAPIServer() mam := &assetmocks.Manager{} o.On("Assets").Return(mam) - input := fftypes.TokenTransfer{} + input := fftypes.TokenTransferInput{} var buf bytes.Buffer json.NewEncoder(&buf).Encode(&input) req := httptest.NewRequest("POST", "/api/v1/namespaces/ns1/tokens/tok1/pools/pool1/transfers", &buf) req.Header.Set("Content-Type", "application/json; charset=utf-8") res := httptest.NewRecorder() - mam.On("TransferTokens", mock.Anything, "ns1", "tok1", "pool1", mock.AnythingOfType("*fftypes.TokenTransfer"), false). + mam.On("TransferTokens", mock.Anything, "ns1", "tok1", "pool1", mock.AnythingOfType("*fftypes.TokenTransferInput"), false). Return(&fftypes.TokenTransfer{}, nil) r.ServeHTTP(res, req) diff --git a/internal/assets/manager.go b/internal/assets/manager.go index a0d3a22b57..ac892cbe44 100644 --- a/internal/assets/manager.go +++ b/internal/assets/manager.go @@ -25,6 +25,7 @@ import ( "github.com/hyperledger/firefly/internal/data" "github.com/hyperledger/firefly/internal/i18n" "github.com/hyperledger/firefly/internal/identity" + "github.com/hyperledger/firefly/internal/privatemessaging" "github.com/hyperledger/firefly/internal/retry" "github.com/hyperledger/firefly/internal/syncasync" "github.com/hyperledger/firefly/internal/txcommon" @@ -42,11 +43,10 @@ type Manager interface { GetTokenTransfers(ctx context.Context, ns, typeName, poolName string, filter database.AndFilter) ([]*fftypes.TokenTransfer, *database.FilterResult, error) MintTokens(ctx context.Context, ns, typeName, poolName string, transfer *fftypes.TokenTransfer, waitConfirm bool) (*fftypes.TokenTransfer, error) BurnTokens(ctx context.Context, ns, typeName, poolName string, transfer *fftypes.TokenTransfer, waitConfirm bool) (*fftypes.TokenTransfer, error) - TransferTokens(ctx context.Context, ns, typeName, poolName string, transfer *fftypes.TokenTransfer, waitConfirm bool) (*fftypes.TokenTransfer, error) + TransferTokens(ctx context.Context, ns, typeName, poolName string, transfer *fftypes.TokenTransferInput, waitConfirm bool) (*fftypes.TokenTransfer, error) // Bound token callbacks TokenPoolCreated(tk tokens.Plugin, pool *fftypes.TokenPool, protocolTxID string, additionalInfo fftypes.JSONObject) error - TokensTransferred(tk tokens.Plugin, transfer *fftypes.TokenTransfer, protocolTxID string, additionalInfo fftypes.JSONObject) error Start() error WaitStop() @@ -59,13 +59,14 @@ type assetManager struct { data data.Manager syncasync syncasync.Bridge broadcast broadcast.Manager + messaging privatemessaging.Manager tokens map[string]tokens.Plugin retry retry.Retry txhelper txcommon.Helper } -func NewAssetManager(ctx context.Context, di database.Plugin, im identity.Manager, dm data.Manager, sa syncasync.Bridge, bm broadcast.Manager, ti map[string]tokens.Plugin) (Manager, error) { - if di == nil || im == nil || sa == nil || bm == nil || ti == nil { +func NewAssetManager(ctx context.Context, di database.Plugin, im identity.Manager, dm data.Manager, sa syncasync.Bridge, bm broadcast.Manager, pm privatemessaging.Manager, ti map[string]tokens.Plugin) (Manager, error) { + if di == nil || im == nil || sa == nil || bm == nil || pm == nil || ti == nil { return nil, i18n.NewError(ctx, i18n.MsgInitializationNilDepError) } am := &assetManager{ @@ -75,6 +76,7 @@ func NewAssetManager(ctx context.Context, di database.Plugin, im identity.Manage data: dm, syncasync: sa, broadcast: bm, + messaging: pm, tokens: ti, retry: retry.Retry{ InitialDelay: config.GetDuration(config.AssetManagerRetryInitialDelay), @@ -119,21 +121,13 @@ func retrieveTokenPoolCreateInputs(ctx context.Context, op *fftypes.Operation, p return nil } +// Note: the counterpart to below (retrieveTokenTransferInputs) lives in the events package func addTokenTransferInputs(op *fftypes.Operation, transfer *fftypes.TokenTransfer) { op.Input = fftypes.JSONObject{ "id": transfer.LocalID.String(), } } -func retrieveTokenTransferInputs(ctx context.Context, op *fftypes.Operation, transfer *fftypes.TokenTransfer) (err error) { - input := &op.Input - transfer.LocalID, err = fftypes.ParseUUID(ctx, input.GetString("id")) - if err != nil { - return err - } - return nil -} - func (am *assetManager) CreateTokenPool(ctx context.Context, ns string, typeName string, pool *fftypes.TokenPool, waitConfirm bool) (*fftypes.TokenPool, error) { return am.createTokenPoolWithID(ctx, fftypes.NewUUID(), ns, typeName, pool, waitConfirm) } @@ -291,7 +285,25 @@ func (am *assetManager) BurnTokens(ctx context.Context, ns, typeName, poolName s return am.transferTokensWithID(ctx, fftypes.NewUUID(), ns, typeName, poolName, transfer, waitConfirm) } -func (am *assetManager) TransferTokens(ctx context.Context, ns, typeName, poolName string, transfer *fftypes.TokenTransfer, waitConfirm bool) (*fftypes.TokenTransfer, error) { +func (am *assetManager) sendTransferMessage(ctx context.Context, ns string, in *fftypes.MessageInOut) (*fftypes.Message, error) { + allowedTypes := []fftypes.FFEnum{ + fftypes.MessageTypeTransferBroadcast, + fftypes.MessageTypeTransferPrivate, + } + if in.Header.Type == "" { + in.Header.Type = fftypes.MessageTypeTransferBroadcast + } + switch in.Header.Type { + case fftypes.MessageTypeTransferBroadcast: + return am.broadcast.BroadcastMessage(ctx, ns, in, false) + case fftypes.MessageTypeTransferPrivate: + return am.messaging.SendMessage(ctx, ns, in, false) + default: + return nil, i18n.NewError(ctx, i18n.MsgInvalidMessageType, allowedTypes) + } +} + +func (am *assetManager) TransferTokens(ctx context.Context, ns, typeName, poolName string, transfer *fftypes.TokenTransferInput, waitConfirm bool) (*fftypes.TokenTransfer, error) { transfer.Type = fftypes.TokenTransferTypeTransfer if transfer.Key == "" { org, err := am.identity.GetLocalOrganization(ctx) @@ -309,7 +321,17 @@ func (am *assetManager) TransferTokens(ctx context.Context, ns, typeName, poolNa if transfer.From == transfer.To { return nil, i18n.NewError(ctx, i18n.MsgCannotTransferToSelf) } - return am.transferTokensWithID(ctx, fftypes.NewUUID(), ns, typeName, poolName, transfer, waitConfirm) + + if transfer.Message != nil { + msg, err := am.sendTransferMessage(ctx, ns, transfer.Message) + if err != nil { + return nil, err + } + transfer.MessageHash = msg.Hash + } + + result, err := am.transferTokensWithID(ctx, fftypes.NewUUID(), ns, typeName, poolName, &transfer.TokenTransfer, waitConfirm) + return result, err } func (am *assetManager) transferTokensWithID(ctx context.Context, id *fftypes.UUID, ns, typeName, poolName string, transfer *fftypes.TokenTransfer, waitConfirm bool) (*fftypes.TokenTransfer, error) { diff --git a/internal/assets/manager_test.go b/internal/assets/manager_test.go index dcb0a9eacf..577e779e4c 100644 --- a/internal/assets/manager_test.go +++ b/internal/assets/manager_test.go @@ -25,6 +25,7 @@ import ( "github.com/hyperledger/firefly/mocks/databasemocks" "github.com/hyperledger/firefly/mocks/datamocks" "github.com/hyperledger/firefly/mocks/identitymanagermocks" + "github.com/hyperledger/firefly/mocks/privatemessagingmocks" "github.com/hyperledger/firefly/mocks/syncasyncmocks" "github.com/hyperledger/firefly/mocks/tokenmocks" "github.com/hyperledger/firefly/pkg/database" @@ -41,10 +42,11 @@ func newTestAssets(t *testing.T) (*assetManager, func()) { mdm := &datamocks.Manager{} msa := &syncasyncmocks.Bridge{} mbm := &broadcastmocks.Manager{} + mpm := &privatemessagingmocks.Manager{} mti := &tokenmocks.Plugin{} mti.On("Name").Return("ut_tokens").Maybe() ctx, cancel := context.WithCancel(context.Background()) - a, err := NewAssetManager(ctx, mdi, mim, mdm, msa, mbm, map[string]tokens.Plugin{"magic-tokens": mti}) + a, err := NewAssetManager(ctx, mdi, mim, mdm, msa, mbm, mpm, map[string]tokens.Plugin{"magic-tokens": mti}) rag := mdi.On("RunAsGroup", ctx, mock.Anything).Maybe() rag.RunFn = func(a mock.Arguments) { rag.ReturnArguments = mock.Arguments{a[1].(func(context.Context) error)(a[0].(context.Context))} @@ -54,7 +56,7 @@ func newTestAssets(t *testing.T) (*assetManager, func()) { } func TestInitFail(t *testing.T) { - _, err := NewAssetManager(context.Background(), nil, nil, nil, nil, nil, nil) + _, err := NewAssetManager(context.Background(), nil, nil, nil, nil, nil, nil, nil) assert.Regexp(t, "FF10128", err) } @@ -538,9 +540,11 @@ func TestTransferTokensSuccess(t *testing.T) { am, cancel := newTestAssets(t) defer cancel() - transfer := &fftypes.TokenTransfer{ - From: "A", - To: "B", + transfer := &fftypes.TokenTransferInput{ + TokenTransfer: fftypes.TokenTransfer{ + From: "A", + To: "B", + }, } transfer.Amount.Int().SetInt64(5) @@ -549,7 +553,7 @@ func TestTransferTokensSuccess(t *testing.T) { mim := am.identity.(*identitymanagermocks.Manager) mim.On("GetLocalOrganization", context.Background()).Return(&fftypes.Organization{Identity: "0x12345"}, nil) mdi.On("GetTokenPool", context.Background(), "ns1", "pool1").Return(&fftypes.TokenPool{}, nil) - mti.On("TransferTokens", context.Background(), mock.Anything, transfer).Return(nil) + mti.On("TransferTokens", context.Background(), mock.Anything, &transfer.TokenTransfer).Return(nil) mdi.On("UpsertOperation", context.Background(), mock.Anything, false).Return(nil) mdi.On("UpsertTransaction", context.Background(), mock.MatchedBy(func(tx *fftypes.Transaction) bool { return tx.Subject.Type == fftypes.TransactionTypeTokenTransfer @@ -567,9 +571,11 @@ func TestTransferTokensIdentityFail(t *testing.T) { am, cancel := newTestAssets(t) defer cancel() - transfer := &fftypes.TokenTransfer{ - From: "A", - To: "B", + transfer := &fftypes.TokenTransferInput{ + TokenTransfer: fftypes.TokenTransfer{ + From: "A", + To: "B", + }, } transfer.Amount.Int().SetInt64(5) @@ -586,8 +592,7 @@ func TestTransferTokensNoFromOrTo(t *testing.T) { am, cancel := newTestAssets(t) defer cancel() - transfer := &fftypes.TokenTransfer{} - transfer.Amount.Int().SetInt64(5) + transfer := &fftypes.TokenTransferInput{} mim := am.identity.(*identitymanagermocks.Manager) mim.On("GetLocalOrganization", context.Background()).Return(&fftypes.Organization{Identity: "0x12345"}, nil) @@ -626,9 +631,11 @@ func TestTransferTokensTransactionFail(t *testing.T) { am, cancel := newTestAssets(t) defer cancel() - transfer := &fftypes.TokenTransfer{ - From: "A", - To: "B", + transfer := &fftypes.TokenTransferInput{ + TokenTransfer: fftypes.TokenTransfer{ + From: "A", + To: "B", + }, } transfer.Amount.Int().SetInt64(5) @@ -646,3 +653,123 @@ func TestTransferTokensTransactionFail(t *testing.T) { mim.AssertExpectations(t) mdi.AssertExpectations(t) } + +func TestTransferTokensWithBroadcastMessage(t *testing.T) { + am, cancel := newTestAssets(t) + defer cancel() + + transfer := &fftypes.TokenTransferInput{ + TokenTransfer: fftypes.TokenTransfer{ + From: "A", + To: "B", + }, + Message: &fftypes.MessageInOut{ + InlineData: fftypes.InlineData{ + { + Value: []byte("test data"), + }, + }, + }, + } + transfer.Amount.Int().SetInt64(5) + + mdi := am.database.(*databasemocks.Plugin) + mti := am.tokens["magic-tokens"].(*tokenmocks.Plugin) + mim := am.identity.(*identitymanagermocks.Manager) + mbm := am.broadcast.(*broadcastmocks.Manager) + mim.On("GetLocalOrganization", context.Background()).Return(&fftypes.Organization{Identity: "0x12345"}, nil) + mdi.On("GetTokenPool", context.Background(), "ns1", "pool1").Return(&fftypes.TokenPool{}, nil) + mti.On("TransferTokens", context.Background(), mock.Anything, &transfer.TokenTransfer).Return(nil) + mdi.On("UpsertOperation", context.Background(), mock.Anything, false).Return(nil) + mdi.On("UpsertTransaction", context.Background(), mock.MatchedBy(func(tx *fftypes.Transaction) bool { + return tx.Subject.Type == fftypes.TransactionTypeTokenTransfer + }), false).Return(nil) + mbm.On("BroadcastMessage", context.Background(), "ns1", transfer.Message, false).Return(&transfer.Message.Message, nil) + + _, err := am.TransferTokens(context.Background(), "ns1", "magic-tokens", "pool1", transfer, false) + assert.NoError(t, err) + + mbm.AssertExpectations(t) + mim.AssertExpectations(t) + mdi.AssertExpectations(t) + mti.AssertExpectations(t) +} + +func TestTransferTokensWithPrivateMessage(t *testing.T) { + am, cancel := newTestAssets(t) + defer cancel() + + transfer := &fftypes.TokenTransferInput{ + TokenTransfer: fftypes.TokenTransfer{ + From: "A", + To: "B", + }, + Message: &fftypes.MessageInOut{ + Message: fftypes.Message{ + Header: fftypes.MessageHeader{ + Type: fftypes.MessageTypeTransferPrivate, + }, + }, + InlineData: fftypes.InlineData{ + { + Value: []byte("test data"), + }, + }, + }, + } + transfer.Amount.Int().SetInt64(5) + + mdi := am.database.(*databasemocks.Plugin) + mti := am.tokens["magic-tokens"].(*tokenmocks.Plugin) + mim := am.identity.(*identitymanagermocks.Manager) + mpm := am.messaging.(*privatemessagingmocks.Manager) + mim.On("GetLocalOrganization", context.Background()).Return(&fftypes.Organization{Identity: "0x12345"}, nil) + mdi.On("GetTokenPool", context.Background(), "ns1", "pool1").Return(&fftypes.TokenPool{}, nil) + mti.On("TransferTokens", context.Background(), mock.Anything, &transfer.TokenTransfer).Return(nil) + mdi.On("UpsertOperation", context.Background(), mock.Anything, false).Return(nil) + mdi.On("UpsertTransaction", context.Background(), mock.MatchedBy(func(tx *fftypes.Transaction) bool { + return tx.Subject.Type == fftypes.TransactionTypeTokenTransfer + }), false).Return(nil) + mpm.On("SendMessage", context.Background(), "ns1", transfer.Message, false).Return(&transfer.Message.Message, nil) + + _, err := am.TransferTokens(context.Background(), "ns1", "magic-tokens", "pool1", transfer, false) + assert.NoError(t, err) + + mpm.AssertExpectations(t) + mim.AssertExpectations(t) + mdi.AssertExpectations(t) + mti.AssertExpectations(t) +} + +func TestTransferTokensWithInvalidMessage(t *testing.T) { + am, cancel := newTestAssets(t) + defer cancel() + + transfer := &fftypes.TokenTransferInput{ + TokenTransfer: fftypes.TokenTransfer{ + From: "A", + To: "B", + }, + Message: &fftypes.MessageInOut{ + Message: fftypes.Message{ + Header: fftypes.MessageHeader{ + Type: fftypes.MessageTypeDefinition, + }, + }, + InlineData: fftypes.InlineData{ + { + Value: []byte("test data"), + }, + }, + }, + } + transfer.Amount.Int().SetInt64(5) + + mim := am.identity.(*identitymanagermocks.Manager) + mim.On("GetLocalOrganization", context.Background()).Return(&fftypes.Organization{Identity: "0x12345"}, nil) + + _, err := am.TransferTokens(context.Background(), "ns1", "magic-tokens", "pool1", transfer, false) + assert.Regexp(t, "FF10287", err) + + mim.AssertExpectations(t) +} diff --git a/internal/assets/tokens_transferred.go b/internal/assets/tokens_transferred.go deleted file mode 100644 index 35ad0afaed..0000000000 --- a/internal/assets/tokens_transferred.go +++ /dev/null @@ -1,118 +0,0 @@ -// Copyright © 2021 Kaleido, Inc. -// -// SPDX-License-Identifier: Apache-2.0 -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package assets - -import ( - "context" - - "github.com/hyperledger/firefly/internal/log" - "github.com/hyperledger/firefly/pkg/database" - "github.com/hyperledger/firefly/pkg/fftypes" - "github.com/hyperledger/firefly/pkg/tokens" -) - -func (am *assetManager) TokensTransferred(tk tokens.Plugin, transfer *fftypes.TokenTransfer, protocolTxID string, additionalInfo fftypes.JSONObject) error { - return am.retry.Do(am.ctx, "persist token transfer", func(attempt int) (bool, error) { - err := am.database.RunAsGroup(am.ctx, func(ctx context.Context) error { - // Check that this is from a known pool - pool, err := am.database.GetTokenPoolByProtocolID(ctx, transfer.PoolProtocolID) - if err != nil { - return err - } - if pool == nil { - log.L(ctx).Warnf("Token transfer received for unknown pool '%s' - ignoring: %s", transfer.PoolProtocolID, protocolTxID) - return nil - } - - transfer.LocalID = nil - - if transfer.TX.ID != nil { - // Find a matching operation within this transaction - fb := database.OperationQueryFactory.NewFilter(ctx) - filter := fb.And( - fb.Eq("tx", transfer.TX.ID), - fb.Eq("type", fftypes.OpTypeTokenTransfer), - ) - operations, _, err := am.database.GetOperations(ctx, filter) - if err != nil { - return err - } - if len(operations) > 0 { - err = retrieveTokenTransferInputs(ctx, operations[0], transfer) - if err != nil { - log.L(ctx).Warnf("Failed to read operation inputs for token transfer '%s': %s", transfer.ProtocolID, err) - } - } - - transaction := &fftypes.Transaction{ - ID: transfer.TX.ID, - Status: fftypes.OpStatusSucceeded, - Subject: fftypes.TransactionSubject{ - Namespace: pool.Namespace, - Type: transfer.TX.Type, - Signer: transfer.Key, - Reference: transfer.LocalID, - }, - ProtocolID: protocolTxID, - Info: additionalInfo, - } - valid, err := am.txhelper.PersistTransaction(ctx, transaction) - if err != nil { - return err - } else if !valid { - return nil - } - } - - if transfer.LocalID == nil { - transfer.LocalID = fftypes.NewUUID() - } - - if err := am.database.UpsertTokenTransfer(ctx, transfer); err != nil { - log.L(ctx).Errorf("Failed to record token transfer '%s': %s", transfer.ProtocolID, err) - return err - } - - balance := &fftypes.TokenBalanceChange{ - PoolProtocolID: transfer.PoolProtocolID, - TokenIndex: transfer.TokenIndex, - } - if transfer.Type != fftypes.TokenTransferTypeMint { - balance.Identity = transfer.From - balance.Amount.Int().Neg(transfer.Amount.Int()) - if err := am.database.AddTokenAccountBalance(ctx, balance); err != nil { - log.L(ctx).Errorf("Failed to update account '%s' for token transfer '%s': %s", balance.Identity, transfer.ProtocolID, err) - return err - } - } - - if transfer.Type != fftypes.TokenTransferTypeBurn { - balance.Identity = transfer.To - balance.Amount.Int().Set(transfer.Amount.Int()) - if err := am.database.AddTokenAccountBalance(ctx, balance); err != nil { - log.L(ctx).Errorf("Failed to update account '%s for token transfer '%s': %s", balance.Identity, transfer.ProtocolID, err) - return err - } - } - - log.L(ctx).Infof("Token transfer recorded id=%s author=%s", transfer.ProtocolID, transfer.Key) - event := fftypes.NewEvent(fftypes.EventTypeTransferConfirmed, pool.Namespace, transfer.LocalID) - return am.database.InsertEvent(ctx, event) - }) - return err != nil, err // retry indefinitely (until context closes) - }) -} diff --git a/internal/broadcast/manager.go b/internal/broadcast/manager.go index 62da0b8b2c..2cde037a4d 100644 --- a/internal/broadcast/manager.go +++ b/internal/broadcast/manager.go @@ -84,6 +84,7 @@ func NewBroadcastManager(ctx context.Context, di database.Plugin, im identity.Ma ba.RegisterDispatcher([]fftypes.MessageType{ fftypes.MessageTypeBroadcast, fftypes.MessageTypeDefinition, + fftypes.MessageTypeTransferBroadcast, }, bm.dispatchBatch, bo) return bm, nil } diff --git a/internal/broadcast/manager_test.go b/internal/broadcast/manager_test.go index f222ae6c63..c535423da9 100644 --- a/internal/broadcast/manager_test.go +++ b/internal/broadcast/manager_test.go @@ -48,7 +48,11 @@ func newTestBroadcast(t *testing.T) (*broadcastManager, func()) { msa := &syncasyncmocks.Bridge{} mbp := &batchpinmocks.Submitter{} mbi.On("Name").Return("ut_blockchain").Maybe() - mba.On("RegisterDispatcher", []fftypes.MessageType{fftypes.MessageTypeBroadcast, fftypes.MessageTypeDefinition}, mock.Anything, mock.Anything).Return() + mba.On("RegisterDispatcher", []fftypes.MessageType{ + fftypes.MessageTypeBroadcast, + fftypes.MessageTypeDefinition, + fftypes.MessageTypeTransferBroadcast, + }, mock.Anything, mock.Anything).Return() ctx, cancel := context.WithCancel(context.Background()) b, err := NewBroadcastManager(ctx, mdi, mim, mdm, mbi, mdx, mpi, mba, msa, mbp) assert.NoError(t, err) diff --git a/internal/events/aggregator.go b/internal/events/aggregator.go index 4977115488..aa0ffd0975 100644 --- a/internal/events/aggregator.go +++ b/internal/events/aggregator.go @@ -412,6 +412,18 @@ func (ag *aggregator) attemptMessageDispatch(ctx context.Context, msg *fftypes.M return false, err } + // For transfers, verify the transfer has come through + if msg.Header.Type == fftypes.MessageTypeTransferBroadcast || msg.Header.Type == fftypes.MessageTypeTransferPrivate { + fb := database.TokenTransferQueryFactory.NewFilter(ctx) + filter := fb.And( + fb.Eq("messagehash", msg.Hash), + ) + if transfers, _, err := ag.database.GetTokenTransfers(ctx, filter); err != nil || len(transfers) == 0 { + log.L(ctx).Debugf("Transfer for message %s not yet available", msg.Hash) + return false, err + } + } + // We're going to dispatch it at this point, but we need to validate the data first valid := true eventType := fftypes.EventTypeMessageConfirmed diff --git a/internal/events/aggregator_test.go b/internal/events/aggregator_test.go index 756e9b59a0..37b610f186 100644 --- a/internal/events/aggregator_test.go +++ b/internal/events/aggregator_test.go @@ -887,6 +887,56 @@ func TestAttemptMessageDispatchMissingBlobs(t *testing.T) { } +func TestAttemptMessageDispatchMissingTransfers(t *testing.T) { + ag, cancel := newTestAggregator() + defer cancel() + + mdm := ag.data.(*datamocks.Manager) + mdm.On("GetMessageData", ag.ctx, mock.Anything, true).Return([]*fftypes.Data{}, true, nil) + + mdi := ag.database.(*databasemocks.Plugin) + mdi.On("GetTokenTransfers", ag.ctx, mock.Anything).Return([]*fftypes.TokenTransfer{}, nil, nil) + + msg := &fftypes.Message{ + Header: fftypes.MessageHeader{ + ID: fftypes.NewUUID(), + Type: fftypes.MessageTypeTransferBroadcast, + }, + } + msg.Hash = msg.Header.Hash() + dispatched, err := ag.attemptMessageDispatch(ag.ctx, msg) + assert.NoError(t, err) + assert.False(t, dispatched) + + mdm.AssertExpectations(t) + mdi.AssertExpectations(t) +} + +func TestAttemptMessageDispatchGetTransfersFail(t *testing.T) { + ag, cancel := newTestAggregator() + defer cancel() + + mdm := ag.data.(*datamocks.Manager) + mdm.On("GetMessageData", ag.ctx, mock.Anything, true).Return([]*fftypes.Data{}, true, nil) + + mdi := ag.database.(*databasemocks.Plugin) + mdi.On("GetTokenTransfers", ag.ctx, mock.Anything).Return(nil, nil, fmt.Errorf("pop")) + + msg := &fftypes.Message{ + Header: fftypes.MessageHeader{ + ID: fftypes.NewUUID(), + Type: fftypes.MessageTypeTransferBroadcast, + }, + } + msg.Hash = msg.Header.Hash() + dispatched, err := ag.attemptMessageDispatch(ag.ctx, msg) + assert.EqualError(t, err, "pop") + assert.False(t, dispatched) + + mdm.AssertExpectations(t) + mdi.AssertExpectations(t) +} + func TestAttemptMessageDispatchFailValidateBadSystem(t *testing.T) { ag, cancel := newTestAggregator() defer cancel() diff --git a/internal/events/event_manager.go b/internal/events/event_manager.go index b565cb8bd3..57ed08e910 100644 --- a/internal/events/event_manager.go +++ b/internal/events/event_manager.go @@ -38,6 +38,7 @@ import ( "github.com/hyperledger/firefly/pkg/dataexchange" "github.com/hyperledger/firefly/pkg/fftypes" "github.com/hyperledger/firefly/pkg/publicstorage" + "github.com/hyperledger/firefly/pkg/tokens" ) type EventManager interface { @@ -61,6 +62,9 @@ type EventManager interface { BLOBReceived(dx dataexchange.Plugin, peerID string, hash fftypes.Bytes32, payloadRef string) error MessageReceived(dx dataexchange.Plugin, peerID string, data []byte) error + // Bound token callbacks + TokensTransferred(tk tokens.Plugin, transfer *fftypes.TokenTransfer, protocolTxID string, additionalInfo fftypes.JSONObject) error + // Internal events sysmessaging.SystemEvents } diff --git a/internal/events/tokens_transferred.go b/internal/events/tokens_transferred.go new file mode 100644 index 0000000000..82d1966857 --- /dev/null +++ b/internal/events/tokens_transferred.go @@ -0,0 +1,166 @@ +// Copyright © 2021 Kaleido, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package events + +import ( + "context" + + "github.com/hyperledger/firefly/internal/log" + "github.com/hyperledger/firefly/pkg/database" + "github.com/hyperledger/firefly/pkg/fftypes" + "github.com/hyperledger/firefly/pkg/tokens" +) + +func retrieveTokenTransferInputs(ctx context.Context, op *fftypes.Operation, transfer *fftypes.TokenTransfer) (err error) { + input := &op.Input + transfer.LocalID, err = fftypes.ParseUUID(ctx, input.GetString("id")) + if err != nil { + return err + } + return nil +} + +func (em *eventManager) persistTokenTransaction(ctx context.Context, ns string, transfer *fftypes.TokenTransfer, protocolTxID string, additionalInfo fftypes.JSONObject) (valid bool, err error) { + transfer.LocalID = nil + + // Find a matching operation within this transaction + fb := database.OperationQueryFactory.NewFilter(ctx) + filter := fb.And( + fb.Eq("tx", transfer.TX.ID), + fb.Eq("type", fftypes.OpTypeTokenTransfer), + ) + operations, _, err := em.database.GetOperations(ctx, filter) + if err != nil { + return false, err + } + if len(operations) > 0 { + err = retrieveTokenTransferInputs(ctx, operations[0], transfer) + if err != nil { + log.L(ctx).Warnf("Failed to read operation inputs for token transfer '%s': %s", transfer.ProtocolID, err) + } + } + + if transfer.LocalID == nil { + transfer.LocalID = fftypes.NewUUID() + } + + transaction := &fftypes.Transaction{ + ID: transfer.TX.ID, + Status: fftypes.OpStatusSucceeded, + Subject: fftypes.TransactionSubject{ + Namespace: ns, + Type: transfer.TX.Type, + Signer: transfer.Key, + Reference: transfer.LocalID, + }, + ProtocolID: protocolTxID, + Info: additionalInfo, + } + return em.txhelper.PersistTransaction(ctx, transaction) +} + +func (em *eventManager) getBatchForTransfer(ctx context.Context, transfer *fftypes.TokenTransfer) (*fftypes.UUID, error) { + // Find the messages assocated with that data + var messages []*fftypes.Message + fb := database.MessageQueryFactory.NewFilter(ctx) + filter := fb.And( + fb.Eq("pending", true), + fb.Eq("hash", transfer.MessageHash), + ) + messages, _, err := em.database.GetMessages(ctx, filter) + if err != nil || len(messages) == 0 { + return nil, err + } + return messages[0].BatchID, nil +} + +func (em *eventManager) TokensTransferred(tk tokens.Plugin, transfer *fftypes.TokenTransfer, protocolTxID string, additionalInfo fftypes.JSONObject) error { + var batchID *fftypes.UUID + + err := em.retry.Do(em.ctx, "persist token transfer", func(attempt int) (bool, error) { + err := em.database.RunAsGroup(em.ctx, func(ctx context.Context) error { + // Check that this is from a known pool + pool, err := em.database.GetTokenPoolByProtocolID(ctx, transfer.PoolProtocolID) + if err != nil { + return err + } + if pool == nil { + log.L(ctx).Warnf("Token transfer received for unknown pool '%s' - ignoring: %s", transfer.PoolProtocolID, protocolTxID) + return nil + } + + if transfer.TX.ID != nil { + if valid, err := em.persistTokenTransaction(ctx, pool.Namespace, transfer, protocolTxID, additionalInfo); err != nil || !valid { + return err + } + } else { + transfer.LocalID = fftypes.NewUUID() + } + + if err := em.database.UpsertTokenTransfer(ctx, transfer); err != nil { + log.L(ctx).Errorf("Failed to record token transfer '%s': %s", transfer.ProtocolID, err) + return err + } + + balance := &fftypes.TokenBalanceChange{ + PoolProtocolID: transfer.PoolProtocolID, + TokenIndex: transfer.TokenIndex, + } + + if transfer.Type != fftypes.TokenTransferTypeMint { + balance.Identity = transfer.From + balance.Amount.Int().Neg(transfer.Amount.Int()) + if err := em.database.AddTokenAccountBalance(ctx, balance); err != nil { + log.L(ctx).Errorf("Failed to update account '%s' for token transfer '%s': %s", balance.Identity, transfer.ProtocolID, err) + return err + } + } + + if transfer.Type != fftypes.TokenTransferTypeBurn { + balance.Identity = transfer.To + balance.Amount.Int().Set(transfer.Amount.Int()) + if err := em.database.AddTokenAccountBalance(ctx, balance); err != nil { + log.L(ctx).Errorf("Failed to update account '%s for token transfer '%s': %s", balance.Identity, transfer.ProtocolID, err) + return err + } + } + + log.L(ctx).Infof("Token transfer recorded id=%s author=%s", transfer.ProtocolID, transfer.Key) + + if transfer.MessageHash != nil { + if batchID, err = em.getBatchForTransfer(ctx, transfer); err != nil { + log.L(ctx).Errorf("Failed to lookup batch for token transfer '%s': %s", transfer.ProtocolID, err) + return err + } + } + + event := fftypes.NewEvent(fftypes.EventTypeTransferConfirmed, pool.Namespace, transfer.LocalID) + return em.database.InsertEvent(ctx, event) + }) + return err != nil, err // retry indefinitely (until context closes) + }) + + if err == nil { + // Initiate a rewind if a batch was potentially completed by the arrival of this transfer + if batchID != nil { + log.L(em.ctx).Infof("Batch '%s' contains reference to received transfer. Transfer='%s' Message='%s'", batchID, transfer.ProtocolID, transfer.MessageHash) + em.aggregator.offchainBatches <- batchID + } + } + + return err +} diff --git a/internal/assets/tokens_transferred_test.go b/internal/events/tokens_transferred_test.go similarity index 52% rename from internal/assets/tokens_transferred_test.go rename to internal/events/tokens_transferred_test.go index 2d224a3a9c..1ca2dc0987 100644 --- a/internal/assets/tokens_transferred_test.go +++ b/internal/events/tokens_transferred_test.go @@ -14,7 +14,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package assets +package events import ( "fmt" @@ -28,10 +28,11 @@ import ( "github.com/stretchr/testify/mock" ) -func TestTokensTransferredAddBalanceSucceedWithRetries(t *testing.T) { - am, cancel := newTestAssets(t) +func TestTokensTransferredSucceedWithRetries(t *testing.T) { + em, cancel := newTestEventManager(t) defer cancel() - mdi := am.database.(*databasemocks.Plugin) + + mdi := em.database.(*databasemocks.Plugin) mti := &tokenmocks.Plugin{} transfer := &fftypes.TokenTransfer{ @@ -59,20 +60,20 @@ func TestTokensTransferredAddBalanceSucceedWithRetries(t *testing.T) { Namespace: "ns1", } - mdi.On("GetTokenPoolByProtocolID", am.ctx, "F1").Return(nil, fmt.Errorf("pop")).Once() - mdi.On("GetTokenPoolByProtocolID", am.ctx, "F1").Return(pool, nil).Times(4) - mdi.On("UpsertTokenTransfer", am.ctx, transfer).Return(fmt.Errorf("pop")).Once() - mdi.On("UpsertTokenTransfer", am.ctx, transfer).Return(nil).Times(3) - mdi.On("AddTokenAccountBalance", am.ctx, fromBalance).Return(fmt.Errorf("pop")).Once() - mdi.On("AddTokenAccountBalance", am.ctx, fromBalance).Return(nil).Times(2) - mdi.On("AddTokenAccountBalance", am.ctx, toBalance).Return(fmt.Errorf("pop")).Once() - mdi.On("AddTokenAccountBalance", am.ctx, toBalance).Return(nil).Once() - mdi.On("InsertEvent", am.ctx, mock.MatchedBy(func(ev *fftypes.Event) bool { + mdi.On("GetTokenPoolByProtocolID", em.ctx, "F1").Return(nil, fmt.Errorf("pop")).Once() + mdi.On("GetTokenPoolByProtocolID", em.ctx, "F1").Return(pool, nil).Times(4) + mdi.On("UpsertTokenTransfer", em.ctx, transfer).Return(fmt.Errorf("pop")).Once() + mdi.On("UpsertTokenTransfer", em.ctx, transfer).Return(nil).Times(3) + mdi.On("AddTokenAccountBalance", em.ctx, fromBalance).Return(fmt.Errorf("pop")).Once() + mdi.On("AddTokenAccountBalance", em.ctx, fromBalance).Return(nil).Times(2) + mdi.On("AddTokenAccountBalance", em.ctx, toBalance).Return(fmt.Errorf("pop")).Once() + mdi.On("AddTokenAccountBalance", em.ctx, toBalance).Return(nil).Once() + mdi.On("InsertEvent", em.ctx, mock.MatchedBy(func(ev *fftypes.Event) bool { return ev.Type == fftypes.EventTypeTransferConfirmed && ev.Reference == transfer.LocalID && ev.Namespace == pool.Namespace })).Return(nil).Once() info := fftypes.JSONObject{"some": "info"} - err := am.TokensTransferred(mti, transfer, "tx1", info) + err := em.TokensTransferred(mti, transfer, "tx1", info) assert.NoError(t, err) mdi.AssertExpectations(t) @@ -80,9 +81,10 @@ func TestTokensTransferredAddBalanceSucceedWithRetries(t *testing.T) { } func TestTokensTransferredWithTransactionRetries(t *testing.T) { - am, cancel := newTestAssets(t) + em, cancel := newTestEventManager(t) defer cancel() - mdi := am.database.(*databasemocks.Plugin) + + mdi := em.database.(*databasemocks.Plugin) mti := &tokenmocks.Plugin{} transfer := &fftypes.TokenTransfer{ @@ -112,18 +114,18 @@ func TestTokensTransferredWithTransactionRetries(t *testing.T) { }, }} - mdi.On("GetTokenPoolByProtocolID", am.ctx, "F1").Return(pool, nil).Times(3) - mdi.On("GetOperations", am.ctx, mock.Anything).Return(nil, nil, fmt.Errorf("pop")).Once() - mdi.On("GetOperations", am.ctx, mock.Anything).Return(operationsBad, nil, nil).Once() - mdi.On("GetOperations", am.ctx, mock.Anything).Return(operationsGood, nil, nil).Once() - mdi.On("GetTransactionByID", am.ctx, transfer.TX.ID).Return(nil, fmt.Errorf("pop")).Once() - mdi.On("GetTransactionByID", am.ctx, transfer.TX.ID).Return(nil, nil).Once() - mdi.On("UpsertTransaction", am.ctx, mock.MatchedBy(func(t *fftypes.Transaction) bool { + mdi.On("GetTokenPoolByProtocolID", em.ctx, "F1").Return(pool, nil).Times(3) + mdi.On("GetOperations", em.ctx, mock.Anything).Return(nil, nil, fmt.Errorf("pop")).Once() + mdi.On("GetOperations", em.ctx, mock.Anything).Return(operationsBad, nil, nil).Once() + mdi.On("GetOperations", em.ctx, mock.Anything).Return(operationsGood, nil, nil).Once() + mdi.On("GetTransactionByID", em.ctx, transfer.TX.ID).Return(nil, fmt.Errorf("pop")).Once() + mdi.On("GetTransactionByID", em.ctx, transfer.TX.ID).Return(nil, nil).Once() + mdi.On("UpsertTransaction", em.ctx, mock.MatchedBy(func(t *fftypes.Transaction) bool { return *t.ID == *transfer.TX.ID && t.Subject.Type == fftypes.TransactionTypeTokenTransfer && t.ProtocolID == "tx1" }), false).Return(database.HashMismatch).Once() info := fftypes.JSONObject{"some": "info"} - err := am.TokensTransferred(mti, transfer, "tx1", info) + err := em.TokensTransferred(mti, transfer, "tx1", info) assert.NoError(t, err) mdi.AssertExpectations(t) @@ -131,9 +133,10 @@ func TestTokensTransferredWithTransactionRetries(t *testing.T) { } func TestTokensTransferredAddBalanceIgnore(t *testing.T) { - am, cancel := newTestAssets(t) + em, cancel := newTestEventManager(t) defer cancel() - mdi := am.database.(*databasemocks.Plugin) + + mdi := em.database.(*databasemocks.Plugin) mti := &tokenmocks.Plugin{} transfer := &fftypes.TokenTransfer{ @@ -146,10 +149,64 @@ func TestTokensTransferredAddBalanceIgnore(t *testing.T) { } transfer.Amount.Int().SetInt64(1) - mdi.On("GetTokenPoolByProtocolID", am.ctx, "F1").Return(nil, nil) + mdi.On("GetTokenPoolByProtocolID", em.ctx, "F1").Return(nil, nil) + + info := fftypes.JSONObject{"some": "info"} + err := em.TokensTransferred(mti, transfer, "tx1", info) + assert.NoError(t, err) + + mdi.AssertExpectations(t) + mti.AssertExpectations(t) +} + +func TestTokensTransferredWithMessage(t *testing.T) { + em, cancel := newTestEventManager(t) + defer cancel() + + mdi := em.database.(*databasemocks.Plugin) + mti := &tokenmocks.Plugin{} + + transfer := &fftypes.TokenTransfer{ + Type: fftypes.TokenTransferTypeTransfer, + PoolProtocolID: "F1", + TokenIndex: "0", + Key: "0x12345", + From: "0x1", + To: "0x2", + MessageHash: fftypes.NewRandB32(), + } + transfer.Amount.Int().SetInt64(1) + fromBalance := &fftypes.TokenBalanceChange{ + PoolProtocolID: "F1", + TokenIndex: "0", + Identity: "0x1", + } + fromBalance.Amount.Int().SetInt64(-1) + toBalance := &fftypes.TokenBalanceChange{ + PoolProtocolID: "F1", + TokenIndex: "0", + Identity: "0x2", + } + toBalance.Amount.Int().SetInt64(1) + pool := &fftypes.TokenPool{ + Namespace: "ns1", + } + messages := []*fftypes.Message{{ + BatchID: fftypes.NewUUID(), + }} + + mdi.On("GetTokenPoolByProtocolID", em.ctx, "F1").Return(pool, nil).Times(2) + mdi.On("UpsertTokenTransfer", em.ctx, transfer).Return(nil).Times(2) + mdi.On("AddTokenAccountBalance", em.ctx, fromBalance).Return(nil).Times(2) + mdi.On("AddTokenAccountBalance", em.ctx, toBalance).Return(nil).Times(2) + mdi.On("GetMessages", em.ctx, mock.Anything).Return(nil, nil, fmt.Errorf("pop")).Once() + mdi.On("GetMessages", em.ctx, mock.Anything).Return(messages, nil, nil).Once() + mdi.On("InsertEvent", em.ctx, mock.MatchedBy(func(ev *fftypes.Event) bool { + return ev.Type == fftypes.EventTypeTransferConfirmed && ev.Reference == transfer.LocalID && ev.Namespace == pool.Namespace + })).Return(nil).Once() info := fftypes.JSONObject{"some": "info"} - err := am.TokensTransferred(mti, transfer, "tx1", info) + err := em.TokensTransferred(mti, transfer, "tx1", info) assert.NoError(t, err) mdi.AssertExpectations(t) diff --git a/internal/i18n/en_translations.go b/internal/i18n/en_translations.go index 74f4997bab..9905330683 100644 --- a/internal/i18n/en_translations.go +++ b/internal/i18n/en_translations.go @@ -204,4 +204,5 @@ var ( MsgFabconnectRESTErr = ffm("FF10284", "Error from fabconnect: %s") MsgInvalidIdentity = ffm("FF10285", "Supplied Fabric signer identity is invalid", 400) MsgFailedToDecodeCertificate = ffm("FF10286", "Failed to decode certificate: %s", 500) + MsgInvalidMessageType = ffm("FF10287", "Invalid message type - allowed types are %s", 400) ) diff --git a/internal/orchestrator/bound_callbacks.go b/internal/orchestrator/bound_callbacks.go index 6777e7c389..891b1296b7 100644 --- a/internal/orchestrator/bound_callbacks.go +++ b/internal/orchestrator/bound_callbacks.go @@ -61,5 +61,5 @@ func (bc *boundCallbacks) TokenPoolCreated(plugin tokens.Plugin, pool *fftypes.T } func (bc *boundCallbacks) TokensTransferred(plugin tokens.Plugin, transfer *fftypes.TokenTransfer, protocolTxID string, additionalInfo fftypes.JSONObject) error { - return bc.am.TokensTransferred(plugin, transfer, protocolTxID, additionalInfo) + return bc.ei.TokensTransferred(plugin, transfer, protocolTxID, additionalInfo) } diff --git a/internal/orchestrator/bound_callbacks_test.go b/internal/orchestrator/bound_callbacks_test.go index 4f76a20259..65e9fb7ab2 100644 --- a/internal/orchestrator/bound_callbacks_test.go +++ b/internal/orchestrator/bound_callbacks_test.go @@ -73,7 +73,7 @@ func TestBoundCallbacks(t *testing.T) { err = bc.TokenPoolCreated(mti, pool, "tx12345", info) assert.EqualError(t, err, "pop") - mam.On("TokensTransferred", mti, transfer, "tx12345", info).Return(fmt.Errorf("pop")) + mei.On("TokensTransferred", mti, transfer, "tx12345", info).Return(fmt.Errorf("pop")) err = bc.TokensTransferred(mti, transfer, "tx12345", info) assert.EqualError(t, err, "pop") } diff --git a/internal/orchestrator/orchestrator.go b/internal/orchestrator/orchestrator.go index 9ded940ad7..7eabbba4a3 100644 --- a/internal/orchestrator/orchestrator.go +++ b/internal/orchestrator/orchestrator.go @@ -401,7 +401,7 @@ func (or *orchestrator) initComponents(ctx context.Context) (err error) { } if or.assets == nil { - or.assets, err = assets.NewAssetManager(ctx, or.database, or.identity, or.data, or.syncasync, or.broadcast, or.tokens) + or.assets, err = assets.NewAssetManager(ctx, or.database, or.identity, or.data, or.syncasync, or.broadcast, or.messaging, or.tokens) if err != nil { return err } diff --git a/internal/privatemessaging/privatemessaging.go b/internal/privatemessaging/privatemessaging.go index 3b46317a5c..e9bf698c1e 100644 --- a/internal/privatemessaging/privatemessaging.go +++ b/internal/privatemessaging/privatemessaging.go @@ -105,6 +105,7 @@ func NewPrivateMessaging(ctx context.Context, di database.Plugin, im identity.Ma ba.RegisterDispatcher([]fftypes.MessageType{ fftypes.MessageTypeGroupInit, fftypes.MessageTypePrivate, + fftypes.MessageTypeTransferPrivate, }, pm.dispatchBatch, bo) return pm, nil diff --git a/internal/privatemessaging/privatemessaging_test.go b/internal/privatemessaging/privatemessaging_test.go index 8efa21f312..8b8cdf1bf1 100644 --- a/internal/privatemessaging/privatemessaging_test.go +++ b/internal/privatemessaging/privatemessaging_test.go @@ -51,7 +51,11 @@ func newTestPrivateMessaging(t *testing.T) (*privateMessaging, func()) { msa := &syncasyncmocks.Bridge{} mbp := &batchpinmocks.Submitter{} - mba.On("RegisterDispatcher", []fftypes.MessageType{fftypes.MessageTypeGroupInit, fftypes.MessageTypePrivate}, mock.Anything, mock.Anything).Return() + mba.On("RegisterDispatcher", []fftypes.MessageType{ + fftypes.MessageTypeGroupInit, + fftypes.MessageTypePrivate, + fftypes.MessageTypeTransferPrivate, + }, mock.Anything, mock.Anything).Return() ctx, cancel := context.WithCancel(context.Background()) pm, err := NewPrivateMessaging(ctx, mdi, mim, mdx, mbi, mba, mdm, msa, mbp) diff --git a/internal/tokens/fftokens/fftokens.go b/internal/tokens/fftokens/fftokens.go index d66330c3b5..fd353f3452 100644 --- a/internal/tokens/fftokens/fftokens.go +++ b/internal/tokens/fftokens/fftokens.go @@ -88,6 +88,7 @@ type transferTokens struct { Amount string `json:"amount"` RequestID string `json:"requestId,omitempty"` TrackingID string `json:"trackingId"` + Data string `json:"data,omitempty"` } func (h *FFTokens) Name() string { @@ -221,14 +222,22 @@ func (h *FFTokens) handleTokenTransfer(ctx context.Context, t fftypes.TokenTrans } // We want to process all transfers, even those not initiated by FireFly. - // The trackingID is an optional argument from the connector, so it's important not to - // fail if it's missing or malformed. + // The following are optional arguments from the connector, so it's important not to + // fail if they're missing or malformed. trackingID := data.GetString("trackingId") txID, err := fftypes.ParseUUID(ctx, trackingID) if err != nil { log.L(ctx).Infof("%s event contains invalid ID - continuing anyway (%s): %+v", eventName, err, data) txID = fftypes.NewUUID() } + transferData := data.GetString("data") + var messageHash fftypes.Bytes32 + if transferData != "" { + err = messageHash.UnmarshalText([]byte(transferData)) + if err != nil { + log.L(ctx).Errorf("%s event contains invalid message hash - continuing anyway (%s): %+v", eventName, err, data) + } + } transfer := &fftypes.TokenTransfer{ Type: t, @@ -238,6 +247,7 @@ func (h *FFTokens) handleTokenTransfer(ctx context.Context, t fftypes.TokenTrans To: toAddress, ProtocolID: txHash, Key: operatorAddress, + MessageHash: &messageHash, TX: fftypes.TransactionRef{ ID: txID, Type: fftypes.TransactionTypeTokenTransfer, @@ -368,6 +378,7 @@ func (h *FFTokens) TransferTokens(ctx context.Context, operationID *fftypes.UUID Amount: transfer.Amount.Int().String(), RequestID: operationID.String(), TrackingID: transfer.TX.ID.String(), + Data: transfer.MessageHash.String(), }). Post("/api/v1/transfer") if err != nil || !res.IsSuccess() { diff --git a/internal/tokens/fftokens/fftokens_test.go b/internal/tokens/fftokens/fftokens_test.go index 0070cdd012..bc3f6228d0 100644 --- a/internal/tokens/fftokens/fftokens_test.go +++ b/internal/tokens/fftokens/fftokens_test.go @@ -412,21 +412,29 @@ func TestEvents(t *testing.T) { msg = <-toServer assert.Equal(t, `{"data":{"id":"13"},"event":"ack"}`, string(msg)) - // token-transfer: success + // token-transfer: bad message hash (success) mcb.On("TokensTransferred", h, mock.MatchedBy(func(t *fftypes.TokenTransfer) bool { return t.PoolProtocolID == "F1" && t.Amount.Int().Int64() == 2 && t.From == "0x0" && t.To == "0x1" && t.TokenIndex == "" }), "abc", fftypes.JSONObject{"transactionHash": "abc"}).Return(nil) - fromServer <- `{"id":"14","event":"token-transfer","data":{"poolId":"F1","operator":"0x0","from":"0x0","to":"0x1","amount":"2","trackingId":"` + txID.String() + `","transaction":{"transactionHash":"abc"}}}` + fromServer <- `{"id":"14","event":"token-transfer","data":{"poolId":"F1","operator":"0x0","from":"0x0","to":"0x1","amount":"2","trackingId":"` + txID.String() + `","data":"bad","transaction":{"transactionHash":"abc"}}}` msg = <-toServer assert.Equal(t, `{"data":{"id":"14"},"event":"ack"}`, string(msg)) + // token-transfer: success + mcb.On("TokensTransferred", h, mock.MatchedBy(func(t *fftypes.TokenTransfer) bool { + return t.PoolProtocolID == "F1" && t.Amount.Int().Int64() == 2 && t.From == "0x0" && t.To == "0x1" && t.TokenIndex == "" + }), "abc", fftypes.JSONObject{"transactionHash": "abc"}).Return(nil) + fromServer <- `{"id":"15","event":"token-transfer","data":{"poolId":"F1","operator":"0x0","from":"0x0","to":"0x1","amount":"2","trackingId":"` + txID.String() + `","transaction":{"transactionHash":"abc"}}}` + msg = <-toServer + assert.Equal(t, `{"data":{"id":"15"},"event":"ack"}`, string(msg)) + // token-burn: success mcb.On("TokensTransferred", h, mock.MatchedBy(func(t *fftypes.TokenTransfer) bool { return t.PoolProtocolID == "F1" && t.Amount.Int().Int64() == 2 && t.From == "0x0" && t.TokenIndex == "0" }), "abc", fftypes.JSONObject{"transactionHash": "abc"}).Return(nil) - fromServer <- `{"id":"15","event":"token-burn","data":{"poolId":"F1","tokenIndex":"0","operator":"0x0","from":"0x0","amount":"2","trackingId":"` + txID.String() + `","transaction":{"transactionHash":"abc"}}}` + fromServer <- `{"id":"16","event":"token-burn","data":{"poolId":"F1","tokenIndex":"0","operator":"0x0","from":"0x0","amount":"2","trackingId":"` + txID.String() + `","transaction":{"transactionHash":"abc"}}}` msg = <-toServer - assert.Equal(t, `{"data":{"id":"15"},"event":"ack"}`, string(msg)) + assert.Equal(t, `{"data":{"id":"16"},"event":"ack"}`, string(msg)) mcb.AssertExpectations(t) } diff --git a/mocks/assetmocks/manager.go b/mocks/assetmocks/manager.go index e3672c7be6..12618baa24 100644 --- a/mocks/assetmocks/manager.go +++ b/mocks/assetmocks/manager.go @@ -234,26 +234,12 @@ func (_m *Manager) TokenPoolCreated(tk tokens.Plugin, pool *fftypes.TokenPool, p return r0 } -// TokensTransferred provides a mock function with given fields: tk, transfer, protocolTxID, additionalInfo -func (_m *Manager) TokensTransferred(tk tokens.Plugin, transfer *fftypes.TokenTransfer, protocolTxID string, additionalInfo fftypes.JSONObject) error { - ret := _m.Called(tk, transfer, protocolTxID, additionalInfo) - - var r0 error - if rf, ok := ret.Get(0).(func(tokens.Plugin, *fftypes.TokenTransfer, string, fftypes.JSONObject) error); ok { - r0 = rf(tk, transfer, protocolTxID, additionalInfo) - } else { - r0 = ret.Error(0) - } - - return r0 -} - // TransferTokens provides a mock function with given fields: ctx, ns, typeName, poolName, transfer, waitConfirm -func (_m *Manager) TransferTokens(ctx context.Context, ns string, typeName string, poolName string, transfer *fftypes.TokenTransfer, waitConfirm bool) (*fftypes.TokenTransfer, error) { +func (_m *Manager) TransferTokens(ctx context.Context, ns string, typeName string, poolName string, transfer *fftypes.TokenTransferInput, waitConfirm bool) (*fftypes.TokenTransfer, error) { ret := _m.Called(ctx, ns, typeName, poolName, transfer, waitConfirm) var r0 *fftypes.TokenTransfer - if rf, ok := ret.Get(0).(func(context.Context, string, string, string, *fftypes.TokenTransfer, bool) *fftypes.TokenTransfer); ok { + if rf, ok := ret.Get(0).(func(context.Context, string, string, string, *fftypes.TokenTransferInput, bool) *fftypes.TokenTransfer); ok { r0 = rf(ctx, ns, typeName, poolName, transfer, waitConfirm) } else { if ret.Get(0) != nil { @@ -262,7 +248,7 @@ func (_m *Manager) TransferTokens(ctx context.Context, ns string, typeName strin } var r1 error - if rf, ok := ret.Get(1).(func(context.Context, string, string, string, *fftypes.TokenTransfer, bool) error); ok { + if rf, ok := ret.Get(1).(func(context.Context, string, string, string, *fftypes.TokenTransferInput, bool) error); ok { r1 = rf(ctx, ns, typeName, poolName, transfer, waitConfirm) } else { r1 = ret.Error(1) diff --git a/mocks/eventmocks/event_manager.go b/mocks/eventmocks/event_manager.go index 6c4a632a9e..2c570881ef 100644 --- a/mocks/eventmocks/event_manager.go +++ b/mocks/eventmocks/event_manager.go @@ -14,6 +14,8 @@ import ( mock "github.com/stretchr/testify/mock" system "github.com/hyperledger/firefly/internal/events/system" + + tokens "github.com/hyperledger/firefly/pkg/tokens" ) // EventManager is an autogenerated mock type for the EventManager type @@ -229,6 +231,20 @@ func (_m *EventManager) SubscriptionUpdates() chan<- *fftypes.UUID { return r0 } +// TokensTransferred provides a mock function with given fields: tk, transfer, protocolTxID, additionalInfo +func (_m *EventManager) TokensTransferred(tk tokens.Plugin, transfer *fftypes.TokenTransfer, protocolTxID string, additionalInfo fftypes.JSONObject) error { + ret := _m.Called(tk, transfer, protocolTxID, additionalInfo) + + var r0 error + if rf, ok := ret.Get(0).(func(tokens.Plugin, *fftypes.TokenTransfer, string, fftypes.JSONObject) error); ok { + r0 = rf(tk, transfer, protocolTxID, additionalInfo) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // TransferResult provides a mock function with given fields: dx, trackingID, status, info, opOutput func (_m *EventManager) TransferResult(dx dataexchange.Plugin, trackingID string, status fftypes.OpStatus, info string, opOutput fftypes.JSONObject) error { ret := _m.Called(dx, trackingID, status, info, opOutput) diff --git a/pkg/fftypes/message.go b/pkg/fftypes/message.go index cd0870d399..3bb37452b1 100644 --- a/pkg/fftypes/message.go +++ b/pkg/fftypes/message.go @@ -41,6 +41,10 @@ var ( MessageTypePrivate MessageType = ffEnum("messagetype", "private") // MessageTypeGroupInit is a special private message that contains the definition of the group MessageTypeGroupInit MessageType = ffEnum("messagetype", "groupinit") + // MessageTypeTransferBroadcast is a broadcast message to accompany/annotate a token transfer + MessageTypeTransferBroadcast MessageType = ffEnum("messagetype", "transfer_broadcast") + // MessageTypeTransferPrivate is a private message to accompany/annotate a token transfer + MessageTypeTransferPrivate MessageType = ffEnum("messagetype", "transfer_private") ) // MessageHeader contains all fields that contribute to the hash diff --git a/pkg/fftypes/tokentransfer.go b/pkg/fftypes/tokentransfer.go index de5bc251f0..22136cf30a 100644 --- a/pkg/fftypes/tokentransfer.go +++ b/pkg/fftypes/tokentransfer.go @@ -38,3 +38,8 @@ type TokenTransfer struct { Created *FFTime `json:"created,omitempty"` TX TransactionRef `json:"tx,omitempty"` } + +type TokenTransferInput struct { + TokenTransfer + Message *MessageInOut `json:"message,omitempty"` +}