diff --git a/Makefile b/Makefile index 5b467f941c..1494f3bf43 100644 --- a/Makefile +++ b/Makefile @@ -49,6 +49,7 @@ $(eval $(call makemock, pkg/wsclient, WSClient, wsmocks)) $(eval $(call makemock, internal/identity, Manager, identitymanagermocks)) $(eval $(call makemock, internal/batchpin, Submitter, batchpinmocks)) $(eval $(call makemock, internal/sysmessaging, SystemEvents, sysmessagingmocks)) +$(eval $(call makemock, internal/sysmessaging, MessageSender, sysmessagingmocks)) $(eval $(call makemock, internal/syncasync, Bridge, syncasyncmocks)) $(eval $(call makemock, internal/data, Manager, datamocks)) $(eval $(call makemock, internal/batch, Manager, batchmocks)) diff --git a/docs/swagger/swagger.yaml b/docs/swagger/swagger.yaml index ed4d8ba364..15f27e31d4 100644 --- a/docs/swagger/swagger.yaml +++ b/docs/swagger/swagger.yaml @@ -4743,12 +4743,120 @@ paths: schema: properties: amount: {} + created: {} from: type: string key: type: string + localId: {} + message: + properties: + batch: {} + confirmed: {} + data: + items: + properties: + blob: + properties: + hash: {} + public: + type: string + type: object + datatype: + properties: + name: + type: string + version: + type: string + type: object + hash: {} + id: {} + validator: + type: string + value: + format: byte + type: string + type: object + type: array + group: + properties: + ledger: {} + members: + items: + properties: + identity: + type: string + node: + type: string + type: object + type: array + name: + type: string + type: object + hash: {} + header: + properties: + author: + type: string + cid: {} + created: {} + datahash: {} + group: {} + id: {} + key: + type: string + namespace: + type: string + tag: + type: string + topics: + items: + type: string + type: array + txtype: + type: string + type: + enum: + - definition + - broadcast + - private + - groupinit + - transfer_broadcast + - transfer_private + type: string + type: object + local: + type: boolean + pending: + type: boolean + pins: + items: + type: string + type: array + rejected: + type: boolean + type: object + messageHash: {} + poolProtocolId: + type: string + protocolId: + type: string + to: + type: string tokenIndex: type: string + tx: + properties: + id: {} + type: + type: string + type: object + type: + enum: + - mint + - burn + - transfer + type: string type: object responses: "200": @@ -4857,10 +4965,120 @@ paths: schema: properties: amount: {} + created: {} + from: + type: string key: type: string + localId: {} + message: + properties: + batch: {} + confirmed: {} + data: + items: + properties: + blob: + properties: + hash: {} + public: + type: string + type: object + datatype: + properties: + name: + type: string + version: + type: string + type: object + hash: {} + id: {} + validator: + type: string + value: + format: byte + type: string + type: object + type: array + group: + properties: + ledger: {} + members: + items: + properties: + identity: + type: string + node: + type: string + type: object + type: array + name: + type: string + type: object + hash: {} + header: + properties: + author: + type: string + cid: {} + created: {} + datahash: {} + group: {} + id: {} + key: + type: string + namespace: + type: string + tag: + type: string + topics: + items: + type: string + type: array + txtype: + type: string + type: + enum: + - definition + - broadcast + - private + - groupinit + - transfer_broadcast + - transfer_private + type: string + type: object + local: + type: boolean + pending: + type: boolean + pins: + items: + type: string + type: array + rejected: + type: boolean + type: object + messageHash: {} + poolProtocolId: + type: string + protocolId: + type: string to: type: string + tokenIndex: + type: string + tx: + properties: + id: {} + type: + type: string + type: object + type: + enum: + - mint + - burn + - transfer + type: string type: object responses: "200": diff --git a/internal/apiserver/route_post_token_burn.go b/internal/apiserver/route_post_token_burn.go index 607a8a8b9f..35e579b55f 100644 --- a/internal/apiserver/route_post_token_burn.go +++ b/internal/apiserver/route_post_token_burn.go @@ -40,13 +40,13 @@ var postTokenBurn = &oapispec.Route{ }, FilterFactory: nil, Description: i18n.MsgTBD, - JSONInputValue: func() interface{} { return &fftypes.TokenTransfer{} }, + JSONInputValue: func() interface{} { return &fftypes.TokenTransferInput{} }, JSONInputMask: []string{"Type", "LocalID", "PoolProtocolID", "To", "ProtocolID", "MessageHash", "TX", "Created"}, JSONOutputValue: func() interface{} { return &fftypes.TokenTransfer{} }, JSONOutputCodes: []int{http.StatusAccepted, http.StatusOK}, JSONHandler: func(r *oapispec.APIRequest) (output interface{}, err error) { waitConfirm := strings.EqualFold(r.QP["confirm"], "true") r.SuccessStatus = syncRetcode(waitConfirm) - return r.Or.Assets().BurnTokens(r.Ctx, r.PP["ns"], r.PP["type"], r.PP["name"], r.Input.(*fftypes.TokenTransfer), waitConfirm) + return r.Or.Assets().BurnTokens(r.Ctx, r.PP["ns"], r.PP["type"], r.PP["name"], r.Input.(*fftypes.TokenTransferInput), waitConfirm) }, } diff --git a/internal/apiserver/route_post_token_burn_test.go b/internal/apiserver/route_post_token_burn_test.go index a2addefaf1..6b4de5ee98 100644 --- a/internal/apiserver/route_post_token_burn_test.go +++ b/internal/apiserver/route_post_token_burn_test.go @@ -32,14 +32,14 @@ func TestPostTokenBurn(t *testing.T) { o, r := newTestAPIServer() mam := &assetmocks.Manager{} o.On("Assets").Return(mam) - input := fftypes.TokenTransfer{} + input := fftypes.TokenTransferInput{} var buf bytes.Buffer json.NewEncoder(&buf).Encode(&input) req := httptest.NewRequest("POST", "/api/v1/namespaces/ns1/tokens/tok1/pools/pool1/burn", &buf) req.Header.Set("Content-Type", "application/json; charset=utf-8") res := httptest.NewRecorder() - mam.On("BurnTokens", mock.Anything, "ns1", "tok1", "pool1", mock.AnythingOfType("*fftypes.TokenTransfer"), false). + mam.On("BurnTokens", mock.Anything, "ns1", "tok1", "pool1", mock.AnythingOfType("*fftypes.TokenTransferInput"), false). Return(&fftypes.TokenTransfer{}, nil) r.ServeHTTP(res, req) diff --git a/internal/apiserver/route_post_token_mint.go b/internal/apiserver/route_post_token_mint.go index ed874752e8..b134aef51f 100644 --- a/internal/apiserver/route_post_token_mint.go +++ b/internal/apiserver/route_post_token_mint.go @@ -40,13 +40,13 @@ var postTokenMint = &oapispec.Route{ }, FilterFactory: nil, Description: i18n.MsgTBD, - JSONInputValue: func() interface{} { return &fftypes.TokenTransfer{} }, + JSONInputValue: func() interface{} { return &fftypes.TokenTransferInput{} }, JSONInputMask: []string{"Type", "LocalID", "PoolProtocolID", "TokenIndex", "From", "ProtocolID", "MessageHash", "TX", "Created"}, JSONOutputValue: func() interface{} { return &fftypes.TokenTransfer{} }, JSONOutputCodes: []int{http.StatusAccepted, http.StatusOK}, JSONHandler: func(r *oapispec.APIRequest) (output interface{}, err error) { waitConfirm := strings.EqualFold(r.QP["confirm"], "true") r.SuccessStatus = syncRetcode(waitConfirm) - return r.Or.Assets().MintTokens(r.Ctx, r.PP["ns"], r.PP["type"], r.PP["name"], r.Input.(*fftypes.TokenTransfer), waitConfirm) + return r.Or.Assets().MintTokens(r.Ctx, r.PP["ns"], r.PP["type"], r.PP["name"], r.Input.(*fftypes.TokenTransferInput), waitConfirm) }, } diff --git a/internal/apiserver/route_post_token_mint_test.go b/internal/apiserver/route_post_token_mint_test.go index 28af3cf91b..1b4eca0c64 100644 --- a/internal/apiserver/route_post_token_mint_test.go +++ b/internal/apiserver/route_post_token_mint_test.go @@ -32,14 +32,14 @@ func TestPostTokenMint(t *testing.T) { o, r := newTestAPIServer() mam := &assetmocks.Manager{} o.On("Assets").Return(mam) - input := fftypes.TokenTransfer{} + input := fftypes.TokenTransferInput{} var buf bytes.Buffer json.NewEncoder(&buf).Encode(&input) req := httptest.NewRequest("POST", "/api/v1/namespaces/ns1/tokens/tok1/pools/pool1/mint", &buf) req.Header.Set("Content-Type", "application/json; charset=utf-8") res := httptest.NewRecorder() - mam.On("MintTokens", mock.Anything, "ns1", "tok1", "pool1", mock.AnythingOfType("*fftypes.TokenTransfer"), false). + mam.On("MintTokens", mock.Anything, "ns1", "tok1", "pool1", mock.AnythingOfType("*fftypes.TokenTransferInput"), false). Return(&fftypes.TokenTransfer{}, nil) r.ServeHTTP(res, req) diff --git a/internal/assets/manager.go b/internal/assets/manager.go index ac892cbe44..33f9bd6ded 100644 --- a/internal/assets/manager.go +++ b/internal/assets/manager.go @@ -28,6 +28,7 @@ import ( "github.com/hyperledger/firefly/internal/privatemessaging" "github.com/hyperledger/firefly/internal/retry" "github.com/hyperledger/firefly/internal/syncasync" + "github.com/hyperledger/firefly/internal/sysmessaging" "github.com/hyperledger/firefly/internal/txcommon" "github.com/hyperledger/firefly/pkg/database" "github.com/hyperledger/firefly/pkg/fftypes" @@ -41,8 +42,9 @@ type Manager interface { GetTokenAccounts(ctx context.Context, ns, typeName, poolName string, filter database.AndFilter) ([]*fftypes.TokenAccount, *database.FilterResult, error) ValidateTokenPoolTx(ctx context.Context, pool *fftypes.TokenPool, protocolTxID string) error GetTokenTransfers(ctx context.Context, ns, typeName, poolName string, filter database.AndFilter) ([]*fftypes.TokenTransfer, *database.FilterResult, error) - MintTokens(ctx context.Context, ns, typeName, poolName string, transfer *fftypes.TokenTransfer, waitConfirm bool) (*fftypes.TokenTransfer, error) - BurnTokens(ctx context.Context, ns, typeName, poolName string, transfer *fftypes.TokenTransfer, waitConfirm bool) (*fftypes.TokenTransfer, error) + NewTransfer(ns, typeName, poolName string, transfer *fftypes.TokenTransferInput) sysmessaging.MessageSender + MintTokens(ctx context.Context, ns, typeName, poolName string, transfer *fftypes.TokenTransferInput, waitConfirm bool) (*fftypes.TokenTransfer, error) + BurnTokens(ctx context.Context, ns, typeName, poolName string, transfer *fftypes.TokenTransferInput, waitConfirm bool) (*fftypes.TokenTransfer, error) TransferTokens(ctx context.Context, ns, typeName, poolName string, transfer *fftypes.TokenTransferInput, waitConfirm bool) (*fftypes.TokenTransfer, error) // Bound token callbacks @@ -151,7 +153,8 @@ func (am *assetManager) createTokenPoolWithID(ctx context.Context, id *fftypes.U } if waitConfirm { - return am.syncasync.SendConfirmTokenPool(ctx, ns, func(requestID *fftypes.UUID) error { + requestID := fftypes.NewUUID() + return am.syncasync.SendConfirmTokenPool(ctx, ns, requestID, func(ctx context.Context) error { _, err := am.createTokenPoolWithID(ctx, requestID, ns, typeName, pool, false) return err }) @@ -176,10 +179,8 @@ func (am *assetManager) createTokenPoolWithID(ctx context.Context, id *fftypes.U pool.ID = id pool.Namespace = ns - pool.TX = fftypes.TransactionRef{ - ID: tx.ID, - Type: tx.Subject.Type, - } + pool.TX.ID = tx.ID + pool.TX.Type = tx.Subject.Type op := fftypes.NewTXOperation( plugin, @@ -253,7 +254,45 @@ func (am *assetManager) GetTokenTransfers(ctx context.Context, ns, typeName, nam return am.database.GetTokenTransfers(ctx, filter.Condition(filter.Builder().Eq("poolprotocolid", pool.ProtocolID))) } -func (am *assetManager) MintTokens(ctx context.Context, ns, typeName, poolName string, transfer *fftypes.TokenTransfer, waitConfirm bool) (*fftypes.TokenTransfer, error) { +func (am *assetManager) NewTransfer(ns, typeName, poolName string, transfer *fftypes.TokenTransferInput) sysmessaging.MessageSender { + sender := &transferSender{ + mgr: am, + namespace: ns, + typeName: typeName, + poolName: poolName, + transfer: transfer, + } + sender.setDefaults() + return sender +} + +type transferSender struct { + mgr *assetManager + namespace string + typeName string + poolName string + transfer *fftypes.TokenTransferInput + sendCallback sysmessaging.BeforeSendCallback +} + +func (s *transferSender) Send(ctx context.Context) error { + return s.resolveAndSend(ctx, false) +} + +func (s *transferSender) SendAndWait(ctx context.Context) error { + return s.resolveAndSend(ctx, true) +} + +func (s *transferSender) BeforeSend(cb sysmessaging.BeforeSendCallback) sysmessaging.MessageSender { + s.sendCallback = cb + return s +} + +func (s *transferSender) setDefaults() { + s.transfer.LocalID = fftypes.NewUUID() +} + +func (am *assetManager) MintTokens(ctx context.Context, ns, typeName, poolName string, transfer *fftypes.TokenTransferInput, waitConfirm bool) (out *fftypes.TokenTransfer, err error) { transfer.Type = fftypes.TokenTransferTypeMint if transfer.Key == "" { org, err := am.identity.GetLocalOrganization(ctx) @@ -266,10 +305,17 @@ func (am *assetManager) MintTokens(ctx context.Context, ns, typeName, poolName s if transfer.To == "" { transfer.To = transfer.Key } - return am.transferTokensWithID(ctx, fftypes.NewUUID(), ns, typeName, poolName, transfer, waitConfirm) + + sender := am.NewTransfer(ns, typeName, poolName, transfer) + if waitConfirm { + err = sender.SendAndWait(ctx) + } else { + err = sender.Send(ctx) + } + return &transfer.TokenTransfer, err } -func (am *assetManager) BurnTokens(ctx context.Context, ns, typeName, poolName string, transfer *fftypes.TokenTransfer, waitConfirm bool) (*fftypes.TokenTransfer, error) { +func (am *assetManager) BurnTokens(ctx context.Context, ns, typeName, poolName string, transfer *fftypes.TokenTransferInput, waitConfirm bool) (out *fftypes.TokenTransfer, err error) { transfer.Type = fftypes.TokenTransferTypeBurn if transfer.Key == "" { org, err := am.identity.GetLocalOrganization(ctx) @@ -282,28 +328,17 @@ func (am *assetManager) BurnTokens(ctx context.Context, ns, typeName, poolName s transfer.From = transfer.Key } transfer.To = "" - return am.transferTokensWithID(ctx, fftypes.NewUUID(), ns, typeName, poolName, transfer, waitConfirm) -} -func (am *assetManager) sendTransferMessage(ctx context.Context, ns string, in *fftypes.MessageInOut) (*fftypes.Message, error) { - allowedTypes := []fftypes.FFEnum{ - fftypes.MessageTypeTransferBroadcast, - fftypes.MessageTypeTransferPrivate, - } - if in.Header.Type == "" { - in.Header.Type = fftypes.MessageTypeTransferBroadcast - } - switch in.Header.Type { - case fftypes.MessageTypeTransferBroadcast: - return am.broadcast.BroadcastMessage(ctx, ns, in, false) - case fftypes.MessageTypeTransferPrivate: - return am.messaging.SendMessage(ctx, ns, in, false) - default: - return nil, i18n.NewError(ctx, i18n.MsgInvalidMessageType, allowedTypes) + sender := am.NewTransfer(ns, typeName, poolName, transfer) + if waitConfirm { + err = sender.SendAndWait(ctx) + } else { + err = sender.Send(ctx) } + return &transfer.TokenTransfer, err } -func (am *assetManager) TransferTokens(ctx context.Context, ns, typeName, poolName string, transfer *fftypes.TokenTransferInput, waitConfirm bool) (*fftypes.TokenTransfer, error) { +func (am *assetManager) TransferTokens(ctx context.Context, ns, typeName, poolName string, transfer *fftypes.TokenTransferInput, waitConfirm bool) (out *fftypes.TokenTransfer, err error) { transfer.Type = fftypes.TokenTransferTypeTransfer if transfer.Key == "" { org, err := am.identity.GetLocalOrganization(ctx) @@ -322,80 +357,128 @@ func (am *assetManager) TransferTokens(ctx context.Context, ns, typeName, poolNa return nil, i18n.NewError(ctx, i18n.MsgCannotTransferToSelf) } - if transfer.Message != nil { - msg, err := am.sendTransferMessage(ctx, ns, transfer.Message) - if err != nil { - return nil, err - } - transfer.MessageHash = msg.Hash + sender := am.NewTransfer(ns, typeName, poolName, transfer) + if waitConfirm { + err = sender.SendAndWait(ctx) + } else { + err = sender.Send(ctx) } - - result, err := am.transferTokensWithID(ctx, fftypes.NewUUID(), ns, typeName, poolName, &transfer.TokenTransfer, waitConfirm) - return result, err + return &transfer.TokenTransfer, err } -func (am *assetManager) transferTokensWithID(ctx context.Context, id *fftypes.UUID, ns, typeName, poolName string, transfer *fftypes.TokenTransfer, waitConfirm bool) (*fftypes.TokenTransfer, error) { - plugin, err := am.selectTokenPlugin(ctx, typeName) +func (s *transferSender) resolveAndSend(ctx context.Context, waitConfirm bool) (err error) { + plugin, err := s.mgr.selectTokenPlugin(ctx, s.typeName) if err != nil { - return nil, err + return err } - pool, err := am.GetTokenPool(ctx, ns, typeName, poolName) + pool, err := s.mgr.GetTokenPool(ctx, s.namespace, s.typeName, s.poolName) if err != nil { - return nil, err + return err } + s.transfer.PoolProtocolID = pool.ProtocolID - if waitConfirm { - return am.syncasync.SendConfirmTokenTransfer(ctx, ns, func(requestID *fftypes.UUID) error { - _, err := am.transferTokensWithID(ctx, requestID, ns, typeName, poolName, transfer, false) + var messageSender sysmessaging.MessageSender + if s.transfer.Message != nil { + if messageSender, err = s.buildTransferMessage(ctx, s.namespace, s.transfer.Message); err != nil { return err - }) + } + } + + switch { + case waitConfirm && messageSender != nil: + // prepare the message, send the transfer async, then send the message and wait + return messageSender. + BeforeSend(func(ctx context.Context) error { + s.transfer.MessageHash = s.transfer.Message.Hash + s.transfer.Message = nil + return s.Send(ctx) + }). + SendAndWait(ctx) + case waitConfirm: + // no message - just send the transfer and wait + return s.sendSync(ctx) + case messageSender != nil: + // send the message async and then move on to the transfer + if err := messageSender.Send(ctx); err != nil { + return err + } + s.transfer.MessageHash = s.transfer.Message.Hash } tx := &fftypes.Transaction{ ID: fftypes.NewUUID(), Subject: fftypes.TransactionSubject{ - Namespace: ns, + Namespace: s.namespace, Type: fftypes.TransactionTypeTokenTransfer, - Signer: transfer.Key, - Reference: id, + Signer: s.transfer.Key, + Reference: s.transfer.LocalID, }, Created: fftypes.Now(), Status: fftypes.OpStatusPending, } tx.Hash = tx.Subject.Hash() - err = am.database.UpsertTransaction(ctx, tx, false /* should be new, or idempotent replay */) + err = s.mgr.database.UpsertTransaction(ctx, tx, false /* should be new, or idempotent replay */) if err != nil { - return nil, err + return err } - transfer.LocalID = id - transfer.PoolProtocolID = pool.ProtocolID - transfer.TX.ID = tx.ID - transfer.TX.Type = tx.Subject.Type + s.transfer.TX.ID = tx.ID + s.transfer.TX.Type = tx.Subject.Type op := fftypes.NewTXOperation( plugin, - ns, + s.namespace, tx.ID, "", fftypes.OpTypeTokenTransfer, fftypes.OpStatusPending, "") - addTokenTransferInputs(op, transfer) - err = am.database.UpsertOperation(ctx, op, false) - if err != nil { - return nil, err + addTokenTransferInputs(op, &s.transfer.TokenTransfer) + if err := s.mgr.database.UpsertOperation(ctx, op, false); err != nil { + return err + } + + if s.sendCallback != nil { + if err := s.sendCallback(ctx); err != nil { + return err + } } - switch transfer.Type { + switch s.transfer.Type { case fftypes.TokenTransferTypeMint: - return transfer, plugin.MintTokens(ctx, op.ID, transfer) + return plugin.MintTokens(ctx, op.ID, &s.transfer.TokenTransfer) case fftypes.TokenTransferTypeTransfer: - return transfer, plugin.TransferTokens(ctx, op.ID, transfer) + return plugin.TransferTokens(ctx, op.ID, &s.transfer.TokenTransfer) case fftypes.TokenTransferTypeBurn: - return transfer, plugin.BurnTokens(ctx, op.ID, transfer) + return plugin.BurnTokens(ctx, op.ID, &s.transfer.TokenTransfer) + default: + panic(fmt.Sprintf("unknown transfer type: %v", s.transfer.Type)) + } +} + +func (s *transferSender) sendSync(ctx context.Context) error { + out, err := s.mgr.syncasync.SendConfirmTokenTransfer(ctx, s.namespace, s.transfer.LocalID, s.Send) + if out != nil { + s.transfer.TokenTransfer = *out + } + return err +} + +func (s *transferSender) buildTransferMessage(ctx context.Context, ns string, in *fftypes.MessageInOut) (sysmessaging.MessageSender, error) { + allowedTypes := []fftypes.FFEnum{ + fftypes.MessageTypeTransferBroadcast, + fftypes.MessageTypeTransferPrivate, + } + if in.Header.Type == "" { + in.Header.Type = fftypes.MessageTypeTransferBroadcast + } + switch in.Header.Type { + case fftypes.MessageTypeTransferBroadcast: + return s.mgr.broadcast.NewBroadcast(ns, in), nil + case fftypes.MessageTypeTransferPrivate: + return s.mgr.messaging.NewMessage(ns, in), nil default: - panic(fmt.Sprintf("unknown transfer type: %v", transfer.Type)) + return nil, i18n.NewError(ctx, i18n.MsgInvalidMessageType, allowedTypes) } } diff --git a/internal/assets/manager_test.go b/internal/assets/manager_test.go index 577e779e4c..09ee71c88b 100644 --- a/internal/assets/manager_test.go +++ b/internal/assets/manager_test.go @@ -21,12 +21,14 @@ import ( "github.com/hyperledger/firefly/internal/config" "github.com/hyperledger/firefly/internal/syncasync" + "github.com/hyperledger/firefly/internal/sysmessaging" "github.com/hyperledger/firefly/mocks/broadcastmocks" "github.com/hyperledger/firefly/mocks/databasemocks" "github.com/hyperledger/firefly/mocks/datamocks" "github.com/hyperledger/firefly/mocks/identitymanagermocks" "github.com/hyperledger/firefly/mocks/privatemessagingmocks" "github.com/hyperledger/firefly/mocks/syncasyncmocks" + "github.com/hyperledger/firefly/mocks/sysmessagingmocks" "github.com/hyperledger/firefly/mocks/tokenmocks" "github.com/hyperledger/firefly/pkg/database" "github.com/hyperledger/firefly/pkg/fftypes" @@ -182,8 +184,6 @@ func TestCreateTokenPoolConfirm(t *testing.T) { am, cancel := newTestAssets(t) defer cancel() - requestID := fftypes.NewUUID() - mdi := am.database.(*databasemocks.Plugin) mdm := am.data.(*datamocks.Manager) msa := am.syncasync.(*syncasyncmocks.Bridge) @@ -191,17 +191,15 @@ func TestCreateTokenPoolConfirm(t *testing.T) { mim := am.identity.(*identitymanagermocks.Manager) mim.On("GetLocalOrganization", context.Background()).Return(&fftypes.Organization{Identity: "0x12345"}, nil) mdm.On("VerifyNamespaceExists", context.Background(), "ns1").Return(nil).Times(2) - mti.On("CreateTokenPool", context.Background(), mock.Anything, mock.MatchedBy(func(pool *fftypes.TokenPool) bool { - return pool.ID == requestID - })).Return(nil).Times(1) + mti.On("CreateTokenPool", context.Background(), mock.Anything, mock.Anything).Return(nil).Times(1) mdi.On("UpsertTransaction", context.Background(), mock.MatchedBy(func(tx *fftypes.Transaction) bool { return tx.Subject.Type == fftypes.TransactionTypeTokenPool }), false).Return(nil) mdi.On("UpsertOperation", context.Background(), mock.Anything, false).Return(nil).Times(1) - msa.On("SendConfirmTokenPool", context.Background(), "ns1", mock.Anything). + msa.On("SendConfirmTokenPool", context.Background(), "ns1", mock.Anything, mock.Anything). Run(func(args mock.Arguments) { - send := args[2].(syncasync.RequestSender) - send(requestID) + send := args[3].(syncasync.RequestSender) + send(context.Background()) }). Return(nil, nil) @@ -355,7 +353,7 @@ func TestMintTokensSuccess(t *testing.T) { am, cancel := newTestAssets(t) defer cancel() - mint := &fftypes.TokenTransfer{} + mint := &fftypes.TokenTransferInput{} mint.Amount.Int().SetInt64(5) mdi := am.database.(*databasemocks.Plugin) @@ -363,7 +361,7 @@ func TestMintTokensSuccess(t *testing.T) { mim := am.identity.(*identitymanagermocks.Manager) mim.On("GetLocalOrganization", context.Background()).Return(&fftypes.Organization{Identity: "0x12345"}, nil) mdi.On("GetTokenPool", context.Background(), "ns1", "pool1").Return(&fftypes.TokenPool{}, nil) - mti.On("MintTokens", context.Background(), mock.Anything, mint).Return(nil) + mti.On("MintTokens", context.Background(), mock.Anything, &mint.TokenTransfer).Return(nil) mdi.On("UpsertOperation", context.Background(), mock.Anything, false).Return(nil) mdi.On("UpsertTransaction", context.Background(), mock.MatchedBy(func(tx *fftypes.Transaction) bool { return tx.Subject.Type == fftypes.TransactionTypeTokenTransfer @@ -380,7 +378,7 @@ func TestMintTokensBadPlugin(t *testing.T) { mim := am.identity.(*identitymanagermocks.Manager) mim.On("GetLocalOrganization", context.Background()).Return(&fftypes.Organization{Identity: "0x12345"}, nil) - _, err := am.MintTokens(context.Background(), "", "", "", &fftypes.TokenTransfer{}, false) + _, err := am.MintTokens(context.Background(), "", "", "", &fftypes.TokenTransferInput{}, false) assert.Regexp(t, "FF10272", err) } @@ -388,7 +386,7 @@ func TestMintTokensBadPool(t *testing.T) { am, cancel := newTestAssets(t) defer cancel() - mint := &fftypes.TokenTransfer{} + mint := &fftypes.TokenTransferInput{} mint.Amount.Int().SetInt64(5) mdi := am.database.(*databasemocks.Plugin) @@ -404,7 +402,7 @@ func TestMintTokensIdentityFail(t *testing.T) { am, cancel := newTestAssets(t) defer cancel() - mint := &fftypes.TokenTransfer{} + mint := &fftypes.TokenTransferInput{} mint.Amount.Int().SetInt64(5) mdi := am.database.(*databasemocks.Plugin) @@ -420,7 +418,7 @@ func TestMintTokensFail(t *testing.T) { am, cancel := newTestAssets(t) defer cancel() - mint := &fftypes.TokenTransfer{} + mint := &fftypes.TokenTransferInput{} mint.Amount.Int().SetInt64(5) mdi := am.database.(*databasemocks.Plugin) @@ -429,7 +427,7 @@ func TestMintTokensFail(t *testing.T) { mim.On("GetLocalOrganization", context.Background()).Return(&fftypes.Organization{Identity: "0x12345"}, nil) mdi.On("GetTokenPool", context.Background(), "ns1", "pool1").Return(&fftypes.TokenPool{}, nil) mdi.On("UpsertOperation", context.Background(), mock.Anything, false).Return(nil) - mti.On("MintTokens", context.Background(), mock.Anything, mint).Return(fmt.Errorf("pop")) + mti.On("MintTokens", context.Background(), mock.Anything, &mint.TokenTransfer).Return(fmt.Errorf("pop")) mdi.On("UpsertTransaction", context.Background(), mock.MatchedBy(func(tx *fftypes.Transaction) bool { return tx.Subject.Type == fftypes.TransactionTypeTokenTransfer }), false).Return(nil) @@ -442,7 +440,7 @@ func TestMintTokensOperationFail(t *testing.T) { am, cancel := newTestAssets(t) defer cancel() - mint := &fftypes.TokenTransfer{} + mint := &fftypes.TokenTransferInput{} mint.Amount.Int().SetInt64(5) mdi := am.database.(*databasemocks.Plugin) @@ -462,8 +460,7 @@ func TestMintTokensConfirm(t *testing.T) { am, cancel := newTestAssets(t) defer cancel() - requestID := fftypes.NewUUID() - mint := &fftypes.TokenTransfer{} + mint := &fftypes.TokenTransferInput{} mint.Amount.Int().SetInt64(5) mdi := am.database.(*databasemocks.Plugin) @@ -473,17 +470,17 @@ func TestMintTokensConfirm(t *testing.T) { mim := am.identity.(*identitymanagermocks.Manager) mim.On("GetLocalOrganization", context.Background()).Return(&fftypes.Organization{Identity: "0x12345"}, nil) mdi.On("GetTokenPool", context.Background(), "ns1", "pool1").Return(&fftypes.TokenPool{}, nil) - mti.On("MintTokens", context.Background(), mock.Anything, mint).Return(nil) + mti.On("MintTokens", context.Background(), mock.Anything, &mint.TokenTransfer).Return(nil) mdi.On("UpsertOperation", context.Background(), mock.Anything, false).Return(nil) mdi.On("UpsertTransaction", context.Background(), mock.MatchedBy(func(tx *fftypes.Transaction) bool { return tx.Subject.Type == fftypes.TransactionTypeTokenTransfer }), false).Return(nil) - msa.On("SendConfirmTokenTransfer", context.Background(), "ns1", mock.Anything). + msa.On("SendConfirmTokenTransfer", context.Background(), "ns1", mock.Anything, mock.Anything). Run(func(args mock.Arguments) { - send := args[2].(syncasync.RequestSender) - send(requestID) + send := args[3].(syncasync.RequestSender) + send(context.Background()) }). - Return(nil, nil) + Return(&fftypes.TokenTransfer{}, nil) _, err := am.MintTokens(context.Background(), "ns1", "magic-tokens", "pool1", mint, true) assert.NoError(t, err) @@ -498,7 +495,7 @@ func TestBurnTokensSuccess(t *testing.T) { am, cancel := newTestAssets(t) defer cancel() - burn := &fftypes.TokenTransfer{} + burn := &fftypes.TokenTransferInput{} burn.Amount.Int().SetInt64(5) mdi := am.database.(*databasemocks.Plugin) @@ -506,7 +503,7 @@ func TestBurnTokensSuccess(t *testing.T) { mim := am.identity.(*identitymanagermocks.Manager) mim.On("GetLocalOrganization", context.Background()).Return(&fftypes.Organization{Identity: "0x12345"}, nil) mdi.On("GetTokenPool", context.Background(), "ns1", "pool1").Return(&fftypes.TokenPool{}, nil) - mti.On("BurnTokens", context.Background(), mock.Anything, burn).Return(nil) + mti.On("BurnTokens", context.Background(), mock.Anything, &burn.TokenTransfer).Return(nil) mdi.On("UpsertOperation", context.Background(), mock.Anything, false).Return(nil) mdi.On("UpsertTransaction", context.Background(), mock.MatchedBy(func(tx *fftypes.Transaction) bool { return tx.Subject.Type == fftypes.TransactionTypeTokenTransfer @@ -524,7 +521,7 @@ func TestBurnTokensIdentityFail(t *testing.T) { am, cancel := newTestAssets(t) defer cancel() - burn := &fftypes.TokenTransfer{} + burn := &fftypes.TokenTransferInput{} burn.Amount.Int().SetInt64(5) mdi := am.database.(*databasemocks.Plugin) @@ -536,6 +533,41 @@ func TestBurnTokensIdentityFail(t *testing.T) { assert.EqualError(t, err, "pop") } +func TestBurnTokensConfirm(t *testing.T) { + am, cancel := newTestAssets(t) + defer cancel() + + burn := &fftypes.TokenTransferInput{} + burn.Amount.Int().SetInt64(5) + + mdi := am.database.(*databasemocks.Plugin) + mdm := am.data.(*datamocks.Manager) + msa := am.syncasync.(*syncasyncmocks.Bridge) + mti := am.tokens["magic-tokens"].(*tokenmocks.Plugin) + mim := am.identity.(*identitymanagermocks.Manager) + mim.On("GetLocalOrganization", context.Background()).Return(&fftypes.Organization{Identity: "0x12345"}, nil) + mdi.On("GetTokenPool", context.Background(), "ns1", "pool1").Return(&fftypes.TokenPool{}, nil) + mti.On("BurnTokens", context.Background(), mock.Anything, &burn.TokenTransfer).Return(nil) + mdi.On("UpsertOperation", context.Background(), mock.Anything, false).Return(nil) + mdi.On("UpsertTransaction", context.Background(), mock.MatchedBy(func(tx *fftypes.Transaction) bool { + return tx.Subject.Type == fftypes.TransactionTypeTokenTransfer + }), false).Return(nil) + msa.On("SendConfirmTokenTransfer", context.Background(), "ns1", mock.Anything, mock.Anything). + Run(func(args mock.Arguments) { + send := args[3].(syncasync.RequestSender) + send(context.Background()) + }). + Return(&fftypes.TokenTransfer{}, nil) + + _, err := am.BurnTokens(context.Background(), "ns1", "magic-tokens", "pool1", burn, true) + assert.NoError(t, err) + + mdi.AssertExpectations(t) + mdm.AssertExpectations(t) + msa.AssertExpectations(t) + mti.AssertExpectations(t) +} + func TestTransferTokensSuccess(t *testing.T) { am, cancel := newTestAssets(t) defer cancel() @@ -607,24 +639,31 @@ func TestTransferTokensInvalidType(t *testing.T) { am, cancel := newTestAssets(t) defer cancel() - transfer := &fftypes.TokenTransfer{ - From: "A", - To: "B", + transfer := &fftypes.TokenTransferInput{ + TokenTransfer: fftypes.TokenTransfer{ + From: "A", + To: "B", + }, } transfer.Amount.Int().SetInt64(5) mdi := am.database.(*databasemocks.Plugin) - mdi.On("GetTokenPool", context.Background(), "ns1", "pool1").Return(&fftypes.TokenPool{}, nil) - mdi.On("UpsertOperation", context.Background(), mock.Anything, false).Return(nil) - mdi.On("UpsertTransaction", context.Background(), mock.MatchedBy(func(tx *fftypes.Transaction) bool { + mdi.On("GetTokenPool", am.ctx, "ns1", "pool1").Return(&fftypes.TokenPool{}, nil) + mdi.On("UpsertTransaction", am.ctx, mock.MatchedBy(func(tx *fftypes.Transaction) bool { return tx.Subject.Type == fftypes.TransactionTypeTokenTransfer }), false).Return(nil) - - assert.Panics(t, func() { - am.transferTokensWithID(context.Background(), fftypes.NewUUID(), "ns1", "magic-tokens", "pool1", transfer, false) + mdi.On("UpsertOperation", am.ctx, mock.Anything, false).Return(nil) + + sender := &transferSender{ + mgr: am, + namespace: "ns1", + typeName: "magic-tokens", + poolName: "pool1", + transfer: transfer, + } + assert.PanicsWithValue(t, "unknown transfer type: ", func() { + sender.Send(am.ctx) }) - - mdi.AssertExpectations(t) } func TestTransferTokensTransactionFail(t *testing.T) { @@ -658,12 +697,16 @@ func TestTransferTokensWithBroadcastMessage(t *testing.T) { am, cancel := newTestAssets(t) defer cancel() + hash := fftypes.NewRandB32() transfer := &fftypes.TokenTransferInput{ TokenTransfer: fftypes.TokenTransfer{ From: "A", To: "B", }, Message: &fftypes.MessageInOut{ + Message: fftypes.Message{ + Hash: hash, + }, InlineData: fftypes.InlineData{ { Value: []byte("test data"), @@ -677,6 +720,7 @@ func TestTransferTokensWithBroadcastMessage(t *testing.T) { mti := am.tokens["magic-tokens"].(*tokenmocks.Plugin) mim := am.identity.(*identitymanagermocks.Manager) mbm := am.broadcast.(*broadcastmocks.Manager) + mms := &sysmessagingmocks.MessageSender{} mim.On("GetLocalOrganization", context.Background()).Return(&fftypes.Organization{Identity: "0x12345"}, nil) mdi.On("GetTokenPool", context.Background(), "ns1", "pool1").Return(&fftypes.TokenPool{}, nil) mti.On("TransferTokens", context.Background(), mock.Anything, &transfer.TokenTransfer).Return(nil) @@ -684,21 +728,62 @@ func TestTransferTokensWithBroadcastMessage(t *testing.T) { mdi.On("UpsertTransaction", context.Background(), mock.MatchedBy(func(tx *fftypes.Transaction) bool { return tx.Subject.Type == fftypes.TransactionTypeTokenTransfer }), false).Return(nil) - mbm.On("BroadcastMessage", context.Background(), "ns1", transfer.Message, false).Return(&transfer.Message.Message, nil) + mbm.On("NewBroadcast", "ns1", transfer.Message).Return(mms) + mms.On("Send", context.Background()).Return(nil) _, err := am.TransferTokens(context.Background(), "ns1", "magic-tokens", "pool1", transfer, false) assert.NoError(t, err) + assert.Equal(t, *hash, *transfer.MessageHash) mbm.AssertExpectations(t) mim.AssertExpectations(t) mdi.AssertExpectations(t) mti.AssertExpectations(t) + mms.AssertExpectations(t) +} + +func TestTransferTokensWithBroadcastMessageFail(t *testing.T) { + am, cancel := newTestAssets(t) + defer cancel() + + transfer := &fftypes.TokenTransferInput{ + TokenTransfer: fftypes.TokenTransfer{ + From: "A", + To: "B", + }, + Message: &fftypes.MessageInOut{ + InlineData: fftypes.InlineData{ + { + Value: []byte("test data"), + }, + }, + }, + } + transfer.Amount.Int().SetInt64(5) + + mdi := am.database.(*databasemocks.Plugin) + mim := am.identity.(*identitymanagermocks.Manager) + mbm := am.broadcast.(*broadcastmocks.Manager) + mms := &sysmessagingmocks.MessageSender{} + mim.On("GetLocalOrganization", context.Background()).Return(&fftypes.Organization{Identity: "0x12345"}, nil) + mdi.On("GetTokenPool", context.Background(), "ns1", "pool1").Return(&fftypes.TokenPool{}, nil) + mbm.On("NewBroadcast", "ns1", transfer.Message).Return(mms) + mms.On("Send", context.Background()).Return(fmt.Errorf("pop")) + + _, err := am.TransferTokens(context.Background(), "ns1", "magic-tokens", "pool1", transfer, false) + assert.EqualError(t, err, "pop") + + mbm.AssertExpectations(t) + mim.AssertExpectations(t) + mdi.AssertExpectations(t) + mms.AssertExpectations(t) } func TestTransferTokensWithPrivateMessage(t *testing.T) { am, cancel := newTestAssets(t) defer cancel() + hash := fftypes.NewRandB32() transfer := &fftypes.TokenTransferInput{ TokenTransfer: fftypes.TokenTransfer{ From: "A", @@ -709,6 +794,7 @@ func TestTransferTokensWithPrivateMessage(t *testing.T) { Header: fftypes.MessageHeader{ Type: fftypes.MessageTypeTransferPrivate, }, + Hash: hash, }, InlineData: fftypes.InlineData{ { @@ -723,6 +809,7 @@ func TestTransferTokensWithPrivateMessage(t *testing.T) { mti := am.tokens["magic-tokens"].(*tokenmocks.Plugin) mim := am.identity.(*identitymanagermocks.Manager) mpm := am.messaging.(*privatemessagingmocks.Manager) + mms := &sysmessagingmocks.MessageSender{} mim.On("GetLocalOrganization", context.Background()).Return(&fftypes.Organization{Identity: "0x12345"}, nil) mdi.On("GetTokenPool", context.Background(), "ns1", "pool1").Return(&fftypes.TokenPool{}, nil) mti.On("TransferTokens", context.Background(), mock.Anything, &transfer.TokenTransfer).Return(nil) @@ -730,15 +817,18 @@ func TestTransferTokensWithPrivateMessage(t *testing.T) { mdi.On("UpsertTransaction", context.Background(), mock.MatchedBy(func(tx *fftypes.Transaction) bool { return tx.Subject.Type == fftypes.TransactionTypeTokenTransfer }), false).Return(nil) - mpm.On("SendMessage", context.Background(), "ns1", transfer.Message, false).Return(&transfer.Message.Message, nil) + mpm.On("NewMessage", "ns1", transfer.Message).Return(mms) + mms.On("Send", context.Background()).Return(nil) _, err := am.TransferTokens(context.Background(), "ns1", "magic-tokens", "pool1", transfer, false) assert.NoError(t, err) + assert.Equal(t, *hash, *transfer.MessageHash) mpm.AssertExpectations(t) mim.AssertExpectations(t) mdi.AssertExpectations(t) mti.AssertExpectations(t) + mms.AssertExpectations(t) } func TestTransferTokensWithInvalidMessage(t *testing.T) { @@ -765,11 +855,189 @@ func TestTransferTokensWithInvalidMessage(t *testing.T) { } transfer.Amount.Int().SetInt64(5) + mdi := am.database.(*databasemocks.Plugin) mim := am.identity.(*identitymanagermocks.Manager) mim.On("GetLocalOrganization", context.Background()).Return(&fftypes.Organization{Identity: "0x12345"}, nil) + mdi.On("GetTokenPool", context.Background(), "ns1", "pool1").Return(&fftypes.TokenPool{}, nil) _, err := am.TransferTokens(context.Background(), "ns1", "magic-tokens", "pool1", transfer, false) assert.Regexp(t, "FF10287", err) mim.AssertExpectations(t) + mdi.AssertExpectations(t) +} + +func TestTransferTokensConfirm(t *testing.T) { + am, cancel := newTestAssets(t) + defer cancel() + + transfer := &fftypes.TokenTransferInput{ + TokenTransfer: fftypes.TokenTransfer{ + From: "A", + To: "B", + }, + } + transfer.Amount.Int().SetInt64(5) + + mdi := am.database.(*databasemocks.Plugin) + mdm := am.data.(*datamocks.Manager) + msa := am.syncasync.(*syncasyncmocks.Bridge) + mti := am.tokens["magic-tokens"].(*tokenmocks.Plugin) + mim := am.identity.(*identitymanagermocks.Manager) + mim.On("GetLocalOrganization", context.Background()).Return(&fftypes.Organization{Identity: "0x12345"}, nil) + mdi.On("GetTokenPool", context.Background(), "ns1", "pool1").Return(&fftypes.TokenPool{}, nil) + mti.On("TransferTokens", context.Background(), mock.Anything, &transfer.TokenTransfer).Return(nil) + mdi.On("UpsertOperation", context.Background(), mock.Anything, false).Return(nil) + mdi.On("UpsertTransaction", context.Background(), mock.MatchedBy(func(tx *fftypes.Transaction) bool { + return tx.Subject.Type == fftypes.TransactionTypeTokenTransfer + }), false).Return(nil) + msa.On("SendConfirmTokenTransfer", context.Background(), "ns1", mock.Anything, mock.Anything). + Run(func(args mock.Arguments) { + send := args[3].(syncasync.RequestSender) + send(context.Background()) + }). + Return(&fftypes.TokenTransfer{}, nil) + + _, err := am.TransferTokens(context.Background(), "ns1", "magic-tokens", "pool1", transfer, true) + assert.NoError(t, err) + + mdi.AssertExpectations(t) + mdm.AssertExpectations(t) + msa.AssertExpectations(t) + mti.AssertExpectations(t) +} + +func TestTransferTokensBeforeSendCallback(t *testing.T) { + am, cancel := newTestAssets(t) + defer cancel() + + transfer := &fftypes.TokenTransferInput{ + TokenTransfer: fftypes.TokenTransfer{ + Type: fftypes.TokenTransferTypeTransfer, + From: "A", + To: "B", + }, + } + transfer.Amount.Int().SetInt64(5) + + mdi := am.database.(*databasemocks.Plugin) + mti := am.tokens["magic-tokens"].(*tokenmocks.Plugin) + mim := am.identity.(*identitymanagermocks.Manager) + mdi.On("GetTokenPool", context.Background(), "ns1", "pool1").Return(&fftypes.TokenPool{}, nil) + mti.On("TransferTokens", context.Background(), mock.Anything, &transfer.TokenTransfer).Return(nil) + mdi.On("UpsertOperation", context.Background(), mock.Anything, false).Return(nil) + mdi.On("UpsertTransaction", context.Background(), mock.MatchedBy(func(tx *fftypes.Transaction) bool { + return tx.Subject.Type == fftypes.TransactionTypeTokenTransfer + }), false).Return(nil) + + sender := am.NewTransfer("ns1", "magic-tokens", "pool1", transfer) + + called := false + sender.BeforeSend(func(ctx context.Context) error { + called = true + return nil + }) + + err := sender.Send(context.Background()) + assert.NoError(t, err) + assert.True(t, called) + + mim.AssertExpectations(t) + mdi.AssertExpectations(t) + mti.AssertExpectations(t) +} + +func TestTransferTokensBeforeSendCallbackFail(t *testing.T) { + am, cancel := newTestAssets(t) + defer cancel() + + transfer := &fftypes.TokenTransferInput{ + TokenTransfer: fftypes.TokenTransfer{ + Type: fftypes.TokenTransferTypeTransfer, + From: "A", + To: "B", + }, + } + transfer.Amount.Int().SetInt64(5) + + mdi := am.database.(*databasemocks.Plugin) + mti := am.tokens["magic-tokens"].(*tokenmocks.Plugin) + mim := am.identity.(*identitymanagermocks.Manager) + mdi.On("GetTokenPool", context.Background(), "ns1", "pool1").Return(&fftypes.TokenPool{}, nil) + mdi.On("UpsertOperation", context.Background(), mock.Anything, false).Return(nil) + mdi.On("UpsertTransaction", context.Background(), mock.MatchedBy(func(tx *fftypes.Transaction) bool { + return tx.Subject.Type == fftypes.TransactionTypeTokenTransfer + }), false).Return(nil) + + sender := am.NewTransfer("ns1", "magic-tokens", "pool1", transfer) + + called := false + sender.BeforeSend(func(ctx context.Context) error { + called = true + return fmt.Errorf("pop") + }) + + err := sender.Send(context.Background()) + assert.EqualError(t, err, "pop") + assert.True(t, called) + + mim.AssertExpectations(t) + mdi.AssertExpectations(t) + mti.AssertExpectations(t) +} + +func TestTransferTokensWithBroadcastMessageConfirm(t *testing.T) { + am, cancel := newTestAssets(t) + defer cancel() + + hash := fftypes.NewRandB32() + transfer := &fftypes.TokenTransferInput{ + TokenTransfer: fftypes.TokenTransfer{ + From: "A", + To: "B", + }, + Message: &fftypes.MessageInOut{ + Message: fftypes.Message{ + Hash: hash, + }, + InlineData: fftypes.InlineData{ + { + Value: []byte("test data"), + }, + }, + }, + } + transfer.Amount.Int().SetInt64(5) + + mdi := am.database.(*databasemocks.Plugin) + mti := am.tokens["magic-tokens"].(*tokenmocks.Plugin) + mim := am.identity.(*identitymanagermocks.Manager) + mbm := am.broadcast.(*broadcastmocks.Manager) + mms := &sysmessagingmocks.MessageSender{} + mim.On("GetLocalOrganization", context.Background()).Return(&fftypes.Organization{Identity: "0x12345"}, nil) + mdi.On("GetTokenPool", context.Background(), "ns1", "pool1").Return(&fftypes.TokenPool{}, nil) + mti.On("TransferTokens", context.Background(), mock.Anything, &transfer.TokenTransfer).Return(nil) + mdi.On("UpsertOperation", context.Background(), mock.Anything, false).Return(nil) + mdi.On("UpsertTransaction", context.Background(), mock.MatchedBy(func(tx *fftypes.Transaction) bool { + return tx.Subject.Type == fftypes.TransactionTypeTokenTransfer + }), false).Return(nil) + mbm.On("NewBroadcast", "ns1", transfer.Message).Return(mms) + mms.On("BeforeSend", mock.Anything). + Run(func(args mock.Arguments) { + cb := args[0].(sysmessaging.BeforeSendCallback) + cb(context.Background()) + }). + Return(mms) + mms.On("SendAndWait", context.Background()).Return(nil) + + _, err := am.TransferTokens(context.Background(), "ns1", "magic-tokens", "pool1", transfer, true) + assert.NoError(t, err) + assert.Nil(t, transfer.Message) + assert.Equal(t, *hash, *transfer.MessageHash) + + mbm.AssertExpectations(t) + mim.AssertExpectations(t) + mdi.AssertExpectations(t) + mti.AssertExpectations(t) + mms.AssertExpectations(t) } diff --git a/internal/broadcast/definition.go b/internal/broadcast/definition.go index 5d786456de..8337dbe1ca 100644 --- a/internal/broadcast/definition.go +++ b/internal/broadcast/definition.go @@ -68,21 +68,30 @@ func (bm *broadcastManager) broadcastDefinitionCommon(ctx context.Context, def f } // Create a broadcast message referring to the data - msg = &fftypes.Message{ - Header: fftypes.MessageHeader{ - Namespace: fftypes.SystemNamespace, - Type: fftypes.MessageTypeDefinition, - Identity: *signingIdentity, - Topics: fftypes.FFNameArray{def.Topic()}, - Tag: string(tag), - TxType: fftypes.TransactionTypeBatchPin, - }, - Data: fftypes.DataRefs{ - {ID: data.ID, Hash: data.Hash}, + in := &fftypes.MessageInOut{ + Message: fftypes.Message{ + Header: fftypes.MessageHeader{ + Namespace: fftypes.SystemNamespace, + Type: fftypes.MessageTypeDefinition, + Identity: *signingIdentity, + Topics: fftypes.FFNameArray{def.Topic()}, + Tag: string(tag), + TxType: fftypes.TransactionTypeBatchPin, + }, + Data: fftypes.DataRefs{ + {ID: data.ID, Hash: data.Hash}, + }, }, } // Broadcast the message - return bm.broadcastMessageCommon(ctx, msg, waitConfirm) - + sender := broadcastSender{ + mgr: bm, + namespace: fftypes.SystemNamespace, + msg: in, + resolved: true, + } + sender.setDefaults() + err = sender.sendInternal(ctx, waitConfirm) + return &in.Message, err } diff --git a/internal/broadcast/manager.go b/internal/broadcast/manager.go index 2cde037a4d..13e713a7ed 100644 --- a/internal/broadcast/manager.go +++ b/internal/broadcast/manager.go @@ -27,7 +27,9 @@ import ( "github.com/hyperledger/firefly/internal/data" "github.com/hyperledger/firefly/internal/i18n" "github.com/hyperledger/firefly/internal/identity" + "github.com/hyperledger/firefly/internal/log" "github.com/hyperledger/firefly/internal/syncasync" + "github.com/hyperledger/firefly/internal/sysmessaging" "github.com/hyperledger/firefly/pkg/blockchain" "github.com/hyperledger/firefly/pkg/database" "github.com/hyperledger/firefly/pkg/dataexchange" @@ -36,6 +38,7 @@ import ( ) type Manager interface { + NewBroadcast(ns string, in *fftypes.MessageInOut) sysmessaging.MessageSender BroadcastDatatype(ctx context.Context, ns string, datatype *fftypes.Datatype, waitConfirm bool) (msg *fftypes.Message, err error) BroadcastNamespace(ctx context.Context, ns *fftypes.Namespace, waitConfirm bool) (msg *fftypes.Message, err error) BroadcastMessage(ctx context.Context, ns string, in *fftypes.MessageInOut, waitConfirm bool) (out *fftypes.Message, err error) @@ -134,22 +137,32 @@ func (bm *broadcastManager) submitTXAndUpdateDB(ctx context.Context, batch *ffty return bm.batchpin.SubmitPinnedBatch(ctx, batch, contexts) } -func (bm *broadcastManager) broadcastMessageCommon(ctx context.Context, msg *fftypes.Message, waitConfirm bool) (*fftypes.Message, error) { - - if !waitConfirm { - // Seal the message - if err := msg.Seal(ctx); err != nil { - return nil, err +func (bm *broadcastManager) publishBlobs(ctx context.Context, dataToPublish []*fftypes.DataAndBlob) error { + for _, d := range dataToPublish { + // Stream from the local data exchange ... + reader, err := bm.exchange.DownloadBLOB(ctx, d.Blob.PayloadRef) + if err != nil { + return i18n.WrapError(ctx, err, i18n.MsgDownloadBlobFailed, d.Blob.PayloadRef) } + defer reader.Close() - // Store the message - this asynchronously triggers the next step in process - return msg, bm.database.InsertMessageLocal(ctx, msg) + // ... to the public storage + publicRef, err := bm.publicstorage.PublishData(ctx, reader) + if err != nil { + return err + } + log.L(ctx).Infof("Published blob with hash '%s' for data '%s' to public storage: '%s'", d.Data.Blob, d.Data.ID, publicRef) + + // Update the data in the database, with the public reference. + // We do this independently for each piece of data + update := database.DataQueryFactory.NewUpdate(ctx).Set("blob.public", publicRef) + err = bm.database.UpdateData(ctx, d.Data.ID, update) + if err != nil { + return err + } } - return bm.syncasync.SendConfirm(ctx, msg.Header.Namespace, func(requestID *fftypes.UUID) error { - _, err := bm.broadcastMessageWithID(ctx, msg.Header.Namespace, requestID, nil, msg, false) - return err - }) + return nil } func (bm *broadcastManager) Start() error { diff --git a/internal/broadcast/manager_test.go b/internal/broadcast/manager_test.go index c535423da9..89264cee70 100644 --- a/internal/broadcast/manager_test.go +++ b/internal/broadcast/manager_test.go @@ -17,8 +17,11 @@ package broadcast import ( + "bytes" "context" "fmt" + "io" + "io/ioutil" "testing" "github.com/hyperledger/firefly/internal/config" @@ -48,11 +51,20 @@ func newTestBroadcast(t *testing.T) (*broadcastManager, func()) { msa := &syncasyncmocks.Bridge{} mbp := &batchpinmocks.Submitter{} mbi.On("Name").Return("ut_blockchain").Maybe() + mpi.On("Name").Return("ut_publicstorage").Maybe() mba.On("RegisterDispatcher", []fftypes.MessageType{ fftypes.MessageTypeBroadcast, fftypes.MessageTypeDefinition, fftypes.MessageTypeTransferBroadcast, }, mock.Anything, mock.Anything).Return() + + rag := mdi.On("RunAsGroup", mock.Anything, mock.Anything).Maybe() + rag.RunFn = func(a mock.Arguments) { + rag.ReturnArguments = mock.Arguments{ + a[1].(func(context.Context) error)(a[0].(context.Context)), + } + } + ctx, cancel := context.WithCancel(context.Background()) b, err := NewBroadcastManager(ctx, mdi, mim, mdm, mbi, mdx, mpi, mba, msa, mbp) assert.NoError(t, err) @@ -68,12 +80,15 @@ func TestBroadcastMessageGood(t *testing.T) { bm, cancel := newTestBroadcast(t) defer cancel() - msg := &fftypes.Message{} - bm.database.(*databasemocks.Plugin).On("InsertMessageLocal", mock.Anything, msg).Return(nil) + msg := &fftypes.MessageInOut{} + bm.database.(*databasemocks.Plugin).On("InsertMessageLocal", mock.Anything, &msg.Message).Return(nil) - msgRet, err := bm.broadcastMessageCommon(context.Background(), msg, false) + broadcast := broadcastSender{ + mgr: bm, + msg: msg, + } + err := broadcast.sendInternal(context.Background(), false) assert.NoError(t, err) - assert.Equal(t, msg, msgRet) bm.Start() bm.WaitStop() @@ -84,14 +99,20 @@ func TestBroadcastMessageBad(t *testing.T) { defer cancel() dupID := fftypes.NewUUID() - msg := &fftypes.Message{ - Data: fftypes.DataRefs{ - {ID: dupID /* missing hash */}, + msg := &fftypes.MessageInOut{ + Message: fftypes.Message{ + Data: fftypes.DataRefs{ + {ID: dupID /* missing hash */}, + }, }, } bm.database.(*databasemocks.Plugin).On("UpsertMessage", mock.Anything, msg, false).Return(nil) - _, err := bm.broadcastMessageCommon(context.Background(), msg, false) + broadcast := broadcastSender{ + mgr: bm, + msg: msg, + } + err := broadcast.sendInternal(context.Background(), false) assert.Regexp(t, "FF10144", err) } @@ -113,7 +134,6 @@ func TestDispatchBatchInvalidData(t *testing.T) { func TestDispatchBatchUploadFail(t *testing.T) { bm, cancel := newTestBroadcast(t) defer cancel() - bm.publicstorage.(*publicstoragemocks.Plugin).On("PublishData", mock.Anything, mock.Anything).Return("", fmt.Errorf("pop")) err := bm.dispatchBatch(context.Background(), &fftypes.Batch{}, []*fftypes.Bytes32{fftypes.NewRandB32()}) @@ -124,11 +144,19 @@ func TestDispatchBatchSubmitBatchPinSucceed(t *testing.T) { bm, cancel := newTestBroadcast(t) defer cancel() + batch := &fftypes.Batch{ + ID: fftypes.NewUUID(), + } + mdi := bm.database.(*databasemocks.Plugin) - mdi.On("RunAsGroup", mock.Anything, mock.Anything).Return(nil) - bm.publicstorage.(*publicstoragemocks.Plugin).On("PublishData", mock.Anything, mock.Anything).Return("id1", nil) + mps := bm.publicstorage.(*publicstoragemocks.Plugin) + mbp := bm.batchpin.(*batchpinmocks.Submitter) + mps.On("PublishData", mock.Anything, mock.Anything).Return("id1", nil) + mdi.On("UpdateBatch", mock.Anything, batch.ID, mock.Anything).Return(nil) + mdi.On("UpsertOperation", mock.Anything, mock.Anything, false).Return(nil) + mbp.On("SubmitPinnedBatch", mock.Anything, mock.Anything, mock.Anything).Return(nil) - err := bm.dispatchBatch(context.Background(), &fftypes.Batch{}, []*fftypes.Bytes32{fftypes.NewRandB32()}) + err := bm.dispatchBatch(context.Background(), batch, []*fftypes.Bytes32{fftypes.NewRandB32()}) assert.NoError(t, err) } @@ -139,19 +167,13 @@ func TestDispatchBatchSubmitBroadcastFail(t *testing.T) { mdi := bm.database.(*databasemocks.Plugin) mps := bm.publicstorage.(*publicstoragemocks.Plugin) mbp := bm.batchpin.(*batchpinmocks.Submitter) - mdi.On("RunAsGroup", mock.Anything, mock.Anything).Return(nil) mps.On("PublishData", mock.Anything, mock.Anything).Return("id1", nil) - mps.On("Name").Return("ut_publicstorage") - - err := bm.dispatchBatch(context.Background(), &fftypes.Batch{Identity: fftypes.Identity{Author: "wrong", Key: "wrong"}}, []*fftypes.Bytes32{fftypes.NewRandB32()}) - assert.NoError(t, err) - mdi.On("UpdateBatch", mock.Anything, mock.Anything, mock.Anything).Return(nil) mdi.On("UpsertOperation", mock.Anything, mock.Anything, false).Return(nil) mbp.On("SubmitPinnedBatch", mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("pop")) - fn := mdi.Calls[0].Arguments[1].(func(ctx context.Context) error) - err = fn(context.Background()) - assert.Regexp(t, "pop", err) + + err := bm.dispatchBatch(context.Background(), &fftypes.Batch{Identity: fftypes.Identity{Author: "wrong", Key: "wrong"}}, []*fftypes.Bytes32{fftypes.NewRandB32()}) + assert.EqualError(t, err, "pop") } func TestSubmitTXAndUpdateDBUpdateBatchFail(t *testing.T) { @@ -178,7 +200,6 @@ func TestSubmitTXAndUpdateDBAddOp1Fail(t *testing.T) { mdi.On("UpsertOperation", mock.Anything, mock.Anything, false).Return(fmt.Errorf("pop")) mbi.On("SubmitBatchPin", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return("txid", nil) mbi.On("Name").Return("unittest") - bm.publicstorage.(*publicstoragemocks.Plugin).On("Name").Return("ut_publicstorage") batch := &fftypes.Batch{ Identity: fftypes.Identity{Author: "org1", Key: "0x12345"}, @@ -208,8 +229,6 @@ func TestSubmitTXAndUpdateDBSucceed(t *testing.T) { mbi.On("SubmitBatchPin", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) mbp.On("SubmitPinnedBatch", mock.Anything, mock.Anything, mock.Anything).Return(nil) - bm.publicstorage.(*publicstoragemocks.Plugin).On("Name").Return("ut_publicstorage") - msgID := fftypes.NewUUID() batch := &fftypes.Batch{ Identity: fftypes.Identity{Author: "org1", Key: "0x12345"}, @@ -237,3 +256,117 @@ func TestSubmitTXAndUpdateDBSucceed(t *testing.T) { assert.Equal(t, fftypes.OpTypePublicStorageBatchBroadcast, op.Type) } + +func TestPublishBlobsUpdateDataFail(t *testing.T) { + bm, cancel := newTestBroadcast(t) + defer cancel() + mdi := bm.database.(*databasemocks.Plugin) + mdx := bm.exchange.(*dataexchangemocks.Plugin) + mps := bm.publicstorage.(*publicstoragemocks.Plugin) + mim := bm.identity.(*identitymanagermocks.Manager) + + blobHash := fftypes.NewRandB32() + dataID := fftypes.NewUUID() + + ctx := context.Background() + mdx.On("DownloadBLOB", ctx, "blob/1").Return(ioutil.NopCloser(bytes.NewReader([]byte(`some data`))), nil) + mps.On("PublishData", ctx, mock.MatchedBy(func(reader io.ReadCloser) bool { + b, err := ioutil.ReadAll(reader) + assert.NoError(t, err) + assert.Equal(t, "some data", string(b)) + return true + })).Return("payload-ref", nil) + mdi.On("UpdateData", ctx, mock.Anything, mock.Anything).Return(fmt.Errorf("pop")) + mim.On("ResolveInputIdentity", ctx, mock.Anything).Return(nil) + + err := bm.publishBlobs(ctx, []*fftypes.DataAndBlob{ + { + Data: &fftypes.Data{ + ID: dataID, + Blob: &fftypes.BlobRef{ + Hash: blobHash, + }, + }, + Blob: &fftypes.Blob{ + Hash: blobHash, + PayloadRef: "blob/1", + }, + }, + }) + assert.EqualError(t, err, "pop") + + mdi.AssertExpectations(t) +} + +func TestPublishBlobsPublishFail(t *testing.T) { + bm, cancel := newTestBroadcast(t) + defer cancel() + mdi := bm.database.(*databasemocks.Plugin) + mdx := bm.exchange.(*dataexchangemocks.Plugin) + mps := bm.publicstorage.(*publicstoragemocks.Plugin) + mim := bm.identity.(*identitymanagermocks.Manager) + + blobHash := fftypes.NewRandB32() + dataID := fftypes.NewUUID() + + ctx := context.Background() + mdx.On("DownloadBLOB", ctx, "blob/1").Return(ioutil.NopCloser(bytes.NewReader([]byte(`some data`))), nil) + mps.On("PublishData", ctx, mock.MatchedBy(func(reader io.ReadCloser) bool { + b, err := ioutil.ReadAll(reader) + assert.NoError(t, err) + assert.Equal(t, "some data", string(b)) + return true + })).Return("", fmt.Errorf("pop")) + mim.On("ResolveInputIdentity", ctx, mock.Anything).Return(nil) + + err := bm.publishBlobs(ctx, []*fftypes.DataAndBlob{ + { + Data: &fftypes.Data{ + ID: dataID, + Blob: &fftypes.BlobRef{ + Hash: blobHash, + }, + }, + Blob: &fftypes.Blob{ + Hash: blobHash, + PayloadRef: "blob/1", + }, + }, + }) + assert.EqualError(t, err, "pop") + + mdi.AssertExpectations(t) +} + +func TestPublishBlobsDownloadFail(t *testing.T) { + bm, cancel := newTestBroadcast(t) + defer cancel() + mdi := bm.database.(*databasemocks.Plugin) + mdx := bm.exchange.(*dataexchangemocks.Plugin) + mim := bm.identity.(*identitymanagermocks.Manager) + + blobHash := fftypes.NewRandB32() + dataID := fftypes.NewUUID() + + ctx := context.Background() + mdx.On("DownloadBLOB", ctx, "blob/1").Return(nil, fmt.Errorf("pop")) + mim.On("ResolveInputIdentity", ctx, mock.Anything).Return(nil) + + err := bm.publishBlobs(ctx, []*fftypes.DataAndBlob{ + { + Data: &fftypes.Data{ + ID: dataID, + Blob: &fftypes.BlobRef{ + Hash: blobHash, + }, + }, + Blob: &fftypes.Blob{ + Hash: blobHash, + PayloadRef: "blob/1", + }, + }, + }) + assert.Regexp(t, "FF10240", err) + + mdi.AssertExpectations(t) +} diff --git a/internal/broadcast/message.go b/internal/broadcast/message.go index 89febe7a71..15d978b77a 100644 --- a/internal/broadcast/message.go +++ b/internal/broadcast/message.go @@ -21,78 +21,139 @@ import ( "encoding/json" "github.com/hyperledger/firefly/internal/i18n" - "github.com/hyperledger/firefly/internal/log" - "github.com/hyperledger/firefly/pkg/database" + "github.com/hyperledger/firefly/internal/sysmessaging" "github.com/hyperledger/firefly/pkg/fftypes" ) -func (bm *broadcastManager) BroadcastMessage(ctx context.Context, ns string, in *fftypes.MessageInOut, waitConfirm bool) (out *fftypes.Message, err error) { - return bm.broadcastMessageWithID(ctx, ns, nil, in, nil, waitConfirm) +func (bm *broadcastManager) NewBroadcast(ns string, in *fftypes.MessageInOut) sysmessaging.MessageSender { + broadcast := &broadcastSender{ + mgr: bm, + namespace: ns, + msg: in, + } + broadcast.setDefaults() + return broadcast } -func (bm *broadcastManager) broadcastMessageWithID(ctx context.Context, ns string, id *fftypes.UUID, unresolved *fftypes.MessageInOut, resolved *fftypes.Message, waitConfirm bool) (out *fftypes.Message, err error) { - if unresolved != nil { - resolved = &unresolved.Message +func (bm *broadcastManager) BroadcastMessage(ctx context.Context, ns string, in *fftypes.MessageInOut, waitConfirm bool) (out *fftypes.Message, err error) { + broadcast := bm.NewBroadcast(ns, in) + if waitConfirm { + err = broadcast.SendAndWait(ctx) + } else { + err = broadcast.Send(ctx) } - resolved.Header.ID = id - resolved.Header.Namespace = ns + return &in.Message, err +} - if resolved.Header.Type == "" { - resolved.Header.Type = fftypes.MessageTypeBroadcast - } +type broadcastSender struct { + mgr *broadcastManager + namespace string + msg *fftypes.MessageInOut + resolved bool + sendCallback sysmessaging.BeforeSendCallback +} - if resolved.Header.TxType == "" { - resolved.Header.TxType = fftypes.TransactionTypeBatchPin - } +func (s *broadcastSender) Send(ctx context.Context) error { + return s.resolveAndSend(ctx, false) +} - if !bm.isRootOrgBroadcast(ctx, resolved) { - // Resolve the sending identity - if err := bm.identity.ResolveInputIdentity(ctx, &resolved.Header.Identity); err != nil { - return nil, i18n.WrapError(ctx, err, i18n.MsgAuthorInvalid) - } +func (s *broadcastSender) SendAndWait(ctx context.Context) error { + return s.resolveAndSend(ctx, true) +} + +func (s *broadcastSender) BeforeSend(cb sysmessaging.BeforeSendCallback) sysmessaging.MessageSender { + s.sendCallback = cb + return s +} + +func (s *broadcastSender) setDefaults() { + s.msg.Header.ID = fftypes.NewUUID() + s.msg.Header.Namespace = s.namespace + if s.msg.Header.Type == "" { + s.msg.Header.Type = fftypes.MessageTypeBroadcast + } + if s.msg.Header.TxType == "" { + s.msg.Header.TxType = fftypes.TransactionTypeBatchPin } +} + +func (s *broadcastSender) resolveAndSend(ctx context.Context, waitConfirm bool) error { + sent := false - // We optimize the DB storage of all the parts of the message using transaction semantics (assuming those are supported by the DB plugin + // We optimize the DB storage of all the parts of the message using transaction semantics (assuming those are supported by the DB plugin) var dataToPublish []*fftypes.DataAndBlob - err = bm.database.RunAsGroup(ctx, func(ctx context.Context) error { - if unresolved != nil { - // The data manager is responsible for the heavy lifting of storing/validating all our in-line data elements - resolved.Data, dataToPublish, err = bm.data.ResolveInlineDataBroadcast(ctx, ns, unresolved.InlineData) - if err != nil { + err := s.mgr.database.RunAsGroup(ctx, func(ctx context.Context) (err error) { + if !s.resolved { + if dataToPublish, err = s.resolveMessage(ctx); err != nil { return err } + s.resolved = true } - // If we have data to publish, we break out of the DB transaction, do the publishes, then - // do the send later - as that could take a long time (multiple seconds) depending on the size - // - // Same for waiting for confirmation of the send from the blockchain - if len(dataToPublish) > 0 || waitConfirm { - return nil + // For the simple case where we have no data to publish and aren't waiting for blockchain confirmation, + // insert the local message immediately within the same DB transaction. + // Otherwise, break out of the DB transaction (since those operations could take multiple seconds). + if len(dataToPublish) == 0 && !waitConfirm { + sent = true + return s.sendInternal(ctx, waitConfirm) } + return nil + }) - out, err = bm.broadcastMessageCommon(ctx, resolved, false) + if err != nil || sent { return err - }) - if err != nil { - return nil, err } // Perform deferred processing if len(dataToPublish) > 0 { - return bm.publishBlobsAndSend(ctx, resolved, dataToPublish, waitConfirm) - } else if waitConfirm { - return bm.broadcastMessageCommon(ctx, resolved, true) + if err := s.mgr.publishBlobs(ctx, dataToPublish); err != nil { + return err + } } + return s.sendInternal(ctx, waitConfirm) +} - // The broadcastMessage function modifies the input message to create all the refs - return out, err +func (s *broadcastSender) resolveMessage(ctx context.Context) ([]*fftypes.DataAndBlob, error) { + // Resolve the sending identity + if !s.isRootOrgBroadcast(ctx) { + if err := s.mgr.identity.ResolveInputIdentity(ctx, &s.msg.Header.Identity); err != nil { + return nil, i18n.WrapError(ctx, err, i18n.MsgAuthorInvalid) + } + } + + // The data manager is responsible for the heavy lifting of storing/validating all our in-line data elements + dataRefs, dataToPublish, err := s.mgr.data.ResolveInlineDataBroadcast(ctx, s.namespace, s.msg.InlineData) + s.msg.Message.Data = dataRefs + return dataToPublish, err +} + +func (s *broadcastSender) sendInternal(ctx context.Context, waitConfirm bool) (err error) { + if waitConfirm { + out, err := s.mgr.syncasync.SendConfirm(ctx, s.namespace, s.msg.Header.ID, s.Send) + if out != nil { + s.msg.Message = *out + } + return err + } + + // Seal the message + if err := s.msg.Seal(ctx); err != nil { + return err + } + if s.sendCallback != nil { + if err := s.sendCallback(ctx); err != nil { + return err + } + } + + // Store the message - this asynchronously triggers the next step in process + return s.mgr.database.InsertMessageLocal(ctx, &s.msg.Message) } -func (bm *broadcastManager) isRootOrgBroadcast(ctx context.Context, message *fftypes.Message) bool { +func (s *broadcastSender) isRootOrgBroadcast(ctx context.Context) bool { // Look into message to see if it contains a data item that is a root organization definition - if message.Header.Type == fftypes.MessageTypeDefinition { - messageData, ok, err := bm.data.GetMessageData(ctx, message, true) + if s.msg.Header.Type == fftypes.MessageTypeDefinition { + messageData, ok, err := s.mgr.data.GetMessageData(ctx, &s.msg.Message, true) if ok && err == nil { if len(messageData) > 0 { dataItem := messageData[0] @@ -111,35 +172,3 @@ func (bm *broadcastManager) isRootOrgBroadcast(ctx context.Context, message *fft } return false } - -func (bm *broadcastManager) publishBlobsAndSend(ctx context.Context, msg *fftypes.Message, dataToPublish []*fftypes.DataAndBlob, waitConfirm bool) (*fftypes.Message, error) { - - for _, d := range dataToPublish { - - // Stream from the local data exchange ... - reader, err := bm.exchange.DownloadBLOB(ctx, d.Blob.PayloadRef) - if err != nil { - return nil, i18n.WrapError(ctx, err, i18n.MsgDownloadBlobFailed, d.Blob.PayloadRef) - } - defer reader.Close() - - // ... to the public storage - publicRef, err := bm.publicstorage.PublishData(ctx, reader) - if err != nil { - return nil, err - } - log.L(ctx).Infof("Published blob with hash '%s' for data '%s' to public storage: '%s'", d.Data.Blob, d.Data.ID, publicRef) - - // Update the data in the database, with the public reference. - // We do this independently for each piece of data - update := database.DataQueryFactory.NewUpdate(ctx).Set("blob.public", publicRef) - err = bm.database.UpdateData(ctx, d.Data.ID, update) - if err != nil { - return nil, err - } - - } - - // Now we broadcast the message, as all data has been published - return bm.broadcastMessageCommon(ctx, msg, waitConfirm) -} diff --git a/internal/broadcast/message_test.go b/internal/broadcast/message_test.go index f944a51e7a..e00999e2d3 100644 --- a/internal/broadcast/message_test.go +++ b/internal/broadcast/message_test.go @@ -201,22 +201,19 @@ func TestBroadcastMessageWaitConfirmOk(t *testing.T) { }, []*fftypes.DataAndBlob{}, nil) mim.On("ResolveInputIdentity", ctx, mock.Anything).Return(nil) - requestID := fftypes.NewUUID() replyMsg := &fftypes.Message{ Header: fftypes.MessageHeader{ Namespace: "ns1", ID: fftypes.NewUUID(), }, } - msa.On("SendConfirm", ctx, "ns1", mock.Anything). + msa.On("SendConfirm", ctx, "ns1", mock.Anything, mock.Anything). Run(func(args mock.Arguments) { - send := args[2].(syncasync.RequestSender) - send(requestID) + send := args[3].(syncasync.RequestSender) + send(ctx) }). Return(replyMsg, nil) - mdi.On("InsertMessageLocal", ctx, mock.MatchedBy(func(msg *fftypes.Message) bool { - return msg.Header.ID == requestID - })).Return(nil) + mdi.On("InsertMessageLocal", ctx, mock.Anything).Return(nil) msg, err := bm.BroadcastMessage(ctx, "ns1", &fftypes.MessageInOut{ Message: fftypes.Message{ @@ -357,26 +354,23 @@ func TestPublishBlobsSendMessageFail(t *testing.T) { bm, cancel := newTestBroadcast(t) defer cancel() mdi := bm.database.(*databasemocks.Plugin) + mdm := bm.data.(*datamocks.Manager) mdx := bm.exchange.(*dataexchangemocks.Plugin) - mps := bm.publicstorage.(*publicstoragemocks.Plugin) mim := bm.identity.(*identitymanagermocks.Manager) blobHash := fftypes.NewRandB32() dataID := fftypes.NewUUID() ctx := context.Background() - mdx.On("DownloadBLOB", ctx, "blob/1").Return(ioutil.NopCloser(bytes.NewReader([]byte(`some data`))), nil) - mps.On("PublishData", ctx, mock.MatchedBy(func(reader io.ReadCloser) bool { - b, err := ioutil.ReadAll(reader) - assert.NoError(t, err) - assert.Equal(t, "some data", string(b)) - return true - })).Return("payload-ref", nil) - mdi.On("UpdateData", ctx, mock.Anything, mock.Anything).Return(nil) - mdi.On("InsertMessageLocal", ctx, mock.Anything).Return(fmt.Errorf("pop")) + rag := mdi.On("RunAsGroup", ctx, mock.Anything) + rag.RunFn = func(a mock.Arguments) { + var fn = a[1].(func(context.Context) error) + rag.ReturnArguments = mock.Arguments{fn(a[0].(context.Context))} + } mim.On("ResolveInputIdentity", ctx, mock.Anything).Return(nil) - - _, err := bm.publishBlobsAndSend(ctx, &fftypes.Message{}, []*fftypes.DataAndBlob{ + mdm.On("ResolveInlineDataBroadcast", ctx, "ns1", mock.Anything).Return(fftypes.DataRefs{ + {ID: dataID, Hash: fftypes.NewRandB32()}, + }, []*fftypes.DataAndBlob{ { Data: &fftypes.Data{ ID: dataID, @@ -389,122 +383,82 @@ func TestPublishBlobsSendMessageFail(t *testing.T) { PayloadRef: "blob/1", }, }, - }, false) - assert.EqualError(t, err, "pop") - - mdi.AssertExpectations(t) -} - -func TestPublishBlobsUpdateDataFail(t *testing.T) { - bm, cancel := newTestBroadcast(t) - defer cancel() - mdi := bm.database.(*databasemocks.Plugin) - mdx := bm.exchange.(*dataexchangemocks.Plugin) - mps := bm.publicstorage.(*publicstoragemocks.Plugin) - mim := bm.identity.(*identitymanagermocks.Manager) - - blobHash := fftypes.NewRandB32() - dataID := fftypes.NewUUID() - - ctx := context.Background() - mdx.On("DownloadBLOB", ctx, "blob/1").Return(ioutil.NopCloser(bytes.NewReader([]byte(`some data`))), nil) - mps.On("PublishData", ctx, mock.MatchedBy(func(reader io.ReadCloser) bool { - b, err := ioutil.ReadAll(reader) - assert.NoError(t, err) - assert.Equal(t, "some data", string(b)) - return true - })).Return("payload-ref", nil) - mdi.On("UpdateData", ctx, mock.Anything, mock.Anything).Return(fmt.Errorf("pop")) - mim.On("ResolveInputIdentity", ctx, mock.Anything).Return(nil) + }, nil) + mdx.On("DownloadBLOB", ctx, "blob/1").Return(nil, fmt.Errorf("pop")) - _, err := bm.publishBlobsAndSend(ctx, &fftypes.Message{}, []*fftypes.DataAndBlob{ - { - Data: &fftypes.Data{ - ID: dataID, - Blob: &fftypes.BlobRef{ - Hash: blobHash, + _, err := bm.BroadcastMessage(ctx, "ns1", &fftypes.MessageInOut{ + Message: fftypes.Message{ + Header: fftypes.MessageHeader{ + Identity: fftypes.Identity{ + Author: "did:firefly:org/abcd", + Key: "0x12345", }, }, - Blob: &fftypes.Blob{ - Hash: blobHash, - PayloadRef: "blob/1", - }, + }, + InlineData: fftypes.InlineData{ + {Blob: &fftypes.BlobRef{ + Hash: blobHash, + }}, }, }, false) - assert.EqualError(t, err, "pop") + assert.Regexp(t, "FF10240", err) mdi.AssertExpectations(t) + mdm.AssertExpectations(t) + mdx.AssertExpectations(t) + mim.AssertExpectations(t) } -func TestPublishBlobsPublishFail(t *testing.T) { +func TestBeforeSendCallback(t *testing.T) { bm, cancel := newTestBroadcast(t) defer cancel() - mdi := bm.database.(*databasemocks.Plugin) - mdx := bm.exchange.(*dataexchangemocks.Plugin) - mps := bm.publicstorage.(*publicstoragemocks.Plugin) - mim := bm.identity.(*identitymanagermocks.Manager) - - blobHash := fftypes.NewRandB32() - dataID := fftypes.NewUUID() - - ctx := context.Background() - mdx.On("DownloadBLOB", ctx, "blob/1").Return(ioutil.NopCloser(bytes.NewReader([]byte(`some data`))), nil) - mps.On("PublishData", ctx, mock.MatchedBy(func(reader io.ReadCloser) bool { - b, err := ioutil.ReadAll(reader) - assert.NoError(t, err) - assert.Equal(t, "some data", string(b)) - return true - })).Return("", fmt.Errorf("pop")) - mim.On("ResolveInputIdentity", ctx, mock.Anything).Return(nil) - _, err := bm.publishBlobsAndSend(ctx, &fftypes.Message{}, []*fftypes.DataAndBlob{ - { - Data: &fftypes.Data{ - ID: dataID, - Blob: &fftypes.BlobRef{ - Hash: blobHash, - }, - }, - Blob: &fftypes.Blob{ - Hash: blobHash, - PayloadRef: "blob/1", + id1 := fftypes.NewUUID() + message := bm.NewBroadcast("ns1", &fftypes.MessageInOut{ + Message: fftypes.Message{ + Data: fftypes.DataRefs{ + {ID: id1, Hash: fftypes.NewRandB32()}, }, }, - }, false) - assert.EqualError(t, err, "pop") + }) - mdi.AssertExpectations(t) -} + called := false + message.BeforeSend(func(ctx context.Context) error { + called = true + return nil + }) -func TestPublishBlobsDownloadFail(t *testing.T) { - bm, cancel := newTestBroadcast(t) - defer cancel() mdi := bm.database.(*databasemocks.Plugin) - mdx := bm.exchange.(*dataexchangemocks.Plugin) - mim := bm.identity.(*identitymanagermocks.Manager) + mdi.On("InsertMessageLocal", bm.ctx, mock.Anything).Return(nil) - blobHash := fftypes.NewRandB32() - dataID := fftypes.NewUUID() + err := message.(*broadcastSender).sendInternal(bm.ctx, false) + assert.NoError(t, err) + assert.True(t, called) +} - ctx := context.Background() - mdx.On("DownloadBLOB", ctx, "blob/1").Return(nil, fmt.Errorf("pop")) - mim.On("ResolveInputIdentity", ctx, mock.Anything).Return(nil) +func TestBeforeSendCallbackFail(t *testing.T) { + bm, cancel := newTestBroadcast(t) + defer cancel() - _, err := bm.publishBlobsAndSend(ctx, &fftypes.Message{}, []*fftypes.DataAndBlob{ - { - Data: &fftypes.Data{ - ID: dataID, - Blob: &fftypes.BlobRef{ - Hash: blobHash, - }, - }, - Blob: &fftypes.Blob{ - Hash: blobHash, - PayloadRef: "blob/1", + id1 := fftypes.NewUUID() + message := bm.NewBroadcast("ns1", &fftypes.MessageInOut{ + Message: fftypes.Message{ + Data: fftypes.DataRefs{ + {ID: id1, Hash: fftypes.NewRandB32()}, }, }, - }, false) - assert.Regexp(t, "FF10240", err) + }) - mdi.AssertExpectations(t) + called := false + message.BeforeSend(func(ctx context.Context) error { + called = true + return fmt.Errorf("pop") + }) + + mdi := bm.database.(*databasemocks.Plugin) + mdi.On("InsertMessageLocal", bm.ctx, mock.Anything).Return(nil) + + err := message.(*broadcastSender).sendInternal(bm.ctx, false) + assert.EqualError(t, err, "pop") + assert.True(t, called) } diff --git a/internal/privatemessaging/message.go b/internal/privatemessaging/message.go index e583138eb5..b9379434a2 100644 --- a/internal/privatemessaging/message.go +++ b/internal/privatemessaging/message.go @@ -21,126 +21,181 @@ import ( "encoding/json" "github.com/hyperledger/firefly/internal/i18n" + "github.com/hyperledger/firefly/internal/sysmessaging" "github.com/hyperledger/firefly/pkg/fftypes" ) -func (pm *privateMessaging) SendMessage(ctx context.Context, ns string, in *fftypes.MessageInOut, waitConfirm bool) (out *fftypes.Message, err error) { - return pm.sendMessageWithID(ctx, ns, nil, in, nil, waitConfirm) +func (pm *privateMessaging) NewMessage(ns string, in *fftypes.MessageInOut) sysmessaging.MessageSender { + message := &messageSender{ + mgr: pm, + namespace: ns, + msg: in, + } + message.setDefaults() + return message } -func (pm *privateMessaging) sendMessageWithID(ctx context.Context, ns string, id *fftypes.UUID, unresolved *fftypes.MessageInOut, resolved *fftypes.Message, waitConfirm bool) (*fftypes.Message, error) { - if unresolved != nil { - resolved = &unresolved.Message +func (pm *privateMessaging) SendMessage(ctx context.Context, ns string, in *fftypes.MessageInOut, waitConfirm bool) (out *fftypes.Message, err error) { + message := pm.NewMessage(ns, in) + if waitConfirm { + err = message.SendAndWait(ctx) + } else { + err = message.Send(ctx) } + return &in.Message, err +} - resolved.Header.ID = id - resolved.Header.Namespace = ns - resolved.Header.Type = fftypes.MessageTypePrivate - if resolved.Header.TxType == "" { - resolved.Header.TxType = fftypes.TransactionTypeBatchPin +func (pm *privateMessaging) RequestReply(ctx context.Context, ns string, in *fftypes.MessageInOut) (*fftypes.MessageInOut, error) { + if in.Header.Tag == "" { + return nil, i18n.NewError(ctx, i18n.MsgRequestReplyTagRequired) + } + if in.Header.CID != nil { + return nil, i18n.NewError(ctx, i18n.MsgRequestCannotHaveCID) } + message := pm.NewMessage(ns, in) + return pm.syncasync.RequestReply(ctx, ns, in.Header.ID, message.Send) +} - // Resolve the sending identity - if err := pm.identity.ResolveInputIdentity(ctx, &resolved.Header.Identity); err != nil { - return nil, i18n.WrapError(ctx, err, i18n.MsgAuthorInvalid) +type messageSender struct { + mgr *privateMessaging + namespace string + msg *fftypes.MessageInOut + resolved bool + sendCallback sysmessaging.BeforeSendCallback +} + +func (s *messageSender) Send(ctx context.Context) error { + return s.resolveAndSend(ctx, false) +} + +func (s *messageSender) SendAndWait(ctx context.Context) error { + return s.resolveAndSend(ctx, true) +} + +func (s *messageSender) BeforeSend(cb sysmessaging.BeforeSendCallback) sysmessaging.MessageSender { + s.sendCallback = cb + return s +} + +func (s *messageSender) setDefaults() { + s.msg.Header.ID = fftypes.NewUUID() + s.msg.Header.Namespace = s.namespace + if s.msg.Header.Type == "" { + s.msg.Header.Type = fftypes.MessageTypePrivate + } + if s.msg.Header.TxType == "" { + s.msg.Header.TxType = fftypes.TransactionTypeBatchPin } +} - // We optimize the DB storage of all the parts of the message using transaction semantics (assuming those are supported by the DB plugin - var err error - err = pm.database.RunAsGroup(ctx, func(ctx context.Context) error { - if unresolved != nil { - err = pm.resolveMessage(ctx, unresolved) +func (s *messageSender) resolveAndSend(ctx context.Context, waitConfirm bool) error { + sent := false + + // We optimize the DB storage of all the parts of the message using transaction semantics (assuming those are supported by the DB plugin) + err := s.mgr.database.RunAsGroup(ctx, func(ctx context.Context) (err error) { + if !s.resolved { + if err := s.resolveMessage(ctx); err != nil { + return err + } + s.resolved = true } - if err == nil && !waitConfirm { - // We can safely optimize the send into the same DB transaction - resolved, err = pm.sendOrWaitMessage(ctx, resolved, false) + + // If we aren't waiting for blockchain confirmation, insert the local message immediately within the same DB transaction. + if !waitConfirm { + err = s.sendInternal(ctx, waitConfirm) + sent = true } return err }) - if err != nil { - return nil, err - } - if waitConfirm { - // perform the send and wait for the confirmation after closing the original DB transaction - return pm.sendOrWaitMessage(ctx, resolved, true) + + if err != nil || sent { + return err } - return resolved, err + + return s.sendInternal(ctx, waitConfirm) } -func (pm *privateMessaging) resolveMessage(ctx context.Context, in *fftypes.MessageInOut) (err error) { +func (s *messageSender) resolveMessage(ctx context.Context) error { + // Resolve the sending identity + if err := s.mgr.identity.ResolveInputIdentity(ctx, &s.msg.Header.Identity); err != nil { + return i18n.WrapError(ctx, err, i18n.MsgAuthorInvalid) + } // Resolve the member list into a group - if err = pm.resolveReceipientList(ctx, in); err != nil { + if err := s.mgr.resolveRecipientList(ctx, s.msg); err != nil { return err } // The data manager is responsible for the heavy lifting of storing/validating all our in-line data elements - in.Message.Data, err = pm.data.ResolveInlineDataPrivate(ctx, in.Header.Namespace, in.InlineData) + dataRefs, err := s.mgr.data.ResolveInlineDataPrivate(ctx, s.namespace, s.msg.InlineData) + s.msg.Message.Data = dataRefs return err } -func (pm *privateMessaging) sendOrWaitMessage(ctx context.Context, msg *fftypes.Message, waitConfirm bool) (*fftypes.Message, error) { - - immediateConfirm := msg.Header.TxType == fftypes.TransactionTypeNone +func (s *messageSender) sendInternal(ctx context.Context, waitConfirm bool) error { + immediateConfirm := s.msg.Header.TxType == fftypes.TransactionTypeNone - if immediateConfirm || !waitConfirm { - - // Seal the message - if err := msg.Seal(ctx); err != nil { - return nil, err + if waitConfirm && !immediateConfirm { + // Pass it to the sync-async handler to wait for the confirmation to come back in. + // NOTE: Our caller makes sure we are not in a RunAsGroup (which would be bad) + out, err := s.mgr.syncasync.SendConfirm(ctx, s.namespace, s.msg.Header.ID, s.Send) + if out != nil { + s.msg.Message = *out } + return err + } - if immediateConfirm { - msg.Confirmed = fftypes.Now() - msg.Pending = false - // msg.Header.Key = "" // there is no on-chain signing assurance with this message + // Seal the message + if err := s.msg.Seal(ctx); err != nil { + return err + } + if s.sendCallback != nil { + if err := s.sendCallback(ctx); err != nil { + return err } + } - // Store the message - this asynchronously triggers the next step in process - if err := pm.database.InsertMessageLocal(ctx, msg); err != nil { - return nil, err - } + if immediateConfirm { + s.msg.Confirmed = fftypes.Now() + s.msg.Pending = false + // msg.Header.Key = "" // there is no on-chain signing assurance with this message + } - if immediateConfirm { - if err := pm.sendUnpinnedMessage(ctx, msg); err != nil { - return nil, err - } + // Store the message - this asynchronously triggers the next step in process + if err := s.mgr.database.InsertMessageLocal(ctx, &s.msg.Message); err != nil { + return err + } - // Emit a confirmation event locally immediately - event := fftypes.NewEvent(fftypes.EventTypeMessageConfirmed, msg.Header.Namespace, msg.Header.ID) - if err := pm.database.InsertEvent(ctx, event); err != nil { - return nil, err - } + if immediateConfirm { + if err := s.sendUnpinned(ctx); err != nil { + return err } - return msg, nil + // Emit a confirmation event locally immediately + event := fftypes.NewEvent(fftypes.EventTypeMessageConfirmed, s.namespace, s.msg.Header.ID) + if err := s.mgr.database.InsertEvent(ctx, event); err != nil { + return err + } } - // Pass it to the sync-async handler to wait for the confirmation to come back in. - // NOTE: Our caller makes sure we are not in a RunAsGroup (which would be bad) - return pm.syncasync.SendConfirm(ctx, msg.Header.Namespace, func(requestID *fftypes.UUID) error { - _, err := pm.sendMessageWithID(ctx, msg.Header.Namespace, requestID, nil, msg, false) - return err - }) - + return nil } -func (pm *privateMessaging) sendUnpinnedMessage(ctx context.Context, message *fftypes.Message) (err error) { - +func (s *messageSender) sendUnpinned(ctx context.Context) (err error) { // Retrieve the group - group, nodes, err := pm.groupManager.getGroupNodes(ctx, message.Header.Group) + group, nodes, err := s.mgr.groupManager.getGroupNodes(ctx, s.msg.Header.Group) if err != nil { return err } - data, _, err := pm.data.GetMessageData(ctx, message, true) + data, _, err := s.mgr.data.GetMessageData(ctx, &s.msg.Message, true) if err != nil { return err } payload, err := json.Marshal(&fftypes.TransportWrapper{ Type: fftypes.TransportPayloadTypeMessage, - Message: message, + Message: &s.msg.Message, Data: data, Group: group, }) @@ -148,5 +203,5 @@ func (pm *privateMessaging) sendUnpinnedMessage(ctx context.Context, message *ff return i18n.WrapError(ctx, err, i18n.MsgSerializationFailed) } - return pm.sendData(ctx, "message", message.Header.ID, message.Header.Group, message.Header.Namespace, nodes, payload, nil, data) + return s.mgr.sendData(ctx, "message", s.msg.Header.ID, s.msg.Header.Group, s.namespace, nodes, payload, nil, data) } diff --git a/internal/privatemessaging/message_test.go b/internal/privatemessaging/message_test.go index 4bbb3deb52..eec5df50ea 100644 --- a/internal/privatemessaging/message_test.go +++ b/internal/privatemessaging/message_test.go @@ -49,11 +49,6 @@ func TestSendConfirmMessageE2EOk(t *testing.T) { }, nil) mdi := pm.database.(*databasemocks.Plugin) - rag := mdi.On("RunAsGroup", pm.ctx, mock.Anything).Return(nil) - rag.RunFn = func(a mock.Arguments) { - err := a[1].(func(context.Context) error)(a[0].(context.Context)) - rag.ReturnArguments = mock.Arguments{err} - } mdi.On("GetOrganizationByName", pm.ctx, "localorg").Return(&fftypes.Organization{ ID: fftypes.NewUUID(), }, nil) @@ -70,22 +65,19 @@ func TestSendConfirmMessageE2EOk(t *testing.T) { {Hash: fftypes.NewRandB32()}, }, nil, nil).Once() - requestID := fftypes.NewUUID() retMsg := &fftypes.Message{ Header: fftypes.MessageHeader{ ID: fftypes.NewUUID(), }, } msa := pm.syncasync.(*syncasyncmocks.Bridge) - msa.On("SendConfirm", pm.ctx, "ns1", mock.Anything). + msa.On("SendConfirm", pm.ctx, "ns1", mock.Anything, mock.Anything). Run(func(args mock.Arguments) { - send := args[2].(syncasync.RequestSender) - send(requestID) + send := args[3].(syncasync.RequestSender) + send(pm.ctx) }). Return(retMsg, nil).Once() - mdi.On("InsertMessageLocal", pm.ctx, mock.MatchedBy(func(msg *fftypes.Message) bool { - return msg.Header.ID == requestID - })).Return(nil).Once() + mdi.On("InsertMessageLocal", pm.ctx, mock.Anything).Return(nil).Once() msg, err := pm.SendMessage(pm.ctx, "ns1", &fftypes.MessageInOut{ InlineData: fftypes.InlineData{ @@ -128,11 +120,6 @@ func TestSendUnpinnedMessageE2EOk(t *testing.T) { }, true, nil).Once() mdi := pm.database.(*databasemocks.Plugin) - rag := mdi.On("RunAsGroup", pm.ctx, mock.Anything).Return(nil) - rag.RunFn = func(a mock.Arguments) { - err := a[1].(func(context.Context) error)(a[0].(context.Context)) - rag.ReturnArguments = mock.Arguments{err} - } mdi.On("GetGroupByHash", pm.ctx, groupID).Return(&fftypes.Group{ Hash: groupID, GroupIdentity: fftypes.GroupIdentity{ @@ -180,6 +167,26 @@ func TestSendUnpinnedMessageE2EOk(t *testing.T) { } +func TestSendMessageBadGroup(t *testing.T) { + + pm, cancel := newTestPrivateMessaging(t) + defer cancel() + + mim := pm.identity.(*identitymanagermocks.Manager) + mim.On("ResolveInputIdentity", pm.ctx, mock.Anything).Return(nil) + + _, err := pm.SendMessage(pm.ctx, "ns1", &fftypes.MessageInOut{ + InlineData: fftypes.InlineData{ + {Value: fftypes.Byteable(`{"some": "data"}`)}, + }, + Group: &fftypes.InputGroup{}, + }, true) + assert.Regexp(t, "FF10219", err) + + mim.AssertExpectations(t) + +} + func TestSendMessageBadIdentity(t *testing.T) { pm, cancel := newTestPrivateMessaging(t) @@ -210,47 +217,47 @@ func TestSendMessageFail(t *testing.T) { defer cancel() mim := pm.identity.(*identitymanagermocks.Manager) + mim.On("ResolveLocalOrgDID", pm.ctx).Return("localorg", nil) + mim.On("GetLocalOrganization", pm.ctx).Return(&fftypes.Organization{Identity: "localorg"}, nil) mim.On("ResolveInputIdentity", pm.ctx, mock.Anything).Run(func(args mock.Arguments) { identity := args[1].(*fftypes.Identity) identity.Author = "localorg" identity.Key = "localkey" }).Return(nil) + mdi := pm.database.(*databasemocks.Plugin) + mdi.On("GetOrganizationByName", pm.ctx, "localorg").Return(&fftypes.Organization{ + ID: fftypes.NewUUID(), + }, nil) + mdi.On("GetNodes", pm.ctx, mock.Anything).Return([]*fftypes.Node{ + {ID: fftypes.NewUUID(), Name: "node1", Owner: "localorg"}, + }, nil, nil) + mdi.On("GetGroups", pm.ctx, mock.Anything).Return([]*fftypes.Group{ + {Hash: fftypes.NewRandB32()}, + }, nil, nil) + mdi.On("InsertMessageLocal", pm.ctx, mock.Anything).Return(fmt.Errorf("pop")) + dataID := fftypes.NewUUID() mdm := pm.data.(*datamocks.Manager) mdm.On("ResolveInlineDataPrivate", pm.ctx, "ns1", mock.Anything).Return(fftypes.DataRefs{ {ID: dataID, Hash: fftypes.NewRandB32()}, }, nil) - mdi := pm.database.(*databasemocks.Plugin) - mdi.On("RunAsGroup", pm.ctx, mock.Anything).Return(fmt.Errorf("pop")) _, err := pm.SendMessage(pm.ctx, "ns1", &fftypes.MessageInOut{ InlineData: fftypes.InlineData{ {Value: fftypes.Byteable(`{"some": "data"}`)}, }, Group: &fftypes.InputGroup{ Members: []fftypes.MemberInput{ - {Identity: "org1"}, + {Identity: "localorg"}, }, }, }, false) assert.EqualError(t, err, "pop") mim.AssertExpectations(t) - -} - -func TestResolveAndSendBadMembers(t *testing.T) { - - pm, cancel := newTestPrivateMessaging(t) - defer cancel() - - err := pm.resolveMessage(pm.ctx, &fftypes.MessageInOut{ - InlineData: fftypes.InlineData{ - {Value: fftypes.Byteable(`{"some": "data"}`)}, - }, - }) - assert.Regexp(t, "FF10219", err) + mdi.AssertExpectations(t) + mdm.AssertExpectations(t) } @@ -262,6 +269,11 @@ func TestResolveAndSendBadInlineData(t *testing.T) { mim := pm.identity.(*identitymanagermocks.Manager) mim.On("ResolveLocalOrgDID", pm.ctx).Return("localorg", nil) mim.On("GetLocalOrganization", pm.ctx).Return(&fftypes.Organization{Identity: "localorg"}, nil) + mim.On("ResolveInputIdentity", pm.ctx, mock.Anything).Run(func(args mock.Arguments) { + identity := args[1].(*fftypes.Identity) + identity.Author = "localorg" + identity.Key = "localkey" + }).Return(nil) mdi := pm.database.(*databasemocks.Plugin) @@ -278,16 +290,26 @@ func TestResolveAndSendBadInlineData(t *testing.T) { mdm := pm.data.(*datamocks.Manager) mdm.On("ResolveInlineDataPrivate", pm.ctx, "ns1", mock.Anything).Return(nil, fmt.Errorf("pop")) - err := pm.resolveMessage(pm.ctx, &fftypes.MessageInOut{ - Message: fftypes.Message{Header: fftypes.MessageHeader{Namespace: "ns1"}}, - Group: &fftypes.InputGroup{ - Members: []fftypes.MemberInput{ - {Identity: "localorg"}, + message := &messageSender{ + mgr: pm, + namespace: "ns1", + msg: &fftypes.MessageInOut{ + Message: fftypes.Message{Header: fftypes.MessageHeader{Namespace: "ns1"}}, + Group: &fftypes.InputGroup{ + Members: []fftypes.MemberInput{ + {Identity: "localorg"}, + }, }, }, - }) + } + + err := message.resolveMessage(pm.ctx) assert.Regexp(t, "pop", err) + mim.AssertExpectations(t) + mdi.AssertExpectations(t) + mdm.AssertExpectations(t) + } func TestSealFail(t *testing.T) { @@ -296,14 +318,75 @@ func TestSealFail(t *testing.T) { defer cancel() id1 := fftypes.NewUUID() - _, err := pm.sendOrWaitMessage(pm.ctx, &fftypes.Message{ - Header: fftypes.MessageHeader{Namespace: "ns1"}, - Data: fftypes.DataRefs{ - {ID: id1}, - {ID: id1}, // duplicate + message := pm.NewMessage("ns1", &fftypes.MessageInOut{ + Message: fftypes.Message{ + Data: fftypes.DataRefs{ + {ID: id1, Hash: fftypes.NewRandB32()}, + {ID: id1, Hash: fftypes.NewRandB32()}, // duplicate ID + }, }, - }, false) - assert.Regexp(t, "FF10144", err) + }) + + err := message.(*messageSender).sendInternal(pm.ctx, false) + assert.Regexp(t, "FF10145", err) + +} + +func TestBeforeSendCallback(t *testing.T) { + + pm, cancel := newTestPrivateMessaging(t) + defer cancel() + + id1 := fftypes.NewUUID() + message := pm.NewMessage("ns1", &fftypes.MessageInOut{ + Message: fftypes.Message{ + Data: fftypes.DataRefs{ + {ID: id1, Hash: fftypes.NewRandB32()}, + }, + }, + }) + + called := false + message.BeforeSend(func(ctx context.Context) error { + called = true + return nil + }) + + mdi := pm.database.(*databasemocks.Plugin) + mdi.On("InsertMessageLocal", pm.ctx, mock.Anything).Return(nil) + + err := message.(*messageSender).sendInternal(pm.ctx, false) + assert.NoError(t, err) + assert.True(t, called) + +} + +func TestBeforeSendCallbackFail(t *testing.T) { + + pm, cancel := newTestPrivateMessaging(t) + defer cancel() + + id1 := fftypes.NewUUID() + message := pm.NewMessage("ns1", &fftypes.MessageInOut{ + Message: fftypes.Message{ + Data: fftypes.DataRefs{ + {ID: id1, Hash: fftypes.NewRandB32()}, + }, + }, + }) + + called := false + message.BeforeSend(func(ctx context.Context) error { + called = true + return fmt.Errorf("pop") + }) + + mdi := pm.database.(*databasemocks.Plugin) + mdi.On("InsertMessageLocal", pm.ctx, mock.Anything).Return(nil) + + err := message.(*messageSender).sendInternal(pm.ctx, false) + assert.EqualError(t, err, "pop") + assert.True(t, called) } @@ -343,15 +426,22 @@ func TestSendUnpinnedMessageMarshalFail(t *testing.T) { ID: nodeID2, Name: "node2", Owner: "org1", DX: fftypes.DXInfo{Peer: "peer2-remote"}, }, nil).Once() - err := pm.sendUnpinnedMessage(pm.ctx, &fftypes.Message{ - Header: fftypes.MessageHeader{ - Identity: fftypes.Identity{ - Author: "localorg", + message := &messageSender{ + mgr: pm, + msg: &fftypes.MessageInOut{ + Message: fftypes.Message{ + Header: fftypes.MessageHeader{ + Identity: fftypes.Identity{ + Author: "localorg", + }, + TxType: fftypes.TransactionTypeNone, + Group: groupID, + }, }, - TxType: fftypes.TransactionTypeNone, - Group: groupID, }, - }) + } + + err := message.sendUnpinned(pm.ctx) assert.Regexp(t, "FF10137", err) mdm.AssertExpectations(t) @@ -387,15 +477,22 @@ func TestSendUnpinnedMessageGetDataFail(t *testing.T) { ID: nodeID2, Name: "node2", Owner: "org1", DX: fftypes.DXInfo{Peer: "peer2-remote"}, }, nil).Once() - err := pm.sendUnpinnedMessage(pm.ctx, &fftypes.Message{ - Header: fftypes.MessageHeader{ - Identity: fftypes.Identity{ - Author: "localorg", + message := &messageSender{ + mgr: pm, + msg: &fftypes.MessageInOut{ + Message: fftypes.Message{ + Header: fftypes.MessageHeader{ + Identity: fftypes.Identity{ + Author: "localorg", + }, + TxType: fftypes.TransactionTypeNone, + Group: groupID, + }, }, - TxType: fftypes.TransactionTypeNone, - Group: groupID, }, - }) + } + + err := message.sendUnpinned(pm.ctx) assert.Regexp(t, "pop", err) mdm.AssertExpectations(t) @@ -412,15 +509,22 @@ func TestSendUnpinnedMessageGroupLookupFail(t *testing.T) { mdi := pm.database.(*databasemocks.Plugin) mdi.On("GetGroupByHash", pm.ctx, groupID).Return(nil, fmt.Errorf("pop")).Once() - err := pm.sendUnpinnedMessage(pm.ctx, &fftypes.Message{ - Header: fftypes.MessageHeader{ - Identity: fftypes.Identity{ - Author: "org1", + message := &messageSender{ + mgr: pm, + msg: &fftypes.MessageInOut{ + Message: fftypes.Message{ + Header: fftypes.MessageHeader{ + Identity: fftypes.Identity{ + Author: "org1", + }, + TxType: fftypes.TransactionTypeNone, + Group: groupID, + }, }, - TxType: fftypes.TransactionTypeNone, - Group: groupID, }, - }) + } + + err := message.sendUnpinned(pm.ctx) assert.Regexp(t, "pop", err) mdi.AssertExpectations(t) @@ -446,11 +550,6 @@ func TestSendUnpinnedMessageInsertFail(t *testing.T) { }, nil) mdi := pm.database.(*databasemocks.Plugin) - rag := mdi.On("RunAsGroup", pm.ctx, mock.Anything).Return(nil) - rag.RunFn = func(a mock.Arguments) { - err := a[1].(func(context.Context) error)(a[0].(context.Context)) - rag.ReturnArguments = mock.Arguments{err} - } mdi.On("InsertMessageLocal", pm.ctx, mock.Anything).Return(fmt.Errorf("pop")).Once() _, err := pm.SendMessage(pm.ctx, "ns1", &fftypes.MessageInOut{ @@ -493,11 +592,6 @@ func TestSendUnpinnedMessageResolveGroupFail(t *testing.T) { }, nil) mdi := pm.database.(*databasemocks.Plugin) - rag := mdi.On("RunAsGroup", pm.ctx, mock.Anything).Return(nil) - rag.RunFn = func(a mock.Arguments) { - err := a[1].(func(context.Context) error)(a[0].(context.Context)) - rag.ReturnArguments = mock.Arguments{err} - } mdi.On("GetGroupByHash", pm.ctx, groupID).Return(nil, fmt.Errorf("pop")).Once() mdi.On("InsertMessageLocal", pm.ctx, mock.Anything).Return(nil).Once() @@ -550,11 +644,6 @@ func TestSendUnpinnedMessageEventFail(t *testing.T) { }, true, nil).Once() mdi := pm.database.(*databasemocks.Plugin) - rag := mdi.On("RunAsGroup", pm.ctx, mock.Anything).Return(nil) - rag.RunFn = func(a mock.Arguments) { - err := a[1].(func(context.Context) error)(a[0].(context.Context)) - rag.ReturnArguments = mock.Arguments{err} - } mdi.On("GetGroupByHash", pm.ctx, groupID).Return(&fftypes.Group{ Hash: groupID, GroupIdentity: fftypes.GroupIdentity{ @@ -599,3 +688,70 @@ func TestSendUnpinnedMessageEventFail(t *testing.T) { mim.AssertExpectations(t) } + +func TestRequestReplyMissingTag(t *testing.T) { + pm, cancel := newTestPrivateMessaging(t) + defer cancel() + + msa := pm.syncasync.(*syncasyncmocks.Bridge) + msa.On("RequestReply", pm.ctx, "ns1", mock.Anything).Return(nil, nil) + + _, err := pm.RequestReply(pm.ctx, "ns1", &fftypes.MessageInOut{}) + assert.Regexp(t, "FF10261", err) +} + +func TestRequestReplyInvalidCID(t *testing.T) { + pm, cancel := newTestPrivateMessaging(t) + defer cancel() + + msa := pm.syncasync.(*syncasyncmocks.Bridge) + msa.On("RequestReply", pm.ctx, "ns1", mock.Anything).Return(nil, nil) + + _, err := pm.RequestReply(pm.ctx, "ns1", &fftypes.MessageInOut{ + Message: fftypes.Message{ + Header: fftypes.MessageHeader{ + Tag: "mytag", + CID: fftypes.NewUUID(), + Group: fftypes.NewRandB32(), + }, + }, + }) + assert.Regexp(t, "FF10262", err) +} + +func TestRequestReplySuccess(t *testing.T) { + pm, cancel := newTestPrivateMessaging(t) + defer cancel() + + mim := pm.identity.(*identitymanagermocks.Manager) + mim.On("ResolveInputIdentity", pm.ctx, mock.Anything).Return(nil) + + msa := pm.syncasync.(*syncasyncmocks.Bridge) + msa.On("RequestReply", pm.ctx, "ns1", mock.Anything, mock.Anything). + Run(func(args mock.Arguments) { + send := args[3].(syncasync.RequestSender) + send(pm.ctx) + }). + Return(nil, nil) + + mdm := pm.data.(*datamocks.Manager) + mdm.On("ResolveInlineDataPrivate", pm.ctx, "ns1", mock.Anything).Return(fftypes.DataRefs{ + {ID: fftypes.NewUUID(), Hash: fftypes.NewRandB32()}, + }, nil) + + mdi := pm.database.(*databasemocks.Plugin) + mdi.On("InsertMessageLocal", pm.ctx, mock.Anything).Return(nil).Once() + + _, err := pm.RequestReply(pm.ctx, "ns1", &fftypes.MessageInOut{ + Message: fftypes.Message{ + Header: fftypes.MessageHeader{ + Tag: "mytag", + Group: fftypes.NewRandB32(), + Identity: fftypes.Identity{ + Author: "org1", + }, + }, + }, + }) + assert.NoError(t, err) +} diff --git a/internal/privatemessaging/privatemessaging.go b/internal/privatemessaging/privatemessaging.go index e9bf698c1e..1ca3ccdedc 100644 --- a/internal/privatemessaging/privatemessaging.go +++ b/internal/privatemessaging/privatemessaging.go @@ -29,6 +29,7 @@ import ( "github.com/hyperledger/firefly/internal/log" "github.com/hyperledger/firefly/internal/retry" "github.com/hyperledger/firefly/internal/syncasync" + "github.com/hyperledger/firefly/internal/sysmessaging" "github.com/hyperledger/firefly/pkg/blockchain" "github.com/hyperledger/firefly/pkg/database" "github.com/hyperledger/firefly/pkg/dataexchange" @@ -40,6 +41,7 @@ type Manager interface { GroupManager Start() error + NewMessage(ns string, msg *fftypes.MessageInOut) sysmessaging.MessageSender SendMessage(ctx context.Context, ns string, in *fftypes.MessageInOut, waitConfirm bool) (out *fftypes.Message, err error) RequestReply(ctx context.Context, ns string, request *fftypes.MessageInOut) (reply *fftypes.MessageInOut, err error) } @@ -231,16 +233,3 @@ func (pm *privateMessaging) sendAndSubmitBatch(ctx context.Context, batch *fftyp func (pm *privateMessaging) writeTransaction(ctx context.Context, batch *fftypes.Batch, contexts []*fftypes.Bytes32) error { return pm.batchpin.SubmitPinnedBatch(ctx, batch, contexts) } - -func (pm *privateMessaging) RequestReply(ctx context.Context, ns string, unresolved *fftypes.MessageInOut) (*fftypes.MessageInOut, error) { - if unresolved.Header.Tag == "" { - return nil, i18n.NewError(ctx, i18n.MsgRequestReplyTagRequired) - } - if unresolved.Header.CID != nil { - return nil, i18n.NewError(ctx, i18n.MsgRequestCannotHaveCID) - } - return pm.syncasync.RequestReply(ctx, ns, func(requestID *fftypes.UUID) error { - _, err := pm.sendMessageWithID(ctx, ns, requestID, unresolved, &unresolved.Message, false) - return err - }) -} diff --git a/internal/privatemessaging/privatemessaging_test.go b/internal/privatemessaging/privatemessaging_test.go index 8b8cdf1bf1..c1412018d2 100644 --- a/internal/privatemessaging/privatemessaging_test.go +++ b/internal/privatemessaging/privatemessaging_test.go @@ -22,7 +22,6 @@ import ( "testing" "github.com/hyperledger/firefly/internal/config" - "github.com/hyperledger/firefly/internal/syncasync" "github.com/hyperledger/firefly/mocks/batchmocks" "github.com/hyperledger/firefly/mocks/batchpinmocks" "github.com/hyperledger/firefly/mocks/blockchainmocks" @@ -57,6 +56,13 @@ func newTestPrivateMessaging(t *testing.T) (*privateMessaging, func()) { fftypes.MessageTypeTransferPrivate, }, mock.Anything, mock.Anything).Return() + rag := mdi.On("RunAsGroup", mock.Anything, mock.Anything).Maybe() + rag.RunFn = func(a mock.Arguments) { + rag.ReturnArguments = mock.Arguments{ + a[1].(func(context.Context) error)(a[0].(context.Context)), + } + } + ctx, cancel := context.WithCancel(context.Background()) pm, err := NewPrivateMessaging(ctx, mdi, mim, mdx, mbi, mba, mdm, msa, mbp) assert.NoError(t, err) @@ -89,13 +95,6 @@ func TestDispatchBatchWithBlobs(t *testing.T) { mdx := pm.exchange.(*dataexchangemocks.Plugin) mim := pm.identity.(*identitymanagermocks.Manager) - rag := mdi.On("RunAsGroup", pm.ctx, mock.Anything).Maybe() - rag.RunFn = func(a mock.Arguments) { - rag.ReturnArguments = mock.Arguments{ - a[1].(func(context.Context) error)(a[0].(context.Context)), - } - } - mim.On("ResolveInputIdentity", pm.ctx, mock.Anything).Run(func(args mock.Arguments) { identity := args[1].(*fftypes.Identity) assert.Equal(t, "org1", identity.Author) @@ -397,68 +396,6 @@ func TestTransferBlobsOpInsertFail(t *testing.T) { assert.Regexp(t, "pop", err) } -func TestRequestReplyMissingTag(t *testing.T) { - pm, cancel := newTestPrivateMessaging(t) - defer cancel() - - msa := pm.syncasync.(*syncasyncmocks.Bridge) - msa.On("RequestReply", pm.ctx, "ns1", mock.Anything).Return(nil, nil) - - _, err := pm.RequestReply(pm.ctx, "ns1", &fftypes.MessageInOut{}) - assert.Regexp(t, "FF10261", err) -} - -func TestRequestReplyInvalidCID(t *testing.T) { - pm, cancel := newTestPrivateMessaging(t) - defer cancel() - - msa := pm.syncasync.(*syncasyncmocks.Bridge) - msa.On("RequestReply", pm.ctx, "ns1", mock.Anything).Return(nil, nil) - - _, err := pm.RequestReply(pm.ctx, "ns1", &fftypes.MessageInOut{ - Message: fftypes.Message{ - Header: fftypes.MessageHeader{ - Tag: "mytag", - CID: fftypes.NewUUID(), - Group: fftypes.NewRandB32(), - }, - }, - }) - assert.Regexp(t, "FF10262", err) -} - -func TestRequestReplySuccess(t *testing.T) { - pm, cancel := newTestPrivateMessaging(t) - defer cancel() - - mim := pm.identity.(*identitymanagermocks.Manager) - mim.On("ResolveInputIdentity", pm.ctx, mock.Anything).Return(nil) - - msa := pm.syncasync.(*syncasyncmocks.Bridge) - msa.On("RequestReply", pm.ctx, "ns1", mock.Anything). - Run(func(args mock.Arguments) { - send := args[2].(syncasync.RequestSender) - send(fftypes.NewUUID()) - }). - Return(nil, nil) - - mdi := pm.database.(*databasemocks.Plugin) - mdi.On("RunAsGroup", pm.ctx, mock.Anything).Return(nil) - - _, err := pm.RequestReply(pm.ctx, "ns1", &fftypes.MessageInOut{ - Message: fftypes.Message{ - Header: fftypes.MessageHeader{ - Tag: "mytag", - Group: fftypes.NewRandB32(), - Identity: fftypes.Identity{ - Author: "org1", - }, - }, - }, - }) - assert.NoError(t, err) -} - func TestStart(t *testing.T) { pm, cancel := newTestPrivateMessaging(t) defer cancel() diff --git a/internal/privatemessaging/recipients.go b/internal/privatemessaging/recipients.go index 604818aad4..6092cb2dee 100644 --- a/internal/privatemessaging/recipients.go +++ b/internal/privatemessaging/recipients.go @@ -26,7 +26,7 @@ import ( "github.com/hyperledger/firefly/pkg/fftypes" ) -func (pm *privateMessaging) resolveReceipientList(ctx context.Context, in *fftypes.MessageInOut) error { +func (pm *privateMessaging) resolveRecipientList(ctx context.Context, in *fftypes.MessageInOut) error { if in.Header.Group != nil { log.L(ctx).Debugf("Group '%s' specified for message", in.Header.Group) return nil // validity of existing group checked later diff --git a/internal/privatemessaging/recipients_test.go b/internal/privatemessaging/recipients_test.go index 93a619370b..ed0beb9c0a 100644 --- a/internal/privatemessaging/recipients_test.go +++ b/internal/privatemessaging/recipients_test.go @@ -91,7 +91,7 @@ func TestResolveMemberListNewGroupE2E(t *testing.T) { assert.Equal(t, *dataID, *msg.Data[0].ID) } - err := pm.resolveReceipientList(pm.ctx, &fftypes.MessageInOut{ + err := pm.resolveRecipientList(pm.ctx, &fftypes.MessageInOut{ Message: fftypes.Message{ Header: fftypes.MessageHeader{ Namespace: "ns1", @@ -126,7 +126,7 @@ func TestResolveMemberListExistingGroup(t *testing.T) { mim.On("ResolveLocalOrgDID", pm.ctx).Return("localorg", nil) mim.On("GetLocalOrganization", pm.ctx).Return(&fftypes.Organization{Identity: "localorg"}, nil) - err := pm.resolveReceipientList(pm.ctx, &fftypes.MessageInOut{ + err := pm.resolveRecipientList(pm.ctx, &fftypes.MessageInOut{ Message: fftypes.Message{ Header: fftypes.MessageHeader{ Identity: fftypes.Identity{ @@ -158,7 +158,7 @@ func TestResolveMemberListGetGroupsFail(t *testing.T) { mim.On("ResolveLocalOrgDID", pm.ctx).Return("localorg", nil) mim.On("GetLocalOrganization", pm.ctx).Return(&fftypes.Organization{Identity: "localorg"}, nil) - err := pm.resolveReceipientList(pm.ctx, &fftypes.MessageInOut{ + err := pm.resolveRecipientList(pm.ctx, &fftypes.MessageInOut{ Message: fftypes.Message{ Header: fftypes.MessageHeader{ Identity: fftypes.Identity{ @@ -185,7 +185,7 @@ func TestResolveMemberListLocalOrgUnregistered(t *testing.T) { mim := pm.identity.(*identitymanagermocks.Manager) mim.On("ResolveLocalOrgDID", pm.ctx).Return("", fmt.Errorf("pop")) - err := pm.resolveReceipientList(pm.ctx, &fftypes.MessageInOut{ + err := pm.resolveRecipientList(pm.ctx, &fftypes.MessageInOut{ Message: fftypes.Message{ Header: fftypes.MessageHeader{ Identity: fftypes.Identity{ @@ -212,7 +212,7 @@ func TestResolveMemberListLocalOrgLookupFailed(t *testing.T) { mim.On("ResolveLocalOrgDID", pm.ctx).Return("", nil) mim.On("GetLocalOrganization", pm.ctx).Return(nil, fmt.Errorf("pop")) - err := pm.resolveReceipientList(pm.ctx, &fftypes.MessageInOut{ + err := pm.resolveRecipientList(pm.ctx, &fftypes.MessageInOut{ Message: fftypes.Message{ Header: fftypes.MessageHeader{ Identity: fftypes.Identity{ @@ -243,7 +243,7 @@ func TestResolveMemberListMissingLocalMemberLookupFailed(t *testing.T) { mim.On("ResolveLocalOrgDID", pm.ctx).Return("localorg", nil) mim.On("GetLocalOrganization", pm.ctx).Return(&fftypes.Organization{Identity: "localorg"}, nil) - err := pm.resolveReceipientList(pm.ctx, &fftypes.MessageInOut{ + err := pm.resolveRecipientList(pm.ctx, &fftypes.MessageInOut{ Message: fftypes.Message{ Header: fftypes.MessageHeader{ Identity: fftypes.Identity{ @@ -274,7 +274,7 @@ func TestResolveMemberListNodeNotFound(t *testing.T) { mim.On("ResolveLocalOrgDID", pm.ctx).Return("localorg", nil) mim.On("GetLocalOrganization", pm.ctx).Return(&fftypes.Organization{Identity: "localorg"}, nil) - err := pm.resolveReceipientList(pm.ctx, &fftypes.MessageInOut{ + err := pm.resolveRecipientList(pm.ctx, &fftypes.MessageInOut{ Message: fftypes.Message{ Header: fftypes.MessageHeader{ Identity: fftypes.Identity{ @@ -305,7 +305,7 @@ func TestResolveMemberOrgNameNotFound(t *testing.T) { mim.On("ResolveLocalOrgDID", pm.ctx).Return("localorg", nil) mim.On("GetLocalOrganization", pm.ctx).Return(&fftypes.Organization{Identity: "localorg"}, nil) - err := pm.resolveReceipientList(pm.ctx, &fftypes.MessageInOut{ + err := pm.resolveRecipientList(pm.ctx, &fftypes.MessageInOut{ Message: fftypes.Message{ Header: fftypes.MessageHeader{ Identity: fftypes.Identity{ @@ -340,7 +340,7 @@ func TestResolveMemberNodeOwnedParentOrg(t *testing.T) { mim.On("ResolveLocalOrgDID", pm.ctx).Return("localorg", nil) mim.On("GetLocalOrganization", pm.ctx).Return(&fftypes.Organization{Identity: "localorg"}, nil) - err := pm.resolveReceipientList(pm.ctx, &fftypes.MessageInOut{ + err := pm.resolveRecipientList(pm.ctx, &fftypes.MessageInOut{ Message: fftypes.Message{ Header: fftypes.MessageHeader{ Identity: fftypes.Identity{ @@ -417,7 +417,7 @@ func TestResolveReceipientListExisting(t *testing.T) { pm, cancel := newTestPrivateMessaging(t) defer cancel() - err := pm.resolveReceipientList(pm.ctx, &fftypes.MessageInOut{ + err := pm.resolveRecipientList(pm.ctx, &fftypes.MessageInOut{ Message: fftypes.Message{ Header: fftypes.MessageHeader{ Group: fftypes.NewRandB32(), @@ -431,7 +431,7 @@ func TestResolveReceipientListEmptyList(t *testing.T) { pm, cancel := newTestPrivateMessaging(t) defer cancel() - err := pm.resolveReceipientList(pm.ctx, &fftypes.MessageInOut{}) + err := pm.resolveRecipientList(pm.ctx, &fftypes.MessageInOut{}) assert.Regexp(t, "FF10219", err) } diff --git a/internal/syncasync/sync_async_bridge.go b/internal/syncasync/sync_async_bridge.go index 9ca4a291e6..391647c243 100644 --- a/internal/syncasync/sync_async_bridge.go +++ b/internal/syncasync/sync_async_bridge.go @@ -36,16 +36,16 @@ type Bridge interface { Init(sysevents sysmessaging.SystemEvents) // Request performs a request/reply exchange taking a message as input, and returning a message as a response // The input message must have a tag, and a group, to be routed appropriately. - RequestReply(ctx context.Context, ns string, send RequestSender) (*fftypes.MessageInOut, error) + RequestReply(ctx context.Context, ns string, id *fftypes.UUID, send RequestSender) (*fftypes.MessageInOut, error) // SendConfirm blocks until the message is confirmed (or rejected), but does not look for a reply. - SendConfirm(ctx context.Context, ns string, send RequestSender) (*fftypes.Message, error) + SendConfirm(ctx context.Context, ns string, id *fftypes.UUID, send RequestSender) (*fftypes.Message, error) // SendConfirmTokenPool blocks until the token pool is confirmed (or rejected) - SendConfirmTokenPool(ctx context.Context, ns string, send RequestSender) (*fftypes.TokenPool, error) + SendConfirmTokenPool(ctx context.Context, ns string, id *fftypes.UUID, send RequestSender) (*fftypes.TokenPool, error) // SendConfirmTokenTransfer blocks until the token transfer is confirmed - SendConfirmTokenTransfer(ctx context.Context, ns string, send RequestSender) (*fftypes.TokenTransfer, error) + SendConfirmTokenTransfer(ctx context.Context, ns string, id *fftypes.UUID, send RequestSender) (*fftypes.TokenTransfer, error) } -type RequestSender func(requestID *fftypes.UUID) error +type RequestSender func(ctx context.Context) error type requestType int @@ -94,9 +94,9 @@ func (sa *syncAsyncBridge) Init(sysevents sysmessaging.SystemEvents) { sa.sysevents = sysevents } -func (sa *syncAsyncBridge) addInFlight(ns string, reqType requestType) (*inflightRequest, error) { +func (sa *syncAsyncBridge) addInFlight(ns string, id *fftypes.UUID, reqType requestType) (*inflightRequest, error) { inflight := &inflightRequest{ - id: fftypes.NewUUID(), + id: id, startTime: time.Now(), response: make(chan inflightResponse), reqType: reqType, @@ -298,8 +298,8 @@ func (sa *syncAsyncBridge) resolveConfirmedTokenTransfer(inflight *inflightReque inflight.response <- inflightResponse{id: transfer.LocalID, data: transfer} } -func (sa *syncAsyncBridge) sendAndWait(ctx context.Context, ns string, reqType requestType, send RequestSender) (interface{}, error) { - inflight, err := sa.addInFlight(ns, reqType) +func (sa *syncAsyncBridge) sendAndWait(ctx context.Context, ns string, id *fftypes.UUID, reqType requestType, send RequestSender) (interface{}, error) { + inflight, err := sa.addInFlight(ns, id, reqType) if err != nil { return nil, err } @@ -314,7 +314,7 @@ func (sa *syncAsyncBridge) sendAndWait(ctx context.Context, ns string, reqType r } }() - err = send(inflight.id) + err = send(ctx) if err != nil { return nil, err } @@ -328,32 +328,32 @@ func (sa *syncAsyncBridge) sendAndWait(ctx context.Context, ns string, reqType r } } -func (sa *syncAsyncBridge) RequestReply(ctx context.Context, ns string, send RequestSender) (*fftypes.MessageInOut, error) { - reply, err := sa.sendAndWait(ctx, ns, messageReply, send) +func (sa *syncAsyncBridge) RequestReply(ctx context.Context, ns string, id *fftypes.UUID, send RequestSender) (*fftypes.MessageInOut, error) { + reply, err := sa.sendAndWait(ctx, ns, id, messageReply, send) if err != nil { return nil, err } return reply.(*fftypes.MessageInOut), err } -func (sa *syncAsyncBridge) SendConfirm(ctx context.Context, ns string, send RequestSender) (*fftypes.Message, error) { - reply, err := sa.sendAndWait(ctx, ns, messageConfirm, send) +func (sa *syncAsyncBridge) SendConfirm(ctx context.Context, ns string, id *fftypes.UUID, send RequestSender) (*fftypes.Message, error) { + reply, err := sa.sendAndWait(ctx, ns, id, messageConfirm, send) if err != nil { return nil, err } return reply.(*fftypes.Message), err } -func (sa *syncAsyncBridge) SendConfirmTokenPool(ctx context.Context, ns string, send RequestSender) (*fftypes.TokenPool, error) { - reply, err := sa.sendAndWait(ctx, ns, tokenPoolConfirm, send) +func (sa *syncAsyncBridge) SendConfirmTokenPool(ctx context.Context, ns string, id *fftypes.UUID, send RequestSender) (*fftypes.TokenPool, error) { + reply, err := sa.sendAndWait(ctx, ns, id, tokenPoolConfirm, send) if err != nil { return nil, err } return reply.(*fftypes.TokenPool), err } -func (sa *syncAsyncBridge) SendConfirmTokenTransfer(ctx context.Context, ns string, send RequestSender) (*fftypes.TokenTransfer, error) { - reply, err := sa.sendAndWait(ctx, ns, tokenTransferConfirm, send) +func (sa *syncAsyncBridge) SendConfirmTokenTransfer(ctx context.Context, ns string, id *fftypes.UUID, send RequestSender) (*fftypes.TokenTransfer, error) { + reply, err := sa.sendAndWait(ctx, ns, id, tokenTransferConfirm, send) if err != nil { return nil, err } diff --git a/internal/syncasync/sync_async_bridge_test.go b/internal/syncasync/sync_async_bridge_test.go index cf074270dd..a0f8508295 100644 --- a/internal/syncasync/sync_async_bridge_test.go +++ b/internal/syncasync/sync_async_bridge_test.go @@ -44,7 +44,7 @@ func TestRequestReplyOk(t *testing.T) { sa, cancel := newTestSyncAsyncBridge(t) defer cancel() - var requestID *fftypes.UUID + requestID := fftypes.NewUUID() replyID := fftypes.NewUUID() dataID := fftypes.NewUUID() @@ -73,8 +73,7 @@ func TestRequestReplyOk(t *testing.T) { {ID: dataID, Value: fftypes.Byteable(`"response data"`)}, }, true, nil) - reply, err := sa.RequestReply(sa.ctx, "ns1", func(id *fftypes.UUID) error { - requestID = id + reply, err := sa.RequestReply(sa.ctx, "ns1", requestID, func(ctx context.Context) error { go func() { sa.eventCallback(&fftypes.EventDelivery{ Event: fftypes.Event{ @@ -98,7 +97,7 @@ func TestAwaitConfirmationOk(t *testing.T) { sa, cancel := newTestSyncAsyncBridge(t) defer cancel() - var requestID *fftypes.UUID + requestID := fftypes.NewUUID() dataID := fftypes.NewUUID() mse := sa.sysevents.(*sysmessagingmocks.SystemEvents) @@ -122,8 +121,7 @@ func TestAwaitConfirmationOk(t *testing.T) { {ID: dataID, Value: fftypes.Byteable(`"response data"`)}, }, true, nil) - reply, err := sa.SendConfirm(sa.ctx, "ns1", func(id *fftypes.UUID) error { - requestID = id + reply, err := sa.SendConfirm(sa.ctx, "ns1", requestID, func(ctx context.Context) error { go func() { sa.eventCallback(&fftypes.EventDelivery{ Event: fftypes.Event{ @@ -146,7 +144,7 @@ func TestAwaitConfirmationRejected(t *testing.T) { sa, cancel := newTestSyncAsyncBridge(t) defer cancel() - var requestID *fftypes.UUID + requestID := fftypes.NewUUID() dataID := fftypes.NewUUID() mse := sa.sysevents.(*sysmessagingmocks.SystemEvents) @@ -170,8 +168,7 @@ func TestAwaitConfirmationRejected(t *testing.T) { {ID: dataID, Value: fftypes.Byteable(`"response data"`)}, }, true, nil) - _, err := sa.SendConfirm(sa.ctx, "ns1", func(id *fftypes.UUID) error { - requestID = id + _, err := sa.SendConfirm(sa.ctx, "ns1", requestID, func(ctx context.Context) error { go func() { sa.eventCallback(&fftypes.EventDelivery{ Event: fftypes.Event{ @@ -195,7 +192,7 @@ func TestRequestReplyTimeout(t *testing.T) { mse := sa.sysevents.(*sysmessagingmocks.SystemEvents) mse.On("AddSystemEventListener", "ns1", mock.Anything).Return(nil) - _, err := sa.RequestReply(sa.ctx, "ns1", func(requestID *fftypes.UUID) error { + _, err := sa.RequestReply(sa.ctx, "ns1", fftypes.NewUUID(), func(ctx context.Context) error { return nil }) assert.Regexp(t, "FF10260", err) @@ -209,7 +206,7 @@ func TestRequestSetupSystemListenerFail(t *testing.T) { mse := sa.sysevents.(*sysmessagingmocks.SystemEvents) mse.On("AddSystemEventListener", "ns1", mock.Anything).Return(fmt.Errorf("pop")) - _, err := sa.RequestReply(sa.ctx, "ns1", func(requestID *fftypes.UUID) error { + _, err := sa.RequestReply(sa.ctx, "ns1", fftypes.NewUUID(), func(ctx context.Context) error { return nil }) assert.Regexp(t, "pop", err) @@ -501,7 +498,7 @@ func TestAwaitTokenPoolConfirmation(t *testing.T) { sa, cancel := newTestSyncAsyncBridge(t) defer cancel() - var requestID *fftypes.UUID + requestID := fftypes.NewUUID() mse := sa.sysevents.(*sysmessagingmocks.SystemEvents) mse.On("AddSystemEventListener", "ns1", mock.Anything).Return(nil) @@ -519,8 +516,7 @@ func TestAwaitTokenPoolConfirmation(t *testing.T) { } } - reply, err := sa.SendConfirmTokenPool(sa.ctx, "ns1", func(id *fftypes.UUID) error { - requestID = id + reply, err := sa.SendConfirmTokenPool(sa.ctx, "ns1", requestID, func(ctx context.Context) error { go func() { sa.eventCallback(&fftypes.EventDelivery{ Event: fftypes.Event{ @@ -546,7 +542,7 @@ func TestAwaitTokenPoolConfirmationSendFail(t *testing.T) { mse := sa.sysevents.(*sysmessagingmocks.SystemEvents) mse.On("AddSystemEventListener", "ns1", mock.Anything).Return(nil) - _, err := sa.SendConfirmTokenPool(sa.ctx, "ns1", func(id *fftypes.UUID) error { + _, err := sa.SendConfirmTokenPool(sa.ctx, "ns1", fftypes.NewUUID(), func(ctx context.Context) error { return fmt.Errorf("pop") }) assert.EqualError(t, err, "pop") @@ -557,7 +553,7 @@ func TestAwaitTokenPoolConfirmationRejected(t *testing.T) { sa, cancel := newTestSyncAsyncBridge(t) defer cancel() - var requestID *fftypes.UUID + requestID := fftypes.NewUUID() mse := sa.sysevents.(*sysmessagingmocks.SystemEvents) mse.On("AddSystemEventListener", "ns1", mock.Anything).Return(nil) @@ -575,8 +571,7 @@ func TestAwaitTokenPoolConfirmationRejected(t *testing.T) { } } - _, err := sa.SendConfirmTokenPool(sa.ctx, "ns1", func(id *fftypes.UUID) error { - requestID = id + _, err := sa.SendConfirmTokenPool(sa.ctx, "ns1", requestID, func(ctx context.Context) error { go func() { sa.eventCallback(&fftypes.EventDelivery{ Event: fftypes.Event{ @@ -597,7 +592,7 @@ func TestAwaitTokenTransferConfirmation(t *testing.T) { sa, cancel := newTestSyncAsyncBridge(t) defer cancel() - var requestID *fftypes.UUID + requestID := fftypes.NewUUID() mse := sa.sysevents.(*sysmessagingmocks.SystemEvents) mse.On("AddSystemEventListener", "ns1", mock.Anything).Return(nil) @@ -615,8 +610,7 @@ func TestAwaitTokenTransferConfirmation(t *testing.T) { } } - reply, err := sa.SendConfirmTokenTransfer(sa.ctx, "ns1", func(id *fftypes.UUID) error { - requestID = id + reply, err := sa.SendConfirmTokenTransfer(sa.ctx, "ns1", requestID, func(ctx context.Context) error { go func() { sa.eventCallback(&fftypes.EventDelivery{ Event: fftypes.Event{ @@ -642,7 +636,7 @@ func TestAwaitTokenTransferConfirmationSendFail(t *testing.T) { mse := sa.sysevents.(*sysmessagingmocks.SystemEvents) mse.On("AddSystemEventListener", "ns1", mock.Anything).Return(nil) - _, err := sa.SendConfirmTokenTransfer(sa.ctx, "ns1", func(id *fftypes.UUID) error { + _, err := sa.SendConfirmTokenTransfer(sa.ctx, "ns1", fftypes.NewUUID(), func(ctx context.Context) error { return fmt.Errorf("pop") }) assert.EqualError(t, err, "pop") diff --git a/internal/syshandlers/reply_sender.go b/internal/syshandlers/reply_sender.go index 655a6abeaf..040796c662 100644 --- a/internal/syshandlers/reply_sender.go +++ b/internal/syshandlers/reply_sender.go @@ -25,15 +25,14 @@ import ( func (sh *systemHandlers) SendReply(ctx context.Context, event *fftypes.Event, reply *fftypes.MessageInOut) { var err error - var msg *fftypes.Message if reply.Header.Group != nil { - msg, err = sh.messaging.SendMessage(ctx, event.Namespace, reply, false) + err = sh.messaging.NewMessage(event.Namespace, reply).Send(ctx) } else { - msg, err = sh.broadcast.BroadcastMessage(ctx, event.Namespace, reply, false) + err = sh.broadcast.NewBroadcast(event.Namespace, reply).Send(ctx) } if err != nil { log.L(ctx).Errorf("Failed to send reply: %s", err) } else { - log.L(ctx).Infof("Sent reply %s:%s (%s) cid=%s to event '%s'", msg.Header.Namespace, msg.Header.ID, msg.Header.Type, msg.Header.CID, event.ID) + log.L(ctx).Infof("Sent reply %s:%s (%s) cid=%s to event '%s'", reply.Header.Namespace, reply.Header.ID, reply.Header.Type, reply.Header.CID, event.ID) } } diff --git a/internal/syshandlers/reply_sender_test.go b/internal/syshandlers/reply_sender_test.go index b2f23a7ee9..775a829d1a 100644 --- a/internal/syshandlers/reply_sender_test.go +++ b/internal/syshandlers/reply_sender_test.go @@ -23,25 +23,34 @@ import ( "github.com/hyperledger/firefly/mocks/broadcastmocks" "github.com/hyperledger/firefly/mocks/privatemessagingmocks" + "github.com/hyperledger/firefly/mocks/sysmessagingmocks" "github.com/hyperledger/firefly/pkg/fftypes" "github.com/stretchr/testify/mock" ) func TestSendReplyBroadcastFail(t *testing.T) { sh := newTestSystemHandlers(t) + mms := &sysmessagingmocks.MessageSender{} mbm := sh.broadcast.(*broadcastmocks.Manager) - mbm.On("BroadcastMessage", mock.Anything, "ns1", mock.Anything, false).Return(nil, fmt.Errorf("pop")) + mbm.On("NewBroadcast", "ns1", mock.Anything).Return(mms) + mms.On("Send", context.Background()).Return(fmt.Errorf("pop")) + sh.SendReply(context.Background(), &fftypes.Event{ ID: fftypes.NewUUID(), Namespace: "ns1", }, &fftypes.MessageInOut{}) + mbm.AssertExpectations(t) + mms.AssertExpectations(t) } -func TestSendReplyPrivatetFail(t *testing.T) { +func TestSendReplyPrivateFail(t *testing.T) { sh := newTestSystemHandlers(t) + mms := &sysmessagingmocks.MessageSender{} mpm := sh.messaging.(*privatemessagingmocks.Manager) - mpm.On("SendMessage", mock.Anything, "ns1", mock.Anything, false).Return(nil, fmt.Errorf("pop")) + mpm.On("NewMessage", "ns1", mock.Anything).Return(mms) + mms.On("Send", context.Background()).Return(fmt.Errorf("pop")) + sh.SendReply(context.Background(), &fftypes.Event{ ID: fftypes.NewUUID(), Namespace: "ns1", @@ -52,10 +61,12 @@ func TestSendReplyPrivatetFail(t *testing.T) { }, }, }) + mpm.AssertExpectations(t) + mms.AssertExpectations(t) } -func TestSendReplyPrivatetOk(t *testing.T) { +func TestSendReplyPrivateOk(t *testing.T) { sh := newTestSystemHandlers(t) msg := &fftypes.Message{ @@ -64,13 +75,18 @@ func TestSendReplyPrivatetOk(t *testing.T) { }, } + mms := &sysmessagingmocks.MessageSender{} mpm := sh.messaging.(*privatemessagingmocks.Manager) - mpm.On("SendMessage", mock.Anything, "ns1", mock.Anything, false).Return(msg, nil) + mpm.On("NewMessage", "ns1", mock.Anything).Return(mms) + mms.On("Send", context.Background()).Return(nil) + sh.SendReply(context.Background(), &fftypes.Event{ ID: fftypes.NewUUID(), Namespace: "ns1", }, &fftypes.MessageInOut{ Message: *msg, }) + mpm.AssertExpectations(t) + mms.AssertExpectations(t) } diff --git a/internal/sysmessaging/message_sender.go b/internal/sysmessaging/message_sender.go new file mode 100644 index 0000000000..603e266fab --- /dev/null +++ b/internal/sysmessaging/message_sender.go @@ -0,0 +1,27 @@ +// Copyright © 2021 Kaleido, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sysmessaging + +import "context" + +type BeforeSendCallback func(ctx context.Context) error + +type MessageSender interface { + Send(ctx context.Context) error + SendAndWait(ctx context.Context) error + BeforeSend(cb BeforeSendCallback) MessageSender +} diff --git a/mocks/assetmocks/manager.go b/mocks/assetmocks/manager.go index 12618baa24..850689dd60 100644 --- a/mocks/assetmocks/manager.go +++ b/mocks/assetmocks/manager.go @@ -10,6 +10,8 @@ import ( mock "github.com/stretchr/testify/mock" + sysmessaging "github.com/hyperledger/firefly/internal/sysmessaging" + tokens "github.com/hyperledger/firefly/pkg/tokens" ) @@ -19,11 +21,11 @@ type Manager struct { } // BurnTokens provides a mock function with given fields: ctx, ns, typeName, poolName, transfer, waitConfirm -func (_m *Manager) BurnTokens(ctx context.Context, ns string, typeName string, poolName string, transfer *fftypes.TokenTransfer, waitConfirm bool) (*fftypes.TokenTransfer, error) { +func (_m *Manager) BurnTokens(ctx context.Context, ns string, typeName string, poolName string, transfer *fftypes.TokenTransferInput, waitConfirm bool) (*fftypes.TokenTransfer, error) { ret := _m.Called(ctx, ns, typeName, poolName, transfer, waitConfirm) var r0 *fftypes.TokenTransfer - if rf, ok := ret.Get(0).(func(context.Context, string, string, string, *fftypes.TokenTransfer, bool) *fftypes.TokenTransfer); ok { + if rf, ok := ret.Get(0).(func(context.Context, string, string, string, *fftypes.TokenTransferInput, bool) *fftypes.TokenTransfer); ok { r0 = rf(ctx, ns, typeName, poolName, transfer, waitConfirm) } else { if ret.Get(0) != nil { @@ -32,7 +34,7 @@ func (_m *Manager) BurnTokens(ctx context.Context, ns string, typeName string, p } var r1 error - if rf, ok := ret.Get(1).(func(context.Context, string, string, string, *fftypes.TokenTransfer, bool) error); ok { + if rf, ok := ret.Get(1).(func(context.Context, string, string, string, *fftypes.TokenTransferInput, bool) error); ok { r1 = rf(ctx, ns, typeName, poolName, transfer, waitConfirm) } else { r1 = ret.Error(1) @@ -184,11 +186,11 @@ func (_m *Manager) GetTokenTransfers(ctx context.Context, ns string, typeName st } // MintTokens provides a mock function with given fields: ctx, ns, typeName, poolName, transfer, waitConfirm -func (_m *Manager) MintTokens(ctx context.Context, ns string, typeName string, poolName string, transfer *fftypes.TokenTransfer, waitConfirm bool) (*fftypes.TokenTransfer, error) { +func (_m *Manager) MintTokens(ctx context.Context, ns string, typeName string, poolName string, transfer *fftypes.TokenTransferInput, waitConfirm bool) (*fftypes.TokenTransfer, error) { ret := _m.Called(ctx, ns, typeName, poolName, transfer, waitConfirm) var r0 *fftypes.TokenTransfer - if rf, ok := ret.Get(0).(func(context.Context, string, string, string, *fftypes.TokenTransfer, bool) *fftypes.TokenTransfer); ok { + if rf, ok := ret.Get(0).(func(context.Context, string, string, string, *fftypes.TokenTransferInput, bool) *fftypes.TokenTransfer); ok { r0 = rf(ctx, ns, typeName, poolName, transfer, waitConfirm) } else { if ret.Get(0) != nil { @@ -197,7 +199,7 @@ func (_m *Manager) MintTokens(ctx context.Context, ns string, typeName string, p } var r1 error - if rf, ok := ret.Get(1).(func(context.Context, string, string, string, *fftypes.TokenTransfer, bool) error); ok { + if rf, ok := ret.Get(1).(func(context.Context, string, string, string, *fftypes.TokenTransferInput, bool) error); ok { r1 = rf(ctx, ns, typeName, poolName, transfer, waitConfirm) } else { r1 = ret.Error(1) @@ -206,6 +208,22 @@ func (_m *Manager) MintTokens(ctx context.Context, ns string, typeName string, p return r0, r1 } +// NewTransfer provides a mock function with given fields: ns, typeName, poolName, transfer +func (_m *Manager) NewTransfer(ns string, typeName string, poolName string, transfer *fftypes.TokenTransferInput) sysmessaging.MessageSender { + ret := _m.Called(ns, typeName, poolName, transfer) + + var r0 sysmessaging.MessageSender + if rf, ok := ret.Get(0).(func(string, string, string, *fftypes.TokenTransferInput) sysmessaging.MessageSender); ok { + r0 = rf(ns, typeName, poolName, transfer) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(sysmessaging.MessageSender) + } + } + + return r0 +} + // Start provides a mock function with given fields: func (_m *Manager) Start() error { ret := _m.Called() diff --git a/mocks/broadcastmocks/manager.go b/mocks/broadcastmocks/manager.go index b3d1da3ddf..816b4d61c9 100644 --- a/mocks/broadcastmocks/manager.go +++ b/mocks/broadcastmocks/manager.go @@ -7,6 +7,8 @@ import ( fftypes "github.com/hyperledger/firefly/pkg/fftypes" mock "github.com/stretchr/testify/mock" + + sysmessaging "github.com/hyperledger/firefly/internal/sysmessaging" ) // Manager is an autogenerated mock type for the Manager type @@ -175,6 +177,22 @@ func (_m *Manager) BroadcastTokenPool(ctx context.Context, ns string, pool *ffty return r0, r1 } +// NewBroadcast provides a mock function with given fields: ns, in +func (_m *Manager) NewBroadcast(ns string, in *fftypes.MessageInOut) sysmessaging.MessageSender { + ret := _m.Called(ns, in) + + var r0 sysmessaging.MessageSender + if rf, ok := ret.Get(0).(func(string, *fftypes.MessageInOut) sysmessaging.MessageSender); ok { + r0 = rf(ns, in) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(sysmessaging.MessageSender) + } + } + + return r0 +} + // Start provides a mock function with given fields: func (_m *Manager) Start() error { ret := _m.Called() diff --git a/mocks/privatemessagingmocks/manager.go b/mocks/privatemessagingmocks/manager.go index ec78181ae2..4634da107f 100644 --- a/mocks/privatemessagingmocks/manager.go +++ b/mocks/privatemessagingmocks/manager.go @@ -9,6 +9,8 @@ import ( fftypes "github.com/hyperledger/firefly/pkg/fftypes" mock "github.com/stretchr/testify/mock" + + sysmessaging "github.com/hyperledger/firefly/internal/sysmessaging" ) // Manager is an autogenerated mock type for the Manager type @@ -92,6 +94,22 @@ func (_m *Manager) GetGroups(ctx context.Context, filter database.AndFilter) ([] return r0, r1, r2 } +// NewMessage provides a mock function with given fields: ns, msg +func (_m *Manager) NewMessage(ns string, msg *fftypes.MessageInOut) sysmessaging.MessageSender { + ret := _m.Called(ns, msg) + + var r0 sysmessaging.MessageSender + if rf, ok := ret.Get(0).(func(string, *fftypes.MessageInOut) sysmessaging.MessageSender); ok { + r0 = rf(ns, msg) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(sysmessaging.MessageSender) + } + } + + return r0 +} + // RequestReply provides a mock function with given fields: ctx, ns, request func (_m *Manager) RequestReply(ctx context.Context, ns string, request *fftypes.MessageInOut) (*fftypes.MessageInOut, error) { ret := _m.Called(ctx, ns, request) diff --git a/mocks/syncasyncmocks/bridge.go b/mocks/syncasyncmocks/bridge.go index c07c18675f..02b5773b79 100644 --- a/mocks/syncasyncmocks/bridge.go +++ b/mocks/syncasyncmocks/bridge.go @@ -23,13 +23,13 @@ func (_m *Bridge) Init(sysevents sysmessaging.SystemEvents) { _m.Called(sysevents) } -// RequestReply provides a mock function with given fields: ctx, ns, send -func (_m *Bridge) RequestReply(ctx context.Context, ns string, send syncasync.RequestSender) (*fftypes.MessageInOut, error) { - ret := _m.Called(ctx, ns, send) +// RequestReply provides a mock function with given fields: ctx, ns, id, send +func (_m *Bridge) RequestReply(ctx context.Context, ns string, id *fftypes.UUID, send syncasync.RequestSender) (*fftypes.MessageInOut, error) { + ret := _m.Called(ctx, ns, id, send) var r0 *fftypes.MessageInOut - if rf, ok := ret.Get(0).(func(context.Context, string, syncasync.RequestSender) *fftypes.MessageInOut); ok { - r0 = rf(ctx, ns, send) + if rf, ok := ret.Get(0).(func(context.Context, string, *fftypes.UUID, syncasync.RequestSender) *fftypes.MessageInOut); ok { + r0 = rf(ctx, ns, id, send) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(*fftypes.MessageInOut) @@ -37,8 +37,8 @@ func (_m *Bridge) RequestReply(ctx context.Context, ns string, send syncasync.Re } var r1 error - if rf, ok := ret.Get(1).(func(context.Context, string, syncasync.RequestSender) error); ok { - r1 = rf(ctx, ns, send) + if rf, ok := ret.Get(1).(func(context.Context, string, *fftypes.UUID, syncasync.RequestSender) error); ok { + r1 = rf(ctx, ns, id, send) } else { r1 = ret.Error(1) } @@ -46,13 +46,13 @@ func (_m *Bridge) RequestReply(ctx context.Context, ns string, send syncasync.Re return r0, r1 } -// SendConfirm provides a mock function with given fields: ctx, ns, send -func (_m *Bridge) SendConfirm(ctx context.Context, ns string, send syncasync.RequestSender) (*fftypes.Message, error) { - ret := _m.Called(ctx, ns, send) +// SendConfirm provides a mock function with given fields: ctx, ns, id, send +func (_m *Bridge) SendConfirm(ctx context.Context, ns string, id *fftypes.UUID, send syncasync.RequestSender) (*fftypes.Message, error) { + ret := _m.Called(ctx, ns, id, send) var r0 *fftypes.Message - if rf, ok := ret.Get(0).(func(context.Context, string, syncasync.RequestSender) *fftypes.Message); ok { - r0 = rf(ctx, ns, send) + if rf, ok := ret.Get(0).(func(context.Context, string, *fftypes.UUID, syncasync.RequestSender) *fftypes.Message); ok { + r0 = rf(ctx, ns, id, send) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(*fftypes.Message) @@ -60,8 +60,8 @@ func (_m *Bridge) SendConfirm(ctx context.Context, ns string, send syncasync.Req } var r1 error - if rf, ok := ret.Get(1).(func(context.Context, string, syncasync.RequestSender) error); ok { - r1 = rf(ctx, ns, send) + if rf, ok := ret.Get(1).(func(context.Context, string, *fftypes.UUID, syncasync.RequestSender) error); ok { + r1 = rf(ctx, ns, id, send) } else { r1 = ret.Error(1) } @@ -69,13 +69,13 @@ func (_m *Bridge) SendConfirm(ctx context.Context, ns string, send syncasync.Req return r0, r1 } -// SendConfirmTokenPool provides a mock function with given fields: ctx, ns, send -func (_m *Bridge) SendConfirmTokenPool(ctx context.Context, ns string, send syncasync.RequestSender) (*fftypes.TokenPool, error) { - ret := _m.Called(ctx, ns, send) +// SendConfirmTokenPool provides a mock function with given fields: ctx, ns, id, send +func (_m *Bridge) SendConfirmTokenPool(ctx context.Context, ns string, id *fftypes.UUID, send syncasync.RequestSender) (*fftypes.TokenPool, error) { + ret := _m.Called(ctx, ns, id, send) var r0 *fftypes.TokenPool - if rf, ok := ret.Get(0).(func(context.Context, string, syncasync.RequestSender) *fftypes.TokenPool); ok { - r0 = rf(ctx, ns, send) + if rf, ok := ret.Get(0).(func(context.Context, string, *fftypes.UUID, syncasync.RequestSender) *fftypes.TokenPool); ok { + r0 = rf(ctx, ns, id, send) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(*fftypes.TokenPool) @@ -83,8 +83,8 @@ func (_m *Bridge) SendConfirmTokenPool(ctx context.Context, ns string, send sync } var r1 error - if rf, ok := ret.Get(1).(func(context.Context, string, syncasync.RequestSender) error); ok { - r1 = rf(ctx, ns, send) + if rf, ok := ret.Get(1).(func(context.Context, string, *fftypes.UUID, syncasync.RequestSender) error); ok { + r1 = rf(ctx, ns, id, send) } else { r1 = ret.Error(1) } @@ -92,13 +92,13 @@ func (_m *Bridge) SendConfirmTokenPool(ctx context.Context, ns string, send sync return r0, r1 } -// SendConfirmTokenTransfer provides a mock function with given fields: ctx, ns, send -func (_m *Bridge) SendConfirmTokenTransfer(ctx context.Context, ns string, send syncasync.RequestSender) (*fftypes.TokenTransfer, error) { - ret := _m.Called(ctx, ns, send) +// SendConfirmTokenTransfer provides a mock function with given fields: ctx, ns, id, send +func (_m *Bridge) SendConfirmTokenTransfer(ctx context.Context, ns string, id *fftypes.UUID, send syncasync.RequestSender) (*fftypes.TokenTransfer, error) { + ret := _m.Called(ctx, ns, id, send) var r0 *fftypes.TokenTransfer - if rf, ok := ret.Get(0).(func(context.Context, string, syncasync.RequestSender) *fftypes.TokenTransfer); ok { - r0 = rf(ctx, ns, send) + if rf, ok := ret.Get(0).(func(context.Context, string, *fftypes.UUID, syncasync.RequestSender) *fftypes.TokenTransfer); ok { + r0 = rf(ctx, ns, id, send) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(*fftypes.TokenTransfer) @@ -106,8 +106,8 @@ func (_m *Bridge) SendConfirmTokenTransfer(ctx context.Context, ns string, send } var r1 error - if rf, ok := ret.Get(1).(func(context.Context, string, syncasync.RequestSender) error); ok { - r1 = rf(ctx, ns, send) + if rf, ok := ret.Get(1).(func(context.Context, string, *fftypes.UUID, syncasync.RequestSender) error); ok { + r1 = rf(ctx, ns, id, send) } else { r1 = ret.Error(1) } diff --git a/mocks/sysmessagingmocks/message_sender.go b/mocks/sysmessagingmocks/message_sender.go new file mode 100644 index 0000000000..96f3b1ee02 --- /dev/null +++ b/mocks/sysmessagingmocks/message_sender.go @@ -0,0 +1,59 @@ +// Code generated by mockery v1.0.0. DO NOT EDIT. + +package sysmessagingmocks + +import ( + context "context" + + sysmessaging "github.com/hyperledger/firefly/internal/sysmessaging" + mock "github.com/stretchr/testify/mock" +) + +// MessageSender is an autogenerated mock type for the MessageSender type +type MessageSender struct { + mock.Mock +} + +// BeforeSend provides a mock function with given fields: cb +func (_m *MessageSender) BeforeSend(cb sysmessaging.BeforeSendCallback) sysmessaging.MessageSender { + ret := _m.Called(cb) + + var r0 sysmessaging.MessageSender + if rf, ok := ret.Get(0).(func(sysmessaging.BeforeSendCallback) sysmessaging.MessageSender); ok { + r0 = rf(cb) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(sysmessaging.MessageSender) + } + } + + return r0 +} + +// Send provides a mock function with given fields: ctx +func (_m *MessageSender) Send(ctx context.Context) error { + ret := _m.Called(ctx) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context) error); ok { + r0 = rf(ctx) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// SendAndWait provides a mock function with given fields: ctx +func (_m *MessageSender) SendAndWait(ctx context.Context) error { + ret := _m.Called(ctx) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context) error); ok { + r0 = rf(ctx) + } else { + r0 = ret.Error(0) + } + + return r0 +}