Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions cmd/firefly.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2021 Kaleido, Inc.
// Copyright © 2022 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -95,7 +95,8 @@ func run() error {

// Setup logging after reading config (even if failed), to output header correctly
ctx, cancelCtx := context.WithCancel(context.Background())
ctx = log.WithLogger(ctx, logrus.WithField("pid", os.Getpid()))
ctx = log.WithLogger(ctx, logrus.WithField("pid", fmt.Sprintf("%d", os.Getpid())))
ctx = log.WithLogger(ctx, logrus.WithField("prefix", config.GetString(config.NodeName)))

config.SetupLogging(ctx)
log.L(ctx).Infof("Project Firefly")
Expand Down
61 changes: 34 additions & 27 deletions internal/batch/batch_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func NewBatchManager(ctx context.Context, ni sysmessaging.LocalNodeInfo, di data
readOffset: -1, // On restart we trawl for all ready messages
readPageSize: uint64(readPageSize),
messagePollTimeout: config.GetDuration(config.BatchManagerReadPollTimeout),
minimumPollTime: config.GetDuration(config.BatchManagerMinimumPollTime),
startupOffsetRetryAttempts: config.GetInt(config.OrchestratorStartupAttempts),
dispatcherMap: make(map[string]*dispatcher),
allDispatchers: make([]*dispatcher, 0),
Expand Down Expand Up @@ -98,6 +99,7 @@ type batchManager struct {
readOffset int64
readPageSize uint64
messagePollTimeout time.Duration
minimumPollTime time.Duration
startupOffsetRetryAttempts int
}

Expand Down Expand Up @@ -187,34 +189,34 @@ func (bm *batchManager) getProcessor(txType fftypes.TransactionType, msgType fft
return processor, nil
}

func (bm *batchManager) assembleMessageData(msg *fftypes.Message) (retData fftypes.DataArray, err error) {
func (bm *batchManager) assembleMessageData(id *fftypes.UUID) (msg *fftypes.Message, retData fftypes.DataArray, err error) {
var foundAll = false
err = bm.retry.Do(bm.ctx, fmt.Sprintf("assemble message %s data", msg.Header.ID), func(attempt int) (retry bool, err error) {
retData, foundAll, err = bm.data.GetMessageDataCached(bm.ctx, msg)
err = bm.retry.Do(bm.ctx, "retrieve message", func(attempt int) (retry bool, err error) {
msg, retData, foundAll, err = bm.data.GetMessageWithDataCached(bm.ctx, id)
// continual retry for persistence error (distinct from not-found)
return true, err
})
if err != nil {
return nil, err
return nil, nil, err
}
if !foundAll {
return nil, i18n.NewError(bm.ctx, i18n.MsgDataNotFound, msg.Header.ID)
return nil, nil, i18n.NewError(bm.ctx, i18n.MsgDataNotFound, id)
}
return retData, nil
return msg, retData, nil
}

func (bm *batchManager) readPage() ([]*fftypes.Message, error) {
func (bm *batchManager) readPage() ([]*fftypes.IDAndSequence, error) {

var msgs []*fftypes.Message
var ids []*fftypes.IDAndSequence
err := bm.retry.Do(bm.ctx, "retrieve messages", func(attempt int) (retry bool, err error) {
fb := database.MessageQueryFactory.NewFilterLimit(bm.ctx, bm.readPageSize)
msgs, _, err = bm.database.GetMessages(bm.ctx, fb.And(
ids, err = bm.database.GetMessageIDs(bm.ctx, fb.And(
fb.Gt("sequence", bm.readOffset),
fb.Eq("state", fftypes.MessageStateReady),
).Sort("sequence").Limit(bm.readPageSize))
return true, err
})
return msgs, err
return ids, err
}

func (bm *batchManager) messageSequencer() {
Expand All @@ -223,40 +225,47 @@ func (bm *batchManager) messageSequencer() {
defer close(bm.done)

for {
// Set a timer for the minimum time to wait before the next poll
minimumPollDelay := time.NewTimer(bm.minimumPollTime)

// Each time round the loop we check for quiescing processors
bm.reapQuiescing()

// Read messages from the DB - in an error condition we retry until success, or a closed context
msgs, err := bm.readPage()
entries, err := bm.readPage()
if err != nil {
l.Debugf("Exiting: %s", err)
return
}
batchWasFull := (uint64(len(msgs)) == bm.readPageSize)
batchWasFull := (uint64(len(entries)) == bm.readPageSize)

if len(msgs) > 0 {
for _, msg := range msgs {
processor, err := bm.getProcessor(msg.Header.TxType, msg.Header.Type, msg.Header.Group, msg.Header.Namespace, &msg.Header.SignerRef)
if len(entries) > 0 {
for _, entry := range entries {
msg, data, err := bm.assembleMessageData(&entry.ID)
if err != nil {
l.Errorf("Failed to dispatch message %s: %s", msg.Header.ID, err)
l.Errorf("Failed to retrieve message data for %s (seq=%d): %s", entry.ID, entry.Sequence, err)
continue
}

data, err := bm.assembleMessageData(msg)
// We likely retrieved this message from the cache, which is written by the message-writer before
// the database store. Meaning we cannot rely on the sequence having been set.
msg.Sequence = entry.Sequence

processor, err := bm.getProcessor(msg.Header.TxType, msg.Header.Type, msg.Header.Group, msg.Header.Namespace, &msg.Header.SignerRef)
if err != nil {
l.Errorf("Failed to retrieve message data for %s: %s", msg.Header.ID, err)
l.Errorf("Failed to dispatch message %s: %s", msg.Header.ID, err)
continue
}

bm.dispatchMessage(processor, msg, data)
}

// Next time round only read after the messages we just processed (unless we get a tap to rewind)
bm.readOffset = msgs[len(msgs)-1].Sequence
bm.readOffset = entries[len(entries)-1].Sequence
}

// Wait to be woken again
if !batchWasFull && !bm.drainNewMessages() {
if !batchWasFull && !bm.drainNewMessages(minimumPollDelay) {
if done := bm.waitForNewMessages(); done {
l.Debugf("Exiting: %s", err)
return
Expand All @@ -275,20 +284,18 @@ func (bm *batchManager) newMessageNotification(seq int64) {
}
}

func (bm *batchManager) drainNewMessages() bool {
func (bm *batchManager) drainNewMessages(minimumPollDelay *time.Timer) bool {
// Drain any new message notifications, moving back our readOffset as required
newMessages := false
checkingMessages := true
for checkingMessages {
for {
select {
case seq := <-bm.newMessages:
bm.newMessageNotification(seq)
newMessages = true
default:
checkingMessages = false
case <-minimumPollDelay.C:
return newMessages
}
}
return newMessages
}

func (bm *batchManager) waitForNewMessages() (done bool) {
Expand All @@ -311,7 +318,7 @@ func (bm *batchManager) waitForNewMessages() (done bool) {

func (bm *batchManager) dispatchMessage(processor *batchProcessor, msg *fftypes.Message, data fftypes.DataArray) {
l := log.L(bm.ctx)
l.Debugf("Dispatching message %s to %s batch processor %s", msg.Header.ID, msg.Header.Type, processor.conf.name)
l.Debugf("Dispatching message %s (seq=%d) to %s batch processor %s", msg.Header.ID, msg.Sequence, msg.Header.Type, processor.conf.name)
work := &batchWork{
msg: msg,
data: data,
Expand Down
Loading