From 8f6b13c4ced80f3a691cf84fb034e8220fcd9007 Mon Sep 17 00:00:00 2001 From: Peter Broadhurst Date: Tue, 8 Feb 2022 12:11:42 -0500 Subject: [PATCH 1/2] Rewind when messages appear behind offset Signed-off-by: Peter Broadhurst --- internal/batch/batch_manager.go | 27 ++++++++++++++++ internal/batch/batch_manager_test.go | 48 ++++++++++++++++++++++++++++ 2 files changed, 75 insertions(+) diff --git a/internal/batch/batch_manager.go b/internal/batch/batch_manager.go index 1b3b44df3c..56292ec95f 100644 --- a/internal/batch/batch_manager.go +++ b/internal/batch/batch_manager.go @@ -86,6 +86,8 @@ type batchManager struct { readPageSize uint64 messagePollTimeout time.Duration startupOffsetRetryAttempts int + rewindMux sync.Mutex + rewindTo int64 } type DispatchHandler func(context.Context, *fftypes.Batch, []*fftypes.Bytes32) error @@ -116,6 +118,8 @@ func (bm *batchManager) RegisterDispatcher(msgTypes []fftypes.MessageType, handl } func (bm *batchManager) Start() error { + bm.markRewind(-1) + if err := bm.restoreOffset(); err != nil { return err } @@ -219,7 +223,29 @@ func (bm *batchManager) assembleMessageData(msg *fftypes.Message) (data []*fftyp return data, nil } +func (bm *batchManager) markRewind(rewindTo int64) { + bm.rewindMux.Lock() + bm.rewindTo = rewindTo + bm.rewindMux.Unlock() +} + +func (bm *batchManager) popRewind() int64 { + bm.rewindMux.Lock() + rewindTo := bm.rewindTo + bm.rewindTo = -1 + bm.rewindMux.Unlock() + return rewindTo +} + func (bm *batchManager) readPage() ([]*fftypes.Message, error) { + + rewindTo := bm.popRewind() + if rewindTo >= 0 && rewindTo < bm.offset { + if err := bm.updateOffset(true, rewindTo); err != nil { + return nil, err + } + } + var msgs []*fftypes.Message err := bm.retry.Do(bm.ctx, "retrieve messages", func(attempt int) (retry bool, err error) { fb := database.MessageQueryFactory.NewFilterLimit(bm.ctx, bm.readPageSize) @@ -305,6 +331,7 @@ func (bm *batchManager) newEventNotifications() { return } l.Debugf("New message sequence notification: %d", m) + bm.markRewind(m) case <-bm.ctx.Done(): l.Debugf("Exiting due to cancelled context") return diff --git a/internal/batch/batch_manager_test.go b/internal/batch/batch_manager_test.go index 09549721ba..b46dc2abaa 100644 --- a/internal/batch/batch_manager_test.go +++ b/internal/batch/batch_manager_test.go @@ -510,9 +510,57 @@ func TestWaitConsumesMessagesAndDoesNotBlock(t *testing.T) { } // And should generate a shoulder tap <-bm.(*batchManager).shoulderTap + // And a rewind + assert.Equal(t, int64(12345), bm.(*batchManager).popRewind()) bm.Close() } +func TestReadPageWithRewindSuccess(t *testing.T) { + config.Reset() + mdi := &databasemocks.Plugin{} + mdm := &datamocks.Manager{} + mni := &sysmessagingmocks.LocalNodeInfo{} + + msg := &fftypes.Message{Header: fftypes.MessageHeader{ID: fftypes.NewUUID()}} + mdi.On("UpdateOffset", mock.Anything, int64(0), mock.Anything).Return(nil) + mdi.On("GetMessages", mock.Anything, mock.MatchedBy(func(filter database.Filter) bool { + f, _ := filter.Finalize() + assert.Contains(t, f.String(), "12345") + return true + })).Return([]*fftypes.Message{msg}, nil, nil) + + bmi, _ := NewBatchManager(context.Background(), mni, mdi, mdm) + bm := bmi.(*batchManager) + bm.offset = 22222 + bm.markRewind(12345) + msgs, err := bm.readPage() + assert.NoError(t, err) + assert.Len(t, msgs, 1) + bm.Close() + + mdi.AssertExpectations(t) +} + +func TestReadPageWithRewindFail(t *testing.T) { + config.Reset() + mdi := &databasemocks.Plugin{} + mdm := &datamocks.Manager{} + mni := &sysmessagingmocks.LocalNodeInfo{} + + mdi.On("UpdateOffset", mock.Anything, int64(0), mock.Anything).Return(fmt.Errorf("pop")) + + bmi, _ := NewBatchManager(context.Background(), mni, mdi, mdm) + bm := bmi.(*batchManager) + bm.Close() + + bm.offset = 22222 + bm.markRewind(12345) + _, err := bm.readPage() + assert.EqualError(t, err, "pop") + + mdi.AssertExpectations(t) +} + func TestAssembleMessageDataNilData(t *testing.T) { mdi := &databasemocks.Plugin{} mdm := &datamocks.Manager{} From 326a8be4b49376e7b4d41969cb6051d6a3e56e0b Mon Sep 17 00:00:00 2001 From: Peter Broadhurst Date: Tue, 8 Feb 2022 12:56:19 -0500 Subject: [PATCH 2/2] Make sure we only move the offset backwards for multiple taps Signed-off-by: Peter Broadhurst --- internal/batch/batch_manager.go | 6 +++++- internal/batch/batch_manager_test.go | 17 +++++++++++------ 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/internal/batch/batch_manager.go b/internal/batch/batch_manager.go index 56292ec95f..d1dbdfe9a0 100644 --- a/internal/batch/batch_manager.go +++ b/internal/batch/batch_manager.go @@ -225,7 +225,11 @@ func (bm *batchManager) assembleMessageData(msg *fftypes.Message) (data []*fftyp func (bm *batchManager) markRewind(rewindTo int64) { bm.rewindMux.Lock() - bm.rewindTo = rewindTo + // Make sure we only rewind backwards - as we might get multiple shoulder taps + // for different message sequences during a single poll cycle. + if bm.rewindTo < 0 || rewindTo < bm.rewindTo { + bm.rewindTo = rewindTo + } bm.rewindMux.Unlock() } diff --git a/internal/batch/batch_manager_test.go b/internal/batch/batch_manager_test.go index b46dc2abaa..88462cb74a 100644 --- a/internal/batch/batch_manager_test.go +++ b/internal/batch/batch_manager_test.go @@ -503,15 +503,17 @@ func TestWaitConsumesMessagesAndDoesNotBlock(t *testing.T) { mdi := &databasemocks.Plugin{} mdm := &datamocks.Manager{} mni := &sysmessagingmocks.LocalNodeInfo{} - bm, _ := NewBatchManager(context.Background(), mni, mdi, mdm) - go bm.(*batchManager).newEventNotifications() - for i := 0; i < int(bm.(*batchManager).readPageSize); i++ { + bmi, _ := NewBatchManager(context.Background(), mni, mdi, mdm) + bm := bmi.(*batchManager) + _ = bm.popRewind() + go bm.newEventNotifications() + for i := 0; i < int(bm.readPageSize); i++ { bm.NewMessages() <- 12345 } // And should generate a shoulder tap - <-bm.(*batchManager).shoulderTap + <-bm.shoulderTap // And a rewind - assert.Equal(t, int64(12345), bm.(*batchManager).popRewind()) + assert.Equal(t, int64(12345), bm.popRewind()) bm.Close() } @@ -531,8 +533,11 @@ func TestReadPageWithRewindSuccess(t *testing.T) { bmi, _ := NewBatchManager(context.Background(), mni, mdi, mdm) bm := bmi.(*batchManager) - bm.offset = 22222 + _ = bm.popRewind() + bm.offset = 44444 + bm.markRewind(22222) bm.markRewind(12345) + bm.markRewind(33333) msgs, err := bm.readPage() assert.NoError(t, err) assert.Len(t, msgs, 1)