From 0fc6490259463d13564a1fd3e3865948a65c6af6 Mon Sep 17 00:00:00 2001 From: Andrew Richardson Date: Fri, 24 Jun 2022 10:31:08 -0400 Subject: [PATCH 1/3] Collapse batchpin.Submitter into multiparty.Manager Signed-off-by: Andrew Richardson --- Makefile | 3 +- internal/batchpin/batchpin.go | 91 --------- internal/batchpin/batchpin_test.go | 180 ------------------ internal/broadcast/manager.go | 10 +- internal/broadcast/manager_test.go | 18 +- internal/multiparty/manager.go | 40 +++- internal/multiparty/manager_test.go | 149 ++++++++++++--- .../{batchpin => multiparty}/operations.go | 14 +- .../operations_test.go | 61 +++--- internal/orchestrator/orchestrator.go | 14 +- internal/orchestrator/orchestrator_test.go | 14 -- internal/privatemessaging/privatemessaging.go | 10 +- .../privatemessaging/privatemessaging_test.go | 21 +- mocks/batchpinmocks/submitter.go | 98 ---------- mocks/multipartymocks/manager.go | 65 +++++-- 15 files changed, 271 insertions(+), 517 deletions(-) delete mode 100644 internal/batchpin/batchpin.go delete mode 100644 internal/batchpin/batchpin_test.go rename internal/{batchpin => multiparty}/operations.go (90%) rename internal/{batchpin => multiparty}/operations_test.go (64%) delete mode 100644 mocks/batchpinmocks/submitter.go diff --git a/Makefile b/Makefile index 89091bdc3f..cce56c32ed 100644 --- a/Makefile +++ b/Makefile @@ -52,7 +52,6 @@ $(eval $(call makemock, pkg/tokens, Plugin, tokenmock $(eval $(call makemock, pkg/tokens, Callbacks, tokenmocks)) $(eval $(call makemock, internal/txcommon, Helper, txcommonmocks)) $(eval $(call makemock, internal/identity, Manager, identitymanagermocks)) -$(eval $(call makemock, internal/batchpin, Submitter, batchpinmocks)) $(eval $(call makemock, internal/sysmessaging, SystemEvents, sysmessagingmocks)) $(eval $(call makemock, internal/sysmessaging, MessageSender, sysmessagingmocks)) $(eval $(call makemock, internal/sysmessaging, LocalNodeInfo, sysmessagingmocks)) @@ -101,4 +100,4 @@ manifest: docker: ./docker_build.sh $(DOCKER_ARGS) docs: .ALWAYS - cd docs && bundle install && bundle exec jekyll build && bundle exec htmlproofer --disable-external --allow-hash-href --assume-extension ./_site --url-swap '^/firefly/:/' --url-ignore /127.0.0.1/,/localhost/ \ No newline at end of file + cd docs && bundle install && bundle exec jekyll build && bundle exec htmlproofer --disable-external --allow-hash-href --assume-extension ./_site --url-swap '^/firefly/:/' --url-ignore /127.0.0.1/,/localhost/ diff --git a/internal/batchpin/batchpin.go b/internal/batchpin/batchpin.go deleted file mode 100644 index 247e23dbf9..0000000000 --- a/internal/batchpin/batchpin.go +++ /dev/null @@ -1,91 +0,0 @@ -// Copyright © 2022 Kaleido, Inc. -// -// SPDX-License-Identifier: Apache-2.0 -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package batchpin - -import ( - "context" - - "github.com/hyperledger/firefly-common/pkg/fftypes" - "github.com/hyperledger/firefly-common/pkg/i18n" - "github.com/hyperledger/firefly/internal/coremsgs" - "github.com/hyperledger/firefly/internal/identity" - "github.com/hyperledger/firefly/internal/metrics" - "github.com/hyperledger/firefly/internal/multiparty" - "github.com/hyperledger/firefly/internal/operations" - "github.com/hyperledger/firefly/pkg/core" - "github.com/hyperledger/firefly/pkg/database" -) - -type Submitter interface { - core.Named - - SubmitPinnedBatch(ctx context.Context, batch *core.BatchPersisted, contexts []*fftypes.Bytes32, payloadRef string) error - - // From operations.OperationHandler - PrepareOperation(ctx context.Context, op *core.Operation) (*core.PreparedOperation, error) - RunOperation(ctx context.Context, op *core.PreparedOperation) (outputs fftypes.JSONObject, complete bool, err error) -} - -type batchPinSubmitter struct { - namespace string - database database.Plugin - identity identity.Manager - multiparty multiparty.Manager - metrics metrics.Manager - operations operations.Manager -} - -func NewBatchPinSubmitter(ctx context.Context, ns string, di database.Plugin, im identity.Manager, multiparty multiparty.Manager, mm metrics.Manager, om operations.Manager) (Submitter, error) { - if di == nil || im == nil || mm == nil || om == nil { - return nil, i18n.NewError(ctx, coremsgs.MsgInitializationNilDepError, "BatchPinSubmitter") - } - bp := &batchPinSubmitter{ - namespace: ns, - database: di, - identity: im, - multiparty: multiparty, - metrics: mm, - operations: om, - } - om.RegisterHandler(ctx, bp, []core.OpType{ - core.OpTypeBlockchainPinBatch, - }) - return bp, nil -} - -func (bp *batchPinSubmitter) Name() string { - return "BatchPinSubmitter" -} - -func (bp *batchPinSubmitter) SubmitPinnedBatch(ctx context.Context, batch *core.BatchPersisted, contexts []*fftypes.Bytes32, payloadRef string) error { - // The pending blockchain transaction - op := core.NewOperation( - bp.multiparty, - batch.Namespace, - batch.TX.ID, - core.OpTypeBlockchainPinBatch) - addBatchPinInputs(op, batch.ID, contexts, payloadRef) - if err := bp.operations.AddOrReuseOperation(ctx, op); err != nil { - return err - } - - if bp.metrics.IsMetricsEnabled() { - bp.metrics.CountBatchPin() - } - _, err := bp.operations.RunOperation(ctx, opBatchPin(op, batch, contexts, payloadRef)) - return err -} diff --git a/internal/batchpin/batchpin_test.go b/internal/batchpin/batchpin_test.go deleted file mode 100644 index 8e095694d8..0000000000 --- a/internal/batchpin/batchpin_test.go +++ /dev/null @@ -1,180 +0,0 @@ -// Copyright © 2021 Kaleido, Inc. -// -// SPDX-License-Identifier: Apache-2.0 -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package batchpin - -import ( - "context" - "fmt" - "testing" - - "github.com/hyperledger/firefly-common/pkg/config" - "github.com/hyperledger/firefly-common/pkg/fftypes" - "github.com/hyperledger/firefly/internal/coreconfig" - "github.com/hyperledger/firefly/mocks/databasemocks" - "github.com/hyperledger/firefly/mocks/identitymanagermocks" - "github.com/hyperledger/firefly/mocks/metricsmocks" - "github.com/hyperledger/firefly/mocks/multipartymocks" - "github.com/hyperledger/firefly/mocks/operationmocks" - "github.com/hyperledger/firefly/pkg/core" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" -) - -var utConfig = config.RootSection("metrics") - -func newTestBatchPinSubmitter(t *testing.T, enableMetrics bool) *batchPinSubmitter { - coreconfig.Reset() - - mdi := &databasemocks.Plugin{} - mim := &identitymanagermocks.Manager{} - mmp := &multipartymocks.Manager{} - mmi := &metricsmocks.Manager{} - mom := &operationmocks.Manager{} - mmi.On("IsMetricsEnabled").Return(enableMetrics) - mom.On("RegisterHandler", mock.Anything, mock.Anything, mock.Anything) - if enableMetrics { - mmi.On("CountBatchPin").Return() - } - mmp.On("Name").Return("ut").Maybe() - bps, err := NewBatchPinSubmitter(context.Background(), "ns1", mdi, mim, mmp, mmi, mom) - assert.NoError(t, err) - return bps.(*batchPinSubmitter) -} - -func TestInitFail(t *testing.T) { - _, err := NewBatchPinSubmitter(context.Background(), "", nil, nil, nil, nil, nil) - assert.Regexp(t, "FF10128", err) -} - -func TestName(t *testing.T) { - bp := newTestBatchPinSubmitter(t, false) - assert.Equal(t, "BatchPinSubmitter", bp.Name()) -} - -func TestSubmitPinnedBatchOk(t *testing.T) { - bp := newTestBatchPinSubmitter(t, false) - ctx := context.Background() - - mdi := bp.database.(*databasemocks.Plugin) - mmi := bp.metrics.(*metricsmocks.Manager) - mom := bp.operations.(*operationmocks.Manager) - - batch := &core.BatchPersisted{ - BatchHeader: core.BatchHeader{ - ID: fftypes.NewUUID(), - SignerRef: core.SignerRef{ - Author: "id1", - Key: "0x12345", - }, - }, - TX: core.TransactionRef{ - ID: fftypes.NewUUID(), - }, - } - contexts := []*fftypes.Bytes32{} - - mom.On("AddOrReuseOperation", ctx, mock.MatchedBy(func(op *core.Operation) bool { - assert.Equal(t, core.OpTypeBlockchainPinBatch, op.Type) - assert.Equal(t, "ut", op.Plugin) - assert.Equal(t, *batch.TX.ID, *op.Transaction) - assert.Equal(t, "payload1", op.Input.GetString("payloadRef")) - return true - })).Return(nil) - mmi.On("IsMetricsEnabled").Return(false) - mom.On("RunOperation", mock.Anything, mock.MatchedBy(func(op *core.PreparedOperation) bool { - data := op.Data.(batchPinData) - return op.Type == core.OpTypeBlockchainPinBatch && data.Batch == batch - })).Return(nil, nil) - - err := bp.SubmitPinnedBatch(ctx, batch, contexts, "payload1") - assert.NoError(t, err) - - mdi.AssertExpectations(t) - mmi.AssertExpectations(t) - mom.AssertExpectations(t) -} - -func TestSubmitPinnedBatchWithMetricsOk(t *testing.T) { - bp := newTestBatchPinSubmitter(t, true) - ctx := context.Background() - - mdi := bp.database.(*databasemocks.Plugin) - mmi := bp.metrics.(*metricsmocks.Manager) - mom := bp.operations.(*operationmocks.Manager) - - batch := &core.BatchPersisted{ - BatchHeader: core.BatchHeader{ - ID: fftypes.NewUUID(), - SignerRef: core.SignerRef{ - Author: "id1", - Key: "0x12345", - }, - }, - TX: core.TransactionRef{ - ID: fftypes.NewUUID(), - }, - } - contexts := []*fftypes.Bytes32{} - - mom.On("AddOrReuseOperation", ctx, mock.MatchedBy(func(op *core.Operation) bool { - assert.Equal(t, core.OpTypeBlockchainPinBatch, op.Type) - assert.Equal(t, "ut", op.Plugin) - assert.Equal(t, *batch.TX.ID, *op.Transaction) - assert.Equal(t, "payload1", op.Input.GetString("payloadRef")) - return true - })).Return(nil) - mmi.On("IsMetricsEnabled").Return(true) - mom.On("RunOperation", mock.Anything, mock.MatchedBy(func(op *core.PreparedOperation) bool { - data := op.Data.(batchPinData) - return op.Type == core.OpTypeBlockchainPinBatch && data.Batch == batch - })).Return(nil, nil) - - err := bp.SubmitPinnedBatch(ctx, batch, contexts, "payload1") - assert.NoError(t, err) - - mdi.AssertExpectations(t) - mmi.AssertExpectations(t) - mom.AssertExpectations(t) -} - -func TestSubmitPinnedBatchOpFail(t *testing.T) { - bp := newTestBatchPinSubmitter(t, false) - ctx := context.Background() - - mom := bp.operations.(*operationmocks.Manager) - mmi := bp.metrics.(*metricsmocks.Manager) - - batch := &core.BatchPersisted{ - BatchHeader: core.BatchHeader{ - ID: fftypes.NewUUID(), - SignerRef: core.SignerRef{ - Author: "id1", - Key: "0x12345", - }, - }, - TX: core.TransactionRef{ - ID: fftypes.NewUUID(), - }, - } - contexts := []*fftypes.Bytes32{} - - mom.On("AddOrReuseOperation", ctx, mock.Anything).Return(fmt.Errorf("pop")) - mmi.On("IsMetricsEnabled").Return(false) - err := bp.SubmitPinnedBatch(ctx, batch, contexts, "payload1") - assert.Regexp(t, "pop", err) - -} diff --git a/internal/broadcast/manager.go b/internal/broadcast/manager.go index 13dc0c7de4..656d98baa5 100644 --- a/internal/broadcast/manager.go +++ b/internal/broadcast/manager.go @@ -24,12 +24,12 @@ import ( "github.com/hyperledger/firefly-common/pkg/i18n" "github.com/hyperledger/firefly-common/pkg/log" "github.com/hyperledger/firefly/internal/batch" - "github.com/hyperledger/firefly/internal/batchpin" "github.com/hyperledger/firefly/internal/coreconfig" "github.com/hyperledger/firefly/internal/coremsgs" "github.com/hyperledger/firefly/internal/data" "github.com/hyperledger/firefly/internal/identity" "github.com/hyperledger/firefly/internal/metrics" + "github.com/hyperledger/firefly/internal/multiparty" "github.com/hyperledger/firefly/internal/operations" "github.com/hyperledger/firefly/internal/syncasync" "github.com/hyperledger/firefly/internal/sysmessaging" @@ -70,13 +70,13 @@ type broadcastManager struct { exchange dataexchange.Plugin sharedstorage sharedstorage.Plugin syncasync syncasync.Bridge - batchpin batchpin.Submitter + multiparty multiparty.Manager maxBatchPayloadLength int64 metrics metrics.Manager operations operations.Manager } -func NewBroadcastManager(ctx context.Context, ns string, di database.Plugin, bi blockchain.Plugin, dx dataexchange.Plugin, si sharedstorage.Plugin, im identity.Manager, dm data.Manager, ba batch.Manager, sa syncasync.Bridge, bp batchpin.Submitter, mm metrics.Manager, om operations.Manager) (Manager, error) { +func NewBroadcastManager(ctx context.Context, ns string, di database.Plugin, bi blockchain.Plugin, dx dataexchange.Plugin, si sharedstorage.Plugin, im identity.Manager, dm data.Manager, ba batch.Manager, sa syncasync.Bridge, mult multiparty.Manager, mm metrics.Manager, om operations.Manager) (Manager, error) { if di == nil || im == nil || dm == nil || bi == nil || dx == nil || si == nil || ba == nil || mm == nil || om == nil { return nil, i18n.NewError(ctx, coremsgs.MsgInitializationNilDepError, "BroadcastManager") } @@ -90,7 +90,7 @@ func NewBroadcastManager(ctx context.Context, ns string, di database.Plugin, bi exchange: dx, sharedstorage: si, syncasync: sa, - batchpin: bp, + multiparty: mult, maxBatchPayloadLength: config.GetByteSize(coreconfig.BroadcastBatchPayloadLimit), metrics: mm, operations: om, @@ -152,7 +152,7 @@ func (bm *broadcastManager) dispatchBatch(ctx context.Context, state *batch.Disp } payloadRef := outputs.GetString("payloadRef") log.L(ctx).Infof("Pinning broadcast batch %s with author=%s key=%s payloadRef=%s", batch.ID, batch.Author, batch.Key, payloadRef) - return bm.batchpin.SubmitPinnedBatch(ctx, &state.Persisted, state.Pins, payloadRef) + return bm.multiparty.SubmitBatchPin(ctx, &state.Persisted, state.Pins, payloadRef) } func (bm *broadcastManager) uploadBlobs(ctx context.Context, tx *fftypes.UUID, data core.DataArray) error { diff --git a/internal/broadcast/manager_test.go b/internal/broadcast/manager_test.go index a8eff6734a..6ef2cbe248 100644 --- a/internal/broadcast/manager_test.go +++ b/internal/broadcast/manager_test.go @@ -27,13 +27,13 @@ import ( "github.com/hyperledger/firefly/internal/data" "github.com/hyperledger/firefly/internal/operations" "github.com/hyperledger/firefly/mocks/batchmocks" - "github.com/hyperledger/firefly/mocks/batchpinmocks" "github.com/hyperledger/firefly/mocks/blockchainmocks" "github.com/hyperledger/firefly/mocks/databasemocks" "github.com/hyperledger/firefly/mocks/dataexchangemocks" "github.com/hyperledger/firefly/mocks/datamocks" "github.com/hyperledger/firefly/mocks/identitymanagermocks" "github.com/hyperledger/firefly/mocks/metricsmocks" + "github.com/hyperledger/firefly/mocks/multipartymocks" "github.com/hyperledger/firefly/mocks/operationmocks" "github.com/hyperledger/firefly/mocks/sharedstoragemocks" "github.com/hyperledger/firefly/mocks/syncasyncmocks" @@ -52,7 +52,7 @@ func newTestBroadcastCommon(t *testing.T, metricsEnabled bool) (*broadcastManage mba := &batchmocks.Manager{} mdx := &dataexchangemocks.Plugin{} msa := &syncasyncmocks.Bridge{} - mbp := &batchpinmocks.Submitter{} + mmp := &multipartymocks.Manager{} mmi := &metricsmocks.Manager{} mom := &operationmocks.Manager{} mmi.On("IsMetricsEnabled").Return(metricsEnabled) @@ -76,7 +76,7 @@ func newTestBroadcastCommon(t *testing.T, metricsEnabled bool) (*broadcastManage } ctx, cancel := context.WithCancel(context.Background()) - b, err := NewBroadcastManager(ctx, "ns1", mdi, mbi, mdx, mpi, mim, mdm, mba, msa, mbp, mmi, mom) + b, err := NewBroadcastManager(ctx, "ns1", mdi, mbi, mdx, mpi, mim, mdm, mba, msa, mmp, mmi, mom) assert.NoError(t, err) return b.(*broadcastManager), cancel } @@ -247,10 +247,10 @@ func TestDispatchBatchSubmitBatchPinSucceed(t *testing.T) { } mdi := bm.database.(*databasemocks.Plugin) - mbp := bm.batchpin.(*batchpinmocks.Submitter) + mmp := bm.multiparty.(*multipartymocks.Manager) mom := bm.operations.(*operationmocks.Manager) mom.On("AddOrReuseOperation", mock.Anything, mock.Anything).Return(nil) - mbp.On("SubmitPinnedBatch", mock.Anything, mock.Anything, mock.Anything, "payload1").Return(nil) + mmp.On("SubmitBatchPin", mock.Anything, mock.Anything, mock.Anything, "payload1").Return(nil) mom.On("RunOperation", mock.Anything, mock.MatchedBy(func(op *core.PreparedOperation) bool { data := op.Data.(uploadBatchData) return op.Type == core.OpTypeSharedStorageUploadBatch && data.Batch.ID.Equals(state.Persisted.ID) @@ -260,7 +260,7 @@ func TestDispatchBatchSubmitBatchPinSucceed(t *testing.T) { assert.NoError(t, err) mdi.AssertExpectations(t) - mbp.AssertExpectations(t) + mmp.AssertExpectations(t) mom.AssertExpectations(t) } @@ -279,10 +279,10 @@ func TestDispatchBatchSubmitBroadcastFail(t *testing.T) { } mdi := bm.database.(*databasemocks.Plugin) - mbp := bm.batchpin.(*batchpinmocks.Submitter) + mmp := bm.multiparty.(*multipartymocks.Manager) mom := bm.operations.(*operationmocks.Manager) mom.On("AddOrReuseOperation", mock.Anything, mock.Anything).Return(nil) - mbp.On("SubmitPinnedBatch", mock.Anything, mock.Anything, mock.Anything, "payload1").Return(fmt.Errorf("pop")) + mmp.On("SubmitBatchPin", mock.Anything, mock.Anything, mock.Anything, "payload1").Return(fmt.Errorf("pop")) mom.On("RunOperation", mock.Anything, mock.MatchedBy(func(op *core.PreparedOperation) bool { data := op.Data.(uploadBatchData) return op.Type == core.OpTypeSharedStorageUploadBatch && data.Batch.ID.Equals(state.Persisted.ID) @@ -292,7 +292,7 @@ func TestDispatchBatchSubmitBroadcastFail(t *testing.T) { assert.EqualError(t, err, "pop") mdi.AssertExpectations(t) - mbp.AssertExpectations(t) + mmp.AssertExpectations(t) mom.AssertExpectations(t) } diff --git a/internal/multiparty/manager.go b/internal/multiparty/manager.go index 60521d4854..4f47cf2af8 100644 --- a/internal/multiparty/manager.go +++ b/internal/multiparty/manager.go @@ -24,8 +24,11 @@ import ( "github.com/hyperledger/firefly-common/pkg/i18n" "github.com/hyperledger/firefly-common/pkg/log" "github.com/hyperledger/firefly/internal/coremsgs" + "github.com/hyperledger/firefly/internal/metrics" + "github.com/hyperledger/firefly/internal/operations" "github.com/hyperledger/firefly/pkg/blockchain" "github.com/hyperledger/firefly/pkg/core" + "github.com/hyperledger/firefly/pkg/database" ) type Manager interface { @@ -44,12 +47,14 @@ type Manager interface { GetNetworkVersion() int // SubmitBatchPin sequences a batch of message globally to all viewers of a given ledger - SubmitBatchPin(ctx context.Context, nsOpID string, signingKey string, batch *blockchain.BatchPin) error + SubmitBatchPin(ctx context.Context, batch *core.BatchPersisted, contexts []*fftypes.Bytes32, payloadRef string) error // SubmitNetworkAction writes a special "BatchPin" event which signals the plugin to take an action SubmitNetworkAction(ctx context.Context, nsOpID string, signingKey string, action core.NetworkActionType) error - core.Named + // From operations.OperationHandler + PrepareOperation(ctx context.Context, op *core.Operation) (*core.PreparedOperation, error) + RunOperation(ctx context.Context, op *core.PreparedOperation) (outputs fftypes.JSONObject, complete bool, err error) } type Contract struct { @@ -59,9 +64,12 @@ type Contract struct { type multipartyManager struct { ctx context.Context + namespace string + database database.Plugin blockchain blockchain.Plugin + operations operations.Manager + metrics metrics.Manager contracts []Contract - namespace string activeContract struct { location *fftypes.JSONAny firstEvent string @@ -70,12 +78,15 @@ type multipartyManager struct { } } -func NewMultipartyManager(ctx context.Context, ns string, contracts []Contract, bi blockchain.Plugin) Manager { +func NewMultipartyManager(ctx context.Context, ns string, contracts []Contract, di database.Plugin, bi blockchain.Plugin, om operations.Manager, mm metrics.Manager) Manager { return &multipartyManager{ ctx: ctx, namespace: ns, contracts: contracts, + database: di, blockchain: bi, + operations: om, + metrics: mm, } } @@ -145,10 +156,21 @@ func (mm *multipartyManager) SubmitNetworkAction(ctx context.Context, nsOpID str return mm.blockchain.SubmitNetworkAction(ctx, nsOpID, signingKey, action, mm.activeContract.location) } -func (mm *multipartyManager) SubmitBatchPin(ctx context.Context, nsOpID string, signingKey string, batch *blockchain.BatchPin) error { - return mm.blockchain.SubmitBatchPin(ctx, nsOpID, signingKey, batch, mm.activeContract.location) -} +func (mm *multipartyManager) SubmitBatchPin(ctx context.Context, batch *core.BatchPersisted, contexts []*fftypes.Bytes32, payloadRef string) error { + // The pending blockchain transaction + op := core.NewOperation( + mm.blockchain, + batch.Namespace, + batch.TX.ID, + core.OpTypeBlockchainPinBatch) + addBatchPinInputs(op, batch.ID, contexts, payloadRef) + if err := mm.operations.AddOrReuseOperation(ctx, op); err != nil { + return err + } -func (mm *multipartyManager) Name() string { - return mm.blockchain.Name() + if mm.metrics.IsMetricsEnabled() { + mm.metrics.CountBatchPin() + } + _, err := mm.operations.RunOperation(ctx, opBatchPin(op, batch, contexts, payloadRef)) + return err } diff --git a/internal/multiparty/manager_test.go b/internal/multiparty/manager_test.go index 5e32fc1054..93398c768a 100644 --- a/internal/multiparty/manager_test.go +++ b/internal/multiparty/manager_test.go @@ -23,6 +23,9 @@ import ( "github.com/hyperledger/firefly-common/pkg/fftypes" "github.com/hyperledger/firefly/mocks/blockchainmocks" + "github.com/hyperledger/firefly/mocks/databasemocks" + "github.com/hyperledger/firefly/mocks/metricsmocks" + "github.com/hyperledger/firefly/mocks/operationmocks" "github.com/hyperledger/firefly/pkg/blockchain" "github.com/hyperledger/firefly/pkg/core" "github.com/stretchr/testify/assert" @@ -31,28 +34,45 @@ import ( type testMultipartyManager struct { multipartyManager + mdi *databasemocks.Plugin mbi *blockchainmocks.Plugin + mom *operationmocks.Manager + mmi *metricsmocks.Manager +} + +func (mp *testMultipartyManager) cleanup(t *testing.T) { + mp.mdi.AssertExpectations(t) + mp.mbi.AssertExpectations(t) + mp.mom.AssertExpectations(t) + mp.mmi.AssertExpectations(t) } func newTestMultipartyManager() *testMultipartyManager { nm := &testMultipartyManager{ + mdi: &databasemocks.Plugin{}, mbi: &blockchainmocks.Plugin{}, + mom: &operationmocks.Manager{}, + mmi: &metricsmocks.Manager{}, multipartyManager: multipartyManager{ - ctx: context.Background(), + ctx: context.Background(), + namespace: "ns1", }, } + nm.multipartyManager.database = nm.mdi nm.multipartyManager.blockchain = nm.mbi - nm.mbi.On("Name").Return("ethereum") - nm.mbi.On("SubmitBatchPin", context.Background(), mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) - nm.mbi.On("SubmitNetworkAction", context.Background(), mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) + nm.multipartyManager.operations = nm.mom + nm.multipartyManager.metrics = nm.mmi return nm } func TestNewMultipartyManager(t *testing.T) { + mdi := &databasemocks.Plugin{} mbi := &blockchainmocks.Plugin{} + mom := &operationmocks.Manager{} + mmi := &metricsmocks.Manager{} contracts := make([]Contract, 0) - nm := NewMultipartyManager(context.Background(), "namespace", contracts, mbi) + nm := NewMultipartyManager(context.Background(), "namespace", contracts, mdi, mbi, mom, mmi) assert.NotNil(t, nm) } @@ -220,12 +240,6 @@ func TestConfigureContractNetworkVersionFail(t *testing.T) { assert.Regexp(t, "pop", err) } -func TestGetBlockchainName(t *testing.T) { - mp := newTestMultipartyManager() - name := mp.Name() - assert.Equal(t, "ethereum", name) -} - func TestSubmitNetworkAction(t *testing.T) { contracts := make([]Contract, 1) location := fftypes.JSONAnyPtr(fftypes.JSONObject{ @@ -240,6 +254,7 @@ func TestSubmitNetworkAction(t *testing.T) { mp := newTestMultipartyManager() mp.mbi.On("GetNetworkVersion", mock.Anything, mock.Anything).Return(1, nil) mp.mbi.On("AddFireflySubscription", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return("test", nil) + mp.mbi.On("SubmitNetworkAction", mock.Anything, "test", "0x123", core.NetworkActionTerminate, mock.Anything).Return(nil) mp.multipartyManager.contracts = contracts cf := &core.FireFlyContracts{ @@ -252,32 +267,104 @@ func TestSubmitNetworkAction(t *testing.T) { assert.Nil(t, err) } -func TestSubmitBatchPin(t *testing.T) { - contracts := make([]Contract, 1) - location := fftypes.JSONAnyPtr(fftypes.JSONObject{ - "address": "0x123", - }.String()) - contract := Contract{ - FirstEvent: "0", - Location: location, +func TestSubmitBatchPinOk(t *testing.T) { + mp := newTestMultipartyManager() + defer mp.cleanup(t) + ctx := context.Background() + + batch := &core.BatchPersisted{ + BatchHeader: core.BatchHeader{ + ID: fftypes.NewUUID(), + SignerRef: core.SignerRef{ + Author: "id1", + Key: "0x12345", + }, + }, + TX: core.TransactionRef{ + ID: fftypes.NewUUID(), + }, } + contexts := []*fftypes.Bytes32{} + + mp.mbi.On("Name").Return("ut") + mp.mom.On("AddOrReuseOperation", ctx, mock.MatchedBy(func(op *core.Operation) bool { + assert.Equal(t, core.OpTypeBlockchainPinBatch, op.Type) + assert.Equal(t, "ut", op.Plugin) + assert.Equal(t, *batch.TX.ID, *op.Transaction) + assert.Equal(t, "payload1", op.Input.GetString("payloadRef")) + return true + })).Return(nil) + mp.mmi.On("IsMetricsEnabled").Return(false) + mp.mom.On("RunOperation", mock.Anything, mock.MatchedBy(func(op *core.PreparedOperation) bool { + data := op.Data.(batchPinData) + return op.Type == core.OpTypeBlockchainPinBatch && data.Batch == batch + })).Return(nil, nil) + + err := mp.SubmitBatchPin(ctx, batch, contexts, "payload1") + assert.NoError(t, err) +} - contracts[0] = contract +func TestSubmitPinnedBatchWithMetricsOk(t *testing.T) { mp := newTestMultipartyManager() - mp.mbi.On("GetNetworkVersion", mock.Anything, mock.Anything).Return(1, nil) - mp.mbi.On("AddFireflySubscription", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return("test", nil) - mp.multipartyManager.contracts = contracts - - cf := &core.FireFlyContracts{ - Active: core.FireFlyContractInfo{Index: 0}, + defer mp.cleanup(t) + ctx := context.Background() + + batch := &core.BatchPersisted{ + BatchHeader: core.BatchHeader{ + ID: fftypes.NewUUID(), + SignerRef: core.SignerRef{ + Author: "id1", + Key: "0x12345", + }, + }, + TX: core.TransactionRef{ + ID: fftypes.NewUUID(), + }, } + contexts := []*fftypes.Bytes32{} + + mp.mbi.On("Name").Return("ut") + mp.mom.On("AddOrReuseOperation", ctx, mock.MatchedBy(func(op *core.Operation) bool { + assert.Equal(t, core.OpTypeBlockchainPinBatch, op.Type) + assert.Equal(t, "ut", op.Plugin) + assert.Equal(t, *batch.TX.ID, *op.Transaction) + assert.Equal(t, "payload1", op.Input.GetString("payloadRef")) + return true + })).Return(nil) + mp.mmi.On("IsMetricsEnabled").Return(true) + mp.mmi.On("CountBatchPin").Return() + mp.mom.On("RunOperation", mock.Anything, mock.MatchedBy(func(op *core.PreparedOperation) bool { + data := op.Data.(batchPinData) + return op.Type == core.OpTypeBlockchainPinBatch && data.Batch == batch + })).Return(nil, nil) + + err := mp.SubmitBatchPin(ctx, batch, contexts, "payload1") + assert.NoError(t, err) +} - batchPin := &blockchain.BatchPin{} +func TestSubmitPinnedBatchOpFail(t *testing.T) { + mp := newTestMultipartyManager() + defer mp.cleanup(t) + ctx := context.Background() + + batch := &core.BatchPersisted{ + BatchHeader: core.BatchHeader{ + ID: fftypes.NewUUID(), + SignerRef: core.SignerRef{ + Author: "id1", + Key: "0x12345", + }, + }, + TX: core.TransactionRef{ + ID: fftypes.NewUUID(), + }, + } + contexts := []*fftypes.Bytes32{} - err := mp.ConfigureContract(context.Background(), cf) - assert.NoError(t, err) - err = mp.SubmitBatchPin(context.Background(), "ns1", "0x123", batchPin) - assert.Nil(t, err) + mp.mbi.On("Name").Return("ut") + mp.mom.On("AddOrReuseOperation", ctx, mock.Anything).Return(fmt.Errorf("pop")) + err := mp.SubmitBatchPin(ctx, batch, contexts, "payload1") + assert.Regexp(t, "pop", err) } func TestGetNetworkVersion(t *testing.T) { diff --git a/internal/batchpin/operations.go b/internal/multiparty/operations.go similarity index 90% rename from internal/batchpin/operations.go rename to internal/multiparty/operations.go index 6ad8165fa4..ed4dfd5c18 100644 --- a/internal/batchpin/operations.go +++ b/internal/multiparty/operations.go @@ -14,7 +14,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package batchpin +package multiparty import ( "context" @@ -62,14 +62,14 @@ func retrieveBatchPinInputs(ctx context.Context, op *core.Operation) (batchID *f return batchID, contexts, payloadRef, nil } -func (bp *batchPinSubmitter) PrepareOperation(ctx context.Context, op *core.Operation) (*core.PreparedOperation, error) { +func (mm *multipartyManager) PrepareOperation(ctx context.Context, op *core.Operation) (*core.PreparedOperation, error) { switch op.Type { case core.OpTypeBlockchainPinBatch: batchID, contexts, payloadRef, err := retrieveBatchPinInputs(ctx, op) if err != nil { return nil, err } - batch, err := bp.database.GetBatchByID(ctx, bp.namespace, batchID) + batch, err := mm.database.GetBatchByID(ctx, mm.namespace, batchID) if err != nil { return nil, err } else if batch == nil { @@ -82,25 +82,25 @@ func (bp *batchPinSubmitter) PrepareOperation(ctx context.Context, op *core.Oper } } -func (bp *batchPinSubmitter) RunOperation(ctx context.Context, op *core.PreparedOperation) (outputs fftypes.JSONObject, complete bool, err error) { +func (mm *multipartyManager) RunOperation(ctx context.Context, op *core.PreparedOperation) (outputs fftypes.JSONObject, complete bool, err error) { switch data := op.Data.(type) { case batchPinData: batch := data.Batch - return nil, false, bp.multiparty.SubmitBatchPin(ctx, op.NamespacedIDString(), batch.Key, &blockchain.BatchPin{ + return nil, false, mm.blockchain.SubmitBatchPin(ctx, op.NamespacedIDString(), batch.Key, &blockchain.BatchPin{ Namespace: batch.Namespace, TransactionID: batch.TX.ID, BatchID: batch.ID, BatchHash: batch.Hash, BatchPayloadRef: data.PayloadRef, Contexts: data.Contexts, - }) + }, mm.activeContract.location) default: return nil, false, i18n.NewError(ctx, coremsgs.MsgOperationDataIncorrect, op.Data) } } -func (bp *batchPinSubmitter) OnOperationUpdate(ctx context.Context, op *core.Operation, update *operations.OperationUpdate) error { +func (mm *multipartyManager) OnOperationUpdate(ctx context.Context, op *core.Operation, update *operations.OperationUpdate) error { return nil } diff --git a/internal/batchpin/operations_test.go b/internal/multiparty/operations_test.go similarity index 64% rename from internal/batchpin/operations_test.go rename to internal/multiparty/operations_test.go index 5ba65ff89e..40d4db315b 100644 --- a/internal/batchpin/operations_test.go +++ b/internal/multiparty/operations_test.go @@ -13,7 +13,8 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. -package batchpin + +package multiparty import ( "context" @@ -21,15 +22,14 @@ import ( "testing" "github.com/hyperledger/firefly-common/pkg/fftypes" - "github.com/hyperledger/firefly/mocks/databasemocks" - "github.com/hyperledger/firefly/mocks/multipartymocks" "github.com/hyperledger/firefly/pkg/core" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" ) func TestPrepareAndRunBatchPin(t *testing.T) { - bp := newTestBatchPinSubmitter(t, false) + mp := newTestMultipartyManager() + defer mp.cleanup(t) op := &core.Operation{ Type: core.OpTypeBlockchainPinBatch, @@ -50,46 +50,45 @@ func TestPrepareAndRunBatchPin(t *testing.T) { } addBatchPinInputs(op, batch.ID, contexts, "payload1") - mmp := bp.multiparty.(*multipartymocks.Manager) - mdi := bp.database.(*databasemocks.Plugin) - mdi.On("GetBatchByID", context.Background(), "ns1", batch.ID).Return(batch, nil) - mmp.On("SubmitBatchPin", context.Background(), "ns1:"+op.ID.String(), "0x123", mock.Anything).Return(nil) + mp.mdi.On("GetBatchByID", context.Background(), "ns1", batch.ID).Return(batch, nil) + mp.mbi.On("SubmitBatchPin", context.Background(), "ns1:"+op.ID.String(), "0x123", mock.Anything, mock.Anything).Return(nil) - po, err := bp.PrepareOperation(context.Background(), op) + po, err := mp.PrepareOperation(context.Background(), op) assert.NoError(t, err) assert.Equal(t, batch, po.Data.(batchPinData).Batch) - _, complete, err := bp.RunOperation(context.Background(), opBatchPin(op, batch, contexts, "payload1")) + _, complete, err := mp.RunOperation(context.Background(), opBatchPin(op, batch, contexts, "payload1")) assert.False(t, complete) assert.NoError(t, err) - - mdi.AssertExpectations(t) } func TestPrepareOperationNotSupported(t *testing.T) { - bp := newTestBatchPinSubmitter(t, false) + mp := newTestMultipartyManager() + defer mp.cleanup(t) - po, err := bp.PrepareOperation(context.Background(), &core.Operation{}) + po, err := mp.PrepareOperation(context.Background(), &core.Operation{}) assert.Nil(t, po) assert.Regexp(t, "FF10371", err) } func TestPrepareOperationBatchPinBadBatch(t *testing.T) { - bp := newTestBatchPinSubmitter(t, false) + mp := newTestMultipartyManager() + defer mp.cleanup(t) op := &core.Operation{ Type: core.OpTypeBlockchainPinBatch, Input: fftypes.JSONObject{"batch": "bad"}, } - _, err := bp.PrepareOperation(context.Background(), op) + _, err := mp.PrepareOperation(context.Background(), op) assert.Regexp(t, "FF00138", err) } func TestPrepareOperationBatchPinBadContext(t *testing.T) { - bp := newTestBatchPinSubmitter(t, false) + mp := newTestMultipartyManager() + defer mp.cleanup(t) op := &core.Operation{ Type: core.OpTypeBlockchainPinBatch, @@ -99,21 +98,23 @@ func TestPrepareOperationBatchPinBadContext(t *testing.T) { }, } - _, err := bp.PrepareOperation(context.Background(), op) + _, err := mp.PrepareOperation(context.Background(), op) assert.Regexp(t, "FF00107", err) } func TestRunOperationNotSupported(t *testing.T) { - bp := newTestBatchPinSubmitter(t, false) + mp := newTestMultipartyManager() + defer mp.cleanup(t) - _, complete, err := bp.RunOperation(context.Background(), &core.PreparedOperation{}) + _, complete, err := mp.RunOperation(context.Background(), &core.PreparedOperation{}) assert.False(t, complete) assert.Regexp(t, "FF10378", err) } func TestPrepareOperationBatchPinError(t *testing.T) { - bp := newTestBatchPinSubmitter(t, false) + mp := newTestMultipartyManager() + defer mp.cleanup(t) batchID := fftypes.NewUUID() op := &core.Operation{ @@ -124,15 +125,15 @@ func TestPrepareOperationBatchPinError(t *testing.T) { }, } - mdi := bp.database.(*databasemocks.Plugin) - mdi.On("GetBatchByID", context.Background(), "ns1", batchID).Return(nil, fmt.Errorf("pop")) + mp.mdi.On("GetBatchByID", context.Background(), "ns1", batchID).Return(nil, fmt.Errorf("pop")) - _, err := bp.PrepareOperation(context.Background(), op) + _, err := mp.PrepareOperation(context.Background(), op) assert.EqualError(t, err, "pop") } func TestPrepareOperationBatchPinNotFound(t *testing.T) { - bp := newTestBatchPinSubmitter(t, false) + mp := newTestMultipartyManager() + defer mp.cleanup(t) batchID := fftypes.NewUUID() op := &core.Operation{ @@ -143,14 +144,14 @@ func TestPrepareOperationBatchPinNotFound(t *testing.T) { }, } - mdi := bp.database.(*databasemocks.Plugin) - mdi.On("GetBatchByID", context.Background(), "ns1", batchID).Return(nil, nil) + mp.mdi.On("GetBatchByID", context.Background(), "ns1", batchID).Return(nil, nil) - _, err := bp.PrepareOperation(context.Background(), op) + _, err := mp.PrepareOperation(context.Background(), op) assert.Regexp(t, "FF10109", err) } func TestOperationUpdate(t *testing.T) { - bp := newTestBatchPinSubmitter(t, false) - assert.NoError(t, bp.OnOperationUpdate(context.Background(), nil, nil)) + mp := newTestMultipartyManager() + defer mp.cleanup(t) + assert.NoError(t, mp.OnOperationUpdate(context.Background(), nil, nil)) } diff --git a/internal/orchestrator/orchestrator.go b/internal/orchestrator/orchestrator.go index 468540e96e..ab2909196b 100644 --- a/internal/orchestrator/orchestrator.go +++ b/internal/orchestrator/orchestrator.go @@ -24,7 +24,6 @@ import ( "github.com/hyperledger/firefly-common/pkg/log" "github.com/hyperledger/firefly/internal/assets" "github.com/hyperledger/firefly/internal/batch" - "github.com/hyperledger/firefly/internal/batchpin" "github.com/hyperledger/firefly/internal/broadcast" "github.com/hyperledger/firefly/internal/contracts" "github.com/hyperledger/firefly/internal/coremsgs" @@ -184,7 +183,6 @@ type orchestrator struct { definitions definitions.DefinitionHandler data data.Manager syncasync syncasync.Bridge - batchpin batchpin.Submitter assets assets.Manager bc boundCallbacks contracts contracts.Manager @@ -392,7 +390,7 @@ func (or *orchestrator) initPlugins(ctx context.Context) (err error) { func (or *orchestrator) initComponents(ctx context.Context) (err error) { if or.multiparty == nil { - or.multiparty = multiparty.NewMultipartyManager(or.ctx, or.namespace, or.config.Multiparty.Contracts, or.blockchain()) + or.multiparty = multiparty.NewMultipartyManager(or.ctx, or.namespace, or.config.Multiparty.Contracts, or.database(), or.blockchain(), or.operations, or.metrics) } if or.identity == nil { @@ -428,20 +426,14 @@ func (or *orchestrator) initComponents(ctx context.Context) (err error) { } } - if or.batchpin == nil { - if or.batchpin, err = batchpin.NewBatchPinSubmitter(ctx, or.namespace, or.database(), or.identity, or.multiparty, or.metrics, or.operations); err != nil { - return err - } - } - if or.messaging == nil { - if or.messaging, err = privatemessaging.NewPrivateMessaging(ctx, or.namespace, or.database(), or.dataexchange(), or.blockchain(), or.identity, or.batch, or.data, or.syncasync, or.batchpin, or.metrics, or.operations); err != nil { + if or.messaging, err = privatemessaging.NewPrivateMessaging(ctx, or.namespace, or.database(), or.dataexchange(), or.blockchain(), or.identity, or.batch, or.data, or.syncasync, or.multiparty, or.metrics, or.operations); err != nil { return err } } if or.broadcast == nil { - if or.broadcast, err = broadcast.NewBroadcastManager(ctx, or.namespace, or.database(), or.blockchain(), or.dataexchange(), or.sharedstorage(), or.identity, or.data, or.batch, or.syncasync, or.batchpin, or.metrics, or.operations); err != nil { + if or.broadcast, err = broadcast.NewBroadcastManager(ctx, or.namespace, or.database(), or.blockchain(), or.dataexchange(), or.sharedstorage(), or.identity, or.data, or.batch, or.syncasync, or.multiparty, or.metrics, or.operations); err != nil { return err } } diff --git a/internal/orchestrator/orchestrator_test.go b/internal/orchestrator/orchestrator_test.go index 4cb9b99621..c04025e61d 100644 --- a/internal/orchestrator/orchestrator_test.go +++ b/internal/orchestrator/orchestrator_test.go @@ -25,7 +25,6 @@ import ( "github.com/hyperledger/firefly/internal/identity" "github.com/hyperledger/firefly/mocks/assetmocks" "github.com/hyperledger/firefly/mocks/batchmocks" - "github.com/hyperledger/firefly/mocks/batchpinmocks" "github.com/hyperledger/firefly/mocks/blockchainmocks" "github.com/hyperledger/firefly/mocks/broadcastmocks" "github.com/hyperledger/firefly/mocks/contractmocks" @@ -73,7 +72,6 @@ type testOrchestrator struct { mcm *contractmocks.Manager mmi *metricsmocks.Manager mom *operationmocks.Manager - mbp *batchpinmocks.Submitter mth *txcommonmocks.Helper msd *shareddownloadmocks.Manager mae *spieventsmocks.Manager @@ -99,7 +97,6 @@ func (tor *testOrchestrator) cleanup(t *testing.T) { tor.mcm.AssertExpectations(t) tor.mmi.AssertExpectations(t) tor.mom.AssertExpectations(t) - tor.mbp.AssertExpectations(t) tor.mth.AssertExpectations(t) tor.msd.AssertExpectations(t) tor.mae.AssertExpectations(t) @@ -133,7 +130,6 @@ func newTestOrchestrator() *testOrchestrator { mcm: &contractmocks.Manager{}, mmi: &metricsmocks.Manager{}, mom: &operationmocks.Manager{}, - mbp: &batchpinmocks.Submitter{}, mth: &txcommonmocks.Helper{}, msd: &shareddownloadmocks.Manager{}, mae: &spieventsmocks.Manager{}, @@ -152,7 +148,6 @@ func newTestOrchestrator() *testOrchestrator { tor.orchestrator.contracts = tor.mcm tor.orchestrator.metrics = tor.mmi tor.orchestrator.operations = tor.mom - tor.orchestrator.batchpin = tor.mbp tor.orchestrator.sharedDownload = tor.msd tor.orchestrator.txHelper = tor.mth tor.orchestrator.definitions = tor.mdh @@ -347,15 +342,6 @@ func TestInitDefinitionsComponentFail(t *testing.T) { assert.Regexp(t, "FF10128", err) } -func TestInitBatchPinComponentFail(t *testing.T) { - or := newTestOrchestrator() - defer or.cleanup(t) - or.plugins.Database.Plugin = nil - or.batchpin = nil - err := or.initComponents(context.Background()) - assert.Regexp(t, "FF10128", err) -} - func TestInitOperationsComponentFail(t *testing.T) { or := newTestOrchestrator() defer or.cleanup(t) diff --git a/internal/privatemessaging/privatemessaging.go b/internal/privatemessaging/privatemessaging.go index 33be7d7e92..d7ce7b46b4 100644 --- a/internal/privatemessaging/privatemessaging.go +++ b/internal/privatemessaging/privatemessaging.go @@ -26,12 +26,12 @@ import ( "github.com/hyperledger/firefly-common/pkg/log" "github.com/hyperledger/firefly-common/pkg/retry" "github.com/hyperledger/firefly/internal/batch" - "github.com/hyperledger/firefly/internal/batchpin" "github.com/hyperledger/firefly/internal/coreconfig" "github.com/hyperledger/firefly/internal/coremsgs" "github.com/hyperledger/firefly/internal/data" "github.com/hyperledger/firefly/internal/identity" "github.com/hyperledger/firefly/internal/metrics" + "github.com/hyperledger/firefly/internal/multiparty" "github.com/hyperledger/firefly/internal/operations" "github.com/hyperledger/firefly/internal/syncasync" "github.com/hyperledger/firefly/internal/sysmessaging" @@ -70,7 +70,7 @@ type privateMessaging struct { blockchain blockchain.Plugin data data.Manager syncasync syncasync.Bridge - batchpin batchpin.Submitter + multiparty multiparty.Manager retry retry.Retry localNodeName string localNodeID *fftypes.UUID // lookup and cached on first use, as might not be registered at startup @@ -86,7 +86,7 @@ type blobTransferTracker struct { op *core.PreparedOperation } -func NewPrivateMessaging(ctx context.Context, ns string, di database.Plugin, dx dataexchange.Plugin, bi blockchain.Plugin, im identity.Manager, ba batch.Manager, dm data.Manager, sa syncasync.Bridge, bp batchpin.Submitter, mm metrics.Manager, om operations.Manager) (Manager, error) { +func NewPrivateMessaging(ctx context.Context, ns string, di database.Plugin, dx dataexchange.Plugin, bi blockchain.Plugin, im identity.Manager, ba batch.Manager, dm data.Manager, sa syncasync.Bridge, mult multiparty.Manager, mm metrics.Manager, om operations.Manager) (Manager, error) { if di == nil || im == nil || dx == nil || bi == nil || ba == nil || dm == nil || mm == nil || om == nil { return nil, i18n.NewError(ctx, coremsgs.MsgInitializationNilDepError, "PrivateMessaging") } @@ -100,7 +100,7 @@ func NewPrivateMessaging(ctx context.Context, ns string, di database.Plugin, dx blockchain: bi, data: dm, syncasync: sa, - batchpin: bp, + multiparty: mult, localNodeName: config.GetString(coreconfig.NodeName), groupManager: groupManager{ namespace: ns, @@ -172,7 +172,7 @@ func (pm *privateMessaging) dispatchPinnedBatch(ctx context.Context, state *batc } log.L(ctx).Infof("Pinning private batch %s with author=%s key=%s group=%s", state.Persisted.ID, state.Persisted.Author, state.Persisted.Key, state.Persisted.Group) - return pm.batchpin.SubmitPinnedBatch(ctx, &state.Persisted, state.Pins, "" /* no payloadRef for private */) + return pm.multiparty.SubmitBatchPin(ctx, &state.Persisted, state.Pins, "" /* no payloadRef for private */) } func (pm *privateMessaging) dispatchUnpinnedBatch(ctx context.Context, state *batch.DispatchState) error { diff --git a/internal/privatemessaging/privatemessaging_test.go b/internal/privatemessaging/privatemessaging_test.go index 118bb379b6..86ff52fe9d 100644 --- a/internal/privatemessaging/privatemessaging_test.go +++ b/internal/privatemessaging/privatemessaging_test.go @@ -26,13 +26,13 @@ import ( "github.com/hyperledger/firefly/internal/batch" "github.com/hyperledger/firefly/internal/coreconfig" "github.com/hyperledger/firefly/mocks/batchmocks" - "github.com/hyperledger/firefly/mocks/batchpinmocks" "github.com/hyperledger/firefly/mocks/blockchainmocks" "github.com/hyperledger/firefly/mocks/databasemocks" "github.com/hyperledger/firefly/mocks/dataexchangemocks" "github.com/hyperledger/firefly/mocks/datamocks" "github.com/hyperledger/firefly/mocks/identitymanagermocks" "github.com/hyperledger/firefly/mocks/metricsmocks" + "github.com/hyperledger/firefly/mocks/multipartymocks" "github.com/hyperledger/firefly/mocks/operationmocks" "github.com/hyperledger/firefly/mocks/syncasyncmocks" "github.com/hyperledger/firefly/pkg/core" @@ -53,7 +53,7 @@ func newTestPrivateMessagingCommon(t *testing.T, metricsEnabled bool) (*privateM mba := &batchmocks.Manager{} mdm := &datamocks.Manager{} msa := &syncasyncmocks.Bridge{} - mbp := &batchpinmocks.Submitter{} + mmp := &multipartymocks.Manager{} mmi := &metricsmocks.Manager{} mom := &operationmocks.Manager{} mockRunAsGroupPassthrough(mdi) @@ -77,7 +77,7 @@ func newTestPrivateMessagingCommon(t *testing.T, metricsEnabled bool) (*privateM mom.On("RegisterHandler", mock.Anything, mock.Anything, mock.Anything) ctx, cancel := context.WithCancel(context.Background()) - pm, err := NewPrivateMessaging(ctx, "ns1", mdi, mdx, mbi, mim, mba, mdm, msa, mbp, mmi, mom) + pm, err := NewPrivateMessaging(ctx, "ns1", mdi, mdx, mbi, mim, mba, mdm, msa, mmp, mmi, mom) assert.NoError(t, err) // Default mocks to save boilerplate in the tests @@ -130,7 +130,7 @@ func TestDispatchBatchWithBlobs(t *testing.T) { blob1 := fftypes.NewRandB32() mdi := pm.database.(*databasemocks.Plugin) - mbp := pm.batchpin.(*batchpinmocks.Submitter) + mmp := pm.multiparty.(*multipartymocks.Manager) mdx := pm.exchange.(*dataexchangemocks.Plugin) mim := pm.identity.(*identitymanagermocks.Manager) mom := pm.operations.(*operationmocks.Manager) @@ -179,7 +179,7 @@ func TestDispatchBatchWithBlobs(t *testing.T) { return *data.Node.ID == *node2.ID })).Return(nil, nil) - mbp.On("SubmitPinnedBatch", pm.ctx, mock.Anything, mock.Anything, "").Return(nil) + mmp.On("SubmitBatchPin", pm.ctx, mock.Anything, mock.Anything, "").Return(nil) err := pm.dispatchPinnedBatch(pm.ctx, &batch.DispatchState{ Persisted: core.BatchPersisted{ @@ -205,7 +205,7 @@ func TestDispatchBatchWithBlobs(t *testing.T) { assert.NoError(t, err) mdi.AssertExpectations(t) - mbp.AssertExpectations(t) + mmp.AssertExpectations(t) mdx.AssertExpectations(t) mim.AssertExpectations(t) mom.AssertExpectations(t) @@ -234,9 +234,6 @@ func TestSendAndSubmitBatchBadID(t *testing.T) { mdi := pm.database.(*databasemocks.Plugin) mdi.On("GetGroupByHash", pm.ctx, "ns1", mock.Anything).Return(nil, fmt.Errorf("pop")) - mbp := pm.batchpin.(*batchpinmocks.Submitter) - mbp.On("SubmitPinnedBatch", pm.ctx, mock.Anything, mock.Anything, "").Return(fmt.Errorf("pop")) - err := pm.dispatchPinnedBatch(pm.ctx, &batch.DispatchState{ Persisted: core.BatchPersisted{ BatchHeader: core.BatchHeader{ @@ -468,8 +465,8 @@ func TestWriteTransactionSubmitBatchPinFail(t *testing.T) { PayloadRef: "/blob/1", }, nil) - mbp := pm.batchpin.(*batchpinmocks.Submitter) - mbp.On("SubmitPinnedBatch", pm.ctx, mock.Anything, mock.Anything, "").Return(fmt.Errorf("pop")) + mmp := pm.multiparty.(*multipartymocks.Manager) + mmp.On("SubmitBatchPin", pm.ctx, mock.Anything, mock.Anything, "").Return(fmt.Errorf("pop")) err := pm.dispatchPinnedBatch(pm.ctx, &batch.DispatchState{ Persisted: core.BatchPersisted{ @@ -489,7 +486,7 @@ func TestWriteTransactionSubmitBatchPinFail(t *testing.T) { mdi.AssertExpectations(t) mim.AssertExpectations(t) - mbp.AssertExpectations(t) + mmp.AssertExpectations(t) mom.AssertExpectations(t) } diff --git a/mocks/batchpinmocks/submitter.go b/mocks/batchpinmocks/submitter.go deleted file mode 100644 index 8c0fbbfdc6..0000000000 --- a/mocks/batchpinmocks/submitter.go +++ /dev/null @@ -1,98 +0,0 @@ -// Code generated by mockery v1.0.0. DO NOT EDIT. - -package batchpinmocks - -import ( - context "context" - - fftypes "github.com/hyperledger/firefly-common/pkg/fftypes" - core "github.com/hyperledger/firefly/pkg/core" - - mock "github.com/stretchr/testify/mock" -) - -// Submitter is an autogenerated mock type for the Submitter type -type Submitter struct { - mock.Mock -} - -// Name provides a mock function with given fields: -func (_m *Submitter) Name() string { - ret := _m.Called() - - var r0 string - if rf, ok := ret.Get(0).(func() string); ok { - r0 = rf() - } else { - r0 = ret.Get(0).(string) - } - - return r0 -} - -// PrepareOperation provides a mock function with given fields: ctx, op -func (_m *Submitter) PrepareOperation(ctx context.Context, op *core.Operation) (*core.PreparedOperation, error) { - ret := _m.Called(ctx, op) - - var r0 *core.PreparedOperation - if rf, ok := ret.Get(0).(func(context.Context, *core.Operation) *core.PreparedOperation); ok { - r0 = rf(ctx, op) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(*core.PreparedOperation) - } - } - - var r1 error - if rf, ok := ret.Get(1).(func(context.Context, *core.Operation) error); ok { - r1 = rf(ctx, op) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// RunOperation provides a mock function with given fields: ctx, op -func (_m *Submitter) RunOperation(ctx context.Context, op *core.PreparedOperation) (fftypes.JSONObject, bool, error) { - ret := _m.Called(ctx, op) - - var r0 fftypes.JSONObject - if rf, ok := ret.Get(0).(func(context.Context, *core.PreparedOperation) fftypes.JSONObject); ok { - r0 = rf(ctx, op) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(fftypes.JSONObject) - } - } - - var r1 bool - if rf, ok := ret.Get(1).(func(context.Context, *core.PreparedOperation) bool); ok { - r1 = rf(ctx, op) - } else { - r1 = ret.Get(1).(bool) - } - - var r2 error - if rf, ok := ret.Get(2).(func(context.Context, *core.PreparedOperation) error); ok { - r2 = rf(ctx, op) - } else { - r2 = ret.Error(2) - } - - return r0, r1, r2 -} - -// SubmitPinnedBatch provides a mock function with given fields: ctx, batch, contexts, payloadRef -func (_m *Submitter) SubmitPinnedBatch(ctx context.Context, batch *core.BatchPersisted, contexts []*fftypes.Bytes32, payloadRef string) error { - ret := _m.Called(ctx, batch, contexts, payloadRef) - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, *core.BatchPersisted, []*fftypes.Bytes32, string) error); ok { - r0 = rf(ctx, batch, contexts, payloadRef) - } else { - r0 = ret.Error(0) - } - - return r0 -} diff --git a/mocks/multipartymocks/manager.go b/mocks/multipartymocks/manager.go index 154835ed5b..7722553bdc 100644 --- a/mocks/multipartymocks/manager.go +++ b/mocks/multipartymocks/manager.go @@ -47,27 +47,66 @@ func (_m *Manager) GetNetworkVersion() int { return r0 } -// Name provides a mock function with given fields: -func (_m *Manager) Name() string { - ret := _m.Called() +// PrepareOperation provides a mock function with given fields: ctx, op +func (_m *Manager) PrepareOperation(ctx context.Context, op *core.Operation) (*core.PreparedOperation, error) { + ret := _m.Called(ctx, op) - var r0 string - if rf, ok := ret.Get(0).(func() string); ok { - r0 = rf() + var r0 *core.PreparedOperation + if rf, ok := ret.Get(0).(func(context.Context, *core.Operation) *core.PreparedOperation); ok { + r0 = rf(ctx, op) } else { - r0 = ret.Get(0).(string) + if ret.Get(0) != nil { + r0 = ret.Get(0).(*core.PreparedOperation) + } } - return r0 + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, *core.Operation) error); ok { + r1 = rf(ctx, op) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// RunOperation provides a mock function with given fields: ctx, op +func (_m *Manager) RunOperation(ctx context.Context, op *core.PreparedOperation) (fftypes.JSONObject, bool, error) { + ret := _m.Called(ctx, op) + + var r0 fftypes.JSONObject + if rf, ok := ret.Get(0).(func(context.Context, *core.PreparedOperation) fftypes.JSONObject); ok { + r0 = rf(ctx, op) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(fftypes.JSONObject) + } + } + + var r1 bool + if rf, ok := ret.Get(1).(func(context.Context, *core.PreparedOperation) bool); ok { + r1 = rf(ctx, op) + } else { + r1 = ret.Get(1).(bool) + } + + var r2 error + if rf, ok := ret.Get(2).(func(context.Context, *core.PreparedOperation) error); ok { + r2 = rf(ctx, op) + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 } -// SubmitBatchPin provides a mock function with given fields: ctx, nsOpID, signingKey, batch -func (_m *Manager) SubmitBatchPin(ctx context.Context, nsOpID string, signingKey string, batch *blockchain.BatchPin) error { - ret := _m.Called(ctx, nsOpID, signingKey, batch) +// SubmitBatchPin provides a mock function with given fields: ctx, batch, contexts, payloadRef +func (_m *Manager) SubmitBatchPin(ctx context.Context, batch *core.BatchPersisted, contexts []*fftypes.Bytes32, payloadRef string) error { + ret := _m.Called(ctx, batch, contexts, payloadRef) var r0 error - if rf, ok := ret.Get(0).(func(context.Context, string, string, *blockchain.BatchPin) error); ok { - r0 = rf(ctx, nsOpID, signingKey, batch) + if rf, ok := ret.Get(0).(func(context.Context, *core.BatchPersisted, []*fftypes.Bytes32, string) error); ok { + r0 = rf(ctx, batch, contexts, payloadRef) } else { r0 = ret.Error(0) } From acc7369a72e6c4158da7ddde6474c56f34e4841b Mon Sep 17 00:00:00 2001 From: Andrew Richardson Date: Fri, 24 Jun 2022 13:13:28 -0400 Subject: [PATCH 2/3] Fix manager initialization order Signed-off-by: Andrew Richardson --- internal/broadcast/manager.go | 2 +- internal/multiparty/manager.go | 7 ++++-- internal/multiparty/manager_test.go | 9 ++++++- internal/orchestrator/orchestrator.go | 25 +++++++++++-------- internal/orchestrator/orchestrator_test.go | 5 ++-- internal/privatemessaging/privatemessaging.go | 2 +- 6 files changed, 31 insertions(+), 19 deletions(-) diff --git a/internal/broadcast/manager.go b/internal/broadcast/manager.go index 656d98baa5..e71aaf2382 100644 --- a/internal/broadcast/manager.go +++ b/internal/broadcast/manager.go @@ -77,7 +77,7 @@ type broadcastManager struct { } func NewBroadcastManager(ctx context.Context, ns string, di database.Plugin, bi blockchain.Plugin, dx dataexchange.Plugin, si sharedstorage.Plugin, im identity.Manager, dm data.Manager, ba batch.Manager, sa syncasync.Bridge, mult multiparty.Manager, mm metrics.Manager, om operations.Manager) (Manager, error) { - if di == nil || im == nil || dm == nil || bi == nil || dx == nil || si == nil || ba == nil || mm == nil || om == nil { + if di == nil || im == nil || dm == nil || bi == nil || dx == nil || si == nil || ba == nil || mm == nil || om == nil || mult == nil { return nil, i18n.NewError(ctx, coremsgs.MsgInitializationNilDepError, "BroadcastManager") } bm := &broadcastManager{ diff --git a/internal/multiparty/manager.go b/internal/multiparty/manager.go index 4f47cf2af8..5bfa3de334 100644 --- a/internal/multiparty/manager.go +++ b/internal/multiparty/manager.go @@ -78,7 +78,10 @@ type multipartyManager struct { } } -func NewMultipartyManager(ctx context.Context, ns string, contracts []Contract, di database.Plugin, bi blockchain.Plugin, om operations.Manager, mm metrics.Manager) Manager { +func NewMultipartyManager(ctx context.Context, ns string, contracts []Contract, di database.Plugin, bi blockchain.Plugin, om operations.Manager, mm metrics.Manager) (Manager, error) { + if di == nil || bi == nil || mm == nil || om == nil { + return nil, i18n.NewError(ctx, coremsgs.MsgInitializationNilDepError, "MultipartyManager") + } return &multipartyManager{ ctx: ctx, namespace: ns, @@ -87,7 +90,7 @@ func NewMultipartyManager(ctx context.Context, ns string, contracts []Contract, blockchain: bi, operations: om, metrics: mm, - } + }, nil } func (mm *multipartyManager) ConfigureContract(ctx context.Context, contracts *core.FireFlyContracts) (err error) { diff --git a/internal/multiparty/manager_test.go b/internal/multiparty/manager_test.go index 93398c768a..8c4632a678 100644 --- a/internal/multiparty/manager_test.go +++ b/internal/multiparty/manager_test.go @@ -72,8 +72,15 @@ func TestNewMultipartyManager(t *testing.T) { mom := &operationmocks.Manager{} mmi := &metricsmocks.Manager{} contracts := make([]Contract, 0) - nm := NewMultipartyManager(context.Background(), "namespace", contracts, mdi, mbi, mom, mmi) + nm, err := NewMultipartyManager(context.Background(), "namespace", contracts, mdi, mbi, mom, mmi) assert.NotNil(t, nm) + assert.NoError(t, err) +} + +func TestInitFail(t *testing.T) { + contracts := make([]Contract, 0) + _, err := NewMultipartyManager(context.Background(), "namespace", contracts, nil, nil, nil, nil) + assert.Regexp(t, "FF10128", err) } func TestConfigureContract(t *testing.T) { diff --git a/internal/orchestrator/orchestrator.go b/internal/orchestrator/orchestrator.go index ab2909196b..069731ae41 100644 --- a/internal/orchestrator/orchestrator.go +++ b/internal/orchestrator/orchestrator.go @@ -389,17 +389,6 @@ func (or *orchestrator) initPlugins(ctx context.Context) (err error) { func (or *orchestrator) initComponents(ctx context.Context) (err error) { - if or.multiparty == nil { - or.multiparty = multiparty.NewMultipartyManager(or.ctx, or.namespace, or.config.Multiparty.Contracts, or.database(), or.blockchain(), or.operations, or.metrics) - } - - if or.identity == nil { - or.identity, err = identity.NewIdentityManager(ctx, or.namespace, or.config.DefaultKey, or.config.Multiparty.OrgName, or.config.Multiparty.OrgKey, or.database(), or.blockchain(), or.multiparty) - if err != nil { - return err - } - } - if or.data == nil { or.data, err = data.NewDataManager(ctx, or.namespace, or.database(), or.sharedstorage(), or.dataexchange()) if err != nil { @@ -417,6 +406,20 @@ func (or *orchestrator) initComponents(ctx context.Context) (err error) { } } + if or.multiparty == nil { + or.multiparty, err = multiparty.NewMultipartyManager(or.ctx, or.namespace, or.config.Multiparty.Contracts, or.database(), or.blockchain(), or.operations, or.metrics) + if err != nil { + return err + } + } + + if or.identity == nil { + or.identity, err = identity.NewIdentityManager(ctx, or.namespace, or.config.DefaultKey, or.config.Multiparty.OrgName, or.config.Multiparty.OrgKey, or.database(), or.blockchain(), or.multiparty) + if err != nil { + return err + } + } + or.syncasync = syncasync.NewSyncAsyncBridge(ctx, or.namespace, or.database(), or.data) if or.batch == nil { diff --git a/internal/orchestrator/orchestrator_test.go b/internal/orchestrator/orchestrator_test.go index c04025e61d..38e4d92974 100644 --- a/internal/orchestrator/orchestrator_test.go +++ b/internal/orchestrator/orchestrator_test.go @@ -260,11 +260,11 @@ func TestInitNetworkMapComponentFail(t *testing.T) { assert.Regexp(t, "FF10128", err) } -func TestInitOperationComponentFail(t *testing.T) { +func TestInitMultipartyComponentFail(t *testing.T) { or := newTestOrchestrator() defer or.cleanup(t) or.plugins.Database.Plugin = nil - or.operations = nil + or.multiparty = nil err := or.initComponents(context.Background()) assert.Regexp(t, "FF10128", err) } @@ -310,7 +310,6 @@ func TestInitIdentityComponentFail(t *testing.T) { defer or.cleanup(t) or.plugins.Database.Plugin = nil or.identity = nil - or.multiparty = nil err := or.initComponents(context.Background()) assert.Regexp(t, "FF10128", err) } diff --git a/internal/privatemessaging/privatemessaging.go b/internal/privatemessaging/privatemessaging.go index d7ce7b46b4..8aa8b675fa 100644 --- a/internal/privatemessaging/privatemessaging.go +++ b/internal/privatemessaging/privatemessaging.go @@ -87,7 +87,7 @@ type blobTransferTracker struct { } func NewPrivateMessaging(ctx context.Context, ns string, di database.Plugin, dx dataexchange.Plugin, bi blockchain.Plugin, im identity.Manager, ba batch.Manager, dm data.Manager, sa syncasync.Bridge, mult multiparty.Manager, mm metrics.Manager, om operations.Manager) (Manager, error) { - if di == nil || im == nil || dx == nil || bi == nil || ba == nil || dm == nil || mm == nil || om == nil { + if di == nil || im == nil || dx == nil || bi == nil || ba == nil || dm == nil || mm == nil || om == nil || mult == nil { return nil, i18n.NewError(ctx, coremsgs.MsgInitializationNilDepError, "PrivateMessaging") } From 714855c3b6ab5e51a472f598eb17c21c3783dcf4 Mon Sep 17 00:00:00 2001 From: Andrew Richardson Date: Fri, 24 Jun 2022 13:18:10 -0400 Subject: [PATCH 3/3] Add missing RegisterHandler call Signed-off-by: Andrew Richardson --- internal/multiparty/manager.go | 14 ++++++++++++-- internal/multiparty/manager_test.go | 4 ++++ mocks/multipartymocks/manager.go | 14 ++++++++++++++ 3 files changed, 30 insertions(+), 2 deletions(-) diff --git a/internal/multiparty/manager.go b/internal/multiparty/manager.go index 5bfa3de334..7616b0a4d8 100644 --- a/internal/multiparty/manager.go +++ b/internal/multiparty/manager.go @@ -32,6 +32,8 @@ import ( ) type Manager interface { + core.Named + // ConfigureContract initializes the subscription to the FireFly contract // - Checks the provided contract info against the plugin's configuration, and updates it as needed // - Initializes the contract info for performing BatchPin transactions, and initializes subscriptions for BatchPin events @@ -82,7 +84,7 @@ func NewMultipartyManager(ctx context.Context, ns string, contracts []Contract, if di == nil || bi == nil || mm == nil || om == nil { return nil, i18n.NewError(ctx, coremsgs.MsgInitializationNilDepError, "MultipartyManager") } - return &multipartyManager{ + mp := &multipartyManager{ ctx: ctx, namespace: ns, contracts: contracts, @@ -90,7 +92,15 @@ func NewMultipartyManager(ctx context.Context, ns string, contracts []Contract, blockchain: bi, operations: om, metrics: mm, - }, nil + } + om.RegisterHandler(ctx, mp, []core.OpType{ + core.OpTypeBlockchainPinBatch, + }) + return mp, nil +} + +func (mm *multipartyManager) Name() string { + return "MultipartyManager" } func (mm *multipartyManager) ConfigureContract(ctx context.Context, contracts *core.FireFlyContracts) (err error) { diff --git a/internal/multiparty/manager_test.go b/internal/multiparty/manager_test.go index 8c4632a678..0557361e98 100644 --- a/internal/multiparty/manager_test.go +++ b/internal/multiparty/manager_test.go @@ -72,9 +72,13 @@ func TestNewMultipartyManager(t *testing.T) { mom := &operationmocks.Manager{} mmi := &metricsmocks.Manager{} contracts := make([]Contract, 0) + mom.On("RegisterHandler", mock.Anything, mock.Anything, []core.OpType{ + core.OpTypeBlockchainPinBatch, + }).Return() nm, err := NewMultipartyManager(context.Background(), "namespace", contracts, mdi, mbi, mom, mmi) assert.NotNil(t, nm) assert.NoError(t, err) + assert.Equal(t, "MultipartyManager", nm.Name()) } func TestInitFail(t *testing.T) { diff --git a/mocks/multipartymocks/manager.go b/mocks/multipartymocks/manager.go index 7722553bdc..3b584dd664 100644 --- a/mocks/multipartymocks/manager.go +++ b/mocks/multipartymocks/manager.go @@ -47,6 +47,20 @@ func (_m *Manager) GetNetworkVersion() int { return r0 } +// Name provides a mock function with given fields: +func (_m *Manager) Name() string { + ret := _m.Called() + + var r0 string + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} + // PrepareOperation provides a mock function with given fields: ctx, op func (_m *Manager) PrepareOperation(ctx context.Context, op *core.Operation) (*core.PreparedOperation, error) { ret := _m.Called(ctx, op)