From 0ac34bcee3c27e82650e1fde32dcc3c0eefa71a4 Mon Sep 17 00:00:00 2001 From: Peter Broadhurst Date: Tue, 8 Feb 2022 13:53:23 -0500 Subject: [PATCH 1/2] Fix batch pin index calculation logic and improve logging Signed-off-by: Peter Broadhurst --- internal/batch/batch_manager.go | 9 ++-- internal/broadcast/message.go | 2 +- internal/events/aggregator.go | 45 +++++++++++-------- internal/events/aggregator_batch_state.go | 7 +-- .../events/aggregator_batch_state_test.go | 2 +- internal/events/aggregator_test.go | 2 + internal/privatemessaging/message.go | 2 +- 7 files changed, 41 insertions(+), 28 deletions(-) diff --git a/internal/batch/batch_manager.go b/internal/batch/batch_manager.go index d1dbdfe9a0..1e0c7f19b6 100644 --- a/internal/batch/batch_manager.go +++ b/internal/batch/batch_manager.go @@ -51,7 +51,7 @@ func NewBatchManager(ctx context.Context, ni sysmessaging.LocalNodeInfo, di data startupOffsetRetryAttempts: config.GetInt(config.OrchestratorStartupAttempts), dispatchers: make(map[fftypes.MessageType]*dispatcher), shoulderTap: make(chan bool, 1), - newMessages: make(chan int64, readPageSize), + newMessages: make(chan int64), sequencerClosed: make(chan struct{}), retry: &retry.Retry{ InitialDelay: config.GetDuration(config.BatchRetryInitDelay), @@ -219,7 +219,7 @@ func (bm *batchManager) assembleMessageData(msg *fftypes.Message) (data []*fftyp if !foundAll { return nil, i18n.NewError(bm.ctx, i18n.MsgDataNotFound, msg.Header.ID) } - log.L(bm.ctx).Infof("Detected new batch-pinned message %s", msg.Header.ID) + log.L(bm.ctx).Infof("Detected new batch-pinned message %s sequence=%d", msg.Header.ID, msg.Sequence) return data, nil } @@ -227,10 +227,12 @@ func (bm *batchManager) markRewind(rewindTo int64) { bm.rewindMux.Lock() // Make sure we only rewind backwards - as we might get multiple shoulder taps // for different message sequences during a single poll cycle. - if bm.rewindTo < 0 || rewindTo < bm.rewindTo { + previousRewind := bm.rewindTo + if previousRewind < 0 || rewindTo < previousRewind { bm.rewindTo = rewindTo } bm.rewindMux.Unlock() + log.L(bm.ctx).Debugf("Marking rewind to sequence=%d (previous=%d)", rewindTo, previousRewind) } func (bm *batchManager) popRewind() int64 { @@ -245,6 +247,7 @@ func (bm *batchManager) readPage() ([]*fftypes.Message, error) { rewindTo := bm.popRewind() if rewindTo >= 0 && rewindTo < bm.offset { + log.L(bm.ctx).Debugf("Rewinding to sequence=%d", rewindTo) if err := bm.updateOffset(true, rewindTo); err != nil { return nil, err } diff --git a/internal/broadcast/message.go b/internal/broadcast/message.go index 81a93eb919..673f91e0d4 100644 --- a/internal/broadcast/message.go +++ b/internal/broadcast/message.go @@ -166,7 +166,7 @@ func (s *broadcastSender) sendInternal(ctx context.Context, method sendMethod) ( if err := s.mgr.database.UpsertMessage(ctx, &s.msg.Message, database.UpsertOptimizationNew); err != nil { return err } - log.L(ctx).Infof("Sent broadcast message %s:%s", s.msg.Header.Namespace, s.msg.Header.ID) + log.L(ctx).Infof("Sent broadcast message %s:%s sequence=%d", s.msg.Header.Namespace, s.msg.Header.ID, s.msg.Sequence) return err } diff --git a/internal/events/aggregator.go b/internal/events/aggregator.go index 0e1c231954..8a0bce0146 100644 --- a/internal/events/aggregator.go +++ b/internal/events/aggregator.go @@ -183,6 +183,20 @@ func (ag *aggregator) getPins(ctx context.Context, filter database.Filter) ([]ff return ls, err } +func (ag *aggregator) extractBatchMessagePin(batch *fftypes.Batch, requiredIndex int64) (totalBatchPins int64, msg *fftypes.Message, msgBaseIndex int64) { + for _, batchMsg := range batch.Payload.Messages { + batchMsgBaseIdx := totalBatchPins + for i := 0; i < len(batchMsg.Header.Topics); i++ { + if totalBatchPins == requiredIndex { + msg = batchMsg + msgBaseIndex = batchMsgBaseIdx + } + totalBatchPins++ + } + } + return totalBatchPins, msg, msgBaseIndex +} + func (ag *aggregator) processPins(ctx context.Context, pins []*fftypes.Pin, state *batchState) (err error) { l := log.L(ctx) @@ -192,7 +206,6 @@ func (ag *aggregator) processPins(ctx context.Context, pins []*fftypes.Pin, stat // We must check all the contexts in the message, and mark them dispatched together. dupMsgCheck := make(map[fftypes.UUID]bool) for _, pin := range pins { - l.Debugf("Aggregating pin %.10d batch=%s hash=%s masked=%t", pin.Sequence, pin.Batch, pin.Hash, pin.Masked) if batch == nil || *batch.ID != *pin.Batch { batch, err = ag.database.GetBatchByID(ctx, pin.Batch) @@ -206,23 +219,14 @@ func (ag *aggregator) processPins(ctx context.Context, pins []*fftypes.Pin, stat } // Extract the message from the batch - where the index is of a topic within a message - var msg *fftypes.Message - var i int64 = -1 - var msgBaseIndex int64 - for iM := 0; i < pin.Index && iM < len(batch.Payload.Messages); iM++ { - msg = batch.Payload.Messages[iM] - msgBaseIndex = i - for iT := 0; i < pin.Index && iT < len(msg.Header.Topics); iT++ { - i++ - } - } - - if i < pin.Index { - l.Errorf("Batch %s does not have message-topic index %d - pin %s is invalid", pin.Batch, pin.Index, pin.Hash) + batchPinCount, msg, msgBaseIndex := ag.extractBatchMessagePin(batch, pin.Index) + if msg == nil { + l.Errorf("Pin %.10d outside of range: batch=%s pinCount=%d pinIndex=%d hash=%s masked=%t", pin.Sequence, pin.Batch, batchPinCount, pin.Index, pin.Hash, pin.Masked) continue } - l.Tracef("Batch %s message %d: %+v", batch.ID, pin.Index, msg) - if msg == nil || msg.Header.ID == nil { + + l.Debugf("Aggregating pin %.10d batch=%s msg=%s pinIndex=%d msgBaseIndex=%d hash=%s masked=%t", pin.Sequence, pin.Batch, msg.Header.ID, pin.Index, msgBaseIndex, pin.Hash, pin.Masked) + if msg.Header.ID == nil { l.Errorf("null message entry %d in batch '%s'", pin.Index, batch.ID) continue } @@ -243,6 +247,8 @@ func (ag *aggregator) processPins(ctx context.Context, pins []*fftypes.Pin, stat } func (ag *aggregator) processMessage(ctx context.Context, batch *fftypes.Batch, pin *fftypes.Pin, msgBaseIndex int64, msg *fftypes.Message, state *batchState) (err error) { + l := log.L(ctx) + // Check if it's ready to be processed unmaskedContexts := make([]*fftypes.Bytes32, 0, len(msg.Header.Topics)) nextPins := make([]*nextPinState, 0, len(msg.Header.Topics)) @@ -250,14 +256,14 @@ func (ag *aggregator) processMessage(ctx context.Context, batch *fftypes.Batch, // Private messages have one or more masked "pin" hashes that allow us to work // out if it's the next message in the sequence, given the previous messages if msg.Header.Group == nil || len(msg.Pins) == 0 || len(msg.Header.Topics) != len(msg.Pins) { - log.L(ctx).Errorf("Message '%s' in batch '%s' has invalid pin data pins=%v topics=%v", msg.Header.ID, batch.ID, msg.Pins, msg.Header.Topics) + l.Errorf("Message '%s' in batch '%s' has invalid pin data pins=%v topics=%v", msg.Header.ID, batch.ID, msg.Pins, msg.Header.Topics) return nil } for i, pinStr := range msg.Pins { var msgContext fftypes.Bytes32 err := msgContext.UnmarshalText([]byte(pinStr)) if err != nil { - log.L(ctx).Errorf("Message '%s' in batch '%s' has invalid pin at index %d: '%s'", msg.Header.ID, batch.ID, i, pinStr) + l.Errorf("Message '%s' in batch '%s' has invalid pin at index %d: '%s'", msg.Header.ID, batch.ID, i, pinStr) return nil } nextPin, err := state.CheckMaskedContextReady(ctx, msg, msg.Header.Topics[i], pin.Sequence, &msgContext) @@ -272,7 +278,7 @@ func (ag *aggregator) processMessage(ctx context.Context, batch *fftypes.Batch, h.Write([]byte(topic)) msgContext := fftypes.HashResult(h) unmaskedContexts = append(unmaskedContexts, msgContext) - ready, err := state.CheckUnmaskedContextReady(ctx, *msgContext, msg, msg.Header.Topics[i], pin.Sequence) + ready, err := state.CheckUnmaskedContextReady(ctx, msgContext, msg, msg.Header.Topics[i], pin.Sequence) if err != nil || !ready { return err } @@ -280,6 +286,7 @@ func (ag *aggregator) processMessage(ctx context.Context, batch *fftypes.Batch, } + l.Debugf("Attempt dispatch msg=%s broadcastContexts=%v privatePins=%v", msg.Header.ID, unmaskedContexts, msg.Pins) dispatched, err := ag.attemptMessageDispatch(ctx, msg, state) if err != nil { return err diff --git a/internal/events/aggregator_batch_state.go b/internal/events/aggregator_batch_state.go index 287c5cd561..917dd6c39e 100644 --- a/internal/events/aggregator_batch_state.go +++ b/internal/events/aggregator_batch_state.go @@ -137,12 +137,12 @@ func (bs *batchState) RunFinalize(ctx context.Context) error { return bs.flushPins(ctx) } -func (bs *batchState) CheckUnmaskedContextReady(ctx context.Context, contextUnmasked fftypes.Bytes32, msg *fftypes.Message, topic string, firstMsgPinSequence int64) (bool, error) { +func (bs *batchState) CheckUnmaskedContextReady(ctx context.Context, contextUnmasked *fftypes.Bytes32, msg *fftypes.Message, topic string, firstMsgPinSequence int64) (bool, error) { - ucs, found := bs.unmaskedContexts[contextUnmasked] + ucs, found := bs.unmaskedContexts[*contextUnmasked] if !found { ucs = &contextState{blockedBy: -1} - bs.unmaskedContexts[contextUnmasked] = ucs + bs.unmaskedContexts[*contextUnmasked] = ucs // We need to check there's no earlier sequences with the same unmasked context fb := database.PinQueryFactory.NewFilterLimit(ctx, 1) // only need the first one @@ -261,6 +261,7 @@ func (bs *batchState) flushPins(ctx context.Context) error { fb.Gte("index", dm.firstPinIndex), fb.Lte("index", dm.lastPinIndex), ) + log.L(ctx).Debugf("Marking message dispatched batch=%s msg=%s firstIndex=%d lastIndex=%d", dm.batchID, dm.msgID, dm.firstPinIndex, dm.lastPinIndex) update := database.PinQueryFactory.NewUpdate(ctx).Set("dispatched", true) if err := bs.database.UpdatePins(ctx, filter, update); err != nil { return err diff --git a/internal/events/aggregator_batch_state_test.go b/internal/events/aggregator_batch_state_test.go index e846082b7c..c35ec69783 100644 --- a/internal/events/aggregator_batch_state_test.go +++ b/internal/events/aggregator_batch_state_test.go @@ -52,7 +52,7 @@ func TestSetContextBlockedByNoState(t *testing.T) { unmaskedContext := fftypes.NewRandB32() bs.SetContextBlockedBy(ag.ctx, *unmaskedContext, 10) - ready, err := bs.CheckUnmaskedContextReady(ag.ctx, *unmaskedContext, &fftypes.Message{}, "topic1", 1) + ready, err := bs.CheckUnmaskedContextReady(ag.ctx, unmaskedContext, &fftypes.Message{}, "topic1", 1) assert.NoError(t, err) assert.False(t, ready) } diff --git a/internal/events/aggregator_test.go b/internal/events/aggregator_test.go index fd9c0197b5..5ea592574f 100644 --- a/internal/events/aggregator_test.go +++ b/internal/events/aggregator_test.go @@ -24,6 +24,7 @@ import ( "github.com/hyperledger/firefly/internal/config" "github.com/hyperledger/firefly/internal/definitions" + "github.com/hyperledger/firefly/internal/log" "github.com/hyperledger/firefly/mocks/databasemocks" "github.com/hyperledger/firefly/mocks/datamocks" "github.com/hyperledger/firefly/mocks/definitionsmocks" @@ -166,6 +167,7 @@ func TestAggregationMaskedZeroNonceMatch(t *testing.T) { } func TestAggregationMaskedNextSequenceMatch(t *testing.T) { + log.SetLevel("debug") ag, cancel := newTestAggregator() defer cancel() diff --git a/internal/privatemessaging/message.go b/internal/privatemessaging/message.go index 3376b1265d..2cb7bef6a4 100644 --- a/internal/privatemessaging/message.go +++ b/internal/privatemessaging/message.go @@ -185,7 +185,7 @@ func (s *messageSender) sendInternal(ctx context.Context, method sendMethod) err if err := s.mgr.database.UpsertMessage(ctx, &s.msg.Message, database.UpsertOptimizationNew); err != nil { return err } - log.L(ctx).Infof("Sent private message %s:%s", s.msg.Header.Namespace, s.msg.Header.ID) + log.L(ctx).Infof("Sent private message %s:%s sequence=%d", s.msg.Header.Namespace, s.msg.Header.ID, s.msg.Sequence) if method == methodSendImmediate { if err := s.sendUnpinned(ctx); err != nil { From 8bd5fd3d47b0920b9d91481fbe6d707570660268 Mon Sep 17 00:00:00 2001 From: Peter Broadhurst Date: Tue, 8 Feb 2022 19:59:25 -0500 Subject: [PATCH 2/2] Rewind offset to one before the message that popped Signed-off-by: Peter Broadhurst --- internal/batch/batch_manager.go | 2 +- internal/batch/batch_manager_test.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/batch/batch_manager.go b/internal/batch/batch_manager.go index 1e0c7f19b6..a25520c4a5 100644 --- a/internal/batch/batch_manager.go +++ b/internal/batch/batch_manager.go @@ -338,7 +338,7 @@ func (bm *batchManager) newEventNotifications() { return } l.Debugf("New message sequence notification: %d", m) - bm.markRewind(m) + bm.markRewind(m - 1) case <-bm.ctx.Done(): l.Debugf("Exiting due to cancelled context") return diff --git a/internal/batch/batch_manager_test.go b/internal/batch/batch_manager_test.go index 88462cb74a..68f0adac50 100644 --- a/internal/batch/batch_manager_test.go +++ b/internal/batch/batch_manager_test.go @@ -512,8 +512,8 @@ func TestWaitConsumesMessagesAndDoesNotBlock(t *testing.T) { } // And should generate a shoulder tap <-bm.shoulderTap - // And a rewind - assert.Equal(t, int64(12345), bm.popRewind()) + // And a rewind to one before that sequence (as readPage is greater-than) + assert.Equal(t, int64(12344), bm.popRewind()) bm.Close() }