Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions internal/batch/batch_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ type batchManager struct {
readPageSize uint64
messagePollTimeout time.Duration
startupOffsetRetryAttempts int
rewindMux sync.Mutex
rewindTo int64
}

type DispatchHandler func(context.Context, *fftypes.Batch, []*fftypes.Bytes32) error
Expand Down Expand Up @@ -116,6 +118,8 @@ func (bm *batchManager) RegisterDispatcher(msgTypes []fftypes.MessageType, handl
}

func (bm *batchManager) Start() error {
bm.markRewind(-1)

if err := bm.restoreOffset(); err != nil {
return err
}
Expand Down Expand Up @@ -219,7 +223,33 @@ func (bm *batchManager) assembleMessageData(msg *fftypes.Message) (data []*fftyp
return data, nil
}

func (bm *batchManager) markRewind(rewindTo int64) {
bm.rewindMux.Lock()
// Make sure we only rewind backwards - as we might get multiple shoulder taps
// for different message sequences during a single poll cycle.
if bm.rewindTo < 0 || rewindTo < bm.rewindTo {
bm.rewindTo = rewindTo
}
bm.rewindMux.Unlock()
}

func (bm *batchManager) popRewind() int64 {
bm.rewindMux.Lock()
rewindTo := bm.rewindTo
bm.rewindTo = -1
bm.rewindMux.Unlock()
return rewindTo
}

func (bm *batchManager) readPage() ([]*fftypes.Message, error) {

rewindTo := bm.popRewind()
if rewindTo >= 0 && rewindTo < bm.offset {
if err := bm.updateOffset(true, rewindTo); err != nil {
return nil, err
}
}

var msgs []*fftypes.Message
err := bm.retry.Do(bm.ctx, "retrieve messages", func(attempt int) (retry bool, err error) {
fb := database.MessageQueryFactory.NewFilterLimit(bm.ctx, bm.readPageSize)
Expand Down Expand Up @@ -305,6 +335,7 @@ func (bm *batchManager) newEventNotifications() {
return
}
l.Debugf("New message sequence notification: %d", m)
bm.markRewind(m)
case <-bm.ctx.Done():
l.Debugf("Exiting due to cancelled context")
return
Expand Down
61 changes: 57 additions & 4 deletions internal/batch/batch_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,14 +503,67 @@ func TestWaitConsumesMessagesAndDoesNotBlock(t *testing.T) {
mdi := &databasemocks.Plugin{}
mdm := &datamocks.Manager{}
mni := &sysmessagingmocks.LocalNodeInfo{}
bm, _ := NewBatchManager(context.Background(), mni, mdi, mdm)
go bm.(*batchManager).newEventNotifications()
for i := 0; i < int(bm.(*batchManager).readPageSize); i++ {
bmi, _ := NewBatchManager(context.Background(), mni, mdi, mdm)
bm := bmi.(*batchManager)
_ = bm.popRewind()
go bm.newEventNotifications()
for i := 0; i < int(bm.readPageSize); i++ {
bm.NewMessages() <- 12345
}
// And should generate a shoulder tap
<-bm.(*batchManager).shoulderTap
<-bm.shoulderTap
// And a rewind
assert.Equal(t, int64(12345), bm.popRewind())
bm.Close()
}

func TestReadPageWithRewindSuccess(t *testing.T) {
config.Reset()
mdi := &databasemocks.Plugin{}
mdm := &datamocks.Manager{}
mni := &sysmessagingmocks.LocalNodeInfo{}

msg := &fftypes.Message{Header: fftypes.MessageHeader{ID: fftypes.NewUUID()}}
mdi.On("UpdateOffset", mock.Anything, int64(0), mock.Anything).Return(nil)
mdi.On("GetMessages", mock.Anything, mock.MatchedBy(func(filter database.Filter) bool {
f, _ := filter.Finalize()
assert.Contains(t, f.String(), "12345")
return true
})).Return([]*fftypes.Message{msg}, nil, nil)

bmi, _ := NewBatchManager(context.Background(), mni, mdi, mdm)
bm := bmi.(*batchManager)
_ = bm.popRewind()
bm.offset = 44444
bm.markRewind(22222)
bm.markRewind(12345)
bm.markRewind(33333)
msgs, err := bm.readPage()
assert.NoError(t, err)
assert.Len(t, msgs, 1)
bm.Close()

mdi.AssertExpectations(t)
}

func TestReadPageWithRewindFail(t *testing.T) {
config.Reset()
mdi := &databasemocks.Plugin{}
mdm := &datamocks.Manager{}
mni := &sysmessagingmocks.LocalNodeInfo{}

mdi.On("UpdateOffset", mock.Anything, int64(0), mock.Anything).Return(fmt.Errorf("pop"))

bmi, _ := NewBatchManager(context.Background(), mni, mdi, mdm)
bm := bmi.(*batchManager)
bm.Close()

bm.offset = 22222
bm.markRewind(12345)
_, err := bm.readPage()
assert.EqualError(t, err, "pop")

mdi.AssertExpectations(t)
}

func TestAssembleMessageDataNilData(t *testing.T) {
Expand Down