diff --git a/internal/assets/manager.go b/internal/assets/manager.go index e1f3633087..c37feda55f 100644 --- a/internal/assets/manager.go +++ b/internal/assets/manager.go @@ -82,14 +82,14 @@ type assetManager struct { keyNormalization int } -func NewAssetManager(ctx context.Context, di database.Plugin, im identity.Manager, dm data.Manager, sa syncasync.Bridge, bm broadcast.Manager, pm privatemessaging.Manager, ti map[string]tokens.Plugin, mm metrics.Manager, om operations.Manager) (Manager, error) { +func NewAssetManager(ctx context.Context, di database.Plugin, im identity.Manager, dm data.Manager, sa syncasync.Bridge, bm broadcast.Manager, pm privatemessaging.Manager, ti map[string]tokens.Plugin, mm metrics.Manager, om operations.Manager, txHelper txcommon.Helper) (Manager, error) { if di == nil || im == nil || sa == nil || bm == nil || pm == nil || ti == nil || mm == nil || om == nil { return nil, i18n.NewError(ctx, i18n.MsgInitializationNilDepError) } am := &assetManager{ ctx: ctx, database: di, - txHelper: txcommon.NewTransactionHelper(di), + txHelper: txHelper, identity: im, data: dm, syncasync: sa, diff --git a/internal/assets/manager_test.go b/internal/assets/manager_test.go index b32da3d7e4..5eed3e17fa 100644 --- a/internal/assets/manager_test.go +++ b/internal/assets/manager_test.go @@ -19,6 +19,7 @@ import ( "testing" "github.com/hyperledger/firefly/internal/config" + "github.com/hyperledger/firefly/internal/txcommon" "github.com/hyperledger/firefly/mocks/broadcastmocks" "github.com/hyperledger/firefly/mocks/databasemocks" "github.com/hyperledger/firefly/mocks/datamocks" @@ -47,11 +48,12 @@ func newTestAssets(t *testing.T) (*assetManager, func()) { mti := &tokenmocks.Plugin{} mm := &metricsmocks.Manager{} mom := &operationmocks.Manager{} + txHelper := txcommon.NewTransactionHelper(mdi, mdm) mti.On("Name").Return("ut_tokens").Maybe() mm.On("IsMetricsEnabled").Return(false) mom.On("RegisterHandler", mock.Anything, mock.Anything, mock.Anything) ctx, cancel := context.WithCancel(context.Background()) - a, err := NewAssetManager(ctx, mdi, mim, mdm, msa, mbm, mpm, map[string]tokens.Plugin{"magic-tokens": mti}, mm, mom) + a, err := NewAssetManager(ctx, mdi, mim, mdm, msa, mbm, mpm, map[string]tokens.Plugin{"magic-tokens": mti}, mm, mom, txHelper) rag := mdi.On("RunAsGroup", mock.Anything, mock.Anything).Maybe() rag.RunFn = func(a mock.Arguments) { rag.ReturnArguments = mock.Arguments{a[1].(func(context.Context) error)(a[0].(context.Context))} @@ -73,12 +75,13 @@ func newTestAssetsWithMetrics(t *testing.T) (*assetManager, func()) { mti := &tokenmocks.Plugin{} mm := &metricsmocks.Manager{} mom := &operationmocks.Manager{} + txHelper := txcommon.NewTransactionHelper(mdi, mdm) mti.On("Name").Return("ut_tokens").Maybe() mm.On("IsMetricsEnabled").Return(true) mm.On("TransferSubmitted", mock.Anything) mom.On("RegisterHandler", mock.Anything, mock.Anything, mock.Anything) ctx, cancel := context.WithCancel(context.Background()) - a, err := NewAssetManager(ctx, mdi, mim, mdm, msa, mbm, mpm, map[string]tokens.Plugin{"magic-tokens": mti}, mm, mom) + a, err := NewAssetManager(ctx, mdi, mim, mdm, msa, mbm, mpm, map[string]tokens.Plugin{"magic-tokens": mti}, mm, mom, txHelper) rag := mdi.On("RunAsGroup", mock.Anything, mock.Anything).Maybe() rag.RunFn = func(a mock.Arguments) { rag.ReturnArguments = mock.Arguments{a[1].(func(context.Context) error)(a[0].(context.Context))} @@ -90,7 +93,7 @@ func newTestAssetsWithMetrics(t *testing.T) (*assetManager, func()) { } func TestInitFail(t *testing.T) { - _, err := NewAssetManager(context.Background(), nil, nil, nil, nil, nil, nil, nil, nil, nil) + _, err := NewAssetManager(context.Background(), nil, nil, nil, nil, nil, nil, nil, nil, nil, nil) assert.Regexp(t, "FF10128", err) } diff --git a/internal/batch/batch_manager.go b/internal/batch/batch_manager.go index 1197b2c5a3..1f2ac337b7 100644 --- a/internal/batch/batch_manager.go +++ b/internal/batch/batch_manager.go @@ -28,11 +28,12 @@ import ( "github.com/hyperledger/firefly/internal/log" "github.com/hyperledger/firefly/internal/retry" "github.com/hyperledger/firefly/internal/sysmessaging" + "github.com/hyperledger/firefly/internal/txcommon" "github.com/hyperledger/firefly/pkg/database" "github.com/hyperledger/firefly/pkg/fftypes" ) -func NewBatchManager(ctx context.Context, ni sysmessaging.LocalNodeInfo, di database.Plugin, dm data.Manager) (Manager, error) { +func NewBatchManager(ctx context.Context, ni sysmessaging.LocalNodeInfo, di database.Plugin, dm data.Manager, txHelper txcommon.Helper) (Manager, error) { if di == nil || dm == nil { return nil, i18n.NewError(ctx, i18n.MsgInitializationNilDepError) } @@ -44,11 +45,13 @@ func NewBatchManager(ctx context.Context, ni sysmessaging.LocalNodeInfo, di data ni: ni, database: di, data: dm, + txHelper: txHelper, readOffset: -1, // On restart we trawl for all ready messages readPageSize: uint64(readPageSize), messagePollTimeout: config.GetDuration(config.BatchManagerReadPollTimeout), startupOffsetRetryAttempts: config.GetInt(config.OrchestratorStartupAttempts), - dispatchers: make(map[string]*dispatcher), + dispatcherMap: make(map[string]*dispatcher), + allDispatchers: make([]*dispatcher, 0), newMessages: make(chan int64, 1), done: make(chan struct{}), retry: &retry.Retry{ @@ -85,8 +88,10 @@ type batchManager struct { ni sysmessaging.LocalNodeInfo database database.Plugin data data.Manager + txHelper txcommon.Helper dispatcherMux sync.Mutex - dispatchers map[string]*dispatcher + dispatcherMap map[string]*dispatcher + allDispatchers []*dispatcher newMessages chan int64 done chan struct{} retry *retry.Retry @@ -122,14 +127,18 @@ func (bm *batchManager) getDispatcherKey(txType fftypes.TransactionType, msgType } func (bm *batchManager) RegisterDispatcher(name string, txType fftypes.TransactionType, msgTypes []fftypes.MessageType, handler DispatchHandler, options DispatcherOptions) { + bm.dispatcherMux.Lock() + defer bm.dispatcherMux.Unlock() + dispatcher := &dispatcher{ name: name, handler: handler, options: options, processors: make(map[string]*batchProcessor), } + bm.allDispatchers = append(bm.allDispatchers, dispatcher) for _, msgType := range msgTypes { - bm.dispatchers[bm.getDispatcherKey(txType, msgType)] = dispatcher + bm.dispatcherMap[bm.getDispatcherKey(txType, msgType)] = dispatcher } } @@ -147,7 +156,7 @@ func (bm *batchManager) getProcessor(txType fftypes.TransactionType, msgType fft defer bm.dispatcherMux.Unlock() dispatcherKey := bm.getDispatcherKey(txType, msgType) - dispatcher, ok := bm.dispatchers[dispatcherKey] + dispatcher, ok := bm.dispatcherMap[dispatcherKey] if !ok { return nil, i18n.NewError(bm.ctx, i18n.MsgUnregisteredBatchType, dispatcherKey) } @@ -170,6 +179,7 @@ func (bm *batchManager) getProcessor(txType fftypes.TransactionType, msgType fft dispatch: dispatcher.handler, }, bm.retry, + bm.txHelper, ) dispatcher.processors[name] = processor } @@ -316,7 +326,7 @@ func (bm *batchManager) dispatchMessage(processor *batchProcessor, msg *fftypes. func (bm *batchManager) reapQuiescing() { bm.dispatcherMux.Lock() var reaped []*batchProcessor - for _, d := range bm.dispatchers { + for _, d := range bm.allDispatchers { for k, p := range d.processors { select { case <-p.quescing: @@ -344,7 +354,7 @@ func (bm *batchManager) getProcessors() []*batchProcessor { defer bm.dispatcherMux.Unlock() var processors []*batchProcessor - for _, d := range bm.dispatchers { + for _, d := range bm.allDispatchers { for _, p := range d.processors { processors = append(processors, p) } diff --git a/internal/batch/batch_manager_test.go b/internal/batch/batch_manager_test.go index 26c7155999..90eed813d1 100644 --- a/internal/batch/batch_manager_test.go +++ b/internal/batch/batch_manager_test.go @@ -25,6 +25,7 @@ import ( "github.com/hyperledger/firefly/internal/config" "github.com/hyperledger/firefly/internal/data" "github.com/hyperledger/firefly/internal/log" + "github.com/hyperledger/firefly/internal/txcommon" "github.com/hyperledger/firefly/mocks/databasemocks" "github.com/hyperledger/firefly/mocks/datamocks" "github.com/hyperledger/firefly/mocks/sysmessagingmocks" @@ -41,6 +42,7 @@ func TestE2EDispatchBroadcast(t *testing.T) { mdi := &databasemocks.Plugin{} mdm := &datamocks.Manager{} mni := &sysmessagingmocks.LocalNodeInfo{} + txHelper := txcommon.NewTransactionHelper(mdi, mdm) mni.On("GetNodeUUID", mock.Anything).Return(fftypes.NewUUID()) readyForDispatch := make(chan bool) waitForDispatch := make(chan *DispatchState) @@ -70,7 +72,7 @@ func TestE2EDispatchBroadcast(t *testing.T) { return nil } ctx, cancel := context.WithCancel(context.Background()) - bmi, _ := NewBatchManager(ctx, mni, mdi, mdm) + bmi, _ := NewBatchManager(ctx, mni, mdi, mdm, txHelper) bm := bmi.(*batchManager) bm.RegisterDispatcher("utdispatcher", fftypes.TransactionTypeBatchPin, []fftypes.MessageType{fftypes.MessageTypeBroadcast}, handler, DispatcherOptions{ @@ -155,6 +157,7 @@ func TestE2EDispatchPrivateUnpinned(t *testing.T) { mdi := &databasemocks.Plugin{} mdm := &datamocks.Manager{} mni := &sysmessagingmocks.LocalNodeInfo{} + txHelper := txcommon.NewTransactionHelper(mdi, mdm) mni.On("GetNodeUUID", mock.Anything).Return(fftypes.NewUUID()) readyForDispatch := make(chan bool) waitForDispatch := make(chan *DispatchState) @@ -187,7 +190,7 @@ func TestE2EDispatchPrivateUnpinned(t *testing.T) { return nil } ctx, cancel := context.WithCancel(context.Background()) - bmi, _ := NewBatchManager(ctx, mni, mdi, mdm) + bmi, _ := NewBatchManager(ctx, mni, mdi, mdm, txHelper) bm := bmi.(*batchManager) bm.RegisterDispatcher("utdispatcher", fftypes.TransactionTypeBatchPin, []fftypes.MessageType{fftypes.MessageTypePrivate}, handler, DispatcherOptions{ @@ -268,8 +271,9 @@ func TestDispatchUnknownType(t *testing.T) { mdi := &databasemocks.Plugin{} mdm := &datamocks.Manager{} mni := &sysmessagingmocks.LocalNodeInfo{} + txHelper := txcommon.NewTransactionHelper(mdi, mdm) ctx, cancel := context.WithCancel(context.Background()) - bmi, _ := NewBatchManager(ctx, mni, mdi, mdm) + bmi, _ := NewBatchManager(ctx, mni, mdi, mdm, txHelper) bm := bmi.(*batchManager) msg := &fftypes.Message{} @@ -285,7 +289,7 @@ func TestDispatchUnknownType(t *testing.T) { } func TestInitFailNoPersistence(t *testing.T) { - _, err := NewBatchManager(context.Background(), nil, nil, nil) + _, err := NewBatchManager(context.Background(), nil, nil, nil, nil) assert.Error(t, err) } @@ -294,7 +298,8 @@ func TestGetInvalidBatchTypeMsg(t *testing.T) { mdi := &databasemocks.Plugin{} mdm := &datamocks.Manager{} mni := &sysmessagingmocks.LocalNodeInfo{} - bm, _ := NewBatchManager(context.Background(), mni, mdi, mdm) + txHelper := txcommon.NewTransactionHelper(mdi, mdm) + bm, _ := NewBatchManager(context.Background(), mni, mdi, mdm, txHelper) defer bm.Close() _, err := bm.(*batchManager).getProcessor(fftypes.BatchTypeBroadcast, "wrong", nil, "ns1", &fftypes.SignerRef{}) assert.Regexp(t, "FF10126", err) @@ -304,8 +309,9 @@ func TestMessageSequencerCancelledContext(t *testing.T) { mdi := &databasemocks.Plugin{} mdm := &datamocks.Manager{} mni := &sysmessagingmocks.LocalNodeInfo{} + txHelper := txcommon.NewTransactionHelper(mdi, mdm) mdi.On("GetMessages", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil, fmt.Errorf("pop")) - bm, _ := NewBatchManager(context.Background(), mni, mdi, mdm) + bm, _ := NewBatchManager(context.Background(), mni, mdi, mdm, txHelper) defer bm.Close() ctx, cancel := context.WithCancel(context.Background()) cancel() @@ -318,7 +324,8 @@ func TestMessageSequencerMissingMessageData(t *testing.T) { mdi := &databasemocks.Plugin{} mdm := &datamocks.Manager{} mni := &sysmessagingmocks.LocalNodeInfo{} - bm, _ := NewBatchManager(context.Background(), mni, mdi, mdm) + txHelper := txcommon.NewTransactionHelper(mdi, mdm) + bm, _ := NewBatchManager(context.Background(), mni, mdi, mdm, txHelper) bm.RegisterDispatcher("utdispatcher", fftypes.TransactionTypeNone, []fftypes.MessageType{fftypes.MessageTypeBroadcast}, func(c context.Context, state *DispatchState) error { return nil @@ -359,9 +366,10 @@ func TestMessageSequencerUpdateMessagesFail(t *testing.T) { mdi := &databasemocks.Plugin{} mdm := &datamocks.Manager{} mni := &sysmessagingmocks.LocalNodeInfo{} + txHelper := txcommon.NewTransactionHelper(mdi, mdm) mni.On("GetNodeUUID", mock.Anything).Return(fftypes.NewUUID()) ctx, cancelCtx := context.WithCancel(context.Background()) - bm, _ := NewBatchManager(ctx, mni, mdi, mdm) + bm, _ := NewBatchManager(ctx, mni, mdi, mdm, txHelper) bm.RegisterDispatcher("utdispatcher", fftypes.TransactionTypeBatchPin, []fftypes.MessageType{fftypes.MessageTypeBroadcast}, func(c context.Context, state *DispatchState) error { return nil @@ -414,8 +422,9 @@ func TestMessageSequencerDispatchFail(t *testing.T) { mdm := &datamocks.Manager{} mni := &sysmessagingmocks.LocalNodeInfo{} mni.On("GetNodeUUID", mock.Anything).Return(fftypes.NewUUID()) + txHelper := txcommon.NewTransactionHelper(mdi, mdm) ctx, cancelCtx := context.WithCancel(context.Background()) - bm, _ := NewBatchManager(ctx, mni, mdi, mdm) + bm, _ := NewBatchManager(ctx, mni, mdi, mdm, txHelper) bm.RegisterDispatcher("utdispatcher", fftypes.TransactionTypeBatchPin, []fftypes.MessageType{fftypes.MessageTypeBroadcast}, func(c context.Context, state *DispatchState) error { cancelCtx() @@ -454,7 +463,8 @@ func TestMessageSequencerUpdateBatchFail(t *testing.T) { mni := &sysmessagingmocks.LocalNodeInfo{} mni.On("GetNodeUUID", mock.Anything).Return(fftypes.NewUUID()) ctx, cancelCtx := context.WithCancel(context.Background()) - bm, _ := NewBatchManager(ctx, mni, mdi, mdm) + txHelper := txcommon.NewTransactionHelper(mdi, mdm) + bm, _ := NewBatchManager(ctx, mni, mdi, mdm, txHelper) bm.RegisterDispatcher("utdispatcher", fftypes.TransactionTypeBatchPin, []fftypes.MessageType{fftypes.MessageTypeBroadcast}, func(c context.Context, state *DispatchState) error { return nil @@ -504,7 +514,8 @@ func TestWaitForPollTimeout(t *testing.T) { mdi := &databasemocks.Plugin{} mdm := &datamocks.Manager{} mni := &sysmessagingmocks.LocalNodeInfo{} - bm, _ := NewBatchManager(context.Background(), mni, mdi, mdm) + txHelper := txcommon.NewTransactionHelper(mdi, mdm) + bm, _ := NewBatchManager(context.Background(), mni, mdi, mdm, txHelper) bm.(*batchManager).messagePollTimeout = 1 * time.Microsecond bm.(*batchManager).waitForNewMessages() } @@ -513,7 +524,8 @@ func TestWaitForNewMessage(t *testing.T) { mdi := &databasemocks.Plugin{} mdm := &datamocks.Manager{} mni := &sysmessagingmocks.LocalNodeInfo{} - bmi, _ := NewBatchManager(context.Background(), mni, mdi, mdm) + txHelper := txcommon.NewTransactionHelper(mdi, mdm) + bmi, _ := NewBatchManager(context.Background(), mni, mdi, mdm, txHelper) bm := bmi.(*batchManager) bm.readOffset = 22222 bm.NewMessages() <- 12345 @@ -525,7 +537,8 @@ func TestAssembleMessageDataNilData(t *testing.T) { mdi := &databasemocks.Plugin{} mdm := &datamocks.Manager{} mni := &sysmessagingmocks.LocalNodeInfo{} - bm, _ := NewBatchManager(context.Background(), mni, mdi, mdm) + txHelper := txcommon.NewTransactionHelper(mdi, mdm) + bm, _ := NewBatchManager(context.Background(), mni, mdi, mdm, txHelper) bm.Close() mdm.On("GetMessageDataCached", mock.Anything, mock.Anything).Return(nil, false, nil) _, err := bm.(*batchManager).assembleMessageData(fftypes.BatchTypePrivate, &fftypes.Message{ @@ -541,7 +554,8 @@ func TestGetMessageDataFail(t *testing.T) { mdi := &databasemocks.Plugin{} mdm := &datamocks.Manager{} mni := &sysmessagingmocks.LocalNodeInfo{} - bm, _ := NewBatchManager(context.Background(), mni, mdi, mdm) + txHelper := txcommon.NewTransactionHelper(mdi, mdm) + bm, _ := NewBatchManager(context.Background(), mni, mdi, mdm, txHelper) mdm.On("GetMessageDataCached", mock.Anything, mock.Anything).Return(nil, false, fmt.Errorf("pop")) bm.Close() _, _ = bm.(*batchManager).assembleMessageData(fftypes.BatchTypePrivate, &fftypes.Message{ @@ -559,7 +573,8 @@ func TestGetMessageNotFound(t *testing.T) { mdi := &databasemocks.Plugin{} mdm := &datamocks.Manager{} mni := &sysmessagingmocks.LocalNodeInfo{} - bm, _ := NewBatchManager(context.Background(), mni, mdi, mdm) + txHelper := txcommon.NewTransactionHelper(mdi, mdm) + bm, _ := NewBatchManager(context.Background(), mni, mdi, mdm, txHelper) mdm.On("GetMessageDataCached", mock.Anything, mock.Anything, data.CRORequirePublicBlobRefs).Return(nil, false, nil) bm.Close() _, err := bm.(*batchManager).assembleMessageData(fftypes.BatchTypeBroadcast, &fftypes.Message{ diff --git a/internal/batch/batch_processor.go b/internal/batch/batch_processor.go index 40ed6e7c27..9e521a001e 100644 --- a/internal/batch/batch_processor.go +++ b/internal/batch/batch_processor.go @@ -101,7 +101,7 @@ type DispatchState struct { const batchSizeEstimateBase = int64(512) -func newBatchProcessor(ctx context.Context, ni sysmessaging.LocalNodeInfo, di database.Plugin, dm data.Manager, conf *batchProcessorConf, baseRetryConf *retry.Retry) *batchProcessor { +func newBatchProcessor(ctx context.Context, ni sysmessaging.LocalNodeInfo, di database.Plugin, dm data.Manager, conf *batchProcessorConf, baseRetryConf *retry.Retry, txHelper txcommon.Helper) *batchProcessor { pCtx := log.WithLogField(log.WithLogField(ctx, "d", conf.dispatcherName), "p", conf.name) pCtx, cancelCtx := context.WithCancel(pCtx) bp := &batchProcessor{ @@ -110,7 +110,7 @@ func newBatchProcessor(ctx context.Context, ni sysmessaging.LocalNodeInfo, di da ni: ni, database: di, data: dm, - txHelper: txcommon.NewTransactionHelper(di), + txHelper: txHelper, newWork: make(chan *batchWork, conf.BatchMaxSize), quescing: make(chan bool, 1), done: make(chan struct{}), diff --git a/internal/batch/batch_processor_test.go b/internal/batch/batch_processor_test.go index 136b89e310..54c5f6865c 100644 --- a/internal/batch/batch_processor_test.go +++ b/internal/batch/batch_processor_test.go @@ -23,6 +23,7 @@ import ( "github.com/hyperledger/firefly/internal/config" "github.com/hyperledger/firefly/internal/log" "github.com/hyperledger/firefly/internal/retry" + "github.com/hyperledger/firefly/internal/txcommon" "github.com/hyperledger/firefly/mocks/databasemocks" "github.com/hyperledger/firefly/mocks/datamocks" "github.com/hyperledger/firefly/mocks/sysmessagingmocks" @@ -36,6 +37,7 @@ func newTestBatchProcessor(dispatch DispatchHandler) (*databasemocks.Plugin, *ba mdi := &databasemocks.Plugin{} mni := &sysmessagingmocks.LocalNodeInfo{} mdm := &datamocks.Manager{} + txHelper := txcommon.NewTransactionHelper(mdi, mdm) mni.On("GetNodeUUID", mock.Anything).Return(fftypes.NewUUID()).Maybe() bp := newBatchProcessor(context.Background(), mni, mdi, mdm, &batchProcessorConf{ namespace: "ns1", @@ -51,7 +53,7 @@ func newTestBatchProcessor(dispatch DispatchHandler) (*databasemocks.Plugin, *ba }, &retry.Retry{ InitialDelay: 1 * time.Microsecond, MaximumDelay: 1 * time.Microsecond, - }) + }, txHelper) bp.txHelper = &txcommonmocks.Helper{} return mdi, bp } diff --git a/internal/config/config.go b/internal/config/config.go index 279f90f909..01d5c5d742 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -230,6 +230,10 @@ var ( SubscriptionsRetryMaxDelay = rootKey("subscription.retry.maxDelay") // SubscriptionsRetryFactor the backoff factor to use for retry of database operations SubscriptionsRetryFactor = rootKey("subscription.retry.factor") + // TransactionCacheSize + TransactionCacheSize = rootKey("transaction.cache.size") + // TransactionCacheTTL + TransactionCacheTTL = rootKey("transaction.cache.ttl") // AssetManagerKeyNormalization mechanism to normalize keys before using them. Valid options: "blockchain_plugin" - use blockchain plugin (default), "none" - do not attempt normalization AssetManagerKeyNormalization = rootKey("asset.manager.keyNormalization") // UIEnabled set to false to disable the UI (default is true, so UI will be enabled if ui.path is valid) @@ -365,6 +369,8 @@ func Reset() { viper.SetDefault(string(SubscriptionsRetryInitialDelay), "250ms") viper.SetDefault(string(SubscriptionsRetryMaxDelay), "30s") viper.SetDefault(string(SubscriptionsRetryFactor), 2.0) + viper.SetDefault(string(TransactionCacheSize), "1Mb") + viper.SetDefault(string(TransactionCacheTTL), "5m") viper.SetDefault(string(UIEnabled), true) viper.SetDefault(string(ValidatorCacheSize), "1Mb") viper.SetDefault(string(ValidatorCacheTTL), "1h") diff --git a/internal/contracts/manager.go b/internal/contracts/manager.go index 0b6d737348..8083538809 100644 --- a/internal/contracts/manager.go +++ b/internal/contracts/manager.go @@ -70,7 +70,7 @@ type contractManager struct { operations operations.Manager } -func NewContractManager(ctx context.Context, di database.Plugin, bm broadcast.Manager, im identity.Manager, bi blockchain.Plugin, om operations.Manager) (Manager, error) { +func NewContractManager(ctx context.Context, di database.Plugin, bm broadcast.Manager, im identity.Manager, bi blockchain.Plugin, om operations.Manager, txHelper txcommon.Helper) (Manager, error) { if di == nil || bm == nil || im == nil || bi == nil || om == nil { return nil, i18n.NewError(ctx, i18n.MsgInitializationNilDepError) } @@ -81,7 +81,7 @@ func NewContractManager(ctx context.Context, di database.Plugin, bm broadcast.Ma cm := &contractManager{ database: di, - txHelper: txcommon.NewTransactionHelper(di), + txHelper: txHelper, broadcast: bm, identity: im, blockchain: bi, diff --git a/internal/contracts/manager_test.go b/internal/contracts/manager_test.go index aa28114736..3ac695daf9 100644 --- a/internal/contracts/manager_test.go +++ b/internal/contracts/manager_test.go @@ -23,9 +23,11 @@ import ( "github.com/hyperledger/firefly/internal/blockchain/ethereum" "github.com/hyperledger/firefly/internal/identity" + "github.com/hyperledger/firefly/internal/txcommon" "github.com/hyperledger/firefly/mocks/blockchainmocks" "github.com/hyperledger/firefly/mocks/broadcastmocks" "github.com/hyperledger/firefly/mocks/databasemocks" + "github.com/hyperledger/firefly/mocks/datamocks" "github.com/hyperledger/firefly/mocks/identitymanagermocks" "github.com/hyperledger/firefly/mocks/operationmocks" "github.com/hyperledger/firefly/mocks/txcommonmocks" @@ -37,29 +39,31 @@ import ( ) func newTestContractManager() *contractManager { - mdb := &databasemocks.Plugin{} + mdi := &databasemocks.Plugin{} + mdm := &datamocks.Manager{} mbm := &broadcastmocks.Manager{} mim := &identitymanagermocks.Manager{} mbi := &blockchainmocks.Plugin{} mom := &operationmocks.Manager{} + txHelper := txcommon.NewTransactionHelper(mdi, mdm) mbi.On("GetFFIParamValidator", mock.Anything).Return(nil, nil) mom.On("RegisterHandler", mock.Anything, mock.Anything, mock.Anything) mbi.On("Name").Return("mockblockchain").Maybe() - rag := mdb.On("RunAsGroup", mock.Anything, mock.Anything).Maybe() + rag := mdi.On("RunAsGroup", mock.Anything, mock.Anything).Maybe() rag.RunFn = func(a mock.Arguments) { rag.ReturnArguments = mock.Arguments{ a[1].(func(context.Context) error)(a[0].(context.Context)), } } - cm, _ := NewContractManager(context.Background(), mdb, mbm, mim, mbi, mom) + cm, _ := NewContractManager(context.Background(), mdi, mbm, mim, mbi, mom, txHelper) cm.(*contractManager).txHelper = &txcommonmocks.Helper{} return cm.(*contractManager) } func TestNewContractManagerFail(t *testing.T) { - _, err := NewContractManager(context.Background(), nil, nil, nil, nil, nil) + _, err := NewContractManager(context.Background(), nil, nil, nil, nil, nil, nil) assert.Regexp(t, "FF10128", err) } @@ -69,25 +73,29 @@ func TestName(t *testing.T) { } func TestNewContractManagerFFISchemaLoaderFail(t *testing.T) { - mdb := &databasemocks.Plugin{} + mdi := &databasemocks.Plugin{} + mdm := &datamocks.Manager{} mbm := &broadcastmocks.Manager{} mim := &identitymanagermocks.Manager{} mbi := &blockchainmocks.Plugin{} mom := &operationmocks.Manager{} + txHelper := txcommon.NewTransactionHelper(mdi, mdm) mbi.On("GetFFIParamValidator", mock.Anything).Return(nil, fmt.Errorf("pop")) - _, err := NewContractManager(context.Background(), mdb, mbm, mim, mbi, mom) + _, err := NewContractManager(context.Background(), mdi, mbm, mim, mbi, mom, txHelper) assert.Regexp(t, "pop", err) } func TestNewContractManagerFFISchemaLoader(t *testing.T) { - mdb := &databasemocks.Plugin{} + mdi := &databasemocks.Plugin{} + mdm := &datamocks.Manager{} mbm := &broadcastmocks.Manager{} mim := &identitymanagermocks.Manager{} mbi := &blockchainmocks.Plugin{} mom := &operationmocks.Manager{} + txHelper := txcommon.NewTransactionHelper(mdi, mdm) mbi.On("GetFFIParamValidator", mock.Anything).Return(ðereum.FFIParamValidator{}, nil) mom.On("RegisterHandler", mock.Anything, mock.Anything, mock.Anything) - _, err := NewContractManager(context.Background(), mdb, mbm, mim, mbi, mom) + _, err := NewContractManager(context.Background(), mdi, mbm, mim, mbi, mom, txHelper) assert.NoError(t, err) } diff --git a/internal/events/event_dispatcher.go b/internal/events/event_dispatcher.go index 95adc12a38..7695ed1a24 100644 --- a/internal/events/event_dispatcher.go +++ b/internal/events/event_dispatcher.go @@ -66,7 +66,7 @@ type eventDispatcher struct { txHelper txcommon.Helper } -func newEventDispatcher(ctx context.Context, ei events.Plugin, di database.Plugin, dm data.Manager, sh definitions.DefinitionHandlers, connID string, sub *subscription, en *eventNotifier, cel *changeEventListener) *eventDispatcher { +func newEventDispatcher(ctx context.Context, ei events.Plugin, di database.Plugin, dm data.Manager, sh definitions.DefinitionHandlers, connID string, sub *subscription, en *eventNotifier, cel *changeEventListener, txHelper txcommon.Helper) *eventDispatcher { ctx, cancelCtx := context.WithCancel(ctx) readAhead := config.GetUint(config.SubscriptionDefaultsReadAhead) if sub.definition.Options.ReadAhead != nil { @@ -94,7 +94,7 @@ func newEventDispatcher(ctx context.Context, ei events.Plugin, di database.Plugi acksNacks: make(chan ackNack), closed: make(chan struct{}), cel: cel, - txHelper: txcommon.NewTransactionHelper(di), + txHelper: txHelper, } pollerConf := &eventPollerConf{ diff --git a/internal/events/event_dispatcher_test.go b/internal/events/event_dispatcher_test.go index 55972e13df..5a43a2b90f 100644 --- a/internal/events/event_dispatcher_test.go +++ b/internal/events/event_dispatcher_test.go @@ -24,6 +24,7 @@ import ( "github.com/hyperledger/firefly/internal/config" "github.com/hyperledger/firefly/internal/log" + "github.com/hyperledger/firefly/internal/txcommon" "github.com/hyperledger/firefly/mocks/databasemocks" "github.com/hyperledger/firefly/mocks/datamocks" "github.com/hyperledger/firefly/mocks/definitionsmocks" @@ -42,8 +43,9 @@ func newTestEventDispatcher(sub *subscription) (*eventDispatcher, func()) { mei.On("Name").Return("ut").Maybe() mdm := &datamocks.Manager{} msh := &definitionsmocks.DefinitionHandlers{} + txHelper := txcommon.NewTransactionHelper(mdi, mdm) ctx, cancel := context.WithCancel(context.Background()) - return newEventDispatcher(ctx, mei, mdi, mdm, msh, fftypes.NewUUID().String(), sub, newEventNotifier(ctx, "ut"), newChangeEventListener(ctx)), func() { + return newEventDispatcher(ctx, mei, mdi, mdm, msh, fftypes.NewUUID().String(), sub, newEventNotifier(ctx, "ut"), newChangeEventListener(ctx), txHelper), func() { cancel() config.Reset() } @@ -157,6 +159,7 @@ func TestEventDispatcherReadAheadOutOfOrderAcks(t *testing.T) { go ed.deliverEvents() mdi := ed.database.(*databasemocks.Plugin) mei := ed.transport.(*eventsmocks.PluginAll) + mdm := ed.data.(*datamocks.Manager) eventDeliveries := make(chan *fftypes.EventDelivery) deliveryRequestMock := mei.On("DeliveryRequest", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) @@ -184,18 +187,18 @@ func TestEventDispatcherReadAheadOutOfOrderAcks(t *testing.T) { offsetUpdates <- v.(int64) } // Setup enrichment - mdi.On("GetMessageByID", mock.Anything, ref1).Return(&fftypes.Message{ + mdm.On("GetMessageWithDataCached", mock.Anything, ref1).Return(&fftypes.Message{ Header: fftypes.MessageHeader{ID: ref1}, - }, nil) - mdi.On("GetMessageByID", mock.Anything, ref2).Return(&fftypes.Message{ + }, nil, true, nil) + mdm.On("GetMessageWithDataCached", mock.Anything, ref2).Return(&fftypes.Message{ Header: fftypes.MessageHeader{ID: ref2}, - }, nil) - mdi.On("GetMessageByID", mock.Anything, ref3).Return(&fftypes.Message{ + }, nil, true, nil) + mdm.On("GetMessageWithDataCached", mock.Anything, ref3).Return(&fftypes.Message{ Header: fftypes.MessageHeader{ID: ref3}, - }, nil) - mdi.On("GetMessageByID", mock.Anything, ref4).Return(&fftypes.Message{ + }, nil, true, nil) + mdm.On("GetMessageWithDataCached", mock.Anything, ref4).Return(&fftypes.Message{ Header: fftypes.MessageHeader{ID: ref4}, - }, nil) + }, nil, true, nil) // Deliver a batch of messages batch1Done := make(chan struct{}) @@ -240,6 +243,7 @@ func TestEventDispatcherReadAheadOutOfOrderAcks(t *testing.T) { mdi.AssertExpectations(t) mei.AssertExpectations(t) + mdm.AssertExpectations(t) } func TestEventDispatcherNoReadAheadInOrder(t *testing.T) { @@ -258,6 +262,7 @@ func TestEventDispatcherNoReadAheadInOrder(t *testing.T) { go ed.deliverEvents() mdi := ed.database.(*databasemocks.Plugin) + mdm := ed.data.(*datamocks.Manager) mei := ed.transport.(*eventsmocks.PluginAll) eventDeliveries := make(chan *fftypes.EventDelivery) @@ -277,18 +282,18 @@ func TestEventDispatcherNoReadAheadInOrder(t *testing.T) { ev4 := fftypes.NewUUID() // Setup enrichment - mdi.On("GetMessageByID", mock.Anything, ref1).Return(&fftypes.Message{ + mdm.On("GetMessageWithDataCached", mock.Anything, ref1).Return(&fftypes.Message{ Header: fftypes.MessageHeader{ID: ref1}, - }, nil) - mdi.On("GetMessageByID", mock.Anything, ref2).Return(&fftypes.Message{ + }, nil, true, nil) + mdm.On("GetMessageWithDataCached", mock.Anything, ref2).Return(&fftypes.Message{ Header: fftypes.MessageHeader{ID: ref2}, - }, nil) - mdi.On("GetMessageByID", mock.Anything, ref3).Return(&fftypes.Message{ + }, nil, true, nil) + mdm.On("GetMessageWithDataCached", mock.Anything, ref3).Return(&fftypes.Message{ Header: fftypes.MessageHeader{ID: ref3}, - }, nil) - mdi.On("GetMessageByID", mock.Anything, ref4).Return(&fftypes.Message{ + }, nil, true, nil) + mdm.On("GetMessageWithDataCached", mock.Anything, ref4).Return(&fftypes.Message{ Header: fftypes.MessageHeader{ID: ref4}, - }, nil) + }, nil, true, nil) // Deliver a batch of messages batch1Done := make(chan struct{}) @@ -329,6 +334,7 @@ func TestEventDispatcherNoReadAheadInOrder(t *testing.T) { mdi.AssertExpectations(t) mei.AssertExpectations(t) + mdm.AssertExpectations(t) } func TestEventDispatcherChangeEvents(t *testing.T) { @@ -405,8 +411,8 @@ func TestEnrichEventsFailGetMessages(t *testing.T) { ed, cancel := newTestEventDispatcher(sub) defer cancel() - mdi := ed.database.(*databasemocks.Plugin) - mdi.On("GetMessageByID", mock.Anything, mock.Anything).Return(nil, fmt.Errorf("pop")) + mdm := ed.data.(*datamocks.Manager) + mdm.On("GetMessageWithDataCached", mock.Anything, mock.Anything).Return(nil, nil, false, fmt.Errorf("pop")) id1 := fftypes.NewUUID() _, err := ed.enrichEvents([]fftypes.LocallySequenced{&fftypes.Event{ID: id1, Type: fftypes.EventTypeMessageConfirmed}}) @@ -841,8 +847,8 @@ func TestBufferedDeliveryEnrichFail(t *testing.T) { ed, cancel := newTestEventDispatcher(sub) defer cancel() - mdi := ed.database.(*databasemocks.Plugin) - mdi.On("GetMessageByID", mock.Anything, mock.Anything).Return(nil, fmt.Errorf("pop")) + mdm := ed.data.(*datamocks.Manager) + mdm.On("GetMessageWithDataCached", mock.Anything, mock.Anything).Return(nil, nil, false, fmt.Errorf("pop")) repoll, err := ed.bufferedDelivery([]fftypes.LocallySequenced{&fftypes.Event{ID: fftypes.NewUUID(), Type: fftypes.EventTypeMessageConfirmed}}) assert.False(t, repoll) diff --git a/internal/events/event_manager.go b/internal/events/event_manager.go index 605ab03147..de46c3b4fe 100644 --- a/internal/events/event_manager.go +++ b/internal/events/event_manager.go @@ -99,7 +99,7 @@ type eventManager struct { metrics metrics.Manager } -func NewEventManager(ctx context.Context, ni sysmessaging.LocalNodeInfo, si sharedstorage.Plugin, di database.Plugin, bi blockchain.Plugin, im identity.Manager, dh definitions.DefinitionHandlers, dm data.Manager, bm broadcast.Manager, pm privatemessaging.Manager, am assets.Manager, mm metrics.Manager) (EventManager, error) { +func NewEventManager(ctx context.Context, ni sysmessaging.LocalNodeInfo, si sharedstorage.Plugin, di database.Plugin, bi blockchain.Plugin, im identity.Manager, dh definitions.DefinitionHandlers, dm data.Manager, bm broadcast.Manager, pm privatemessaging.Manager, am assets.Manager, mm metrics.Manager, txHelper txcommon.Helper) (EventManager, error) { if ni == nil || si == nil || di == nil || bi == nil || im == nil || dh == nil || dm == nil || bm == nil || pm == nil || am == nil { return nil, i18n.NewError(ctx, i18n.MsgInitializationNilDepError) } @@ -110,7 +110,7 @@ func NewEventManager(ctx context.Context, ni sysmessaging.LocalNodeInfo, si shar ni: ni, sharedstorage: si, database: di, - txHelper: txcommon.NewTransactionHelper(di), + txHelper: txHelper, identity: im, definitions: dh, data: dm, @@ -133,7 +133,7 @@ func NewEventManager(ctx context.Context, ni sysmessaging.LocalNodeInfo, si shar em.internalEvents = ie.(*system.Events) var err error - if em.subManager, err = newSubscriptionManager(ctx, di, dm, newEventNotifier, dh); err != nil { + if em.subManager, err = newSubscriptionManager(ctx, di, dm, newEventNotifier, dh, txHelper); err != nil { return nil, err } diff --git a/internal/events/event_manager_test.go b/internal/events/event_manager_test.go index 59dfebb00b..b8b762fcee 100644 --- a/internal/events/event_manager_test.go +++ b/internal/events/event_manager_test.go @@ -23,6 +23,7 @@ import ( "github.com/hyperledger/firefly/internal/config" "github.com/hyperledger/firefly/internal/events/system" + "github.com/hyperledger/firefly/internal/txcommon" "github.com/hyperledger/firefly/mocks/assetmocks" "github.com/hyperledger/firefly/mocks/blockchainmocks" "github.com/hyperledger/firefly/mocks/broadcastmocks" @@ -66,6 +67,7 @@ func newTestEventManagerCommon(t *testing.T, metrics bool) (*eventManager, func( mam := &assetmocks.Manager{} mni := &sysmessagingmocks.LocalNodeInfo{} mmi := &metricsmocks.Manager{} + txHelper := txcommon.NewTransactionHelper(mdi, mdm) mmi.On("IsMetricsEnabled").Return(metrics) if metrics { mmi.On("TransferConfirmed", mock.Anything) @@ -73,7 +75,7 @@ func newTestEventManagerCommon(t *testing.T, metrics bool) (*eventManager, func( mni.On("GetNodeUUID", mock.Anything).Return(testNodeID).Maybe() met.On("Name").Return("ut").Maybe() mbi.On("VerifierType").Return(fftypes.VerifierTypeEthAddress).Maybe() - emi, err := NewEventManager(ctx, mni, mpi, mdi, mbi, mim, msh, mdm, mbm, mpm, mam, mmi) + emi, err := NewEventManager(ctx, mni, mpi, mdi, mbi, mim, msh, mdm, mbm, mpm, mam, mmi, txHelper) em := emi.(*eventManager) em.txHelper = &txcommonmocks.Helper{} rag := mdi.On("RunAsGroup", em.ctx, mock.Anything).Maybe() @@ -104,7 +106,7 @@ func TestStartStop(t *testing.T) { } func TestStartStopBadDependencies(t *testing.T) { - _, err := NewEventManager(context.Background(), nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil) + _, err := NewEventManager(context.Background(), nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil) assert.Regexp(t, "FF10128", err) } @@ -123,8 +125,9 @@ func TestStartStopBadTransports(t *testing.T) { mni := &sysmessagingmocks.LocalNodeInfo{} mam := &assetmocks.Manager{} mm := &metricsmocks.Manager{} + txHelper := txcommon.NewTransactionHelper(mdi, mdm) mbi.On("VerifierType").Return(fftypes.VerifierTypeEthAddress) - _, err := NewEventManager(context.Background(), mni, mpi, mdi, mbi, mim, msh, mdm, mbm, mpm, mam, mm) + _, err := NewEventManager(context.Background(), mni, mpi, mdi, mbi, mim, msh, mdm, mbm, mpm, mam, mm, txHelper) assert.Regexp(t, "FF10172", err) } diff --git a/internal/events/subscription_manager.go b/internal/events/subscription_manager.go index 613c9057ae..b80a4931dc 100644 --- a/internal/events/subscription_manager.go +++ b/internal/events/subscription_manager.go @@ -29,6 +29,7 @@ import ( "github.com/hyperledger/firefly/internal/i18n" "github.com/hyperledger/firefly/internal/log" "github.com/hyperledger/firefly/internal/retry" + "github.com/hyperledger/firefly/internal/txcommon" "github.com/hyperledger/firefly/pkg/database" "github.com/hyperledger/firefly/pkg/events" "github.com/hyperledger/firefly/pkg/fftypes" @@ -72,6 +73,7 @@ type subscriptionManager struct { ctx context.Context database database.Plugin data data.Manager + txHelper txcommon.Helper eventNotifier *eventNotifier definitions definitions.DefinitionHandlers transports map[string]events.Plugin @@ -86,7 +88,7 @@ type subscriptionManager struct { retry retry.Retry } -func newSubscriptionManager(ctx context.Context, di database.Plugin, dm data.Manager, en *eventNotifier, sh definitions.DefinitionHandlers) (*subscriptionManager, error) { +func newSubscriptionManager(ctx context.Context, di database.Plugin, dm data.Manager, en *eventNotifier, sh definitions.DefinitionHandlers, txHelper txcommon.Helper) (*subscriptionManager, error) { ctx, cancelCtx := context.WithCancel(ctx) sm := &subscriptionManager{ ctx: ctx, @@ -101,6 +103,7 @@ func newSubscriptionManager(ctx context.Context, di database.Plugin, dm data.Man cancelCtx: cancelCtx, eventNotifier: en, definitions: sh, + txHelper: txHelper, retry: retry.Retry{ InitialDelay: config.GetDuration(config.SubscriptionsRetryInitialDelay), MaximumDelay: config.GetDuration(config.SubscriptionsRetryMaxDelay), @@ -473,7 +476,7 @@ func (sm *subscriptionManager) matchSubToConnLocked(conn *connection, sub *subsc } if conn.transport == sub.definition.Transport && conn.matcher(sub.definition.SubscriptionRef) { if _, ok := conn.dispatchers[*sub.definition.ID]; !ok { - dispatcher := newEventDispatcher(sm.ctx, conn.ei, sm.database, sm.data, sm.definitions, conn.id, sub, sm.eventNotifier, sm.cel) + dispatcher := newEventDispatcher(sm.ctx, conn.ei, sm.database, sm.data, sm.definitions, conn.id, sub, sm.eventNotifier, sm.cel, sm.txHelper) conn.dispatchers[*sub.definition.ID] = dispatcher dispatcher.start() } @@ -510,7 +513,7 @@ func (sm *subscriptionManager) ephemeralSubscription(ei events.Plugin, connID, n } // Create the dispatcher, and start immediately - dispatcher := newEventDispatcher(sm.ctx, ei, sm.database, sm.data, sm.definitions, connID, newSub, sm.eventNotifier, sm.cel) + dispatcher := newEventDispatcher(sm.ctx, ei, sm.database, sm.data, sm.definitions, connID, newSub, sm.eventNotifier, sm.cel, sm.txHelper) dispatcher.start() conn.dispatchers[*subID] = dispatcher diff --git a/internal/events/subscription_manager_test.go b/internal/events/subscription_manager_test.go index b1dc611d71..0fd6d6331c 100644 --- a/internal/events/subscription_manager_test.go +++ b/internal/events/subscription_manager_test.go @@ -22,6 +22,7 @@ import ( "testing" "github.com/hyperledger/firefly/internal/config" + "github.com/hyperledger/firefly/internal/txcommon" "github.com/hyperledger/firefly/mocks/databasemocks" "github.com/hyperledger/firefly/mocks/datamocks" "github.com/hyperledger/firefly/mocks/definitionsmocks" @@ -39,6 +40,7 @@ func newTestSubManager(t *testing.T, mei *eventsmocks.PluginAll) (*subscriptionM mdi := &databasemocks.Plugin{} mdm := &datamocks.Manager{} msh := &definitionsmocks.DefinitionHandlers{} + txHelper := txcommon.NewTransactionHelper(mdi, mdm) ctx, cancel := context.WithCancel(context.Background()) mei.On("Name").Return("ut") @@ -47,7 +49,7 @@ func newTestSubManager(t *testing.T, mei *eventsmocks.PluginAll) (*subscriptionM mei.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil) mdi.On("GetEvents", mock.Anything, mock.Anything, mock.Anything).Return([]*fftypes.Event{}, nil, nil).Maybe() mdi.On("GetOffset", mock.Anything, mock.Anything, mock.Anything).Return(&fftypes.Offset{RowID: 3333333, Current: 0}, nil).Maybe() - sm, err := newSubscriptionManager(ctx, mdi, mdm, newEventNotifier(ctx, "ut"), msh) + sm, err := newSubscriptionManager(ctx, mdi, mdm, newEventNotifier(ctx, "ut"), msh, txHelper) assert.NoError(t, err) sm.transports = map[string]events.Plugin{ "ut": mei, @@ -164,9 +166,10 @@ func TestRegisterEphemeralSubscriptionsFail(t *testing.T) { func TestSubManagerBadPlugin(t *testing.T) { mdi := &databasemocks.Plugin{} mdm := &datamocks.Manager{} + txHelper := txcommon.NewTransactionHelper(mdi, mdm) config.Reset() config.Set(config.EventTransportsEnabled, []string{"!unknown!"}) - _, err := newSubscriptionManager(context.Background(), mdi, mdm, newEventNotifier(context.Background(), "ut"), nil) + _, err := newSubscriptionManager(context.Background(), mdi, mdm, newEventNotifier(context.Background(), "ut"), nil, txHelper) assert.Regexp(t, "FF10172", err) } diff --git a/internal/orchestrator/orchestrator.go b/internal/orchestrator/orchestrator.go index edd34c11a1..c7461584c3 100644 --- a/internal/orchestrator/orchestrator.go +++ b/internal/orchestrator/orchestrator.go @@ -197,7 +197,6 @@ func (or *orchestrator) Init(ctx context.Context, cancelCtx context.CancelFunc) if err == nil { err = or.initNamespaces(ctx) } - or.txHelper = txcommon.NewTransactionHelper(or.database) // Bind together the blockchain interface callbacks, with the events manager or.bc.bi = or.blockchain or.bc.ei = or.events @@ -460,6 +459,10 @@ func (or *orchestrator) initComponents(ctx context.Context) (err error) { } } + if or.txHelper == nil { + or.txHelper = txcommon.NewTransactionHelper(or.database, or.data) + } + if or.identity == nil { or.identity, err = identity.NewIdentityManager(ctx, or.database, or.identityPlugin, or.blockchain, or.data) if err != nil { @@ -468,7 +471,7 @@ func (or *orchestrator) initComponents(ctx context.Context) (err error) { } if or.batch == nil { - or.batch, err = batch.NewBatchManager(ctx, or, or.database, or.data) + or.batch, err = batch.NewBatchManager(ctx, or, or.database, or.data, or.txHelper) if err != nil { return err } @@ -501,14 +504,14 @@ func (or *orchestrator) initComponents(ctx context.Context) (err error) { } if or.assets == nil { - or.assets, err = assets.NewAssetManager(ctx, or.database, or.identity, or.data, or.syncasync, or.broadcast, or.messaging, or.tokens, or.metrics, or.operations) + or.assets, err = assets.NewAssetManager(ctx, or.database, or.identity, or.data, or.syncasync, or.broadcast, or.messaging, or.tokens, or.metrics, or.operations, or.txHelper) if err != nil { return err } } if or.contracts == nil { - or.contracts, err = contracts.NewContractManager(ctx, or.database, or.broadcast, or.identity, or.blockchain, or.operations) + or.contracts, err = contracts.NewContractManager(ctx, or.database, or.broadcast, or.identity, or.blockchain, or.operations, or.txHelper) if err != nil { return err } @@ -517,7 +520,7 @@ func (or *orchestrator) initComponents(ctx context.Context) (err error) { or.definitions = definitions.NewDefinitionHandlers(or.database, or.blockchain, or.dataexchange, or.data, or.identity, or.broadcast, or.messaging, or.assets, or.contracts) if or.events == nil { - or.events, err = events.NewEventManager(ctx, or, or.sharedstorage, or.database, or.blockchain, or.identity, or.definitions, or.data, or.broadcast, or.messaging, or.assets, or.metrics) + or.events, err = events.NewEventManager(ctx, or, or.sharedstorage, or.database, or.blockchain, or.identity, or.definitions, or.data, or.broadcast, or.messaging, or.assets, or.metrics, or.txHelper) if err != nil { return err } diff --git a/internal/orchestrator/orchestrator_test.go b/internal/orchestrator/orchestrator_test.go index dc85133e12..98939f6ea1 100644 --- a/internal/orchestrator/orchestrator_test.go +++ b/internal/orchestrator/orchestrator_test.go @@ -516,6 +516,7 @@ func TestInitIdentityComponentFail(t *testing.T) { or := newTestOrchestrator() or.database = nil or.identity = nil + or.txHelper = nil err := or.initComponents(context.Background()) assert.Regexp(t, "FF10128", err) } diff --git a/internal/txcommon/event_enrich.go b/internal/txcommon/event_enrich.go index 8956f74688..145acbcdd2 100644 --- a/internal/txcommon/event_enrich.go +++ b/internal/txcommon/event_enrich.go @@ -29,13 +29,13 @@ func (t *transactionHelper) EnrichEvent(ctx context.Context, event *fftypes.Even switch event.Type { case fftypes.EventTypeTransactionSubmitted: - tx, err := t.database.GetTransactionByID(ctx, event.Reference) + tx, err := t.GetTransactionByIDCached(ctx, event.Reference) if err != nil { return nil, err } e.Transaction = tx case fftypes.EventTypeMessageConfirmed, fftypes.EventTypeMessageRejected: - msg, err := t.database.GetMessageByID(ctx, event.Reference) + msg, _, _, err := t.data.GetMessageWithDataCached(ctx, event.Reference) if err != nil { return nil, err } diff --git a/internal/txcommon/event_enrich_test.go b/internal/txcommon/event_enrich_test.go index 9cfff79a80..d0ee00f968 100644 --- a/internal/txcommon/event_enrich_test.go +++ b/internal/txcommon/event_enrich_test.go @@ -22,6 +22,7 @@ import ( "testing" "github.com/hyperledger/firefly/mocks/databasemocks" + "github.com/hyperledger/firefly/mocks/datamocks" "github.com/hyperledger/firefly/pkg/fftypes" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -29,7 +30,8 @@ import ( func TestEnrichMessageConfirmed(t *testing.T) { mdi := &databasemocks.Plugin{} - txHelper := NewTransactionHelper(mdi) + mdm := &datamocks.Manager{} + txHelper := NewTransactionHelper(mdi, mdm) ctx := context.Background() // Setup the IDs @@ -37,9 +39,9 @@ func TestEnrichMessageConfirmed(t *testing.T) { ev1 := fftypes.NewUUID() // Setup enrichment - mdi.On("GetMessageByID", mock.Anything, ref1).Return(&fftypes.Message{ + mdm.On("GetMessageWithDataCached", mock.Anything, ref1).Return(&fftypes.Message{ Header: fftypes.MessageHeader{ID: ref1}, - }, nil) + }, nil, true, nil) event := &fftypes.Event{ ID: ev1, @@ -54,7 +56,8 @@ func TestEnrichMessageConfirmed(t *testing.T) { func TestEnrichMessageFail(t *testing.T) { mdi := &databasemocks.Plugin{} - txHelper := NewTransactionHelper(mdi) + mdm := &datamocks.Manager{} + txHelper := NewTransactionHelper(mdi, mdm) ctx := context.Background() // Setup the IDs @@ -62,7 +65,7 @@ func TestEnrichMessageFail(t *testing.T) { ev1 := fftypes.NewUUID() // Setup enrichment - mdi.On("GetMessageByID", mock.Anything, ref1).Return(nil, fmt.Errorf("pop")) + mdm.On("GetMessageWithDataCached", mock.Anything, ref1).Return(nil, nil, false, fmt.Errorf("pop")) event := &fftypes.Event{ ID: ev1, @@ -76,7 +79,8 @@ func TestEnrichMessageFail(t *testing.T) { func TestEnrichMessageRejected(t *testing.T) { mdi := &databasemocks.Plugin{} - txHelper := NewTransactionHelper(mdi) + mdm := &datamocks.Manager{} + txHelper := NewTransactionHelper(mdi, mdm) ctx := context.Background() // Setup the IDs @@ -84,9 +88,9 @@ func TestEnrichMessageRejected(t *testing.T) { ev1 := fftypes.NewUUID() // Setup enrichment - mdi.On("GetMessageByID", mock.Anything, ref1).Return(&fftypes.Message{ + mdm.On("GetMessageWithDataCached", mock.Anything, ref1).Return(&fftypes.Message{ Header: fftypes.MessageHeader{ID: ref1}, - }, nil) + }, nil, true, nil) event := &fftypes.Event{ ID: ev1, @@ -101,7 +105,8 @@ func TestEnrichMessageRejected(t *testing.T) { func TestEnrichTxSubmitted(t *testing.T) { mdi := &databasemocks.Plugin{} - txHelper := NewTransactionHelper(mdi) + mdm := &datamocks.Manager{} + txHelper := NewTransactionHelper(mdi, mdm) ctx := context.Background() // Setup the IDs @@ -126,7 +131,8 @@ func TestEnrichTxSubmitted(t *testing.T) { func TestEnrichTxFail(t *testing.T) { mdi := &databasemocks.Plugin{} - txHelper := NewTransactionHelper(mdi) + mdm := &datamocks.Manager{} + txHelper := NewTransactionHelper(mdi, mdm) ctx := context.Background() // Setup the IDs @@ -148,7 +154,8 @@ func TestEnrichTxFail(t *testing.T) { func TestEnrichBlockchainEventSubmitted(t *testing.T) { mdi := &databasemocks.Plugin{} - txHelper := NewTransactionHelper(mdi) + mdm := &datamocks.Manager{} + txHelper := NewTransactionHelper(mdi, mdm) ctx := context.Background() // Setup the IDs @@ -173,7 +180,8 @@ func TestEnrichBlockchainEventSubmitted(t *testing.T) { func TestEnrichBlockchainEventFail(t *testing.T) { mdi := &databasemocks.Plugin{} - txHelper := NewTransactionHelper(mdi) + mdm := &datamocks.Manager{} + txHelper := NewTransactionHelper(mdi, mdm) ctx := context.Background() // Setup the IDs diff --git a/internal/txcommon/txcommon.go b/internal/txcommon/txcommon.go index d0b7fac37b..592b3f6874 100644 --- a/internal/txcommon/txcommon.go +++ b/internal/txcommon/txcommon.go @@ -19,10 +19,14 @@ package txcommon import ( "context" "strings" + "time" + "github.com/hyperledger/firefly/internal/config" + "github.com/hyperledger/firefly/internal/data" "github.com/hyperledger/firefly/internal/log" "github.com/hyperledger/firefly/pkg/database" "github.com/hyperledger/firefly/pkg/fftypes" + "github.com/karlseguin/ccache" ) type Helper interface { @@ -30,16 +34,45 @@ type Helper interface { PersistTransaction(ctx context.Context, ns string, id *fftypes.UUID, txType fftypes.TransactionType, blockchainTXID string) (valid bool, err error) AddBlockchainTX(ctx context.Context, id *fftypes.UUID, blockchainTXID string) error EnrichEvent(ctx context.Context, event *fftypes.Event) (*fftypes.EnrichedEvent, error) + GetTransactionByIDCached(ctx context.Context, id *fftypes.UUID) (*fftypes.Transaction, error) } type transactionHelper struct { - database database.Plugin + database database.Plugin + data data.Manager + transactionCache *ccache.Cache + transactionCacheTTL time.Duration } -func NewTransactionHelper(di database.Plugin) Helper { - return &transactionHelper{ +func NewTransactionHelper(di database.Plugin, dm data.Manager) Helper { + t := &transactionHelper{ database: di, + data: dm, } + t.transactionCache = ccache.New( + // We use a LRU cache with a size-aware max + ccache.Configure(). + MaxSize(config.GetByteSize(config.TransactionCacheSize)), + ) + return t +} + +func (t *transactionHelper) updateTransactionsCache(tx *fftypes.Transaction) { + t.transactionCache.Set(tx.ID.String(), tx, t.transactionCacheTTL) +} + +func (t *transactionHelper) GetTransactionByIDCached(ctx context.Context, id *fftypes.UUID) (*fftypes.Transaction, error) { + cached := t.transactionCache.Get(id.String()) + if cached != nil { + cached.Extend(t.transactionCacheTTL) + return cached.Value().(*fftypes.Transaction), nil + } + tx, err := t.database.GetTransactionByID(ctx, id) + if err != nil || tx == nil { + return tx, err + } + t.updateTransactionsCache(tx) + return tx, nil } // SubmitNewTransaction is called when there is a new transaction being submitted by the local node @@ -65,6 +98,7 @@ func (t *transactionHelper) SubmitNewTransaction(ctx context.Context, ns string, // PersistTransaction is called when we need to ensure a transaction exists in the DB, and optionally associate a new BlockchainTXID to it func (t *transactionHelper) PersistTransaction(ctx context.Context, ns string, id *fftypes.UUID, txType fftypes.TransactionType, blockchainTXID string) (valid bool, err error) { + // TODO: Consider if this can exploit caching tx, err := t.database.GetTransactionByID(ctx, id) if err != nil { return false, err @@ -91,15 +125,20 @@ func (t *transactionHelper) PersistTransaction(ctx context.Context, ns string, i return false, err } - } else if err = t.database.InsertTransaction(ctx, &fftypes.Transaction{ - ID: id, - Namespace: ns, - Type: txType, - BlockchainIDs: fftypes.NewFFStringArray(strings.ToLower(blockchainTXID)), - }); err != nil { - return false, err + } else { + tx = &fftypes.Transaction{ + ID: id, + Namespace: ns, + Type: txType, + BlockchainIDs: fftypes.NewFFStringArray(strings.ToLower(blockchainTXID)), + } + if err = t.database.InsertTransaction(ctx, tx); err != nil { + return false, err + } } + t.updateTransactionsCache(tx) + return true, nil } diff --git a/internal/txcommon/txcommon_test.go b/internal/txcommon/txcommon_test.go index 8576358ebd..6fe1358a96 100644 --- a/internal/txcommon/txcommon_test.go +++ b/internal/txcommon/txcommon_test.go @@ -22,6 +22,7 @@ import ( "testing" "github.com/hyperledger/firefly/mocks/databasemocks" + "github.com/hyperledger/firefly/mocks/datamocks" "github.com/hyperledger/firefly/pkg/database" "github.com/hyperledger/firefly/pkg/fftypes" "github.com/stretchr/testify/assert" @@ -31,7 +32,8 @@ import ( func TestSubmitNewTransactionOK(t *testing.T) { mdi := &databasemocks.Plugin{} - txHelper := NewTransactionHelper(mdi) + mdm := &datamocks.Manager{} + txHelper := NewTransactionHelper(mdi, mdm) ctx := context.Background() var txidInserted *fftypes.UUID @@ -58,7 +60,8 @@ func TestSubmitNewTransactionOK(t *testing.T) { func TestSubmitNewTransactionFail(t *testing.T) { mdi := &databasemocks.Plugin{} - txHelper := NewTransactionHelper(mdi) + mdm := &datamocks.Manager{} + txHelper := NewTransactionHelper(mdi, mdm) ctx := context.Background() mdi.On("InsertTransaction", ctx, mock.Anything).Return(fmt.Errorf("pop")) @@ -73,7 +76,8 @@ func TestSubmitNewTransactionFail(t *testing.T) { func TestSubmitNewTransactionEventFail(t *testing.T) { mdi := &databasemocks.Plugin{} - txHelper := NewTransactionHelper(mdi) + mdm := &datamocks.Manager{} + txHelper := NewTransactionHelper(mdi, mdm) ctx := context.Background() mdi.On("InsertTransaction", ctx, mock.Anything).Return(nil) @@ -89,7 +93,8 @@ func TestSubmitNewTransactionEventFail(t *testing.T) { func TestPersistTransactionNew(t *testing.T) { mdi := &databasemocks.Plugin{} - txHelper := NewTransactionHelper(mdi) + mdm := &datamocks.Manager{} + txHelper := NewTransactionHelper(mdi, mdm) ctx := context.Background() txid := fftypes.NewUUID() @@ -113,7 +118,8 @@ func TestPersistTransactionNew(t *testing.T) { func TestPersistTransactionNewInserTFail(t *testing.T) { mdi := &databasemocks.Plugin{} - txHelper := NewTransactionHelper(mdi) + mdm := &datamocks.Manager{} + txHelper := NewTransactionHelper(mdi, mdm) ctx := context.Background() txid := fftypes.NewUUID() @@ -131,7 +137,8 @@ func TestPersistTransactionNewInserTFail(t *testing.T) { func TestPersistTransactionExistingAddBlockchainID(t *testing.T) { mdi := &databasemocks.Plugin{} - txHelper := NewTransactionHelper(mdi) + mdm := &datamocks.Manager{} + txHelper := NewTransactionHelper(mdi, mdm) ctx := context.Background() txid := fftypes.NewUUID() @@ -155,7 +162,8 @@ func TestPersistTransactionExistingAddBlockchainID(t *testing.T) { func TestPersistTransactionExistingUpdateFail(t *testing.T) { mdi := &databasemocks.Plugin{} - txHelper := NewTransactionHelper(mdi) + mdm := &datamocks.Manager{} + txHelper := NewTransactionHelper(mdi, mdm) ctx := context.Background() txid := fftypes.NewUUID() @@ -179,7 +187,8 @@ func TestPersistTransactionExistingUpdateFail(t *testing.T) { func TestPersistTransactionExistingNoChange(t *testing.T) { mdi := &databasemocks.Plugin{} - txHelper := NewTransactionHelper(mdi) + mdm := &datamocks.Manager{} + txHelper := NewTransactionHelper(mdi, mdm) ctx := context.Background() txid := fftypes.NewUUID() @@ -202,7 +211,8 @@ func TestPersistTransactionExistingNoChange(t *testing.T) { func TestPersistTransactionExistingNoBlockchainID(t *testing.T) { mdi := &databasemocks.Plugin{} - txHelper := NewTransactionHelper(mdi) + mdm := &datamocks.Manager{} + txHelper := NewTransactionHelper(mdi, mdm) ctx := context.Background() txid := fftypes.NewUUID() @@ -225,7 +235,8 @@ func TestPersistTransactionExistingNoBlockchainID(t *testing.T) { func TestPersistTransactionExistingLookupFail(t *testing.T) { mdi := &databasemocks.Plugin{} - txHelper := NewTransactionHelper(mdi) + mdm := &datamocks.Manager{} + txHelper := NewTransactionHelper(mdi, mdm) ctx := context.Background() txid := fftypes.NewUUID() @@ -242,7 +253,8 @@ func TestPersistTransactionExistingLookupFail(t *testing.T) { func TestPersistTransactionExistingMismatchNS(t *testing.T) { mdi := &databasemocks.Plugin{} - txHelper := NewTransactionHelper(mdi) + mdm := &datamocks.Manager{} + txHelper := NewTransactionHelper(mdi, mdm) ctx := context.Background() txid := fftypes.NewUUID() @@ -265,7 +277,8 @@ func TestPersistTransactionExistingMismatchNS(t *testing.T) { func TestPersistTransactionExistingMismatchType(t *testing.T) { mdi := &databasemocks.Plugin{} - txHelper := NewTransactionHelper(mdi) + mdm := &datamocks.Manager{} + txHelper := NewTransactionHelper(mdi, mdm) ctx := context.Background() txid := fftypes.NewUUID() @@ -288,7 +301,8 @@ func TestPersistTransactionExistingMismatchType(t *testing.T) { func TestAddBlockchainTX(t *testing.T) { mdi := &databasemocks.Plugin{} - txHelper := NewTransactionHelper(mdi) + mdm := &datamocks.Manager{} + txHelper := NewTransactionHelper(mdi, mdm) ctx := context.Background() txid := fftypes.NewUUID() @@ -318,7 +332,8 @@ func TestAddBlockchainTX(t *testing.T) { func TestAddBlockchainTXGetFail(t *testing.T) { mdi := &databasemocks.Plugin{} - txHelper := NewTransactionHelper(mdi) + mdm := &datamocks.Manager{} + txHelper := NewTransactionHelper(mdi, mdm) ctx := context.Background() txid := fftypes.NewUUID() @@ -334,7 +349,8 @@ func TestAddBlockchainTXGetFail(t *testing.T) { func TestAddBlockchainTXUpdateFail(t *testing.T) { mdi := &databasemocks.Plugin{} - txHelper := NewTransactionHelper(mdi) + mdm := &datamocks.Manager{} + txHelper := NewTransactionHelper(mdi, mdm) ctx := context.Background() txid := fftypes.NewUUID() @@ -357,7 +373,8 @@ func TestAddBlockchainTXUpdateFail(t *testing.T) { func TestAddBlockchainTXUnchanged(t *testing.T) { mdi := &databasemocks.Plugin{} - txHelper := NewTransactionHelper(mdi) + mdm := &datamocks.Manager{} + txHelper := NewTransactionHelper(mdi, mdm) ctx := context.Background() txid := fftypes.NewUUID() @@ -375,3 +392,31 @@ func TestAddBlockchainTXUnchanged(t *testing.T) { mdi.AssertExpectations(t) } + +func TestGetTransactionByIDCached(t *testing.T) { + + mdi := &databasemocks.Plugin{} + mdm := &datamocks.Manager{} + txHelper := NewTransactionHelper(mdi, mdm) + ctx := context.Background() + + txid := fftypes.NewUUID() + mdi.On("GetTransactionByID", ctx, txid).Return(&fftypes.Transaction{ + ID: txid, + Namespace: "ns1", + Type: fftypes.TransactionTypeContractInvoke, + Created: fftypes.Now(), + BlockchainIDs: fftypes.FFStringArray{"0x111111"}, + }, nil).Once() + + tx, err := txHelper.GetTransactionByIDCached(ctx, txid) + assert.NoError(t, err) + assert.Equal(t, txid, tx.ID) + + tx, err = txHelper.GetTransactionByIDCached(ctx, txid) + assert.NoError(t, err) + assert.Equal(t, txid, tx.ID) + + mdi.AssertExpectations(t) + +} diff --git a/mocks/datamocks/manager.go b/mocks/datamocks/manager.go index a42b7f5992..f2e35335d6 100644 --- a/mocks/datamocks/manager.go +++ b/mocks/datamocks/manager.go @@ -32,6 +32,11 @@ func (_m *Manager) CheckDatatype(ctx context.Context, ns string, datatype *fftyp return r0 } +// Close provides a mock function with given fields: +func (_m *Manager) Close() { + _m.Called() +} + // CopyBlobPStoDX provides a mock function with given fields: ctx, _a1 func (_m *Manager) CopyBlobPStoDX(ctx context.Context, _a1 *fftypes.Data) (*fftypes.Blob, error) { ret := _m.Called(ctx, _a1) diff --git a/mocks/txcommonmocks/helper.go b/mocks/txcommonmocks/helper.go index e476e08d20..18c4a61333 100644 --- a/mocks/txcommonmocks/helper.go +++ b/mocks/txcommonmocks/helper.go @@ -51,6 +51,29 @@ func (_m *Helper) EnrichEvent(ctx context.Context, event *fftypes.Event) (*fftyp return r0, r1 } +// GetTransactionByIDCached provides a mock function with given fields: ctx, id +func (_m *Helper) GetTransactionByIDCached(ctx context.Context, id *fftypes.UUID) (*fftypes.Transaction, error) { + ret := _m.Called(ctx, id) + + var r0 *fftypes.Transaction + if rf, ok := ret.Get(0).(func(context.Context, *fftypes.UUID) *fftypes.Transaction); ok { + r0 = rf(ctx, id) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*fftypes.Transaction) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, *fftypes.UUID) error); ok { + r1 = rf(ctx, id) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // PersistTransaction provides a mock function with given fields: ctx, ns, id, txType, blockchainTXID func (_m *Helper) PersistTransaction(ctx context.Context, ns string, id *fftypes.UUID, txType fftypes.FFEnum, blockchainTXID string) (bool, error) { ret := _m.Called(ctx, ns, id, txType, blockchainTXID)