Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions internal/batch/batch_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func NewBatchManager(ctx context.Context, ni sysmessaging.LocalNodeInfo, di data
startupOffsetRetryAttempts: config.GetInt(config.OrchestratorStartupAttempts),
dispatchers: make(map[fftypes.MessageType]*dispatcher),
shoulderTap: make(chan bool, 1),
newMessages: make(chan int64, readPageSize),
Copy link
Contributor Author

@peterbroadhurst peterbroadhurst Feb 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seemed arbitrary that we had a page size here, as we have a dedicated goroutine reading from it

newMessages: make(chan int64),
sequencerClosed: make(chan struct{}),
retry: &retry.Retry{
InitialDelay: config.GetDuration(config.BatchRetryInitDelay),
Expand Down Expand Up @@ -219,18 +219,20 @@ func (bm *batchManager) assembleMessageData(msg *fftypes.Message) (data []*fftyp
if !foundAll {
return nil, i18n.NewError(bm.ctx, i18n.MsgDataNotFound, msg.Header.ID)
}
log.L(bm.ctx).Infof("Detected new batch-pinned message %s", msg.Header.ID)
log.L(bm.ctx).Infof("Detected new batch-pinned message %s sequence=%d", msg.Header.ID, msg.Sequence)
return data, nil
}

func (bm *batchManager) markRewind(rewindTo int64) {
bm.rewindMux.Lock()
// Make sure we only rewind backwards - as we might get multiple shoulder taps
// for different message sequences during a single poll cycle.
if bm.rewindTo < 0 || rewindTo < bm.rewindTo {
previousRewind := bm.rewindTo
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just to allow logging

if previousRewind < 0 || rewindTo < previousRewind {
bm.rewindTo = rewindTo
}
bm.rewindMux.Unlock()
log.L(bm.ctx).Debugf("Marking rewind to sequence=%d (previous=%d)", rewindTo, previousRewind)
}

func (bm *batchManager) popRewind() int64 {
Expand All @@ -245,6 +247,7 @@ func (bm *batchManager) readPage() ([]*fftypes.Message, error) {

rewindTo := bm.popRewind()
if rewindTo >= 0 && rewindTo < bm.offset {
log.L(bm.ctx).Debugf("Rewinding to sequence=%d", rewindTo)
if err := bm.updateOffset(true, rewindTo); err != nil {
return nil, err
}
Expand Down Expand Up @@ -335,7 +338,7 @@ func (bm *batchManager) newEventNotifications() {
return
}
l.Debugf("New message sequence notification: %d", m)
bm.markRewind(m)
bm.markRewind(m - 1)
case <-bm.ctx.Done():
l.Debugf("Exiting due to cancelled context")
return
Expand Down
4 changes: 2 additions & 2 deletions internal/batch/batch_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,8 +512,8 @@ func TestWaitConsumesMessagesAndDoesNotBlock(t *testing.T) {
}
// And should generate a shoulder tap
<-bm.shoulderTap
// And a rewind
assert.Equal(t, int64(12345), bm.popRewind())
// And a rewind to one before that sequence (as readPage is greater-than)
assert.Equal(t, int64(12344), bm.popRewind())
bm.Close()
}

Expand Down
2 changes: 1 addition & 1 deletion internal/broadcast/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func (s *broadcastSender) sendInternal(ctx context.Context, method sendMethod) (
if err := s.mgr.database.UpsertMessage(ctx, &s.msg.Message, database.UpsertOptimizationNew); err != nil {
return err
}
log.L(ctx).Infof("Sent broadcast message %s:%s", s.msg.Header.Namespace, s.msg.Header.ID)
log.L(ctx).Infof("Sent broadcast message %s:%s sequence=%d", s.msg.Header.Namespace, s.msg.Header.ID, s.msg.Sequence)

return err
}
Expand Down
45 changes: 26 additions & 19 deletions internal/events/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,20 @@ func (ag *aggregator) getPins(ctx context.Context, filter database.Filter) ([]ff
return ls, err
}

func (ag *aggregator) extractBatchMessagePin(batch *fftypes.Batch, requiredIndex int64) (totalBatchPins int64, msg *fftypes.Message, msgBaseIndex int64) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The re-write of this logic, is the main thing here.

We trigger from the pin - which references the batch and the index, but we need two other bits of information:

for _, batchMsg := range batch.Payload.Messages {
batchMsgBaseIdx := totalBatchPins
for i := 0; i < len(batchMsg.Header.Topics); i++ {
if totalBatchPins == requiredIndex {
msg = batchMsg
msgBaseIndex = batchMsgBaseIdx
}
totalBatchPins++
}
}
return totalBatchPins, msg, msgBaseIndex
}

func (ag *aggregator) processPins(ctx context.Context, pins []*fftypes.Pin, state *batchState) (err error) {
l := log.L(ctx)

Expand All @@ -192,7 +206,6 @@ func (ag *aggregator) processPins(ctx context.Context, pins []*fftypes.Pin, stat
// We must check all the contexts in the message, and mark them dispatched together.
dupMsgCheck := make(map[fftypes.UUID]bool)
for _, pin := range pins {
l.Debugf("Aggregating pin %.10d batch=%s hash=%s masked=%t", pin.Sequence, pin.Batch, pin.Hash, pin.Masked)

if batch == nil || *batch.ID != *pin.Batch {
batch, err = ag.database.GetBatchByID(ctx, pin.Batch)
Expand All @@ -206,23 +219,14 @@ func (ag *aggregator) processPins(ctx context.Context, pins []*fftypes.Pin, stat
}

// Extract the message from the batch - where the index is of a topic within a message
var msg *fftypes.Message
var i int64 = -1
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the case of pin.Index == 0 this logic was broken. msg and msgBaseIndex were incorrect.

var msgBaseIndex int64
for iM := 0; i < pin.Index && iM < len(batch.Payload.Messages); iM++ {
msg = batch.Payload.Messages[iM]
msgBaseIndex = i
for iT := 0; i < pin.Index && iT < len(msg.Header.Topics); iT++ {
i++
}
}

if i < pin.Index {
l.Errorf("Batch %s does not have message-topic index %d - pin %s is invalid", pin.Batch, pin.Index, pin.Hash)
batchPinCount, msg, msgBaseIndex := ag.extractBatchMessagePin(batch, pin.Index)
if msg == nil {
l.Errorf("Pin %.10d outside of range: batch=%s pinCount=%d pinIndex=%d hash=%s masked=%t", pin.Sequence, pin.Batch, batchPinCount, pin.Index, pin.Hash, pin.Masked)
continue
}
l.Tracef("Batch %s message %d: %+v", batch.ID, pin.Index, msg)
if msg == nil || msg.Header.ID == nil {

l.Debugf("Aggregating pin %.10d batch=%s msg=%s pinIndex=%d msgBaseIndex=%d hash=%s masked=%t", pin.Sequence, pin.Batch, msg.Header.ID, pin.Index, msgBaseIndex, pin.Hash, pin.Masked)
if msg.Header.ID == nil {
l.Errorf("null message entry %d in batch '%s'", pin.Index, batch.ID)
continue
}
Expand All @@ -243,21 +247,23 @@ func (ag *aggregator) processPins(ctx context.Context, pins []*fftypes.Pin, stat
}

func (ag *aggregator) processMessage(ctx context.Context, batch *fftypes.Batch, pin *fftypes.Pin, msgBaseIndex int64, msg *fftypes.Message, state *batchState) (err error) {
l := log.L(ctx)

// Check if it's ready to be processed
unmaskedContexts := make([]*fftypes.Bytes32, 0, len(msg.Header.Topics))
nextPins := make([]*nextPinState, 0, len(msg.Header.Topics))
if pin.Masked {
// Private messages have one or more masked "pin" hashes that allow us to work
// out if it's the next message in the sequence, given the previous messages
if msg.Header.Group == nil || len(msg.Pins) == 0 || len(msg.Header.Topics) != len(msg.Pins) {
log.L(ctx).Errorf("Message '%s' in batch '%s' has invalid pin data pins=%v topics=%v", msg.Header.ID, batch.ID, msg.Pins, msg.Header.Topics)
l.Errorf("Message '%s' in batch '%s' has invalid pin data pins=%v topics=%v", msg.Header.ID, batch.ID, msg.Pins, msg.Header.Topics)
return nil
}
for i, pinStr := range msg.Pins {
var msgContext fftypes.Bytes32
err := msgContext.UnmarshalText([]byte(pinStr))
if err != nil {
log.L(ctx).Errorf("Message '%s' in batch '%s' has invalid pin at index %d: '%s'", msg.Header.ID, batch.ID, i, pinStr)
l.Errorf("Message '%s' in batch '%s' has invalid pin at index %d: '%s'", msg.Header.ID, batch.ID, i, pinStr)
return nil
}
nextPin, err := state.CheckMaskedContextReady(ctx, msg, msg.Header.Topics[i], pin.Sequence, &msgContext)
Expand All @@ -272,14 +278,15 @@ func (ag *aggregator) processMessage(ctx context.Context, batch *fftypes.Batch,
h.Write([]byte(topic))
msgContext := fftypes.HashResult(h)
unmaskedContexts = append(unmaskedContexts, msgContext)
ready, err := state.CheckUnmaskedContextReady(ctx, *msgContext, msg, msg.Header.Topics[i], pin.Sequence)
ready, err := state.CheckUnmaskedContextReady(ctx, msgContext, msg, msg.Header.Topics[i], pin.Sequence)
if err != nil || !ready {
return err
}
}

}

l.Debugf("Attempt dispatch msg=%s broadcastContexts=%v privatePins=%v", msg.Header.ID, unmaskedContexts, msg.Pins)
dispatched, err := ag.attemptMessageDispatch(ctx, msg, state)
if err != nil {
return err
Expand Down
7 changes: 4 additions & 3 deletions internal/events/aggregator_batch_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,12 +137,12 @@ func (bs *batchState) RunFinalize(ctx context.Context) error {
return bs.flushPins(ctx)
}

func (bs *batchState) CheckUnmaskedContextReady(ctx context.Context, contextUnmasked fftypes.Bytes32, msg *fftypes.Message, topic string, firstMsgPinSequence int64) (bool, error) {
func (bs *batchState) CheckUnmaskedContextReady(ctx context.Context, contextUnmasked *fftypes.Bytes32, msg *fftypes.Message, topic string, firstMsgPinSequence int64) (bool, error) {

ucs, found := bs.unmaskedContexts[contextUnmasked]
ucs, found := bs.unmaskedContexts[*contextUnmasked]
if !found {
ucs = &contextState{blockedBy: -1}
bs.unmaskedContexts[contextUnmasked] = ucs
bs.unmaskedContexts[*contextUnmasked] = ucs

// We need to check there's no earlier sequences with the same unmasked context
fb := database.PinQueryFactory.NewFilterLimit(ctx, 1) // only need the first one
Expand Down Expand Up @@ -261,6 +261,7 @@ func (bs *batchState) flushPins(ctx context.Context) error {
fb.Gte("index", dm.firstPinIndex),
fb.Lte("index", dm.lastPinIndex),
)
log.L(ctx).Debugf("Marking message dispatched batch=%s msg=%s firstIndex=%d lastIndex=%d", dm.batchID, dm.msgID, dm.firstPinIndex, dm.lastPinIndex)
update := database.PinQueryFactory.NewUpdate(ctx).Set("dispatched", true)
if err := bs.database.UpdatePins(ctx, filter, update); err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion internal/events/aggregator_batch_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func TestSetContextBlockedByNoState(t *testing.T) {
unmaskedContext := fftypes.NewRandB32()
bs.SetContextBlockedBy(ag.ctx, *unmaskedContext, 10)

ready, err := bs.CheckUnmaskedContextReady(ag.ctx, *unmaskedContext, &fftypes.Message{}, "topic1", 1)
ready, err := bs.CheckUnmaskedContextReady(ag.ctx, unmaskedContext, &fftypes.Message{}, "topic1", 1)
assert.NoError(t, err)
assert.False(t, ready)
}
2 changes: 2 additions & 0 deletions internal/events/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/hyperledger/firefly/internal/config"
"github.com/hyperledger/firefly/internal/definitions"
"github.com/hyperledger/firefly/internal/log"
"github.com/hyperledger/firefly/mocks/databasemocks"
"github.com/hyperledger/firefly/mocks/datamocks"
"github.com/hyperledger/firefly/mocks/definitionsmocks"
Expand Down Expand Up @@ -166,6 +167,7 @@ func TestAggregationMaskedZeroNonceMatch(t *testing.T) {
}

func TestAggregationMaskedNextSequenceMatch(t *testing.T) {
log.SetLevel("debug")

ag, cancel := newTestAggregator()
defer cancel()
Expand Down
2 changes: 1 addition & 1 deletion internal/privatemessaging/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func (s *messageSender) sendInternal(ctx context.Context, method sendMethod) err
if err := s.mgr.database.UpsertMessage(ctx, &s.msg.Message, database.UpsertOptimizationNew); err != nil {
return err
}
log.L(ctx).Infof("Sent private message %s:%s", s.msg.Header.Namespace, s.msg.Header.ID)
log.L(ctx).Infof("Sent private message %s:%s sequence=%d", s.msg.Header.Namespace, s.msg.Header.ID, s.msg.Sequence)

if method == methodSendImmediate {
if err := s.sendUnpinned(ctx); err != nil {
Expand Down