Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 86 additions & 21 deletions internal/batch/batch_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,13 @@ func NewBatchManager(ctx context.Context, ni sysmessaging.LocalNodeInfo, di data
txHelper: txHelper,
readOffset: -1, // On restart we trawl for all ready messages
readPageSize: uint64(readPageSize),
minimumPollDelay: config.GetDuration(config.BatchManagerMinimumPollDelay),
messagePollTimeout: config.GetDuration(config.BatchManagerReadPollTimeout),
startupOffsetRetryAttempts: config.GetInt(config.OrchestratorStartupAttempts),
dispatcherMap: make(map[string]*dispatcher),
allDispatchers: make([]*dispatcher, 0),
newMessages: make(chan int64, readPageSize),
inflightSequences: make(map[int64]*batchProcessor),
shoulderTap: make(chan bool, 1),
rewindOffset: -1,
done: make(chan struct{}),
Expand Down Expand Up @@ -100,8 +102,12 @@ type batchManager struct {
readOffset int64
rewindOffsetMux sync.Mutex
rewindOffset int64
inflightMux sync.Mutex
inflightSequences map[int64]*batchProcessor
inflightFlushed []int64
shoulderTap chan bool
readPageSize uint64
minimumPollDelay time.Duration
messagePollTimeout time.Duration
startupOffsetRetryAttempts int
}
Expand Down Expand Up @@ -171,10 +177,7 @@ func (bm *batchManager) getProcessor(txType fftypes.TransactionType, msgType fft
processor, ok := dispatcher.processors[name]
if !ok {
processor = newBatchProcessor(
bm.ctx, // Background context, not the call context
bm.ni,
bm.database,
bm.data,
bm,
&batchProcessorConf{
DispatcherOptions: dispatcher.options,
name: name,
Expand Down Expand Up @@ -210,17 +213,56 @@ func (bm *batchManager) assembleMessageData(id *fftypes.UUID) (msg *fftypes.Mess
return msg, retData, nil
}

func (bm *batchManager) readPage() ([]*fftypes.IDAndSequence, error) {

// Pop out a rewind offset if there is one and it's behind the cursor
// popRewind is called just before reading a page, to pop out a rewind offset if there is one and it's behind the cursor
func (bm *batchManager) popRewind() {
bm.rewindOffsetMux.Lock()
rewindOffset := bm.rewindOffset
if rewindOffset >= 0 && rewindOffset < bm.readOffset {
bm.readOffset = rewindOffset
if bm.rewindOffset >= 0 && bm.rewindOffset < bm.readOffset {
bm.readOffset = bm.rewindOffset
}
bm.rewindOffset = -1
bm.rewindOffsetMux.Unlock()
}

// filterFlushed is called after we read a page, to remove in-flight IDs, and clean up our flush map
func (bm *batchManager) filterFlushed(entries []*fftypes.IDAndSequence) []*fftypes.IDAndSequence {
bm.inflightMux.Lock()

// Remove inflight entries
unflushedEntries := make([]*fftypes.IDAndSequence, 0, len(entries))
for _, entry := range entries {
if _, inflight := bm.inflightSequences[entry.Sequence]; !inflight {
unflushedEntries = append(unflushedEntries, entry)
}
}

// Drain the list of recently flushed entries that processors have notified us about
for _, seq := range bm.inflightFlushed {
delete(bm.inflightSequences, seq)
}
bm.inflightFlushed = bm.inflightFlushed[:0]

bm.inflightMux.Unlock()

return unflushedEntries
}

// nofifyFlushed is called by a processor, when it's finished updating the database to record a set
// of messages as sent. So it's safe to remove these sequences from the inflight map on the next
// page read.
func (bm *batchManager) notifyFlushed(sequences []int64) {
bm.inflightMux.Lock()
bm.inflightFlushed = append(bm.inflightFlushed, sequences...)
bm.inflightMux.Unlock()
}

func (bm *batchManager) readPage(lastPageFull bool) ([]*fftypes.IDAndSequence, bool, error) {

// Pop out any rewind that has been queued, but each time we read to the front before we rewind
if !lastPageFull {
bm.popRewind()
}

// Read a page from the DB
var ids []*fftypes.IDAndSequence
err := bm.retry.Do(bm.ctx, "retrieve messages", func(attempt int) (retry bool, err error) {
fb := database.MessageQueryFactory.NewFilterLimit(bm.ctx, bm.readPageSize)
Expand All @@ -230,25 +272,34 @@ func (bm *batchManager) readPage() ([]*fftypes.IDAndSequence, error) {
).Sort("sequence").Limit(bm.readPageSize))
return true, err
})
return ids, err

// Calculate if this was a full page we read (so should immediately re-poll) before we remove flushed IDs
pageReadLength := len(ids)
fullPage := (pageReadLength == int(bm.readPageSize))

// Remove any flushed IDs from the list, and then update our flushed map
ids = bm.filterFlushed(ids)

log.L(bm.ctx).Debugf("Read %d records from offset %d. filtered=%d fullPage=%t", pageReadLength, bm.readOffset, len(ids), fullPage)
return ids, fullPage, err
}

func (bm *batchManager) messageSequencer() {
l := log.L(bm.ctx)
l.Debugf("Started batch assembly message sequencer")
defer close(bm.done)

lastPageFull := false
for {
// Each time round the loop we check for quiescing processors
bm.reapQuiescing()

// Read messages from the DB - in an error condition we retry until success, or a closed context
entries, err := bm.readPage()
entries, fullPage, err := bm.readPage(lastPageFull)
if err != nil {
l.Debugf("Exiting: %s", err)
return
}
batchWasFull := (uint64(len(entries)) == bm.readPageSize)

if len(entries) > 0 {
for _, entry := range entries {
Expand Down Expand Up @@ -276,28 +327,34 @@ func (bm *batchManager) messageSequencer() {
}

// Wait to be woken again
if !batchWasFull {
if !fullPage {
if done := bm.waitForNewMessages(); done {
l.Debugf("Exiting: %s", err)
return
}
}
lastPageFull = fullPage
}
}

func (bm *batchManager) newMessageNotification(seq int64) {
// Determine if we need to queue q rewind
rewindToQueue := int64(-1)

// Determine if we need to queue a rewind
bm.rewindOffsetMux.Lock()
lastSequenceBeforeMsg := seq - 1
if bm.rewindOffset == -1 || lastSequenceBeforeMsg < bm.rewindOffset {
rewindToQueue = lastSequenceBeforeMsg
bm.rewindOffset = lastSequenceBeforeMsg
}
bm.rewindOffsetMux.Unlock()
// Shoulder tap that there is a new message, regardless of whether we rewound
// the cursor. As we need to wake up the poll.
select {
case bm.shoulderTap <- true:
default:

if rewindToQueue >= 0 {
log.L(bm.ctx).Debugf("Notifying batch manager of rewind to %d", rewindToQueue)
select {
case bm.shoulderTap <- true:
default:
}
}
}

Expand All @@ -317,7 +374,10 @@ func (bm *batchManager) newMessageNotifier() {
func (bm *batchManager) waitForNewMessages() (done bool) {
l := log.L(bm.ctx)

timeout := time.NewTimer(bm.messagePollTimeout)
// We have a short minimum timeout, to stop us thrashing the DB
time.Sleep(bm.minimumPollDelay)

timeout := time.NewTimer(bm.messagePollTimeout - bm.minimumPollDelay)
select {
case <-bm.shoulderTap:
timeout.Stop()
Expand All @@ -334,6 +394,11 @@ func (bm *batchManager) waitForNewMessages() (done bool) {
func (bm *batchManager) dispatchMessage(processor *batchProcessor, msg *fftypes.Message, data fftypes.DataArray) {
l := log.L(bm.ctx)
l.Debugf("Dispatching message %s (seq=%d) to %s batch processor %s", msg.Header.ID, msg.Sequence, msg.Header.Type, processor.conf.name)

bm.inflightMux.Lock()
bm.inflightSequences[msg.Sequence] = processor
bm.inflightMux.Unlock()

work := &batchWork{
msg: msg,
data: data,
Expand Down
21 changes: 19 additions & 2 deletions internal/batch/batch_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (

func testConfigReset() {
config.Reset()
config.Set(config.BatchManagerMinimumPollDelay, "0")
log.SetLevel("debug")
}

Expand Down Expand Up @@ -87,6 +88,7 @@ func TestE2EDispatchBroadcast(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
bmi, _ := NewBatchManager(ctx, mni, mdi, mdm, txHelper)
bm := bmi.(*batchManager)
bm.readOffset = 1000

bm.RegisterDispatcher("utdispatcher", fftypes.TransactionTypeBatchPin, []fftypes.MessageType{fftypes.MessageTypeBroadcast}, handler, DispatcherOptions{
BatchMaxSize: 2,
Expand All @@ -108,6 +110,7 @@ func TestE2EDispatchBroadcast(t *testing.T) {
Data: fftypes.DataRefs{
{ID: dataID1, Hash: dataHash},
},
Sequence: 500,
}
data := &fftypes.Data{
ID: dataID1,
Expand Down Expand Up @@ -155,7 +158,7 @@ func TestE2EDispatchBroadcast(t *testing.T) {
// Wait for the reaping
for len(bm.getProcessors()) > 0 {
time.Sleep(1 * time.Millisecond)
bm.NewMessages() <- msg.Sequence
bm.shoulderTap <- true
}

cancel()
Expand Down Expand Up @@ -553,7 +556,7 @@ func TestRewindForNewMessage(t *testing.T) {
assert.Equal(t, int64(12344), v)
return true
})).Return(nil, nil)
_, err := bm.readPage()
_, _, err := bm.readPage(false)
assert.NoError(t, err)
}

Expand Down Expand Up @@ -593,3 +596,17 @@ func TestGetMessageNotFound(t *testing.T) {
_, _, err := bm.(*batchManager).assembleMessageData(fftypes.NewUUID())
assert.Regexp(t, "FF10133", err)
}

func TestDoubleTap(t *testing.T) {
bm, cancel := newTestBatchManager(t)
defer cancel()
bm.readOffset = 3000
go bm.newMessageNotifier()

bm.NewMessages() <- 2000
bm.NewMessages() <- 1000

for bm.rewindOffset != int64(999) {
time.Sleep(1 * time.Microsecond)
}
}
Loading