From 8626d37eb40a8454fa2019a6267da1faeb7e4b7f Mon Sep 17 00:00:00 2001 From: Peter Broadhurst Date: Fri, 25 Mar 2022 15:25:25 -0400 Subject: [PATCH 1/6] Update batch manager dipatch to track inflight Signed-off-by: Peter Broadhurst --- internal/batch/batch_manager.go | 106 ++++++++++++++++++++----- internal/batch/batch_manager_test.go | 7 +- internal/batch/batch_processor.go | 72 ++++++----------- internal/batch/batch_processor_test.go | 91 +++++++-------------- internal/config/config.go | 3 + 5 files changed, 144 insertions(+), 135 deletions(-) diff --git a/internal/batch/batch_manager.go b/internal/batch/batch_manager.go index a1dd9d37f9..201d17fdc1 100644 --- a/internal/batch/batch_manager.go +++ b/internal/batch/batch_manager.go @@ -48,11 +48,13 @@ func NewBatchManager(ctx context.Context, ni sysmessaging.LocalNodeInfo, di data txHelper: txHelper, readOffset: -1, // On restart we trawl for all ready messages readPageSize: uint64(readPageSize), + minimumPollDelay: config.GetDuration(config.BatchManagerMinimumPollDelay), messagePollTimeout: config.GetDuration(config.BatchManagerReadPollTimeout), startupOffsetRetryAttempts: config.GetInt(config.OrchestratorStartupAttempts), dispatcherMap: make(map[string]*dispatcher), allDispatchers: make([]*dispatcher, 0), newMessages: make(chan int64, readPageSize), + inflightSequences: make(map[int64]*batchProcessor), shoulderTap: make(chan bool, 1), rewindOffset: -1, done: make(chan struct{}), @@ -100,8 +102,12 @@ type batchManager struct { readOffset int64 rewindOffsetMux sync.Mutex rewindOffset int64 + inflightMux sync.Mutex + inflightSequences map[int64]*batchProcessor + inflightFlushed []int64 shoulderTap chan bool readPageSize uint64 + minimumPollDelay time.Duration messagePollTimeout time.Duration startupOffsetRetryAttempts int } @@ -171,10 +177,7 @@ func (bm *batchManager) getProcessor(txType fftypes.TransactionType, msgType fft processor, ok := dispatcher.processors[name] if !ok { processor = newBatchProcessor( - bm.ctx, // Background context, not the call context - bm.ni, - bm.database, - bm.data, + bm, &batchProcessorConf{ DispatcherOptions: dispatcher.options, name: name, @@ -210,17 +213,56 @@ func (bm *batchManager) assembleMessageData(id *fftypes.UUID) (msg *fftypes.Mess return msg, retData, nil } -func (bm *batchManager) readPage() ([]*fftypes.IDAndSequence, error) { - - // Pop out a rewind offset if there is one and it's behind the cursor +// popRewind is called just before reading a page, to po out a rewind offset if there is one and it's behind the cursor +func (bm *batchManager) popRewind() { bm.rewindOffsetMux.Lock() - rewindOffset := bm.rewindOffset - if rewindOffset >= 0 && rewindOffset < bm.readOffset { - bm.readOffset = rewindOffset + if bm.rewindOffset >= 0 && bm.rewindOffset < bm.readOffset { + bm.readOffset = bm.rewindOffset } bm.rewindOffset = -1 bm.rewindOffsetMux.Unlock() +} + +// filterFlushed is called after we read a page, to remove in-flight IDs, and clean up our flush map +func (bm *batchManager) filterFlushed(entries []*fftypes.IDAndSequence) []*fftypes.IDAndSequence { + bm.inflightMux.Lock() + + // Remove inflight entries + unflushedEntries := make([]*fftypes.IDAndSequence, 0, len(entries)) + for _, entry := range entries { + if _, inflight := bm.inflightSequences[entry.Sequence]; !inflight { + unflushedEntries = append(unflushedEntries, entry) + } + } + + // Drain the list of recently flushed entries that processors have notified us about + for _, seq := range bm.inflightFlushed { + delete(bm.inflightSequences, seq) + } + bm.inflightFlushed = bm.inflightFlushed[:0] + + bm.inflightMux.Unlock() + + return unflushedEntries +} + +// nofifyFlushed is called by a processor, when it's finished updating the database to record a set +// of messages as sent. So it's safe to remove these sequences from the inflight map on the next +// page read. +func (bm *batchManager) notifyFlushed(sequences []int64) { + bm.inflightMux.Lock() + bm.inflightFlushed = append(bm.inflightFlushed, sequences...) + bm.inflightMux.Unlock() +} +func (bm *batchManager) readPage(lastPageFull bool) ([]*fftypes.IDAndSequence, bool, error) { + + // Pop out any rewind that has been queued, but each time we read to the front before we rewind + if !lastPageFull { + bm.popRewind() + } + + // Read a page from the DB var ids []*fftypes.IDAndSequence err := bm.retry.Do(bm.ctx, "retrieve messages", func(attempt int) (retry bool, err error) { fb := database.MessageQueryFactory.NewFilterLimit(bm.ctx, bm.readPageSize) @@ -230,7 +272,16 @@ func (bm *batchManager) readPage() ([]*fftypes.IDAndSequence, error) { ).Sort("sequence").Limit(bm.readPageSize)) return true, err }) - return ids, err + + // Calculate if this was a full page we read (so should immediately re-poll) before we remove flushed IDs + pageReadLength := len(ids) + fullPage := (pageReadLength == int(bm.readPageSize)) + + // Remove any flushed IDs from the list, and then update our flushed map + ids = bm.filterFlushed(ids) + + log.L(bm.ctx).Debugf("Read %d records from offset %d. filtered=%d fullPage=%t", pageReadLength, bm.readOffset, len(ids), fullPage) + return ids, fullPage, err } func (bm *batchManager) messageSequencer() { @@ -238,17 +289,17 @@ func (bm *batchManager) messageSequencer() { l.Debugf("Started batch assembly message sequencer") defer close(bm.done) + lastPageFull := false for { // Each time round the loop we check for quiescing processors bm.reapQuiescing() // Read messages from the DB - in an error condition we retry until success, or a closed context - entries, err := bm.readPage() + entries, fullPage, err := bm.readPage(lastPageFull) if err != nil { l.Debugf("Exiting: %s", err) return } - batchWasFull := (uint64(len(entries)) == bm.readPageSize) if len(entries) > 0 { for _, entry := range entries { @@ -276,28 +327,34 @@ func (bm *batchManager) messageSequencer() { } // Wait to be woken again - if !batchWasFull { + if !fullPage { if done := bm.waitForNewMessages(); done { l.Debugf("Exiting: %s", err) return } } + lastPageFull = fullPage } } func (bm *batchManager) newMessageNotification(seq int64) { - // Determine if we need to queue q rewind + rewindToQueue := int64(-1) + + // Determine if we need to queue a rewind bm.rewindOffsetMux.Lock() lastSequenceBeforeMsg := seq - 1 if bm.rewindOffset == -1 || lastSequenceBeforeMsg < bm.rewindOffset { + rewindToQueue = lastSequenceBeforeMsg bm.rewindOffset = lastSequenceBeforeMsg } bm.rewindOffsetMux.Unlock() - // Shoulder tap that there is a new message, regardless of whether we rewound - // the cursor. As we need to wake up the poll. - select { - case bm.shoulderTap <- true: - default: + + if rewindToQueue >= 0 { + log.L(bm.ctx).Debugf("Notifying batch manager of rewind to %d", rewindToQueue) + select { + case bm.shoulderTap <- true: + default: + } } } @@ -317,7 +374,10 @@ func (bm *batchManager) newMessageNotifier() { func (bm *batchManager) waitForNewMessages() (done bool) { l := log.L(bm.ctx) - timeout := time.NewTimer(bm.messagePollTimeout) + // We have a short minimum timeout, to stop us thrashing the DB + time.Sleep(bm.minimumPollDelay) + + timeout := time.NewTimer(bm.messagePollTimeout - bm.minimumPollDelay) select { case <-bm.shoulderTap: timeout.Stop() @@ -339,6 +399,10 @@ func (bm *batchManager) dispatchMessage(processor *batchProcessor, msg *fftypes. data: data, } processor.newWork <- work + + bm.inflightMux.Lock() + bm.inflightSequences[msg.Sequence] = processor + bm.inflightMux.Unlock() } func (bm *batchManager) reapQuiescing() { diff --git a/internal/batch/batch_manager_test.go b/internal/batch/batch_manager_test.go index 5eccb47cbe..c44e1d4c9c 100644 --- a/internal/batch/batch_manager_test.go +++ b/internal/batch/batch_manager_test.go @@ -36,6 +36,7 @@ import ( func testConfigReset() { config.Reset() + config.Set(config.BatchManagerMinimumPollDelay, "0") log.SetLevel("debug") } @@ -87,6 +88,7 @@ func TestE2EDispatchBroadcast(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) bmi, _ := NewBatchManager(ctx, mni, mdi, mdm, txHelper) bm := bmi.(*batchManager) + bm.readOffset = 1000 bm.RegisterDispatcher("utdispatcher", fftypes.TransactionTypeBatchPin, []fftypes.MessageType{fftypes.MessageTypeBroadcast}, handler, DispatcherOptions{ BatchMaxSize: 2, @@ -108,6 +110,7 @@ func TestE2EDispatchBroadcast(t *testing.T) { Data: fftypes.DataRefs{ {ID: dataID1, Hash: dataHash}, }, + Sequence: 500, } data := &fftypes.Data{ ID: dataID1, @@ -155,7 +158,7 @@ func TestE2EDispatchBroadcast(t *testing.T) { // Wait for the reaping for len(bm.getProcessors()) > 0 { time.Sleep(1 * time.Millisecond) - bm.NewMessages() <- msg.Sequence + bm.shoulderTap <- true } cancel() @@ -553,7 +556,7 @@ func TestRewindForNewMessage(t *testing.T) { assert.Equal(t, int64(12344), v) return true })).Return(nil, nil) - _, err := bm.readPage() + _, _, err := bm.readPage(false) assert.NoError(t, err) } diff --git a/internal/batch/batch_processor.go b/internal/batch/batch_processor.go index 7d1bdedfec..ade9dc8f71 100644 --- a/internal/batch/batch_processor.go +++ b/internal/batch/batch_processor.go @@ -75,6 +75,7 @@ type FlushStatus struct { type batchProcessor struct { ctx context.Context + bm *batchManager ni sysmessaging.LocalNodeInfo data data.Manager database database.Plugin @@ -86,7 +87,6 @@ type batchProcessor struct { assemblyID *fftypes.UUID assemblyQueue []*batchWork assemblyQueueBytes int64 - flushedSequences []int64 statusMux sync.Mutex flushStatus FlushStatus retry *retry.Retry @@ -102,15 +102,16 @@ type DispatchState struct { const batchSizeEstimateBase = int64(512) -func newBatchProcessor(ctx context.Context, ni sysmessaging.LocalNodeInfo, di database.Plugin, dm data.Manager, conf *batchProcessorConf, baseRetryConf *retry.Retry, txHelper txcommon.Helper) *batchProcessor { - pCtx := log.WithLogField(log.WithLogField(ctx, "d", conf.dispatcherName), "p", conf.name) +func newBatchProcessor(bm *batchManager, conf *batchProcessorConf, baseRetryConf *retry.Retry, txHelper txcommon.Helper) *batchProcessor { + pCtx := log.WithLogField(log.WithLogField(bm.ctx, "d", conf.dispatcherName), "p", conf.name) pCtx, cancelCtx := context.WithCancel(pCtx) bp := &batchProcessor{ ctx: pCtx, cancelCtx: cancelCtx, - ni: ni, - database: di, - data: dm, + bm: bm, + ni: bm.ni, + database: bm.database, + data: bm.data, txHelper: txHelper, newWork: make(chan *batchWork, conf.BatchMaxSize), quescing: make(chan bool, 1), @@ -120,8 +121,7 @@ func newBatchProcessor(ctx context.Context, ni sysmessaging.LocalNodeInfo, di da MaximumDelay: baseRetryConf.MaximumDelay, Factor: baseRetryConf.Factor, }, - conf: conf, - flushedSequences: []int64{}, + conf: conf, flushStatus: FlushStatus{ LastFlushTime: fftypes.Now(), }, @@ -166,19 +166,8 @@ func (bp *batchProcessor) newAssembly(initalWork ...*batchWork) { func (bp *batchProcessor) addWork(newWork *batchWork) (full, overflow bool) { newQueue := make([]*batchWork, 0, len(bp.assemblyQueue)+1) added := false - // Check it's not in the recently flushed list - for _, flushedSequence := range bp.flushedSequences { - if newWork.msg.Sequence == flushedSequence { - log.L(bp.ctx).Debugf("Ignoring add of recently flushed message %s sequence=%d to in-flight batch assembly %s", newWork.msg.Header.ID, newWork.msg.Sequence, bp.assemblyID) - return false, false - } - } - // Build the new sorted work list, checking there for duplicates too + // Build the new sorted work list for _, work := range bp.assemblyQueue { - if newWork.msg.Sequence == work.msg.Sequence { - log.L(bp.ctx).Debugf("Ignoring duplicate add of message %s sequence=%d to in-flight batch assembly %s", newWork.msg.Header.ID, newWork.msg.Sequence, bp.assemblyID) - return false, false - } if !added && newWork.msg.Sequence < work.msg.Sequence { newQueue = append(newQueue, newWork) added = true @@ -196,32 +185,6 @@ func (bp *batchProcessor) addWork(newWork *batchWork) (full, overflow bool) { return full, overflow } -func (bp *batchProcessor) addFlushedSequences(flushAssembly []*batchWork) { - // We need to keep track of the sequences we're flushing, because until we finish our flush - // the batch processor might be re-queuing the same messages to us due to rewinds. - - // We keep twice the batch size, which might be made up of multiple batches - maxFlushedSeqLen := int(2 * bp.conf.BatchMaxSize) - - // We keep as much of the END of the existing set as we can, and shift it forwards. - // Then we add the whole of the current flush set after that - combinedLen := len(flushAssembly) + len(bp.flushedSequences) - newLength := combinedLen - if combinedLen > maxFlushedSeqLen { - newLength = maxFlushedSeqLen - } - dropLength := combinedLen - newLength - retainLength := len(bp.flushedSequences) - dropLength - newFlushedSequences := make([]int64, newLength) - for i := 0; i < retainLength; i++ { - newFlushedSequences[i] = bp.flushedSequences[dropLength+i] - } - for i := 0; i < len(flushAssembly); i++ { - newFlushedSequences[retainLength+i] = flushAssembly[i].msg.Sequence - } - bp.flushedSequences = newFlushedSequences -} - func (bp *batchProcessor) startFlush(overflow bool) (id *fftypes.UUID, flushAssembly []*batchWork, byteSize int64) { bp.statusMux.Lock() defer bp.statusMux.Unlock() @@ -237,7 +200,6 @@ func (bp *batchProcessor) startFlush(overflow bool) (id *fftypes.UUID, flushAsse } else { flushAssembly = bp.assemblyQueue } - bp.addFlushedSequences(flushAssembly) // Cycle to the next assembly id = bp.assemblyID byteSize = bp.assemblyQueueBytes @@ -246,7 +208,15 @@ func (bp *batchProcessor) startFlush(overflow bool) (id *fftypes.UUID, flushAsse return id, flushAssembly, byteSize } -func (bp *batchProcessor) endFlush(state *DispatchState, byteSize int64) { +func (bp *batchProcessor) notifyFlushComplete(flushWork []*batchWork) { + sequences := make([]int64, len(flushWork)) + for i, work := range flushWork { + sequences[i] = work.msg.Sequence + } + bp.bm.notifyFlushed(sequences) +} + +func (bp *batchProcessor) updateFlushStats(state *DispatchState, byteSize int64) { bp.statusMux.Lock() defer bp.statusMux.Unlock() fs := &bp.flushStatus @@ -387,7 +357,11 @@ func (bp *batchProcessor) flush(overflow bool) error { } log.L(bp.ctx).Debugf("Finalized batch %s", id) - bp.endFlush(state, byteSize) + // Notify the manager that we've flushed these sequences + bp.notifyFlushComplete(flushWork) + + // Update our stats + bp.updateFlushStats(state, byteSize) return nil } diff --git a/internal/batch/batch_processor_test.go b/internal/batch/batch_processor_test.go index c5111a4e40..04511d0771 100644 --- a/internal/batch/batch_processor_test.go +++ b/internal/batch/batch_processor_test.go @@ -34,13 +34,14 @@ import ( "github.com/stretchr/testify/mock" ) -func newTestBatchProcessor(dispatch DispatchHandler) (*databasemocks.Plugin, *batchProcessor) { - mdi := &databasemocks.Plugin{} - mni := &sysmessagingmocks.LocalNodeInfo{} - mdm := &datamocks.Manager{} +func newTestBatchProcessor(t *testing.T, dispatch DispatchHandler) (func(), *databasemocks.Plugin, *batchProcessor) { + bm, cancel := newTestBatchManager(t) + mdi := bm.database.(*databasemocks.Plugin) + mni := bm.ni.(*sysmessagingmocks.LocalNodeInfo) + mdm := bm.data.(*datamocks.Manager) txHelper := txcommon.NewTransactionHelper(mdi, mdm) mni.On("GetNodeUUID", mock.Anything).Return(fftypes.NewUUID()).Maybe() - bp := newBatchProcessor(context.Background(), mni, mdi, mdm, &batchProcessorConf{ + bp := newBatchProcessor(bm, &batchProcessorConf{ namespace: "ns1", txType: fftypes.TransactionTypeBatchPin, signer: fftypes.SignerRef{Author: "did:firefly:org/abcd", Key: "0x12345"}, @@ -56,7 +57,7 @@ func newTestBatchProcessor(dispatch DispatchHandler) (*databasemocks.Plugin, *ba MaximumDelay: 1 * time.Microsecond, }, txHelper) bp.txHelper = &txcommonmocks.Helper{} - return mdi, bp + return cancel, mdi, bp } func mockRunAsGroupPassthrough(mdi *databasemocks.Plugin) { @@ -72,10 +73,11 @@ func TestUnfilledBatch(t *testing.T) { config.Reset() dispatched := make(chan *DispatchState) - mdi, bp := newTestBatchProcessor(func(c context.Context, state *DispatchState) error { + cancel, mdi, bp := newTestBatchProcessor(t, func(c context.Context, state *DispatchState) error { dispatched <- state return nil }) + defer cancel() mockRunAsGroupPassthrough(mdi) mdi.On("UpdateMessages", mock.Anything, mock.Anything, mock.Anything).Return(nil) @@ -116,10 +118,11 @@ func TestBatchSizeOverflow(t *testing.T) { config.Reset() dispatched := make(chan *DispatchState) - mdi, bp := newTestBatchProcessor(func(c context.Context, state *DispatchState) error { + cancel, mdi, bp := newTestBatchProcessor(t, func(c context.Context, state *DispatchState) error { dispatched <- state return nil }) + defer cancel() bp.conf.BatchMaxBytes = batchSizeEstimateBase + (&fftypes.Message{}).EstimateSize(false) + 100 mockRunAsGroupPassthrough(mdi) mdi.On("UpdateMessages", mock.Anything, mock.Anything, mock.Anything).Return(nil) @@ -160,9 +163,10 @@ func TestBatchSizeOverflow(t *testing.T) { } func TestCloseToUnblockDispatch(t *testing.T) { - _, bp := newTestBatchProcessor(func(c context.Context, state *DispatchState) error { + cancel, _, bp := newTestBatchProcessor(t, func(c context.Context, state *DispatchState) error { return fmt.Errorf("pop") }) + defer cancel() bp.cancelCtx() bp.dispatchBatch(&DispatchState{}) <-bp.done @@ -170,9 +174,10 @@ func TestCloseToUnblockDispatch(t *testing.T) { func TestCloseToUnblockUpsertBatch(t *testing.T) { - mdi, bp := newTestBatchProcessor(func(c context.Context, state *DispatchState) error { + cancel, mdi, bp := newTestBatchProcessor(t, func(c context.Context, state *DispatchState) error { return nil }) + defer cancel() bp.retry.MaximumDelay = 1 * time.Microsecond bp.conf.BatchMaxSize = 1 bp.conf.BatchTimeout = 100 * time.Second @@ -204,9 +209,10 @@ func TestCloseToUnblockUpsertBatch(t *testing.T) { } func TestCalcPinsFail(t *testing.T) { - _, bp := newTestBatchProcessor(func(c context.Context, state *DispatchState) error { + cancel, _, bp := newTestBatchProcessor(t, func(c context.Context, state *DispatchState) error { return nil }) + defer cancel() bp.cancelCtx() mdi := bp.database.(*databasemocks.Plugin) mdi.On("UpsertNonceNext", mock.Anything, mock.Anything).Return(fmt.Errorf("pop")) @@ -233,33 +239,17 @@ func TestCalcPinsFail(t *testing.T) { mdi.AssertExpectations(t) } -func TestAddWorkInRecentlyFlushed(t *testing.T) { - _, bp := newTestBatchProcessor(func(c context.Context, state *DispatchState) error { - return nil - }) - bp.flushedSequences = []int64{100, 500, 400, 900, 200, 700} - _, _ = bp.addWork(&batchWork{ - msg: &fftypes.Message{ - Sequence: 200, - }, - }) - assert.Empty(t, bp.assemblyQueue) - -} - -func TestAddWorkInSortDeDup(t *testing.T) { - _, bp := newTestBatchProcessor(func(c context.Context, state *DispatchState) error { +func TestAddWorkInSort(t *testing.T) { + cancel, _, bp := newTestBatchProcessor(t, func(c context.Context, state *DispatchState) error { return nil }) + defer cancel() bp.assemblyQueue = []*batchWork{ {msg: &fftypes.Message{Sequence: 200}}, {msg: &fftypes.Message{Sequence: 201}}, {msg: &fftypes.Message{Sequence: 202}}, {msg: &fftypes.Message{Sequence: 204}}, } - _, _ = bp.addWork(&batchWork{ - msg: &fftypes.Message{Sequence: 200}, - }) _, _ = bp.addWork(&batchWork{ msg: &fftypes.Message{Sequence: 203}, }) @@ -272,39 +262,11 @@ func TestAddWorkInSortDeDup(t *testing.T) { }, bp.assemblyQueue) } -func TestStartFlushOverflow(t *testing.T) { - _, bp := newTestBatchProcessor(func(c context.Context, state *DispatchState) error { - return nil - }) - batchID := fftypes.NewUUID() - bp.assemblyID = batchID - bp.flushedSequences = []int64{100, 101, 102, 103, 104} - bp.assemblyQueue = []*batchWork{ - {msg: &fftypes.Message{Sequence: 200}}, - {msg: &fftypes.Message{Sequence: 201}}, - {msg: &fftypes.Message{Sequence: 202}}, - {msg: &fftypes.Message{Sequence: 203}}, - } - bp.conf.BatchMaxSize = 3 - - flushBatchID, flushAssembly, _ := bp.startFlush(true) - assert.Equal(t, batchID, flushBatchID) - assert.Equal(t, []int64{102, 103, 104, 200, 201, 202}, bp.flushedSequences) - assert.Equal(t, []*batchWork{ - {msg: &fftypes.Message{Sequence: 200}}, - {msg: &fftypes.Message{Sequence: 201}}, - {msg: &fftypes.Message{Sequence: 202}}, - }, flushAssembly) - assert.Equal(t, []*batchWork{ - {msg: &fftypes.Message{Sequence: 203}}, - }, bp.assemblyQueue) - assert.NotEqual(t, batchID, bp.assemblyID) -} - func TestStartQuiesceNonBlocking(t *testing.T) { - _, bp := newTestBatchProcessor(func(c context.Context, state *DispatchState) error { + cancel, _, bp := newTestBatchProcessor(t, func(c context.Context, state *DispatchState) error { return nil }) + defer cancel() bp.startQuiesce() bp.startQuiesce() // we're just checking this doesn't hang } @@ -314,10 +276,11 @@ func TestMarkMessageDispatchedUnpinnedOK(t *testing.T) { config.Reset() dispatched := make(chan *DispatchState) - mdi, bp := newTestBatchProcessor(func(c context.Context, state *DispatchState) error { + cancel, mdi, bp := newTestBatchProcessor(t, func(c context.Context, state *DispatchState) error { dispatched <- state return nil }) + defer cancel() bp.conf.txType = fftypes.TransactionTypeUnpinned mockRunAsGroupPassthrough(mdi) @@ -361,10 +324,11 @@ func TestMaskContextsDuplicate(t *testing.T) { config.Reset() dispatched := make(chan *DispatchState) - mdi, bp := newTestBatchProcessor(func(c context.Context, state *DispatchState) error { + cancel, mdi, bp := newTestBatchProcessor(t, func(c context.Context, state *DispatchState) error { dispatched <- state return nil }) + defer cancel() mdi.On("UpsertNonceNext", mock.Anything, mock.Anything).Return(nil).Once() mdi.On("UpdateMessage", mock.Anything, mock.Anything, mock.Anything).Return(nil).Once() @@ -398,10 +362,11 @@ func TestMaskContextsUpdataMessageFail(t *testing.T) { config.Reset() dispatched := make(chan *DispatchState) - mdi, bp := newTestBatchProcessor(func(c context.Context, state *DispatchState) error { + cancel, mdi, bp := newTestBatchProcessor(t, func(c context.Context, state *DispatchState) error { dispatched <- state return nil }) + defer cancel() mdi.On("UpsertNonceNext", mock.Anything, mock.Anything).Return(nil).Once() mdi.On("UpdateMessage", mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("pop")).Once() diff --git a/internal/config/config.go b/internal/config/config.go index 3a38724495..67723d3315 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -62,6 +62,8 @@ var ( BatchManagerReadPageSize = rootKey("batch.manager.readPageSize") // BatchManagerReadPollTimeout is how long without any notifications of new messages to wait, before doing a page query BatchManagerReadPollTimeout = rootKey("batch.manager.pollTimeout") + // BatchManagerMinimumPollDelay is the minimum time the batch manager waits between polls on the DB - to prevent thrashing + BatchManagerMinimumPollDelay = rootKey("batch.manager.minimumPollDelay") // BatchRetryFactor is the retry backoff factor for database operations performed by the batch manager BatchRetryFactor = rootKey("batch.retry.factor") // BatchRetryInitDelay is the retry initial delay for database operations @@ -327,6 +329,7 @@ func Reset() { viper.SetDefault(string(BatchCacheTTL), "5m") viper.SetDefault(string(BatchManagerReadPageSize), 100) viper.SetDefault(string(BatchManagerReadPollTimeout), "30s") + viper.SetDefault(string(BatchManagerMinimumPollDelay), "100ms") viper.SetDefault(string(BatchRetryFactor), 2.0) viper.SetDefault(string(BatchRetryFactor), 2.0) viper.SetDefault(string(BatchRetryInitDelay), "250ms") From c510e69edd9916b74f4c18cb596851f13ce98aca Mon Sep 17 00:00:00 2001 From: Peter Broadhurst Date: Sun, 27 Mar 2022 14:55:33 -0400 Subject: [PATCH 2/6] Address intermittent coverage gap Signed-off-by: Peter Broadhurst --- internal/batch/batch_manager_test.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/internal/batch/batch_manager_test.go b/internal/batch/batch_manager_test.go index c44e1d4c9c..59cfc716a8 100644 --- a/internal/batch/batch_manager_test.go +++ b/internal/batch/batch_manager_test.go @@ -596,3 +596,17 @@ func TestGetMessageNotFound(t *testing.T) { _, _, err := bm.(*batchManager).assembleMessageData(fftypes.NewUUID()) assert.Regexp(t, "FF10133", err) } + +func TestDoubleTap(t *testing.T) { + bm, cancel := newTestBatchManager(t) + defer cancel() + bm.readOffset = 3000 + go bm.newMessageNotifier() + + bm.NewMessages() <- 2000 + bm.NewMessages() <- 1000 + + for bm.rewindOffset != int64(999) { + time.Sleep(1 * time.Microsecond) + } +} From 74093fe65965d439e6d7bc5a97c6e6660a3527a9 Mon Sep 17 00:00:00 2001 From: Peter Broadhurst Date: Sun, 27 Mar 2022 14:58:15 -0400 Subject: [PATCH 3/6] Put into map before dispatch Signed-off-by: Peter Broadhurst --- internal/batch/batch_manager.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/internal/batch/batch_manager.go b/internal/batch/batch_manager.go index 201d17fdc1..a6b3d29796 100644 --- a/internal/batch/batch_manager.go +++ b/internal/batch/batch_manager.go @@ -394,15 +394,16 @@ func (bm *batchManager) waitForNewMessages() (done bool) { func (bm *batchManager) dispatchMessage(processor *batchProcessor, msg *fftypes.Message, data fftypes.DataArray) { l := log.L(bm.ctx) l.Debugf("Dispatching message %s (seq=%d) to %s batch processor %s", msg.Header.ID, msg.Sequence, msg.Header.Type, processor.conf.name) + + bm.inflightMux.Lock() + bm.inflightSequences[msg.Sequence] = processor + bm.inflightMux.Unlock() + work := &batchWork{ msg: msg, data: data, } processor.newWork <- work - - bm.inflightMux.Lock() - bm.inflightSequences[msg.Sequence] = processor - bm.inflightMux.Unlock() } func (bm *batchManager) reapQuiescing() { From 75f6911420243dfee71da977236fbae207f0b451 Mon Sep 17 00:00:00 2001 From: Peter Broadhurst Date: Sun, 27 Mar 2022 16:03:15 -0400 Subject: [PATCH 4/6] Remove disabling of send on public ref set, and add more debug Signed-off-by: Peter Broadhurst --- internal/i18n/en_translations.go | 1 + internal/privatemessaging/privatemessaging.go | 8 ++++++-- internal/privatemessaging/privatemessaging_test.go | 11 +++++++++++ 3 files changed, 18 insertions(+), 2 deletions(-) diff --git a/internal/i18n/en_translations.go b/internal/i18n/en_translations.go index 7d609b767a..c2342d9519 100644 --- a/internal/i18n/en_translations.go +++ b/internal/i18n/en_translations.go @@ -294,4 +294,5 @@ var ( MsgDownloadSharedFailed = ffm("FF10376", "Error downloading data with reference '%s' from shared storage") MsgDownloadBatchMaxBytes = ffm("FF10377", "Error downloading batch with reference '%s' from shared storage - maximum size limit reached") MsgOperationDataIncorrect = ffm("FF10378", "Operation data type incorrect: %T", 400) + MsgDataMissingBlobHash = ffm("FF10379", "Blob for data %s cannot be transferred as it is missing a hash", 500) ) diff --git a/internal/privatemessaging/privatemessaging.go b/internal/privatemessaging/privatemessaging.go index 14475def33..3c020f3cd0 100644 --- a/internal/privatemessaging/privatemessaging.go +++ b/internal/privatemessaging/privatemessaging.go @@ -193,8 +193,11 @@ func (pm *privateMessaging) dispatchBatchCommon(ctx context.Context, state *batc func (pm *privateMessaging) transferBlobs(ctx context.Context, data fftypes.DataArray, txid *fftypes.UUID, node *fftypes.Identity) error { // Send all the blobs associated with this batch for _, d := range data { - // We only need to send a blob if there is one, and it's not been uploaded to the shared storage - if d.Blob != nil && d.Blob.Hash != nil && d.Blob.Public == "" { + if d.Blob != nil { + if d.Blob.Hash == nil { + return i18n.NewError(ctx, i18n.MsgDataMissingBlobHash, d.ID) + } + blob, err := pm.database.GetBlobMatchingHash(ctx, d.Blob.Hash) if err != nil { return err @@ -203,6 +206,7 @@ func (pm *privateMessaging) transferBlobs(ctx context.Context, data fftypes.Data return i18n.NewError(ctx, i18n.MsgBlobNotFound, d.Blob) } + log.L(ctx).Debugf("Transferring blob %s for data %s", d.Blob.Hash, d.ID) op := fftypes.NewOperation( pm.exchange, d.Namespace, diff --git a/internal/privatemessaging/privatemessaging_test.go b/internal/privatemessaging/privatemessaging_test.go index 6516ba0783..dcc5ab7ee0 100644 --- a/internal/privatemessaging/privatemessaging_test.go +++ b/internal/privatemessaging/privatemessaging_test.go @@ -478,6 +478,17 @@ func TestWriteTransactionSubmitBatchPinFail(t *testing.T) { mom.AssertExpectations(t) } +func TestTransferBlobsNoHash(t *testing.T) { + pm, cancel := newTestPrivateMessaging(t) + defer cancel() + + err := pm.transferBlobs(pm.ctx, fftypes.DataArray{ + {ID: fftypes.NewUUID(), Hash: fftypes.NewRandB32(), Blob: &fftypes.BlobRef{}}, + }, fftypes.NewUUID(), newTestNode("node1", newTestOrg("org1"))) + assert.Regexp(t, "FF10379", err) + +} + func TestTransferBlobsNotFound(t *testing.T) { pm, cancel := newTestPrivateMessaging(t) defer cancel() From e0cec91f4a75f71dfaae2e111acb477e8d90677c Mon Sep 17 00:00:00 2001 From: Peter Broadhurst Date: Sun, 27 Mar 2022 16:18:15 -0400 Subject: [PATCH 5/6] Invalid early return when dispatching multiple blobs in a single batch Signed-off-by: Peter Broadhurst --- internal/privatemessaging/privatemessaging.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/internal/privatemessaging/privatemessaging.go b/internal/privatemessaging/privatemessaging.go index 3c020f3cd0..891d5925b4 100644 --- a/internal/privatemessaging/privatemessaging.go +++ b/internal/privatemessaging/privatemessaging.go @@ -206,18 +206,20 @@ func (pm *privateMessaging) transferBlobs(ctx context.Context, data fftypes.Data return i18n.NewError(ctx, i18n.MsgBlobNotFound, d.Blob) } - log.L(ctx).Debugf("Transferring blob %s for data %s", d.Blob.Hash, d.ID) op := fftypes.NewOperation( pm.exchange, d.Namespace, txid, fftypes.OpTypeDataExchangeSendBlob) addTransferBlobInputs(op, node.ID, blob.Hash) + log.L(ctx).Debugf("Transferring blob %s for data %s in operation %s", d.Blob.Hash, d.ID, op.ID) if err = pm.operations.AddOrReuseOperation(ctx, op); err != nil { return err } - return pm.operations.RunOperation(ctx, opSendBlob(op, node, blob)) + if err = pm.operations.RunOperation(ctx, opSendBlob(op, node, blob)); err != nil { + return err + } } } return nil From 4062d62c445e0286d520fe3bfa17b0158bd27131 Mon Sep 17 00:00:00 2001 From: Peter Broadhurst Date: Sun, 27 Mar 2022 18:51:33 -0400 Subject: [PATCH 6/6] Fix typo Signed-off-by: Peter Broadhurst --- internal/batch/batch_manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/batch/batch_manager.go b/internal/batch/batch_manager.go index a6b3d29796..3350a1539f 100644 --- a/internal/batch/batch_manager.go +++ b/internal/batch/batch_manager.go @@ -213,7 +213,7 @@ func (bm *batchManager) assembleMessageData(id *fftypes.UUID) (msg *fftypes.Mess return msg, retData, nil } -// popRewind is called just before reading a page, to po out a rewind offset if there is one and it's behind the cursor +// popRewind is called just before reading a page, to pop out a rewind offset if there is one and it's behind the cursor func (bm *batchManager) popRewind() { bm.rewindOffsetMux.Lock() if bm.rewindOffset >= 0 && bm.rewindOffset < bm.readOffset {