Skip to content

Commit e05e166

Browse files
Merge pull request #593 from kaleido-io/event-lookup-cache
Event lookup cache
2 parents 491c272 + ef3a20f commit e05e166

23 files changed

+302
-119
lines changed

internal/assets/manager.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,14 +82,14 @@ type assetManager struct {
8282
keyNormalization int
8383
}
8484

85-
func NewAssetManager(ctx context.Context, di database.Plugin, im identity.Manager, dm data.Manager, sa syncasync.Bridge, bm broadcast.Manager, pm privatemessaging.Manager, ti map[string]tokens.Plugin, mm metrics.Manager, om operations.Manager) (Manager, error) {
85+
func NewAssetManager(ctx context.Context, di database.Plugin, im identity.Manager, dm data.Manager, sa syncasync.Bridge, bm broadcast.Manager, pm privatemessaging.Manager, ti map[string]tokens.Plugin, mm metrics.Manager, om operations.Manager, txHelper txcommon.Helper) (Manager, error) {
8686
if di == nil || im == nil || sa == nil || bm == nil || pm == nil || ti == nil || mm == nil || om == nil {
8787
return nil, i18n.NewError(ctx, i18n.MsgInitializationNilDepError)
8888
}
8989
am := &assetManager{
9090
ctx: ctx,
9191
database: di,
92-
txHelper: txcommon.NewTransactionHelper(di),
92+
txHelper: txHelper,
9393
identity: im,
9494
data: dm,
9595
syncasync: sa,

internal/assets/manager_test.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"testing"
2020

2121
"github.com/hyperledger/firefly/internal/config"
22+
"github.com/hyperledger/firefly/internal/txcommon"
2223
"github.com/hyperledger/firefly/mocks/broadcastmocks"
2324
"github.com/hyperledger/firefly/mocks/databasemocks"
2425
"github.com/hyperledger/firefly/mocks/datamocks"
@@ -47,11 +48,12 @@ func newTestAssets(t *testing.T) (*assetManager, func()) {
4748
mti := &tokenmocks.Plugin{}
4849
mm := &metricsmocks.Manager{}
4950
mom := &operationmocks.Manager{}
51+
txHelper := txcommon.NewTransactionHelper(mdi, mdm)
5052
mti.On("Name").Return("ut_tokens").Maybe()
5153
mm.On("IsMetricsEnabled").Return(false)
5254
mom.On("RegisterHandler", mock.Anything, mock.Anything, mock.Anything)
5355
ctx, cancel := context.WithCancel(context.Background())
54-
a, err := NewAssetManager(ctx, mdi, mim, mdm, msa, mbm, mpm, map[string]tokens.Plugin{"magic-tokens": mti}, mm, mom)
56+
a, err := NewAssetManager(ctx, mdi, mim, mdm, msa, mbm, mpm, map[string]tokens.Plugin{"magic-tokens": mti}, mm, mom, txHelper)
5557
rag := mdi.On("RunAsGroup", mock.Anything, mock.Anything).Maybe()
5658
rag.RunFn = func(a mock.Arguments) {
5759
rag.ReturnArguments = mock.Arguments{a[1].(func(context.Context) error)(a[0].(context.Context))}
@@ -73,12 +75,13 @@ func newTestAssetsWithMetrics(t *testing.T) (*assetManager, func()) {
7375
mti := &tokenmocks.Plugin{}
7476
mm := &metricsmocks.Manager{}
7577
mom := &operationmocks.Manager{}
78+
txHelper := txcommon.NewTransactionHelper(mdi, mdm)
7679
mti.On("Name").Return("ut_tokens").Maybe()
7780
mm.On("IsMetricsEnabled").Return(true)
7881
mm.On("TransferSubmitted", mock.Anything)
7982
mom.On("RegisterHandler", mock.Anything, mock.Anything, mock.Anything)
8083
ctx, cancel := context.WithCancel(context.Background())
81-
a, err := NewAssetManager(ctx, mdi, mim, mdm, msa, mbm, mpm, map[string]tokens.Plugin{"magic-tokens": mti}, mm, mom)
84+
a, err := NewAssetManager(ctx, mdi, mim, mdm, msa, mbm, mpm, map[string]tokens.Plugin{"magic-tokens": mti}, mm, mom, txHelper)
8285
rag := mdi.On("RunAsGroup", mock.Anything, mock.Anything).Maybe()
8386
rag.RunFn = func(a mock.Arguments) {
8487
rag.ReturnArguments = mock.Arguments{a[1].(func(context.Context) error)(a[0].(context.Context))}
@@ -90,7 +93,7 @@ func newTestAssetsWithMetrics(t *testing.T) (*assetManager, func()) {
9093
}
9194

9295
func TestInitFail(t *testing.T) {
93-
_, err := NewAssetManager(context.Background(), nil, nil, nil, nil, nil, nil, nil, nil, nil)
96+
_, err := NewAssetManager(context.Background(), nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
9497
assert.Regexp(t, "FF10128", err)
9598
}
9699

internal/batch/batch_manager.go

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,12 @@ import (
2828
"github.com/hyperledger/firefly/internal/log"
2929
"github.com/hyperledger/firefly/internal/retry"
3030
"github.com/hyperledger/firefly/internal/sysmessaging"
31+
"github.com/hyperledger/firefly/internal/txcommon"
3132
"github.com/hyperledger/firefly/pkg/database"
3233
"github.com/hyperledger/firefly/pkg/fftypes"
3334
)
3435

35-
func NewBatchManager(ctx context.Context, ni sysmessaging.LocalNodeInfo, di database.Plugin, dm data.Manager) (Manager, error) {
36+
func NewBatchManager(ctx context.Context, ni sysmessaging.LocalNodeInfo, di database.Plugin, dm data.Manager, txHelper txcommon.Helper) (Manager, error) {
3637
if di == nil || dm == nil {
3738
return nil, i18n.NewError(ctx, i18n.MsgInitializationNilDepError)
3839
}
@@ -44,11 +45,13 @@ func NewBatchManager(ctx context.Context, ni sysmessaging.LocalNodeInfo, di data
4445
ni: ni,
4546
database: di,
4647
data: dm,
48+
txHelper: txHelper,
4749
readOffset: -1, // On restart we trawl for all ready messages
4850
readPageSize: uint64(readPageSize),
4951
messagePollTimeout: config.GetDuration(config.BatchManagerReadPollTimeout),
5052
startupOffsetRetryAttempts: config.GetInt(config.OrchestratorStartupAttempts),
51-
dispatchers: make(map[string]*dispatcher),
53+
dispatcherMap: make(map[string]*dispatcher),
54+
allDispatchers: make([]*dispatcher, 0),
5255
newMessages: make(chan int64, 1),
5356
done: make(chan struct{}),
5457
retry: &retry.Retry{
@@ -85,8 +88,10 @@ type batchManager struct {
8588
ni sysmessaging.LocalNodeInfo
8689
database database.Plugin
8790
data data.Manager
91+
txHelper txcommon.Helper
8892
dispatcherMux sync.Mutex
89-
dispatchers map[string]*dispatcher
93+
dispatcherMap map[string]*dispatcher
94+
allDispatchers []*dispatcher
9095
newMessages chan int64
9196
done chan struct{}
9297
retry *retry.Retry
@@ -122,14 +127,18 @@ func (bm *batchManager) getDispatcherKey(txType fftypes.TransactionType, msgType
122127
}
123128

124129
func (bm *batchManager) RegisterDispatcher(name string, txType fftypes.TransactionType, msgTypes []fftypes.MessageType, handler DispatchHandler, options DispatcherOptions) {
130+
bm.dispatcherMux.Lock()
131+
defer bm.dispatcherMux.Unlock()
132+
125133
dispatcher := &dispatcher{
126134
name: name,
127135
handler: handler,
128136
options: options,
129137
processors: make(map[string]*batchProcessor),
130138
}
139+
bm.allDispatchers = append(bm.allDispatchers, dispatcher)
131140
for _, msgType := range msgTypes {
132-
bm.dispatchers[bm.getDispatcherKey(txType, msgType)] = dispatcher
141+
bm.dispatcherMap[bm.getDispatcherKey(txType, msgType)] = dispatcher
133142
}
134143
}
135144

@@ -147,7 +156,7 @@ func (bm *batchManager) getProcessor(txType fftypes.TransactionType, msgType fft
147156
defer bm.dispatcherMux.Unlock()
148157

149158
dispatcherKey := bm.getDispatcherKey(txType, msgType)
150-
dispatcher, ok := bm.dispatchers[dispatcherKey]
159+
dispatcher, ok := bm.dispatcherMap[dispatcherKey]
151160
if !ok {
152161
return nil, i18n.NewError(bm.ctx, i18n.MsgUnregisteredBatchType, dispatcherKey)
153162
}
@@ -170,6 +179,7 @@ func (bm *batchManager) getProcessor(txType fftypes.TransactionType, msgType fft
170179
dispatch: dispatcher.handler,
171180
},
172181
bm.retry,
182+
bm.txHelper,
173183
)
174184
dispatcher.processors[name] = processor
175185
}
@@ -316,7 +326,7 @@ func (bm *batchManager) dispatchMessage(processor *batchProcessor, msg *fftypes.
316326
func (bm *batchManager) reapQuiescing() {
317327
bm.dispatcherMux.Lock()
318328
var reaped []*batchProcessor
319-
for _, d := range bm.dispatchers {
329+
for _, d := range bm.allDispatchers {
320330
for k, p := range d.processors {
321331
select {
322332
case <-p.quescing:
@@ -344,7 +354,7 @@ func (bm *batchManager) getProcessors() []*batchProcessor {
344354
defer bm.dispatcherMux.Unlock()
345355

346356
var processors []*batchProcessor
347-
for _, d := range bm.dispatchers {
357+
for _, d := range bm.allDispatchers {
348358
for _, p := range d.processors {
349359
processors = append(processors, p)
350360
}

internal/batch/batch_manager_test.go

Lines changed: 30 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"github.com/hyperledger/firefly/internal/config"
2626
"github.com/hyperledger/firefly/internal/data"
2727
"github.com/hyperledger/firefly/internal/log"
28+
"github.com/hyperledger/firefly/internal/txcommon"
2829
"github.com/hyperledger/firefly/mocks/databasemocks"
2930
"github.com/hyperledger/firefly/mocks/datamocks"
3031
"github.com/hyperledger/firefly/mocks/sysmessagingmocks"
@@ -41,6 +42,7 @@ func TestE2EDispatchBroadcast(t *testing.T) {
4142
mdi := &databasemocks.Plugin{}
4243
mdm := &datamocks.Manager{}
4344
mni := &sysmessagingmocks.LocalNodeInfo{}
45+
txHelper := txcommon.NewTransactionHelper(mdi, mdm)
4446
mni.On("GetNodeUUID", mock.Anything).Return(fftypes.NewUUID())
4547
readyForDispatch := make(chan bool)
4648
waitForDispatch := make(chan *DispatchState)
@@ -70,7 +72,7 @@ func TestE2EDispatchBroadcast(t *testing.T) {
7072
return nil
7173
}
7274
ctx, cancel := context.WithCancel(context.Background())
73-
bmi, _ := NewBatchManager(ctx, mni, mdi, mdm)
75+
bmi, _ := NewBatchManager(ctx, mni, mdi, mdm, txHelper)
7476
bm := bmi.(*batchManager)
7577

7678
bm.RegisterDispatcher("utdispatcher", fftypes.TransactionTypeBatchPin, []fftypes.MessageType{fftypes.MessageTypeBroadcast}, handler, DispatcherOptions{
@@ -155,6 +157,7 @@ func TestE2EDispatchPrivateUnpinned(t *testing.T) {
155157
mdi := &databasemocks.Plugin{}
156158
mdm := &datamocks.Manager{}
157159
mni := &sysmessagingmocks.LocalNodeInfo{}
160+
txHelper := txcommon.NewTransactionHelper(mdi, mdm)
158161
mni.On("GetNodeUUID", mock.Anything).Return(fftypes.NewUUID())
159162
readyForDispatch := make(chan bool)
160163
waitForDispatch := make(chan *DispatchState)
@@ -187,7 +190,7 @@ func TestE2EDispatchPrivateUnpinned(t *testing.T) {
187190
return nil
188191
}
189192
ctx, cancel := context.WithCancel(context.Background())
190-
bmi, _ := NewBatchManager(ctx, mni, mdi, mdm)
193+
bmi, _ := NewBatchManager(ctx, mni, mdi, mdm, txHelper)
191194
bm := bmi.(*batchManager)
192195

193196
bm.RegisterDispatcher("utdispatcher", fftypes.TransactionTypeBatchPin, []fftypes.MessageType{fftypes.MessageTypePrivate}, handler, DispatcherOptions{
@@ -268,8 +271,9 @@ func TestDispatchUnknownType(t *testing.T) {
268271
mdi := &databasemocks.Plugin{}
269272
mdm := &datamocks.Manager{}
270273
mni := &sysmessagingmocks.LocalNodeInfo{}
274+
txHelper := txcommon.NewTransactionHelper(mdi, mdm)
271275
ctx, cancel := context.WithCancel(context.Background())
272-
bmi, _ := NewBatchManager(ctx, mni, mdi, mdm)
276+
bmi, _ := NewBatchManager(ctx, mni, mdi, mdm, txHelper)
273277
bm := bmi.(*batchManager)
274278

275279
msg := &fftypes.Message{}
@@ -285,7 +289,7 @@ func TestDispatchUnknownType(t *testing.T) {
285289
}
286290

287291
func TestInitFailNoPersistence(t *testing.T) {
288-
_, err := NewBatchManager(context.Background(), nil, nil, nil)
292+
_, err := NewBatchManager(context.Background(), nil, nil, nil, nil)
289293
assert.Error(t, err)
290294
}
291295

@@ -294,7 +298,8 @@ func TestGetInvalidBatchTypeMsg(t *testing.T) {
294298
mdi := &databasemocks.Plugin{}
295299
mdm := &datamocks.Manager{}
296300
mni := &sysmessagingmocks.LocalNodeInfo{}
297-
bm, _ := NewBatchManager(context.Background(), mni, mdi, mdm)
301+
txHelper := txcommon.NewTransactionHelper(mdi, mdm)
302+
bm, _ := NewBatchManager(context.Background(), mni, mdi, mdm, txHelper)
298303
defer bm.Close()
299304
_, err := bm.(*batchManager).getProcessor(fftypes.BatchTypeBroadcast, "wrong", nil, "ns1", &fftypes.SignerRef{})
300305
assert.Regexp(t, "FF10126", err)
@@ -304,8 +309,9 @@ func TestMessageSequencerCancelledContext(t *testing.T) {
304309
mdi := &databasemocks.Plugin{}
305310
mdm := &datamocks.Manager{}
306311
mni := &sysmessagingmocks.LocalNodeInfo{}
312+
txHelper := txcommon.NewTransactionHelper(mdi, mdm)
307313
mdi.On("GetMessages", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil, fmt.Errorf("pop"))
308-
bm, _ := NewBatchManager(context.Background(), mni, mdi, mdm)
314+
bm, _ := NewBatchManager(context.Background(), mni, mdi, mdm, txHelper)
309315
defer bm.Close()
310316
ctx, cancel := context.WithCancel(context.Background())
311317
cancel()
@@ -318,7 +324,8 @@ func TestMessageSequencerMissingMessageData(t *testing.T) {
318324
mdi := &databasemocks.Plugin{}
319325
mdm := &datamocks.Manager{}
320326
mni := &sysmessagingmocks.LocalNodeInfo{}
321-
bm, _ := NewBatchManager(context.Background(), mni, mdi, mdm)
327+
txHelper := txcommon.NewTransactionHelper(mdi, mdm)
328+
bm, _ := NewBatchManager(context.Background(), mni, mdi, mdm, txHelper)
322329
bm.RegisterDispatcher("utdispatcher", fftypes.TransactionTypeNone, []fftypes.MessageType{fftypes.MessageTypeBroadcast},
323330
func(c context.Context, state *DispatchState) error {
324331
return nil
@@ -359,9 +366,10 @@ func TestMessageSequencerUpdateMessagesFail(t *testing.T) {
359366
mdi := &databasemocks.Plugin{}
360367
mdm := &datamocks.Manager{}
361368
mni := &sysmessagingmocks.LocalNodeInfo{}
369+
txHelper := txcommon.NewTransactionHelper(mdi, mdm)
362370
mni.On("GetNodeUUID", mock.Anything).Return(fftypes.NewUUID())
363371
ctx, cancelCtx := context.WithCancel(context.Background())
364-
bm, _ := NewBatchManager(ctx, mni, mdi, mdm)
372+
bm, _ := NewBatchManager(ctx, mni, mdi, mdm, txHelper)
365373
bm.RegisterDispatcher("utdispatcher", fftypes.TransactionTypeBatchPin, []fftypes.MessageType{fftypes.MessageTypeBroadcast},
366374
func(c context.Context, state *DispatchState) error {
367375
return nil
@@ -414,8 +422,9 @@ func TestMessageSequencerDispatchFail(t *testing.T) {
414422
mdm := &datamocks.Manager{}
415423
mni := &sysmessagingmocks.LocalNodeInfo{}
416424
mni.On("GetNodeUUID", mock.Anything).Return(fftypes.NewUUID())
425+
txHelper := txcommon.NewTransactionHelper(mdi, mdm)
417426
ctx, cancelCtx := context.WithCancel(context.Background())
418-
bm, _ := NewBatchManager(ctx, mni, mdi, mdm)
427+
bm, _ := NewBatchManager(ctx, mni, mdi, mdm, txHelper)
419428
bm.RegisterDispatcher("utdispatcher", fftypes.TransactionTypeBatchPin, []fftypes.MessageType{fftypes.MessageTypeBroadcast},
420429
func(c context.Context, state *DispatchState) error {
421430
cancelCtx()
@@ -454,7 +463,8 @@ func TestMessageSequencerUpdateBatchFail(t *testing.T) {
454463
mni := &sysmessagingmocks.LocalNodeInfo{}
455464
mni.On("GetNodeUUID", mock.Anything).Return(fftypes.NewUUID())
456465
ctx, cancelCtx := context.WithCancel(context.Background())
457-
bm, _ := NewBatchManager(ctx, mni, mdi, mdm)
466+
txHelper := txcommon.NewTransactionHelper(mdi, mdm)
467+
bm, _ := NewBatchManager(ctx, mni, mdi, mdm, txHelper)
458468
bm.RegisterDispatcher("utdispatcher", fftypes.TransactionTypeBatchPin, []fftypes.MessageType{fftypes.MessageTypeBroadcast},
459469
func(c context.Context, state *DispatchState) error {
460470
return nil
@@ -504,7 +514,8 @@ func TestWaitForPollTimeout(t *testing.T) {
504514
mdi := &databasemocks.Plugin{}
505515
mdm := &datamocks.Manager{}
506516
mni := &sysmessagingmocks.LocalNodeInfo{}
507-
bm, _ := NewBatchManager(context.Background(), mni, mdi, mdm)
517+
txHelper := txcommon.NewTransactionHelper(mdi, mdm)
518+
bm, _ := NewBatchManager(context.Background(), mni, mdi, mdm, txHelper)
508519
bm.(*batchManager).messagePollTimeout = 1 * time.Microsecond
509520
bm.(*batchManager).waitForNewMessages()
510521
}
@@ -513,7 +524,8 @@ func TestWaitForNewMessage(t *testing.T) {
513524
mdi := &databasemocks.Plugin{}
514525
mdm := &datamocks.Manager{}
515526
mni := &sysmessagingmocks.LocalNodeInfo{}
516-
bmi, _ := NewBatchManager(context.Background(), mni, mdi, mdm)
527+
txHelper := txcommon.NewTransactionHelper(mdi, mdm)
528+
bmi, _ := NewBatchManager(context.Background(), mni, mdi, mdm, txHelper)
517529
bm := bmi.(*batchManager)
518530
bm.readOffset = 22222
519531
bm.NewMessages() <- 12345
@@ -525,7 +537,8 @@ func TestAssembleMessageDataNilData(t *testing.T) {
525537
mdi := &databasemocks.Plugin{}
526538
mdm := &datamocks.Manager{}
527539
mni := &sysmessagingmocks.LocalNodeInfo{}
528-
bm, _ := NewBatchManager(context.Background(), mni, mdi, mdm)
540+
txHelper := txcommon.NewTransactionHelper(mdi, mdm)
541+
bm, _ := NewBatchManager(context.Background(), mni, mdi, mdm, txHelper)
529542
bm.Close()
530543
mdm.On("GetMessageDataCached", mock.Anything, mock.Anything).Return(nil, false, nil)
531544
_, err := bm.(*batchManager).assembleMessageData(fftypes.BatchTypePrivate, &fftypes.Message{
@@ -541,7 +554,8 @@ func TestGetMessageDataFail(t *testing.T) {
541554
mdi := &databasemocks.Plugin{}
542555
mdm := &datamocks.Manager{}
543556
mni := &sysmessagingmocks.LocalNodeInfo{}
544-
bm, _ := NewBatchManager(context.Background(), mni, mdi, mdm)
557+
txHelper := txcommon.NewTransactionHelper(mdi, mdm)
558+
bm, _ := NewBatchManager(context.Background(), mni, mdi, mdm, txHelper)
545559
mdm.On("GetMessageDataCached", mock.Anything, mock.Anything).Return(nil, false, fmt.Errorf("pop"))
546560
bm.Close()
547561
_, _ = bm.(*batchManager).assembleMessageData(fftypes.BatchTypePrivate, &fftypes.Message{
@@ -559,7 +573,8 @@ func TestGetMessageNotFound(t *testing.T) {
559573
mdi := &databasemocks.Plugin{}
560574
mdm := &datamocks.Manager{}
561575
mni := &sysmessagingmocks.LocalNodeInfo{}
562-
bm, _ := NewBatchManager(context.Background(), mni, mdi, mdm)
576+
txHelper := txcommon.NewTransactionHelper(mdi, mdm)
577+
bm, _ := NewBatchManager(context.Background(), mni, mdi, mdm, txHelper)
563578
mdm.On("GetMessageDataCached", mock.Anything, mock.Anything, data.CRORequirePublicBlobRefs).Return(nil, false, nil)
564579
bm.Close()
565580
_, err := bm.(*batchManager).assembleMessageData(fftypes.BatchTypeBroadcast, &fftypes.Message{

internal/batch/batch_processor.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ type DispatchState struct {
101101

102102
const batchSizeEstimateBase = int64(512)
103103

104-
func newBatchProcessor(ctx context.Context, ni sysmessaging.LocalNodeInfo, di database.Plugin, dm data.Manager, conf *batchProcessorConf, baseRetryConf *retry.Retry) *batchProcessor {
104+
func newBatchProcessor(ctx context.Context, ni sysmessaging.LocalNodeInfo, di database.Plugin, dm data.Manager, conf *batchProcessorConf, baseRetryConf *retry.Retry, txHelper txcommon.Helper) *batchProcessor {
105105
pCtx := log.WithLogField(log.WithLogField(ctx, "d", conf.dispatcherName), "p", conf.name)
106106
pCtx, cancelCtx := context.WithCancel(pCtx)
107107
bp := &batchProcessor{
@@ -110,7 +110,7 @@ func newBatchProcessor(ctx context.Context, ni sysmessaging.LocalNodeInfo, di da
110110
ni: ni,
111111
database: di,
112112
data: dm,
113-
txHelper: txcommon.NewTransactionHelper(di),
113+
txHelper: txHelper,
114114
newWork: make(chan *batchWork, conf.BatchMaxSize),
115115
quescing: make(chan bool, 1),
116116
done: make(chan struct{}),

internal/batch/batch_processor_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"github.com/hyperledger/firefly/internal/config"
2424
"github.com/hyperledger/firefly/internal/log"
2525
"github.com/hyperledger/firefly/internal/retry"
26+
"github.com/hyperledger/firefly/internal/txcommon"
2627
"github.com/hyperledger/firefly/mocks/databasemocks"
2728
"github.com/hyperledger/firefly/mocks/datamocks"
2829
"github.com/hyperledger/firefly/mocks/sysmessagingmocks"
@@ -36,6 +37,7 @@ func newTestBatchProcessor(dispatch DispatchHandler) (*databasemocks.Plugin, *ba
3637
mdi := &databasemocks.Plugin{}
3738
mni := &sysmessagingmocks.LocalNodeInfo{}
3839
mdm := &datamocks.Manager{}
40+
txHelper := txcommon.NewTransactionHelper(mdi, mdm)
3941
mni.On("GetNodeUUID", mock.Anything).Return(fftypes.NewUUID()).Maybe()
4042
bp := newBatchProcessor(context.Background(), mni, mdi, mdm, &batchProcessorConf{
4143
namespace: "ns1",
@@ -51,7 +53,7 @@ func newTestBatchProcessor(dispatch DispatchHandler) (*databasemocks.Plugin, *ba
5153
}, &retry.Retry{
5254
InitialDelay: 1 * time.Microsecond,
5355
MaximumDelay: 1 * time.Microsecond,
54-
})
56+
}, txHelper)
5557
bp.txHelper = &txcommonmocks.Helper{}
5658
return mdi, bp
5759
}

internal/config/config.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,10 @@ var (
230230
SubscriptionsRetryMaxDelay = rootKey("subscription.retry.maxDelay")
231231
// SubscriptionsRetryFactor the backoff factor to use for retry of database operations
232232
SubscriptionsRetryFactor = rootKey("subscription.retry.factor")
233+
// TransactionCacheSize
234+
TransactionCacheSize = rootKey("transaction.cache.size")
235+
// TransactionCacheTTL
236+
TransactionCacheTTL = rootKey("transaction.cache.ttl")
233237
// AssetManagerKeyNormalization mechanism to normalize keys before using them. Valid options: "blockchain_plugin" - use blockchain plugin (default), "none" - do not attempt normalization
234238
AssetManagerKeyNormalization = rootKey("asset.manager.keyNormalization")
235239
// UIEnabled set to false to disable the UI (default is true, so UI will be enabled if ui.path is valid)
@@ -365,6 +369,8 @@ func Reset() {
365369
viper.SetDefault(string(SubscriptionsRetryInitialDelay), "250ms")
366370
viper.SetDefault(string(SubscriptionsRetryMaxDelay), "30s")
367371
viper.SetDefault(string(SubscriptionsRetryFactor), 2.0)
372+
viper.SetDefault(string(TransactionCacheSize), "1Mb")
373+
viper.SetDefault(string(TransactionCacheTTL), "5m")
368374
viper.SetDefault(string(UIEnabled), true)
369375
viper.SetDefault(string(ValidatorCacheSize), "1Mb")
370376
viper.SetDefault(string(ValidatorCacheTTL), "1h")

0 commit comments

Comments
 (0)