diff --git a/db/migrations/postgres/000059_create_pins_batch_index.down.sql b/db/migrations/postgres/000059_create_pins_batch_index.down.sql new file mode 100644 index 0000000000..12c785fe49 --- /dev/null +++ b/db/migrations/postgres/000059_create_pins_batch_index.down.sql @@ -0,0 +1,3 @@ +BEGIN; +DROP INDEX pins_batch; +COMMIT; diff --git a/db/migrations/postgres/000059_create_pins_batch_index.up.sql b/db/migrations/postgres/000059_create_pins_batch_index.up.sql new file mode 100644 index 0000000000..839784ee16 --- /dev/null +++ b/db/migrations/postgres/000059_create_pins_batch_index.up.sql @@ -0,0 +1,3 @@ +BEGIN; +CREATE INDEX pins_batch ON pins(batch_id); +COMMIT; \ No newline at end of file diff --git a/db/migrations/sqlite/000059_create_pins_batch_index.down.sql b/db/migrations/sqlite/000059_create_pins_batch_index.down.sql new file mode 100644 index 0000000000..f825a38656 --- /dev/null +++ b/db/migrations/sqlite/000059_create_pins_batch_index.down.sql @@ -0,0 +1 @@ +DROP INDEX pins_batch; diff --git a/db/migrations/sqlite/000059_create_pins_batch_index.up.sql b/db/migrations/sqlite/000059_create_pins_batch_index.up.sql new file mode 100644 index 0000000000..30ecb893bf --- /dev/null +++ b/db/migrations/sqlite/000059_create_pins_batch_index.up.sql @@ -0,0 +1 @@ +CREATE INDEX pins_batch ON pins(batch_id); diff --git a/internal/database/sqlcommon/pin_sql.go b/internal/database/sqlcommon/pin_sql.go index cb85028da2..f16857dbf6 100644 --- a/internal/database/sqlcommon/pin_sql.go +++ b/internal/database/sqlcommon/pin_sql.go @@ -1,4 +1,4 @@ -// Copyright © 2021 Kaleido, Inc. +// Copyright © 2022 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -141,7 +141,7 @@ func (s *SQLCommon) GetPins(ctx context.Context, filter database.Filter) (messag } -func (s *SQLCommon) SetPinDispatched(ctx context.Context, sequence int64) (err error) { +func (s *SQLCommon) UpdatePins(ctx context.Context, filter database.Filter, update database.Update) (err error) { ctx, tx, autoCommit, err := s.beginOrUseTx(ctx) if err != nil { @@ -149,16 +149,17 @@ func (s *SQLCommon) SetPinDispatched(ctx context.Context, sequence int64) (err e } defer s.rollbackTx(ctx, tx, autoCommit) - _, err = s.updateTx(ctx, tx, sq. - Update("pins"). - Set("dispatched", true). - Where(sq.Eq{ - sequenceColumn: sequence, - }), - func() { - s.callbacks.OrderedCollectionEvent(database.CollectionPins, fftypes.ChangeEventTypeUpdated, sequence) - }, - ) + query, err := s.buildUpdate(sq.Update("pins"), update, pinFilterFieldMap) + if err != nil { + return err + } + + query, err = s.filterUpdate(ctx, "", query, filter, pinFilterFieldMap) + if err != nil { + return err + } + + _, err = s.updateTx(ctx, tx, query, nil /* no change events filter based update */) if err != nil { return err } diff --git a/internal/database/sqlcommon/pin_sql_test.go b/internal/database/sqlcommon/pin_sql_test.go index 5ccce6bce4..5930682e82 100644 --- a/internal/database/sqlcommon/pin_sql_test.go +++ b/internal/database/sqlcommon/pin_sql_test.go @@ -47,7 +47,6 @@ func TestPinsE2EWithDB(t *testing.T) { } s.callbacks.On("OrderedCollectionEvent", database.CollectionPins, fftypes.ChangeEventTypeCreated, mock.Anything).Return() - s.callbacks.On("OrderedCollectionEvent", database.CollectionPins, fftypes.ChangeEventTypeUpdated, mock.Anything).Return() s.callbacks.On("OrderedCollectionEvent", database.CollectionPins, fftypes.ChangeEventTypeDeleted, mock.Anything).Return() err := s.UpsertPin(ctx, pin) @@ -67,7 +66,7 @@ func TestPinsE2EWithDB(t *testing.T) { assert.Equal(t, int64(1), *res.TotalCount) // Set it dispatched - err = s.SetPinDispatched(ctx, pin.Sequence) + err = s.UpdatePins(ctx, database.PinQueryFactory.NewFilter(ctx).Eq("sequence", pin.Sequence), database.PinQueryFactory.NewUpdate(ctx).Set("dispatched", true)) assert.NoError(t, err) // Double insert, checking no error and we keep the dispatched flag @@ -166,22 +165,42 @@ func TestGetPinReadMessageFail(t *testing.T) { assert.NoError(t, mock.ExpectationsWereMet()) } -func TestSetPinsDispatchedBeginFail(t *testing.T) { +func TestUpdatePinsBeginFail(t *testing.T) { s, mock := newMockProvider().init() mock.ExpectBegin().WillReturnError(fmt.Errorf("pop")) - err := s.SetPinDispatched(context.Background(), 12345) + ctx := context.Background() + err := s.UpdatePins(ctx, database.PinQueryFactory.NewFilter(ctx).Eq("sequence", 1), database.PinQueryFactory.NewUpdate(ctx).Set("dispatched", true)) assert.Regexp(t, "FF10114", err) } -func TestSetPinsDispatchedUpdateFail(t *testing.T) { +func TestUpdatePinsUpdateFail(t *testing.T) { s, mock := newMockProvider().init() mock.ExpectBegin() mock.ExpectExec("UPDATE .*").WillReturnError(fmt.Errorf("pop")) mock.ExpectRollback() - err := s.SetPinDispatched(context.Background(), 12345) + ctx := context.Background() + err := s.UpdatePins(ctx, database.PinQueryFactory.NewFilter(ctx).Eq("sequence", 1), database.PinQueryFactory.NewUpdate(ctx).Set("dispatched", true)) assert.Regexp(t, "FF10117", err) } +func TestUpdatePinsBadFilter(t *testing.T) { + s, mock := newMockProvider().init() + mock.ExpectBegin() + mock.ExpectRollback() + ctx := context.Background() + err := s.UpdatePins(ctx, database.PinQueryFactory.NewFilter(ctx).Eq("sequence", 1), database.PinQueryFactory.NewUpdate(ctx).Set("bad", true)) + assert.Regexp(t, "FF10148", err) +} + +func TestUpdatePinsBadUpdate(t *testing.T) { + s, mock := newMockProvider().init() + mock.ExpectBegin() + mock.ExpectRollback() + ctx := context.Background() + err := s.UpdatePins(ctx, database.PinQueryFactory.NewFilter(ctx).Eq("bad", 1), database.PinQueryFactory.NewUpdate(ctx).Set("dispatched", true)) + assert.Regexp(t, "FF10148", err) +} + func TestPinDeleteBeginFail(t *testing.T) { s, mock := newMockProvider().init() mock.ExpectBegin().WillReturnError(fmt.Errorf("pop")) diff --git a/internal/events/aggregator.go b/internal/events/aggregator.go index a12617b82f..c9d27e3e7d 100644 --- a/internal/events/aggregator.go +++ b/internal/events/aggregator.go @@ -20,7 +20,6 @@ import ( "context" "crypto/sha256" "database/sql/driver" - "encoding/binary" "github.com/hyperledger/firefly/internal/config" "github.com/hyperledger/firefly/internal/data" @@ -73,7 +72,7 @@ func newAggregator(ctx context.Context, di database.Plugin, sh definitions.Defin namespace: fftypes.SystemNamespace, offsetType: fftypes.OffsetTypeAggregator, offsetName: aggregatorOffsetName, - newEventsHandler: ag.processPinsDBGroup, + newEventsHandler: ag.processPinsEventsHandler, getItems: ag.getPins, queryFactory: database.PinQueryFactory, addCriteria: func(af database.AndFilter) database.AndFilter { @@ -118,8 +117,8 @@ func (ag *aggregator) rewindOffchainBatches() (rewind bool, offset int64) { if len(batchIDs) > 0 { fb := database.PinQueryFactory.NewFilter(ag.ctx) filter := fb.And( - fb.Eq("dispatched", false), fb.In("batch", batchIDs), + fb.Eq("dispatched", false), ).Sort("sequence").Limit(1) // only need the one oldest sequence sequences, _, err := ag.database.GetPins(ag.ctx, filter) if err != nil { @@ -136,63 +135,15 @@ func (ag *aggregator) rewindOffchainBatches() (rewind bool, offset int64) { return rewind, offset } -// batchActions are synchronous actions to be performed while processing system messages, but which must happen after reading the whole batch -type batchActions struct { - // PreFinalize callbacks may perform blocking actions (possibly to an external connector) - // - Will execute after all batch messages have been processed - // - Will execute outside database RunAsGroup - // - If any PreFinalize callback errors out, batch will be aborted and retried - PreFinalize []func(ctx context.Context) error - - // Finalize callbacks may perform final, non-idempotent database operations (such as inserting Events) - // - Will execute after all batch messages have been processed and any PreFinalize callbacks have succeeded - // - Will execute inside database RunAsGroup - // - If any Finalize callback errors out, batch will be aborted and retried (small chance of duplicate execution here) - Finalize []func(ctx context.Context) error -} - -func (ba *batchActions) AddPreFinalize(action func(ctx context.Context) error) { - if action != nil { - ba.PreFinalize = append(ba.PreFinalize, action) - } -} - -func (ba *batchActions) AddFinalize(action func(ctx context.Context) error) { - if action != nil { - ba.Finalize = append(ba.Finalize, action) - } -} - -func (ba *batchActions) RunPreFinalize(ctx context.Context) error { - for _, action := range ba.PreFinalize { - if err := action(ctx); err != nil { - return err - } - } - return nil -} - -func (ba *batchActions) RunFinalize(ctx context.Context) error { - for _, action := range ba.Finalize { - if err := action(ctx); err != nil { - return err - } - } - return nil -} - -func (ag *aggregator) processWithBatchActions(callback func(ctx context.Context, actions *batchActions) error) error { - actions := &batchActions{ - PreFinalize: make([]func(ctx context.Context) error, 0), - Finalize: make([]func(ctx context.Context) error, 0), - } +func (ag *aggregator) processWithBatchState(callback func(ctx context.Context, state *batchState) error) error { + state := newBatchState(ag) err := ag.database.RunAsGroup(ag.ctx, func(ctx context.Context) (err error) { - if err := callback(ctx, actions); err != nil { + if err := callback(ctx, state); err != nil { return err } - if len(actions.PreFinalize) == 0 { - return actions.RunFinalize(ctx) + if len(state.PreFinalize) == 0 { + return state.RunFinalize(ctx) } return nil }) @@ -200,26 +151,26 @@ func (ag *aggregator) processWithBatchActions(callback func(ctx context.Context, return err } - if len(actions.PreFinalize) == 0 { + if len(state.PreFinalize) == 0 { return err } - if err := actions.RunPreFinalize(ag.ctx); err != nil { + if err := state.RunPreFinalize(ag.ctx); err != nil { return err } return ag.database.RunAsGroup(ag.ctx, func(ctx context.Context) error { - return actions.RunFinalize(ctx) + return state.RunFinalize(ctx) }) } -func (ag *aggregator) processPinsDBGroup(items []fftypes.LocallySequenced) (repoll bool, err error) { +func (ag *aggregator) processPinsEventsHandler(items []fftypes.LocallySequenced) (repoll bool, err error) { pins := make([]*fftypes.Pin, len(items)) for i, item := range items { pins[i] = item.(*fftypes.Pin) } - return false, ag.processWithBatchActions(func(ctx context.Context, actions *batchActions) error { - return ag.processPins(ctx, pins, actions) + return false, ag.processWithBatchState(func(ctx context.Context, state *batchState) error { + return ag.processPins(ctx, pins, state) }) } @@ -232,7 +183,7 @@ func (ag *aggregator) getPins(ctx context.Context, filter database.Filter) ([]ff return ls, err } -func (ag *aggregator) processPins(ctx context.Context, pins []*fftypes.Pin, actions *batchActions) (err error) { +func (ag *aggregator) processPins(ctx context.Context, pins []*fftypes.Pin, state *batchState) (err error) { l := log.L(ctx) // Keep a batch cache for this list of pins @@ -257,8 +208,10 @@ func (ag *aggregator) processPins(ctx context.Context, pins []*fftypes.Pin, acti // Extract the message from the batch - where the index is of a topic within a message var msg *fftypes.Message var i int64 = -1 + var msgBaseIndex int64 for iM := 0; i < pin.Index && iM < len(batch.Payload.Messages); iM++ { msg = batch.Payload.Messages[iM] + msgBaseIndex = i for iT := 0; i < pin.Index && iT < len(msg.Header.Topics); iT++ { i++ } @@ -279,7 +232,8 @@ func (ag *aggregator) processPins(ctx context.Context, pins []*fftypes.Pin, acti dupMsgCheck[*msg.Header.ID] = true // Attempt to process the message (only returns errors for database persistence issues) - if err = ag.processMessage(ctx, batch, pin.Masked, pin.Sequence, msg, actions); err != nil { + err := ag.processMessage(ctx, batch, pin, msgBaseIndex, msg, state) + if err != nil { return err } } @@ -288,23 +242,11 @@ func (ag *aggregator) processPins(ctx context.Context, pins []*fftypes.Pin, acti return err } -func (ag *aggregator) calcHash(topic string, groupID *fftypes.Bytes32, identity string, nonce int64) *fftypes.Bytes32 { - h := sha256.New() - h.Write([]byte(topic)) - h.Write((*groupID)[:]) - h.Write([]byte(identity)) - nonceBytes := make([]byte, 8) - binary.BigEndian.PutUint64(nonceBytes, uint64(nonce)) - h.Write(nonceBytes) - return fftypes.HashResult(h) -} - -func (ag *aggregator) processMessage(ctx context.Context, batch *fftypes.Batch, masked bool, pinnedSequence int64, msg *fftypes.Message, actions *batchActions) (err error) { - l := log.L(ctx) - +func (ag *aggregator) processMessage(ctx context.Context, batch *fftypes.Batch, pin *fftypes.Pin, msgBaseIndex int64, msg *fftypes.Message, state *batchState) (err error) { // Check if it's ready to be processed - nextPins := make([]*fftypes.NextPin, len(msg.Pins)) - if masked { + unmaskedContexts := make([]*fftypes.Bytes32, 0, len(msg.Header.Topics)) + nextPins := make([]*nextPinState, 0, len(msg.Header.Topics)) + if pin.Masked { // Private messages have one or more masked "pin" hashes that allow us to work // out if it's the next message in the sequence, given the previous messages if msg.Header.Group == nil || len(msg.Pins) == 0 || len(msg.Header.Topics) != len(msg.Pins) { @@ -312,173 +254,55 @@ func (ag *aggregator) processMessage(ctx context.Context, batch *fftypes.Batch, return nil } for i, pinStr := range msg.Pins { - var pin fftypes.Bytes32 - err := pin.UnmarshalText([]byte(pinStr)) + var msgContext fftypes.Bytes32 + err := msgContext.UnmarshalText([]byte(pinStr)) if err != nil { log.L(ctx).Errorf("Message '%s' in batch '%s' has invalid pin at index %d: '%s'", msg.Header.ID, batch.ID, i, pinStr) return nil } - nextPin, err := ag.checkMaskedContextReady(ctx, msg, msg.Header.Topics[i], pinnedSequence, &pin) + nextPin, err := state.CheckMaskedContextReady(ctx, msg, msg.Header.Topics[i], pin.Sequence, &msgContext) if err != nil || nextPin == nil { return err } - nextPins[i] = nextPin + nextPins = append(nextPins, nextPin) } } else { - // We just need to check there's no earlier sequences with the same unmasked context - unmaskedContexts := make([]driver.Value, len(msg.Header.Topics)) for i, topic := range msg.Header.Topics { h := sha256.New() h.Write([]byte(topic)) - unmaskedContexts[i] = fftypes.HashResult(h) - } - fb := database.PinQueryFactory.NewFilter(ctx) - filter := fb.And( - fb.Eq("dispatched", false), - fb.In("hash", unmaskedContexts), - fb.Lt("sequence", pinnedSequence), - ) - earlier, _, err := ag.database.GetPins(ctx, filter) - if err != nil { - return err - } - if len(earlier) > 0 { - l.Debugf("Message %s pinned at sequence %d blocked by earlier context %s at sequence %d", msg.Header.ID, pinnedSequence, earlier[0].Hash, earlier[0].Sequence) - return nil - } - } - - dispatched, err := ag.attemptMessageDispatch(ctx, msg, actions) - if err != nil || !dispatched { - return err - } - - actions.AddFinalize(func(ctx context.Context) error { - // Move the nextPin forwards to the next sequence for this sender, on all - // topics associated with the message - if masked { - for i, nextPin := range nextPins { - nextPin.Nonce++ - nextPin.Hash = ag.calcHash(msg.Header.Topics[i], msg.Header.Group, nextPin.Identity, nextPin.Nonce) - if err = ag.database.UpdateNextPin(ctx, nextPin.Sequence, database.NextPinQueryFactory.NewUpdate(ctx). - Set("nonce", nextPin.Nonce). - Set("hash", nextPin.Hash), - ); err != nil { - return err - } + msgContext := fftypes.HashResult(h) + unmaskedContexts = append(unmaskedContexts, msgContext) + ready, err := state.CheckUnmaskedContextReady(ctx, *msgContext, msg, msg.Header.Topics[i], pin.Sequence) + if err != nil || !ready { + return err } } - // Mark the pin dispatched - return ag.database.SetPinDispatched(ctx, pinnedSequence) - }) - - return nil -} - -func (ag *aggregator) checkMaskedContextReady(ctx context.Context, msg *fftypes.Message, topic string, pinnedSequence int64, pin *fftypes.Bytes32) (*fftypes.NextPin, error) { - l := log.L(ctx) - - // For masked pins, we can only process if: - // - it is the next sequence on this context for one of the members of the group - // - there are no undispatched messages on this context earlier in the stream - h := sha256.New() - h.Write([]byte(topic)) - h.Write((*msg.Header.Group)[:]) - contextUnmasked := fftypes.HashResult(h) - filter := database.NextPinQueryFactory.NewFilter(ctx).Eq("context", contextUnmasked) - nextPins, _, err := ag.database.GetNextPins(ctx, filter) - if err != nil { - return nil, err } - l.Debugf("Group=%s Topic='%s' Sequence=%d Pin=%s NextPins=%v", msg.Header.Group, topic, pinnedSequence, pin, nextPins) - if len(nextPins) == 0 { - // If this is the first time we've seen the context, then this message is read as long as it is - // the first (nonce=0) message on the context, for one of the members, and there aren't any earlier - // messages that are nonce=0. - return ag.attemptContextInit(ctx, msg, topic, pinnedSequence, contextUnmasked, pin) - } - - // This message must be the next hash for the author - var nextPin *fftypes.NextPin - for _, np := range nextPins { - if *np.Hash == *pin { - nextPin = np - break - } - } - if nextPin == nil || nextPin.Identity != msg.Header.Author { - l.Warnf("Mismatched nexthash or author group=%s topic=%s context=%s pin=%s nextHash=%+v", msg.Header.Group, topic, contextUnmasked, pin, nextPin) - return nil, nil - } - return nextPin, nil -} - -func (ag *aggregator) attemptContextInit(ctx context.Context, msg *fftypes.Message, topic string, pinnedSequence int64, contextUnmasked, pin *fftypes.Bytes32) (*fftypes.NextPin, error) { - l := log.L(ctx) - - // It might be the system topic/context initializing the group - group, err := ag.definitions.ResolveInitGroup(ctx, msg) - if err != nil || group == nil { - return nil, err + dispatched, err := ag.attemptMessageDispatch(ctx, msg, state) + if err != nil { + return err } - // Find the list of zerohashes for this context, and match this pin to one of them - zeroHashes := make([]driver.Value, len(group.Members)) - var nextPin *fftypes.NextPin - nextPins := make([]*fftypes.NextPin, len(group.Members)) - for i, member := range group.Members { - zeroHash := ag.calcHash(topic, msg.Header.Group, member.Identity, 0) - np := &fftypes.NextPin{ - Context: contextUnmasked, - Identity: member.Identity, - Hash: zeroHash, - Nonce: 0, + // Mark all message pins dispatched true/false + // - dispatched=true: we need to write them dispatched in the DB at the end of the batch, and increment all nextPins + // - dispatched=false: we need to prevent dispatch of any subsequent messages on the same topic in the batch + if dispatched { + for _, np := range nextPins { + np.IncrementNextPin(ctx) } - if *pin == *zeroHash { - if member.Identity != msg.Header.Author { - l.Warnf("Author mismatch for zerohash on context: group=%s topic=%s context=%s pin=%s", msg.Header.Group, topic, contextUnmasked, pin) - return nil, nil - } - nextPin = np + state.MarkMessageDispatched(ctx, batch.ID, msg, msgBaseIndex) + } else { + for _, unmaskedContext := range unmaskedContexts { + state.SetContextBlockedBy(ctx, *unmaskedContext, pin.Sequence) } - zeroHashes[i] = zeroHash - nextPins[i] = np - } - l.Debugf("Group=%s topic=%s context=%s zeroHashes=%v", msg.Header.Group, topic, contextUnmasked, zeroHashes) - if nextPin == nil { - l.Warnf("No match for zerohash on context: group=%s topic=%s context=%s pin=%s", msg.Header.Group, topic, contextUnmasked, pin) - return nil, nil - } - - // Check none of the other zerohashes exist before us in the stream - fb := database.PinQueryFactory.NewFilter(ctx) - filter := fb.And( - fb.Eq("dispatched", false), - fb.In("hash", zeroHashes), - fb.Lt("sequence", pinnedSequence), - ) - earlier, _, err := ag.database.GetPins(ctx, filter) - if err != nil { - return nil, err - } - if len(earlier) > 0 { - l.Debugf("Group=%s topic=%s context=%s earlier=%v", msg.Header.Group, topic, contextUnmasked, earlier) - return nil, nil } - // We're good to be the first message on this context. - // Initialize the nextpins on this context - this is safe to do even if we don't actually dispatch the message - for _, np := range nextPins { - if err = ag.database.InsertNextPin(ctx, np); err != nil { - return nil, err - } - } - return nextPin, err + return nil } -func (ag *aggregator) attemptMessageDispatch(ctx context.Context, msg *fftypes.Message, actions *batchActions) (bool, error) { +func (ag *aggregator) attemptMessageDispatch(ctx context.Context, msg *fftypes.Message, state *batchState) (bool, error) { // If we don't find all the data, then we don't dispatch data, foundAll, err := ag.data.GetMessageData(ctx, msg, true) @@ -517,8 +341,8 @@ func (ag *aggregator) attemptMessageDispatch(ctx context.Context, msg *fftypes.M return false, err } if batchAction != nil { - actions.AddPreFinalize(batchAction.PreFinalize) - actions.AddFinalize(batchAction.Finalize) + state.AddPreFinalize(batchAction.PreFinalize) + state.AddFinalize(batchAction.Finalize) } if msgAction == definitions.ActionWait { return false, nil @@ -535,18 +359,18 @@ func (ag *aggregator) attemptMessageDispatch(ctx context.Context, msg *fftypes.M } } - state := fftypes.MessageStateConfirmed + status := fftypes.MessageStateConfirmed eventType := fftypes.EventTypeMessageConfirmed if !valid { - state = fftypes.MessageStateRejected + status = fftypes.MessageStateRejected eventType = fftypes.EventTypeMessageRejected } - actions.AddFinalize(func(ctx context.Context) error { + state.AddFinalize(func(ctx context.Context) error { // This message is now confirmed setConfirmed := database.MessageQueryFactory.NewUpdate(ctx). Set("confirmed", fftypes.Now()). // the timestamp of the aggregator provides ordering - Set("state", state) // mark if the message was confirmed or rejected + Set("state", status) // mark if the message was confirmed or rejected if err = ag.database.UpdateMessage(ctx, msg.Header.ID, setConfirmed); err != nil { return err } diff --git a/internal/events/aggregator_batch_state.go b/internal/events/aggregator_batch_state.go new file mode 100644 index 0000000000..287c5cd561 --- /dev/null +++ b/internal/events/aggregator_batch_state.go @@ -0,0 +1,400 @@ +// Copyright © 2022 Kaleido, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package events + +import ( + "context" + "crypto/sha256" + "database/sql/driver" + "encoding/binary" + + "github.com/hyperledger/firefly/internal/definitions" + "github.com/hyperledger/firefly/internal/log" + "github.com/hyperledger/firefly/pkg/database" + "github.com/hyperledger/firefly/pkg/fftypes" +) + +func newBatchState(ag *aggregator) *batchState { + return &batchState{ + database: ag.database, + definitions: ag.definitions, + maskedContexts: make(map[fftypes.Bytes32]*nextPinGroupState), + unmaskedContexts: make(map[fftypes.Bytes32]*contextState), + dispatchedMessages: make([]*dispatchedMessage, 0), + + PreFinalize: make([]func(ctx context.Context) error, 0), + Finalize: make([]func(ctx context.Context) error, 0), + } +} + +// nextPinGroupState manages the state during the batch for an individual masked context. +// We read it from the database the first time a context is touched within a batch, and then +// replace the pins in it as they are spent/replaced for each pin processed. +// At the end we flush the changes to the database +type nextPinGroupState struct { + groupID *fftypes.Bytes32 + topic string + nextPins []*fftypes.NextPin + new bool + identitiesChanged map[string]bool +} + +type nextPinState struct { + nextPinGroup *nextPinGroupState + nextPin *fftypes.NextPin +} + +// contextState tracks the unmasked (broadcast) pins related to a particular context +// in the batch, as they might block further messages in the batch +type contextState struct { + blockedBy int64 +} + +// dispatchedMessage is a record for a message that was dispatched as part of processing this batch, +// so that all pins associated to the message can be marked dispatched at the end of the batch. +type dispatchedMessage struct { + batchID *fftypes.UUID + msgID *fftypes.UUID + firstPinIndex int64 + lastPinIndex int64 +} + +// batchState is the object that tracks the in-memory state that builds up while processing a batch of pins, +// that needs to be reconciled at the point the batch closes. +// There are three phases: +// 1. Dispatch: Determines if messages are blocked, or can be dispatched. Calls the appropriate dispatch +// actions for that message type. Reads initial `pin` state for contexts from the DB, and then +// updates this in-memory throughout the batch, ready for flushing in the Finalize phase. +// Runs in a database operation group/tranaction. +// 2. Pre-finalize: Runs any PreFinalize callbacks registered by the handlers in (1). +// Intended to be used for cross-microservice REST/GRPC etc. calls that have side-effects. +// Runs outside any database operation group/tranaction. +// 3. Finalize: Flushes the `pin` state calculated in phase (1), and any Finalize actions registered by handlers +// during phase (1) or (2). +// Runs in a database operation group/tranaction, which will be the same as phase (1) if there +// are no pre-finalize handlers registered. +type batchState struct { + database database.Plugin + definitions definitions.DefinitionHandlers + maskedContexts map[fftypes.Bytes32]*nextPinGroupState + unmaskedContexts map[fftypes.Bytes32]*contextState + dispatchedMessages []*dispatchedMessage + + // PreFinalize callbacks may perform blocking actions (possibly to an external connector) + // - Will execute after all batch messages have been processed + // - Will execute outside database RunAsGroup + // - If any PreFinalize callback errors out, batch will be aborted and retried + PreFinalize []func(ctx context.Context) error + + // Finalize callbacks may perform final, non-idempotent database operations (such as inserting Events) + // - Will execute after all batch messages have been processed and any PreFinalize callbacks have succeeded + // - Will execute inside database RunAsGroup + // - If any Finalize callback errors out, batch will be aborted and retried (small chance of duplicate execution here) + Finalize []func(ctx context.Context) error +} + +func (bs *batchState) AddPreFinalize(action func(ctx context.Context) error) { + if action != nil { + bs.PreFinalize = append(bs.PreFinalize, action) + } +} + +func (bs *batchState) AddFinalize(action func(ctx context.Context) error) { + if action != nil { + bs.Finalize = append(bs.Finalize, action) + } +} + +func (bs *batchState) RunPreFinalize(ctx context.Context) error { + for _, action := range bs.PreFinalize { + if err := action(ctx); err != nil { + return err + } + } + return nil +} + +func (bs *batchState) RunFinalize(ctx context.Context) error { + for _, action := range bs.Finalize { + if err := action(ctx); err != nil { + return err + } + } + return bs.flushPins(ctx) +} + +func (bs *batchState) CheckUnmaskedContextReady(ctx context.Context, contextUnmasked fftypes.Bytes32, msg *fftypes.Message, topic string, firstMsgPinSequence int64) (bool, error) { + + ucs, found := bs.unmaskedContexts[contextUnmasked] + if !found { + ucs = &contextState{blockedBy: -1} + bs.unmaskedContexts[contextUnmasked] = ucs + + // We need to check there's no earlier sequences with the same unmasked context + fb := database.PinQueryFactory.NewFilterLimit(ctx, 1) // only need the first one + filter := fb.And( + fb.Eq("hash", contextUnmasked), + fb.Eq("dispatched", false), + fb.Lt("sequence", firstMsgPinSequence), + ) + earlier, _, err := bs.database.GetPins(ctx, filter) + if err != nil { + return false, err + } + if len(earlier) > 0 { + ucs.blockedBy = earlier[0].Sequence + } + + } + + blocked := ucs.blockedBy >= 0 + if blocked { + log.L(ctx).Debugf("Message %s pinned at sequence %d blocked by earlier context %s at sequence %d", msg.Header.ID, firstMsgPinSequence, contextUnmasked, ucs.blockedBy) + } + return !blocked, nil + +} + +func (bs *batchState) CheckMaskedContextReady(ctx context.Context, msg *fftypes.Message, topic string, firstMsgPinSequence int64, pin *fftypes.Bytes32) (*nextPinState, error) { + l := log.L(ctx) + + // For masked pins, we can only process if: + // - it is the next sequence on this context for one of the members of the group + // - there are no undispatched messages on this context earlier in the stream + h := sha256.New() + h.Write([]byte(topic)) + h.Write((*msg.Header.Group)[:]) + contextUnmasked := fftypes.HashResult(h) + npg, err := bs.stateForMaskedContext(ctx, msg.Header.Group, topic, *contextUnmasked) + if err != nil { + return nil, err + } + if npg == nil { + // If this is the first time we've seen the context, then this message is read as long as it is + // the first (nonce=0) message on the context, for one of the members, and there aren't any earlier + // messages that are nonce=0. + return bs.attemptContextInit(ctx, msg, topic, firstMsgPinSequence, contextUnmasked, pin) + } + + // This message must be the next hash for the author + l.Debugf("Group=%s Topic='%s' Sequence=%d Pin=%s NextPins=%v", msg.Header.Group, topic, firstMsgPinSequence, pin, npg.nextPins) + var nextPin *fftypes.NextPin + for _, np := range npg.nextPins { + if *np.Hash == *pin { + nextPin = np + break + } + } + if nextPin == nil || nextPin.Identity != msg.Header.Author { + l.Warnf("Mismatched nexthash or author group=%s topic=%s context=%s pin=%s nextHash=%+v", msg.Header.Group, topic, contextUnmasked, pin, nextPin) + return nil, nil + } + return &nextPinState{ + nextPinGroup: npg, + nextPin: nextPin, + }, err +} + +func (bs *batchState) MarkMessageDispatched(ctx context.Context, batchID *fftypes.UUID, msg *fftypes.Message, msgBaseIndex int64) { + bs.dispatchedMessages = append(bs.dispatchedMessages, &dispatchedMessage{ + batchID: batchID, + msgID: msg.Header.ID, + firstPinIndex: msgBaseIndex, + lastPinIndex: msgBaseIndex + int64(len(msg.Header.Topics)), + }) +} + +func (bs *batchState) SetContextBlockedBy(ctx context.Context, unmaskedContext fftypes.Bytes32, blockedBy int64) { + ucs, found := bs.unmaskedContexts[unmaskedContext] + if !found { + ucs = &contextState{blockedBy: blockedBy} + bs.unmaskedContexts[unmaskedContext] = ucs + } else if ucs.blockedBy < 0 { + // Do not update an existing block, as we want the earliest entry to be the block + ucs.blockedBy = blockedBy + } +} + +func (bs *batchState) flushPins(ctx context.Context) error { + + // Update all the next pins + for _, npg := range bs.maskedContexts { + for _, np := range npg.nextPins { + if npg.new { + if err := bs.database.InsertNextPin(ctx, np); err != nil { + return err + } + } else if npg.identitiesChanged[np.Identity] { + update := database.NextPinQueryFactory.NewUpdate(ctx). + Set("nonce", np.Nonce). + Set("hash", np.Hash) + if err := bs.database.UpdateNextPin(ctx, np.Sequence, update); err != nil { + return err + } + } + } + } + + // Update all the pins that have been dispatched + // It's important we don't re-process the message, so we update all pins for a message to dispatched in one go, + // using the index range of pins it owns within the batch it is a part of. + // Note that this might include pins not in the batch we read from the database, as the page size + // cannot be guaranteed to overlap with the set of indexes of a message within a batch. + for _, dm := range bs.dispatchedMessages { + fb := database.PinQueryFactory.NewFilter(ctx) + filter := fb.And( + fb.Eq("batch", dm.batchID), + fb.Gte("index", dm.firstPinIndex), + fb.Lte("index", dm.lastPinIndex), + ) + update := database.PinQueryFactory.NewUpdate(ctx).Set("dispatched", true) + if err := bs.database.UpdatePins(ctx, filter, update); err != nil { + return err + } + } + + return nil +} + +func (nps *nextPinState) IncrementNextPin(ctx context.Context) { + npg := nps.nextPinGroup + np := nps.nextPin + for i, existingPin := range npg.nextPins { + if np.Hash.Equals(existingPin.Hash) { + // We are spending this one, replace it in the list + newNonce := np.Nonce + 1 + newNextPin := &fftypes.NextPin{ + Context: np.Context, + Identity: np.Identity, + Nonce: newNonce, + Hash: npg.calcPinHash(np.Identity, newNonce), + Sequence: np.Sequence, // used for update in Flush + } + npg.nextPins[i] = newNextPin + log.L(ctx).Debugf("Incrementing NextPin=%s - Nonce=%d Topic=%s Group=%s NewNextPin=%s", np.Hash, np.Nonce, npg.topic, npg.groupID.String(), newNextPin.Hash) + // Mark the identity as needing its hash replacing when we come to write back to the DB + npg.identitiesChanged[np.Identity] = true + } + } +} + +func (npg *nextPinGroupState) calcPinHash(identity string, nonce int64) *fftypes.Bytes32 { + h := sha256.New() + h.Write([]byte(npg.topic)) + h.Write((*npg.groupID)[:]) + h.Write([]byte(identity)) + nonceBytes := make([]byte, 8) + binary.BigEndian.PutUint64(nonceBytes, uint64(nonce)) + h.Write(nonceBytes) + return fftypes.HashResult(h) +} + +func (bs *batchState) stateForMaskedContext(ctx context.Context, groupID *fftypes.Bytes32, topic string, contextUnmasked fftypes.Bytes32) (*nextPinGroupState, error) { + + if npg, exists := bs.maskedContexts[contextUnmasked]; exists { + return npg, nil + } + + filter := database.NextPinQueryFactory.NewFilter(ctx).Eq("context", contextUnmasked) + nextPins, _, err := bs.database.GetNextPins(ctx, filter) + if err != nil { + return nil, err + } + + if len(nextPins) == 0 { + return nil, nil + } + + npg := &nextPinGroupState{ + groupID: groupID, + topic: topic, + identitiesChanged: make(map[string]bool), + nextPins: nextPins, + } + bs.maskedContexts[contextUnmasked] = npg + return npg, nil + +} + +func (bs *batchState) attemptContextInit(ctx context.Context, msg *fftypes.Message, topic string, pinnedSequence int64, contextUnmasked, pin *fftypes.Bytes32) (*nextPinState, error) { + l := log.L(ctx) + + // It might be the system topic/context initializing the group + // - This performs the actual database updates in-line, as it is idempotent + group, err := bs.definitions.ResolveInitGroup(ctx, msg) + if err != nil || group == nil { + return nil, err + } + + npg := &nextPinGroupState{ + groupID: msg.Header.Group, + topic: topic, + new: true, + identitiesChanged: make(map[string]bool), + nextPins: make([]*fftypes.NextPin, len(group.Members)), + } + + // Find the list of zerohashes for this context, and match this pin to one of them + zeroHashes := make([]driver.Value, len(group.Members)) + var nextPin *fftypes.NextPin + for i, member := range group.Members { + zeroHash := npg.calcPinHash(member.Identity, 0) + np := &fftypes.NextPin{ + Context: contextUnmasked, + Identity: member.Identity, + Hash: zeroHash, + Nonce: 0, + } + if *pin == *zeroHash { + if member.Identity != msg.Header.Author { + l.Warnf("Author mismatch for zerohash on context: group=%s topic=%s context=%s pin=%s", msg.Header.Group, topic, contextUnmasked, pin) + return nil, nil + } + nextPin = np + } + zeroHashes[i] = zeroHash + npg.nextPins[i] = np + } + l.Debugf("Group=%s topic=%s context=%s zeroHashes=%v", msg.Header.Group, topic, contextUnmasked, zeroHashes) + if nextPin == nil { + l.Warnf("No match for zerohash on context: group=%s topic=%s context=%s pin=%s", msg.Header.Group, topic, contextUnmasked, pin) + return nil, nil + } + + // Check none of the other zerohashes exist before us in the stream + fb := database.PinQueryFactory.NewFilter(ctx) + filter := fb.And( + fb.In("hash", zeroHashes), + fb.Eq("dispatched", false), + fb.Lt("sequence", pinnedSequence), + ) + earlier, _, err := bs.database.GetPins(ctx, filter) + if err != nil { + return nil, err + } + if len(earlier) > 0 { + l.Debugf("Group=%s topic=%s context=%s earlier=%v", msg.Header.Group, topic, contextUnmasked, earlier) + return nil, nil + } + + // Initialize the nextpins on this context + bs.maskedContexts[*contextUnmasked] = npg + return &nextPinState{ + nextPin: nextPin, + nextPinGroup: npg, + }, err +} diff --git a/internal/events/aggregator_batch_state_test.go b/internal/events/aggregator_batch_state_test.go new file mode 100644 index 0000000000..e846082b7c --- /dev/null +++ b/internal/events/aggregator_batch_state_test.go @@ -0,0 +1,58 @@ +// Copyright © 2021 Kaleido, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package events + +import ( + "fmt" + "testing" + + "github.com/hyperledger/firefly/mocks/databasemocks" + "github.com/hyperledger/firefly/pkg/fftypes" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" +) + +func TestFlushPinsFail(t *testing.T) { + ag, cancel := newTestAggregator() + defer cancel() + bs := newBatchState(ag) + + mdi := ag.database.(*databasemocks.Plugin) + mdi.On("UpdatePins", ag.ctx, mock.Anything, mock.Anything).Return(fmt.Errorf("pop")) + + bs.MarkMessageDispatched(ag.ctx, fftypes.NewUUID(), &fftypes.Message{ + Header: fftypes.MessageHeader{ + ID: fftypes.NewUUID(), + }, + }, 0) + + err := bs.flushPins(ag.ctx) + assert.Regexp(t, "pop", err) +} + +func TestSetContextBlockedByNoState(t *testing.T) { + ag, cancel := newTestAggregator() + defer cancel() + bs := newBatchState(ag) + + unmaskedContext := fftypes.NewRandB32() + bs.SetContextBlockedBy(ag.ctx, *unmaskedContext, 10) + + ready, err := bs.CheckUnmaskedContextReady(ag.ctx, *unmaskedContext, &fftypes.Message{}, "topic1", 1) + assert.NoError(t, err) + assert.False(t, ready) +} diff --git a/internal/events/aggregator_test.go b/internal/events/aggregator_test.go index 461ae1e24d..fd9c0197b5 100644 --- a/internal/events/aggregator_test.go +++ b/internal/events/aggregator_test.go @@ -46,7 +46,7 @@ func TestAggregationMaskedZeroNonceMatch(t *testing.T) { ag, cancel := newTestAggregator() defer cancel() - ba := &batchActions{} + bs := newBatchState(ag) // Generate some pin data member1org := "org1" @@ -60,9 +60,10 @@ func TestAggregationMaskedZeroNonceMatch(t *testing.T) { h.Write([]byte(topic)) h.Write((*groupID)[:]) contextUnmasked := fftypes.HashResult(h) - member1NonceZero := ag.calcHash(topic, groupID, member1org, 0) - member2NonceZero := ag.calcHash(topic, groupID, member2org, 0) - member2NonceOne := ag.calcHash(topic, groupID, member2org, 1) + initNPG := &nextPinGroupState{topic: topic, groupID: groupID} + member1NonceZero := initNPG.calcPinHash(member1org, 0) + member2NonceZero := initNPG.calcPinHash(member2org, 0) + member2NonceOne := initNPG.calcPinHash(member2org, 1) mdi := ag.database.(*databasemocks.Plugin) mdm := ag.data.(*datamocks.Manager) @@ -113,7 +114,7 @@ func TestAggregationMaskedZeroNonceMatch(t *testing.T) { mdi.On("InsertNextPin", ag.ctx, mock.MatchedBy(func(np *fftypes.NextPin) bool { assert.Equal(t, *np.Context, *contextUnmasked) np.Sequence = 10012 - return *np.Hash == *member2NonceZero && np.Nonce == 0 + return *np.Hash == *member2NonceOne && np.Nonce == 1 })).Return(nil).Once() // Validate the message is ok mdm.On("GetMessageData", ag.ctx, mock.Anything, true).Return([]*fftypes.Data{}, true, nil) @@ -122,21 +123,8 @@ func TestAggregationMaskedZeroNonceMatch(t *testing.T) { mdi.On("InsertEvent", ag.ctx, mock.MatchedBy(func(e *fftypes.Event) bool { return *e.Reference == *msgID && e.Type == fftypes.EventTypeMessageConfirmed })).Return(nil) - // Update member2 to nonce 1 - mdi.On("UpdateNextPin", ag.ctx, mock.MatchedBy(func(seq int64) bool { - return seq == 10012 - }), mock.MatchedBy(func(update database.Update) bool { - ui, _ := update.Finalize() - assert.Equal(t, "nonce", ui.SetOperations[0].Field) - v, _ := ui.SetOperations[0].Value.Value() - assert.Equal(t, int64(1), v.(int64)) - assert.Equal(t, "hash", ui.SetOperations[1].Field) - v, _ = ui.SetOperations[1].Value.Value() - assert.Equal(t, member2NonceOne.String(), v) - return true - })).Return(nil) // Set the pin to dispatched - mdi.On("SetPinDispatched", ag.ctx, int64(10001)).Return(nil) + mdi.On("UpdatePins", ag.ctx, mock.Anything, mock.Anything).Return(nil) // Update the message mdi.On("UpdateMessage", ag.ctx, mock.Anything, mock.MatchedBy(func(u database.Update) bool { update, err := u.Finalize() @@ -167,10 +155,10 @@ func TestAggregationMaskedZeroNonceMatch(t *testing.T) { Index: 0, Dispatched: false, }, - }, ba) + }, bs) assert.NoError(t, err) - err = ba.RunFinalize(ag.ctx) + err = bs.RunFinalize(ag.ctx) assert.NoError(t, err) mdi.AssertExpectations(t) @@ -194,9 +182,10 @@ func TestAggregationMaskedNextSequenceMatch(t *testing.T) { h.Write([]byte(topic)) h.Write((*groupID)[:]) contextUnmasked := fftypes.HashResult(h) - member1Nonce100 := ag.calcHash(topic, groupID, member1org, 100) - member2Nonce500 := ag.calcHash(topic, groupID, member2org, 500) - member2Nonce501 := ag.calcHash(topic, groupID, member2org, 501) + initNPG := &nextPinGroupState{topic: topic, groupID: groupID} + member1Nonce100 := initNPG.calcPinHash(member1org, 100) + member2Nonce500 := initNPG.calcPinHash(member2org, 500) + member2Nonce501 := initNPG.calcPinHash(member2org, 501) mdi := ag.database.(*databasemocks.Plugin) mdm := ag.data.(*datamocks.Manager) @@ -255,13 +244,13 @@ func TestAggregationMaskedNextSequenceMatch(t *testing.T) { return true })).Return(nil) // Set the pin to dispatched - mdi.On("SetPinDispatched", ag.ctx, int64(10001)).Return(nil) + mdi.On("UpdatePins", ag.ctx, mock.Anything, mock.Anything).Return(nil) // Update the message mdi.On("UpdateMessage", ag.ctx, mock.Anything, mock.Anything).Return(nil) // Confirm the offset mdi.On("UpdateOffset", ag.ctx, mock.Anything, mock.Anything, mock.Anything).Return(nil) - _, err := ag.processPinsDBGroup([]fftypes.LocallySequenced{ + _, err := ag.processPinsEventsHandler([]fftypes.LocallySequenced{ &fftypes.Pin{ Sequence: 10001, Masked: true, @@ -281,7 +270,7 @@ func TestAggregationBroadcast(t *testing.T) { ag, cancel := newTestAggregator() defer cancel() - ba := &batchActions{} + bs := newBatchState(ag) // Generate some pin data topic := "some-topic" @@ -327,7 +316,7 @@ func TestAggregationBroadcast(t *testing.T) { return *e.Reference == *msgID && e.Type == fftypes.EventTypeMessageConfirmed })).Return(nil) // Set the pin to dispatched - mdi.On("SetPinDispatched", ag.ctx, int64(10001)).Return(nil) + mdi.On("UpdatePins", ag.ctx, mock.Anything, mock.Anything).Return(nil) // Update the message mdi.On("UpdateMessage", ag.ctx, mock.Anything, mock.Anything).Return(nil) // Confirm the offset @@ -341,10 +330,10 @@ func TestAggregationBroadcast(t *testing.T) { Index: 0, Dispatched: false, }, - }, ba) + }, bs) assert.NoError(t, err) - err = ba.RunFinalize(ag.ctx) + err = bs.RunFinalize(ag.ctx) assert.NoError(t, err) mdi.AssertExpectations(t) @@ -381,7 +370,7 @@ func TestProcessPinsDBGroupFail(t *testing.T) { } mdi.On("GetBatchByID", ag.ctx, mock.Anything).Return(nil, fmt.Errorf("pop")) - _, err := ag.processPinsDBGroup([]fftypes.LocallySequenced{ + _, err := ag.processPinsEventsHandler([]fftypes.LocallySequenced{ &fftypes.Pin{ Batch: fftypes.NewUUID(), }, @@ -406,7 +395,7 @@ func TestGetPins(t *testing.T) { func TestProcessPinsMissingBatch(t *testing.T) { ag, cancel := newTestAggregator() defer cancel() - ba := &batchActions{} + bs := newBatchState(ag) mdi := ag.database.(*databasemocks.Plugin) mdi.On("GetBatchByID", ag.ctx, mock.Anything).Return(nil, nil) @@ -414,7 +403,7 @@ func TestProcessPinsMissingBatch(t *testing.T) { err := ag.processPins(ag.ctx, []*fftypes.Pin{ {Sequence: 12345, Batch: fftypes.NewUUID()}, - }, ba) + }, bs) assert.NoError(t, err) } @@ -422,7 +411,7 @@ func TestProcessPinsMissingBatch(t *testing.T) { func TestProcessPinsMissingNoMsg(t *testing.T) { ag, cancel := newTestAggregator() defer cancel() - ba := &batchActions{} + bs := newBatchState(ag) mdi := ag.database.(*databasemocks.Plugin) mdi.On("GetBatchByID", ag.ctx, mock.Anything).Return(&fftypes.Batch{ @@ -437,7 +426,7 @@ func TestProcessPinsMissingNoMsg(t *testing.T) { err := ag.processPins(ag.ctx, []*fftypes.Pin{ {Sequence: 12345, Batch: fftypes.NewUUID(), Index: 25}, - }, ba) + }, bs) assert.NoError(t, err) mdi.AssertExpectations(t) @@ -446,7 +435,7 @@ func TestProcessPinsMissingNoMsg(t *testing.T) { func TestProcessPinsBadMsgHeader(t *testing.T) { ag, cancel := newTestAggregator() defer cancel() - ba := &batchActions{} + bs := newBatchState(ag) mdi := ag.database.(*databasemocks.Plugin) mdi.On("GetBatchByID", ag.ctx, mock.Anything).Return(&fftypes.Batch{ @@ -464,7 +453,7 @@ func TestProcessPinsBadMsgHeader(t *testing.T) { err := ag.processPins(ag.ctx, []*fftypes.Pin{ {Sequence: 12345, Batch: fftypes.NewUUID(), Index: 0}, - }, ba) + }, bs) assert.NoError(t, err) mdi.AssertExpectations(t) @@ -473,7 +462,7 @@ func TestProcessPinsBadMsgHeader(t *testing.T) { func TestProcessSkipDupMsg(t *testing.T) { ag, cancel := newTestAggregator() defer cancel() - ba := &batchActions{} + bs := newBatchState(ag) batchID := fftypes.NewUUID() mdi := ag.database.(*databasemocks.Plugin) @@ -496,7 +485,7 @@ func TestProcessSkipDupMsg(t *testing.T) { err := ag.processPins(ag.ctx, []*fftypes.Pin{ {Sequence: 12345, Batch: batchID, Index: 0, Hash: fftypes.NewRandB32()}, {Sequence: 12345, Batch: batchID, Index: 1, Hash: fftypes.NewRandB32()}, - }, ba) + }, bs) assert.NoError(t, err) mdi.AssertExpectations(t) @@ -505,7 +494,7 @@ func TestProcessSkipDupMsg(t *testing.T) { func TestProcessMsgFailGetPins(t *testing.T) { ag, cancel := newTestAggregator() defer cancel() - ba := &batchActions{} + bs := newBatchState(ag) batchID := fftypes.NewUUID() mdi := ag.database.(*databasemocks.Plugin) @@ -524,7 +513,7 @@ func TestProcessMsgFailGetPins(t *testing.T) { err := ag.processPins(ag.ctx, []*fftypes.Pin{ {Sequence: 12345, Batch: batchID, Index: 0, Hash: fftypes.NewRandB32()}, - }, ba) + }, bs) assert.EqualError(t, err, "pop") mdi.AssertExpectations(t) } @@ -533,7 +522,7 @@ func TestProcessMsgFailMissingGroup(t *testing.T) { ag, cancel := newTestAggregator() defer cancel() - err := ag.processMessage(ag.ctx, &fftypes.Batch{}, true, 12345, &fftypes.Message{}, nil) + err := ag.processMessage(ag.ctx, &fftypes.Batch{}, &fftypes.Pin{Masked: true, Sequence: 12345}, 10, &fftypes.Message{}, nil) assert.NoError(t, err) } @@ -542,14 +531,14 @@ func TestProcessMsgFailBadPin(t *testing.T) { ag, cancel := newTestAggregator() defer cancel() - err := ag.processMessage(ag.ctx, &fftypes.Batch{}, true, 12345, &fftypes.Message{ + err := ag.processMessage(ag.ctx, &fftypes.Batch{}, &fftypes.Pin{Masked: true, Sequence: 12345}, 10, &fftypes.Message{ Header: fftypes.MessageHeader{ ID: fftypes.NewUUID(), Group: fftypes.NewRandB32(), Topics: fftypes.FFStringArray{"topic1"}, }, Pins: fftypes.FFStringArray{"!Wrong"}, - }, nil) + }, newBatchState(ag)) assert.NoError(t, err) } @@ -561,14 +550,14 @@ func TestProcessMsgFailGetNextPins(t *testing.T) { mdi := ag.database.(*databasemocks.Plugin) mdi.On("GetNextPins", ag.ctx, mock.Anything).Return(nil, nil, fmt.Errorf("pop")) - err := ag.processMessage(ag.ctx, &fftypes.Batch{}, true, 12345, &fftypes.Message{ + err := ag.processMessage(ag.ctx, &fftypes.Batch{}, &fftypes.Pin{Masked: true, Sequence: 12345}, 10, &fftypes.Message{ Header: fftypes.MessageHeader{ ID: fftypes.NewUUID(), Group: fftypes.NewRandB32(), Topics: fftypes.FFStringArray{"topic1"}, }, Pins: fftypes.FFStringArray{fftypes.NewRandB32().String()}, - }, nil) + }, newBatchState(ag)) assert.EqualError(t, err, "pop") } @@ -582,13 +571,13 @@ func TestProcessMsgFailDispatch(t *testing.T) { mdm := ag.data.(*datamocks.Manager) mdm.On("GetMessageData", ag.ctx, mock.Anything, true).Return(nil, false, fmt.Errorf("pop")) - err := ag.processMessage(ag.ctx, &fftypes.Batch{}, false, 12345, &fftypes.Message{ + err := ag.processMessage(ag.ctx, &fftypes.Batch{}, &fftypes.Pin{Sequence: 12345}, 10, &fftypes.Message{ Header: fftypes.MessageHeader{ ID: fftypes.NewUUID(), Topics: fftypes.FFStringArray{"topic1"}, }, Pins: fftypes.FFStringArray{fftypes.NewRandB32().String()}, - }, nil) + }, newBatchState(ag)) assert.EqualError(t, err, "pop") } @@ -596,7 +585,7 @@ func TestProcessMsgFailDispatch(t *testing.T) { func TestProcessMsgFailPinUpdate(t *testing.T) { ag, cancel := newTestAggregator() defer cancel() - ba := &batchActions{} + bs := newBatchState(ag) pin := fftypes.NewRandB32() mdi := ag.database.(*databasemocks.Plugin) @@ -610,17 +599,17 @@ func TestProcessMsgFailPinUpdate(t *testing.T) { mdi.On("UpdateMessage", ag.ctx, mock.Anything, mock.Anything).Return(nil) mdi.On("UpdateNextPin", ag.ctx, mock.Anything, mock.Anything).Return(fmt.Errorf("pop")) - err := ag.processMessage(ag.ctx, &fftypes.Batch{}, true, 12345, &fftypes.Message{ + err := ag.processMessage(ag.ctx, &fftypes.Batch{}, &fftypes.Pin{Masked: true, Sequence: 12345}, 10, &fftypes.Message{ Header: fftypes.MessageHeader{ ID: fftypes.NewUUID(), Group: fftypes.NewRandB32(), Topics: fftypes.FFStringArray{"topic1"}, }, Pins: fftypes.FFStringArray{pin.String()}, - }, ba) + }, bs) assert.NoError(t, err) - err = ba.RunFinalize(ag.ctx) + err = bs.RunFinalize(ag.ctx) assert.EqualError(t, err, "pop") } @@ -635,7 +624,8 @@ func TestCheckMaskedContextReadyMismatchedAuthor(t *testing.T) { {Context: fftypes.NewRandB32(), Hash: pin}, }, nil, nil) - _, err := ag.checkMaskedContextReady(ag.ctx, &fftypes.Message{ + bs := newBatchState(ag) + _, err := bs.CheckMaskedContextReady(ag.ctx, &fftypes.Message{ Header: fftypes.MessageHeader{ ID: fftypes.NewUUID(), Group: fftypes.NewRandB32(), @@ -656,7 +646,8 @@ func TestAttemptContextInitGetGroupByIDFail(t *testing.T) { msh := ag.definitions.(*definitionsmocks.DefinitionHandlers) msh.On("ResolveInitGroup", ag.ctx, mock.Anything).Return(nil, fmt.Errorf("pop")) - _, err := ag.attemptContextInit(ag.ctx, &fftypes.Message{ + bs := newBatchState(ag) + _, err := bs.attemptContextInit(ag.ctx, &fftypes.Message{ Header: fftypes.MessageHeader{ ID: fftypes.NewUUID(), Group: fftypes.NewRandB32(), @@ -677,7 +668,8 @@ func TestAttemptContextInitGroupNotFound(t *testing.T) { msh := ag.definitions.(*definitionsmocks.DefinitionHandlers) msh.On("ResolveInitGroup", ag.ctx, mock.Anything).Return(nil, nil) - _, err := ag.attemptContextInit(ag.ctx, &fftypes.Message{ + bs := newBatchState(ag) + _, err := bs.attemptContextInit(ag.ctx, &fftypes.Message{ Header: fftypes.MessageHeader{ ID: fftypes.NewUUID(), Group: fftypes.NewRandB32(), @@ -696,7 +688,8 @@ func TestAttemptContextInitAuthorMismatch(t *testing.T) { defer cancel() groupID := fftypes.NewRandB32() - zeroHash := ag.calcHash("topic1", groupID, "author2", 0) + initNPG := &nextPinGroupState{topic: "topic1", groupID: groupID} + zeroHash := initNPG.calcPinHash("author2", 0) msh := ag.definitions.(*definitionsmocks.DefinitionHandlers) msh.On("ResolveInitGroup", ag.ctx, mock.Anything).Return(&fftypes.Group{ GroupIdentity: fftypes.GroupIdentity{ @@ -706,7 +699,8 @@ func TestAttemptContextInitAuthorMismatch(t *testing.T) { }, }, nil) - _, err := ag.attemptContextInit(ag.ctx, &fftypes.Message{ + bs := newBatchState(ag) + _, err := bs.attemptContextInit(ag.ctx, &fftypes.Message{ Header: fftypes.MessageHeader{ ID: fftypes.NewUUID(), Group: groupID, @@ -734,7 +728,8 @@ func TestAttemptContextInitNoMatch(t *testing.T) { }, }, nil) - _, err := ag.attemptContextInit(ag.ctx, &fftypes.Message{ + bs := newBatchState(ag) + _, err := bs.attemptContextInit(ag.ctx, &fftypes.Message{ Header: fftypes.MessageHeader{ ID: fftypes.NewUUID(), Group: groupID, @@ -753,7 +748,8 @@ func TestAttemptContextInitGetPinsFail(t *testing.T) { defer cancel() groupID := fftypes.NewRandB32() - zeroHash := ag.calcHash("topic1", groupID, "author1", 0) + initNPG := &nextPinGroupState{topic: "topic1", groupID: groupID} + zeroHash := initNPG.calcPinHash("author1", 0) msh := ag.definitions.(*definitionsmocks.DefinitionHandlers) mdi := ag.database.(*databasemocks.Plugin) msh.On("ResolveInitGroup", ag.ctx, mock.Anything).Return(&fftypes.Group{ @@ -765,7 +761,8 @@ func TestAttemptContextInitGetPinsFail(t *testing.T) { }, nil) mdi.On("GetPins", ag.ctx, mock.Anything).Return(nil, nil, fmt.Errorf("pop")) - _, err := ag.attemptContextInit(ag.ctx, &fftypes.Message{ + bs := newBatchState(ag) + _, err := bs.attemptContextInit(ag.ctx, &fftypes.Message{ Header: fftypes.MessageHeader{ ID: fftypes.NewUUID(), Group: groupID, @@ -784,7 +781,8 @@ func TestAttemptContextInitGetPinsBlocked(t *testing.T) { defer cancel() groupID := fftypes.NewRandB32() - zeroHash := ag.calcHash("topic1", groupID, "author1", 0) + initNPG := &nextPinGroupState{topic: "topic1", groupID: groupID} + zeroHash := initNPG.calcPinHash("author1", 0) mdi := ag.database.(*databasemocks.Plugin) msh := ag.definitions.(*definitionsmocks.DefinitionHandlers) msh.On("ResolveInitGroup", ag.ctx, mock.Anything).Return(&fftypes.Group{ @@ -798,7 +796,8 @@ func TestAttemptContextInitGetPinsBlocked(t *testing.T) { {Sequence: 12345}, }, nil, nil) - np, err := ag.attemptContextInit(ag.ctx, &fftypes.Message{ + bs := newBatchState(ag) + np, err := bs.attemptContextInit(ag.ctx, &fftypes.Message{ Header: fftypes.MessageHeader{ ID: fftypes.NewUUID(), Group: groupID, @@ -818,7 +817,8 @@ func TestAttemptContextInitInsertPinsFail(t *testing.T) { defer cancel() groupID := fftypes.NewRandB32() - zeroHash := ag.calcHash("topic1", groupID, "author1", 0) + initNPG := &nextPinGroupState{topic: "topic1", groupID: groupID} + zeroHash := initNPG.calcPinHash("author1", 0) mdi := ag.database.(*databasemocks.Plugin) msh := ag.definitions.(*definitionsmocks.DefinitionHandlers) msh.On("ResolveInitGroup", ag.ctx, mock.Anything).Return(&fftypes.Group{ @@ -831,7 +831,8 @@ func TestAttemptContextInitInsertPinsFail(t *testing.T) { mdi.On("GetPins", ag.ctx, mock.Anything).Return([]*fftypes.Pin{}, nil, nil) mdi.On("InsertNextPin", ag.ctx, mock.Anything).Return(fmt.Errorf("pop")) - np, err := ag.attemptContextInit(ag.ctx, &fftypes.Message{ + bs := newBatchState(ag) + np, err := bs.attemptContextInit(ag.ctx, &fftypes.Message{ Header: fftypes.MessageHeader{ ID: fftypes.NewUUID(), Group: groupID, @@ -841,7 +842,9 @@ func TestAttemptContextInitInsertPinsFail(t *testing.T) { }, }, }, "topic1", 12345, fftypes.NewRandB32(), zeroHash) - assert.Nil(t, np) + assert.NoError(t, err) + assert.NotNil(t, np) + err = bs.RunFinalize(ag.ctx) assert.EqualError(t, err, "pop") } @@ -989,7 +992,7 @@ func TestAttemptMessageDispatchTransferMismatch(t *testing.T) { func TestDefinitionBroadcastActionReject(t *testing.T) { ag, cancel := newTestAggregator() defer cancel() - ba := &batchActions{} + bs := newBatchState(ag) msh := ag.definitions.(*definitionsmocks.DefinitionHandlers) msh.On("HandleDefinitionBroadcast", mock.Anything, mock.Anything, mock.Anything).Return(definitions.ActionReject, &definitions.DefinitionBatchActions{}, nil) @@ -1026,11 +1029,191 @@ func TestDefinitionBroadcastActionReject(t *testing.T) { Data: fftypes.DataRefs{ {ID: fftypes.NewUUID()}, }, - }, ba) + }, bs) + assert.NoError(t, err) + +} + +func TestDispatchBroadcastQueuesLaterDispatch(t *testing.T) { + ag, cancel := newTestAggregator() + defer cancel() + bs := newBatchState(ag) + + mdm := ag.data.(*datamocks.Manager) + mdm.On("GetMessageData", ag.ctx, mock.Anything, true).Return([]*fftypes.Data{}, false, nil).Once() + + mdi := ag.database.(*databasemocks.Plugin) + mdi.On("GetPins", ag.ctx, mock.Anything).Return([]*fftypes.Pin{}, nil, nil) + + msg1 := &fftypes.Message{ + Header: fftypes.MessageHeader{ + Type: fftypes.MessageTypeDefinition, + ID: fftypes.NewUUID(), + Namespace: "any", + Topics: fftypes.FFStringArray{"topic1"}, + }, + } + msg2 := &fftypes.Message{ + Header: fftypes.MessageHeader{ + Type: fftypes.MessageTypeDefinition, + ID: fftypes.NewUUID(), + Namespace: "any", + Topics: fftypes.FFStringArray{"topic1"}, + }, + } + + batch := &fftypes.Batch{ + ID: fftypes.NewUUID(), + Payload: fftypes.BatchPayload{ + Messages: []*fftypes.Message{msg1, msg2}, + }, + } + + // First message should dispatch + err := ag.processMessage(ag.ctx, batch, &fftypes.Pin{Sequence: 12345}, 0, msg1, bs) + assert.NoError(t, err) + + // Second message should not (mocks have Once limit on GetMessageData to confirm) + err = ag.processMessage(ag.ctx, batch, &fftypes.Pin{Sequence: 12346}, 0, msg1, bs) + assert.NoError(t, err) + + mdi.AssertExpectations(t) + mdm.AssertExpectations(t) +} + +func TestDispatchPrivateQueuesLaterDispatch(t *testing.T) { + ag, cancel := newTestAggregator() + defer cancel() + bs := newBatchState(ag) + + mdm := ag.data.(*datamocks.Manager) + mdm.On("GetMessageData", ag.ctx, mock.Anything, true).Return([]*fftypes.Data{}, false, nil).Once() + + groupID := fftypes.NewRandB32() + initNPG := &nextPinGroupState{topic: "topic1", groupID: groupID} + member1NonceOne := initNPG.calcPinHash("org1", 1) + member1NonceTwo := initNPG.calcPinHash("org1", 2) + h := sha256.New() + h.Write([]byte("topic1")) + context := fftypes.HashResult(h) + + mdi := ag.database.(*databasemocks.Plugin) + mdi.On("GetNextPins", ag.ctx, mock.Anything).Return([]*fftypes.NextPin{ + {Context: context, Nonce: 1 /* match member1NonceOne */, Identity: "org1", Hash: member1NonceOne}, + }, nil, nil) + + msg1 := &fftypes.Message{ + Header: fftypes.MessageHeader{ + Type: fftypes.MessageTypePrivate, + ID: fftypes.NewUUID(), + Namespace: "any", + Topics: fftypes.FFStringArray{"topic1"}, + Group: groupID, + Identity: fftypes.Identity{ + Author: "org1", + }, + }, + Pins: fftypes.FFStringArray{member1NonceOne.String()}, + } + msg2 := &fftypes.Message{ + Header: fftypes.MessageHeader{ + Type: fftypes.MessageTypePrivate, + ID: fftypes.NewUUID(), + Namespace: "any", + Topics: fftypes.FFStringArray{"topic1"}, + Group: groupID, + Identity: fftypes.Identity{ + Author: "org1", + }, + }, + Pins: fftypes.FFStringArray{member1NonceTwo.String()}, + } + + batch := &fftypes.Batch{ + ID: fftypes.NewUUID(), + Payload: fftypes.BatchPayload{ + Messages: []*fftypes.Message{msg1, msg2}, + }, + } + + // First message should dispatch + err := ag.processMessage(ag.ctx, batch, &fftypes.Pin{Masked: true, Sequence: 12345}, 0, msg1, bs) + assert.NoError(t, err) + + // Second message should not (mocks have Once limit on GetMessageData to confirm) + err = ag.processMessage(ag.ctx, batch, &fftypes.Pin{Masked: true, Sequence: 12346}, 0, msg2, bs) assert.NoError(t, err) + mdi.AssertExpectations(t) + mdm.AssertExpectations(t) } +func TestDispatchPrivateNextPinIncremented(t *testing.T) { + ag, cancel := newTestAggregator() + defer cancel() + bs := newBatchState(ag) + + mdm := ag.data.(*datamocks.Manager) + mdm.On("GetMessageData", ag.ctx, mock.Anything, true).Return([]*fftypes.Data{}, true, nil).Twice() + + groupID := fftypes.NewRandB32() + initNPG := &nextPinGroupState{topic: "topic1", groupID: groupID} + member1NonceOne := initNPG.calcPinHash("org1", 1) + member1NonceTwo := initNPG.calcPinHash("org1", 2) + h := sha256.New() + h.Write([]byte("topic1")) + context := fftypes.HashResult(h) + + mdi := ag.database.(*databasemocks.Plugin) + mdi.On("GetNextPins", ag.ctx, mock.Anything).Return([]*fftypes.NextPin{ + {Context: context, Nonce: 1 /* match member1NonceOne */, Identity: "org1", Hash: member1NonceOne}, + }, nil, nil) + + msg1 := &fftypes.Message{ + Header: fftypes.MessageHeader{ + Type: fftypes.MessageTypePrivate, + ID: fftypes.NewUUID(), + Namespace: "any", + Topics: fftypes.FFStringArray{"topic1"}, + Group: groupID, + Identity: fftypes.Identity{ + Author: "org1", + }, + }, + Pins: fftypes.FFStringArray{member1NonceOne.String()}, + } + msg2 := &fftypes.Message{ + Header: fftypes.MessageHeader{ + Type: fftypes.MessageTypePrivate, + ID: fftypes.NewUUID(), + Namespace: "any", + Topics: fftypes.FFStringArray{"topic1"}, + Group: groupID, + Identity: fftypes.Identity{ + Author: "org1", + }, + }, + Pins: fftypes.FFStringArray{member1NonceTwo.String()}, + } + + batch := &fftypes.Batch{ + ID: fftypes.NewUUID(), + Payload: fftypes.BatchPayload{ + Messages: []*fftypes.Message{msg1, msg2}, + }, + } + + // First message should dispatch + err := ag.processMessage(ag.ctx, batch, &fftypes.Pin{Masked: true, Sequence: 12345}, 0, msg1, bs) + assert.NoError(t, err) + + // Second message should dispatch too (Twice on GetMessageData) + err = ag.processMessage(ag.ctx, batch, &fftypes.Pin{Masked: true, Sequence: 12346}, 0, msg2, bs) + assert.NoError(t, err) + + mdi.AssertExpectations(t) + mdm.AssertExpectations(t) +} func TestDefinitionBroadcastActionRetry(t *testing.T) { ag, cancel := newTestAggregator() defer cancel() @@ -1082,7 +1265,7 @@ func TestDefinitionBroadcastActionWait(t *testing.T) { func TestAttemptMessageDispatchEventFail(t *testing.T) { ag, cancel := newTestAggregator() defer cancel() - ba := &batchActions{} + bs := newBatchState(ag) mdi := ag.database.(*databasemocks.Plugin) mdm := ag.data.(*datamocks.Manager) @@ -1093,10 +1276,10 @@ func TestAttemptMessageDispatchEventFail(t *testing.T) { _, err := ag.attemptMessageDispatch(ag.ctx, &fftypes.Message{ Header: fftypes.MessageHeader{ID: fftypes.NewUUID()}, - }, ba) + }, bs) assert.NoError(t, err) - err = ba.RunFinalize(ag.ctx) + err = bs.RunFinalize(ag.ctx) assert.EqualError(t, err, "pop") } @@ -1104,7 +1287,7 @@ func TestAttemptMessageDispatchEventFail(t *testing.T) { func TestAttemptMessageDispatchGroupInit(t *testing.T) { ag, cancel := newTestAggregator() defer cancel() - ba := &batchActions{} + bs := newBatchState(ag) mdi := ag.database.(*databasemocks.Plugin) mdm := ag.data.(*datamocks.Manager) @@ -1118,7 +1301,7 @@ func TestAttemptMessageDispatchGroupInit(t *testing.T) { ID: fftypes.NewUUID(), Type: fftypes.MessageTypeGroupInit, }, - }, ba) + }, bs) assert.NoError(t, err) } @@ -1126,7 +1309,7 @@ func TestAttemptMessageDispatchGroupInit(t *testing.T) { func TestAttemptMessageUpdateMessageFail(t *testing.T) { ag, cancel := newTestAggregator() defer cancel() - ba := &batchActions{} + bs := newBatchState(ag) mdi := ag.database.(*databasemocks.Plugin) mdm := ag.data.(*datamocks.Manager) @@ -1136,10 +1319,10 @@ func TestAttemptMessageUpdateMessageFail(t *testing.T) { _, err := ag.attemptMessageDispatch(ag.ctx, &fftypes.Message{ Header: fftypes.MessageHeader{ID: fftypes.NewUUID()}, - }, ba) + }, bs) assert.NoError(t, err) - err = ba.RunFinalize(ag.ctx) + err = bs.RunFinalize(ag.ctx) assert.EqualError(t, err, "pop") } @@ -1343,46 +1526,44 @@ func TestBatchActions(t *testing.T) { prefinalizeCalled := false finalizeCalled := false - ba := &batchActions{ - PreFinalize: make([]func(ctx context.Context) error, 0), - Finalize: make([]func(ctx context.Context) error, 0), - } + ag, cancel := newTestAggregator() + defer cancel() + bs := newBatchState(ag) - ba.AddPreFinalize(func(ctx context.Context) error { + bs.AddPreFinalize(func(ctx context.Context) error { prefinalizeCalled = true return nil }) - ba.AddFinalize(func(ctx context.Context) error { + bs.AddFinalize(func(ctx context.Context) error { finalizeCalled = true return nil }) - err := ba.RunPreFinalize(context.Background()) + err := bs.RunPreFinalize(context.Background()) assert.NoError(t, err) assert.True(t, prefinalizeCalled) - err = ba.RunFinalize(context.Background()) + err = bs.RunFinalize(context.Background()) assert.NoError(t, err) assert.True(t, finalizeCalled) } func TestBatchActionsError(t *testing.T) { - ba := &batchActions{ - PreFinalize: make([]func(ctx context.Context) error, 0), - Finalize: make([]func(ctx context.Context) error, 0), - } + ag, cancel := newTestAggregator() + defer cancel() + bs := newBatchState(ag) - ba.AddPreFinalize(func(ctx context.Context) error { + bs.AddPreFinalize(func(ctx context.Context) error { return fmt.Errorf("pop") }) - ba.AddFinalize(func(ctx context.Context) error { + bs.AddFinalize(func(ctx context.Context) error { return fmt.Errorf("pop") }) - err := ba.RunPreFinalize(context.Background()) + err := bs.RunPreFinalize(context.Background()) assert.EqualError(t, err, "pop") - err = ba.RunFinalize(context.Background()) + err = bs.RunFinalize(context.Background()) assert.EqualError(t, err, "pop") } @@ -1396,7 +1577,7 @@ func TestProcessWithBatchActionsPreFinalizeError(t *testing.T) { rag.ReturnArguments = mock.Arguments{a[1].(func(context.Context) error)(a[0].(context.Context))} } - err := ag.processWithBatchActions(func(ctx context.Context, actions *batchActions) error { + err := ag.processWithBatchState(func(ctx context.Context, actions *batchState) error { actions.AddPreFinalize(func(ctx context.Context) error { return fmt.Errorf("pop") }) return nil }) @@ -1413,7 +1594,7 @@ func TestProcessWithBatchActionsSuccess(t *testing.T) { rag.ReturnArguments = mock.Arguments{a[1].(func(context.Context) error)(a[0].(context.Context))} } - err := ag.processWithBatchActions(func(ctx context.Context, actions *batchActions) error { + err := ag.processWithBatchState(func(ctx context.Context, actions *batchState) error { actions.AddPreFinalize(func(ctx context.Context) error { return nil }) actions.AddFinalize(func(ctx context.Context) error { return nil }) return nil diff --git a/internal/events/webhooks/webhooks.go b/internal/events/webhooks/webhooks.go index d952207186..3375266aa0 100644 --- a/internal/events/webhooks/webhooks.go +++ b/internal/events/webhooks/webhooks.go @@ -386,6 +386,7 @@ func (wh *WebHooks) doDelivery(connID string, reply bool, sub *fftypes.Subscript CID: event.Message.Header.ID, Group: event.Message.Header.Group, Type: event.Message.Header.Type, + Topics: event.Message.Header.Topics, Tag: sub.Options.TransportOptions().GetString("replytag"), TxType: txType, }, diff --git a/mocks/databasemocks/plugin.go b/mocks/databasemocks/plugin.go index 0a4e0a3a41..4b180b74b2 100644 --- a/mocks/databasemocks/plugin.go +++ b/mocks/databasemocks/plugin.go @@ -2241,20 +2241,6 @@ func (_m *Plugin) RunAsGroup(ctx context.Context, fn func(context.Context) error return r0 } -// SetPinDispatched provides a mock function with given fields: ctx, sequence -func (_m *Plugin) SetPinDispatched(ctx context.Context, sequence int64) error { - ret := _m.Called(ctx, sequence) - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, int64) error); ok { - r0 = rf(ctx, sequence) - } else { - r0 = ret.Error(0) - } - - return r0 -} - // UpdateBatch provides a mock function with given fields: ctx, id, update func (_m *Plugin) UpdateBatch(ctx context.Context, id *fftypes.UUID, update database.Update) error { ret := _m.Called(ctx, id, update) @@ -2423,6 +2409,20 @@ func (_m *Plugin) UpdateOrganization(ctx context.Context, id *fftypes.UUID, upda return r0 } +// UpdatePins provides a mock function with given fields: ctx, filter, update +func (_m *Plugin) UpdatePins(ctx context.Context, filter database.Filter, update database.Update) error { + ret := _m.Called(ctx, filter, update) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, database.Filter, database.Update) error); ok { + r0 = rf(ctx, filter, update) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // UpdateSubscription provides a mock function with given fields: ctx, ns, name, update func (_m *Plugin) UpdateSubscription(ctx context.Context, ns string, name string, update database.Update) error { ret := _m.Called(ctx, ns, name, update) diff --git a/pkg/database/plugin.go b/pkg/database/plugin.go index 46f7408404..307eaa0154 100644 --- a/pkg/database/plugin.go +++ b/pkg/database/plugin.go @@ -186,8 +186,8 @@ type iPinCollection interface { // GetPins - Get pins GetPins(ctx context.Context, filter Filter) (offset []*fftypes.Pin, res *FilterResult, err error) - // SetPinDispatched - Set the dispatched flag to true on the specified pins - SetPinDispatched(ctx context.Context, sequence int64) (err error) + // UpdatePins - Updates pins + UpdatePins(ctx context.Context, filter Filter, update Update) (err error) // DeletePin - Delete a pin DeletePin(ctx context.Context, sequence int64) (err error) diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index 5f4280aeb1..4fc7c6922f 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -80,9 +80,9 @@ func pollForUp(t *testing.T, client *resty.Client) { assert.Equal(t, 200, resp.StatusCode()) } -func validateReceivedMessages(ts *testState, client *resty.Client, msgType fftypes.MessageType, txtype fftypes.TransactionType, count, idx int) *fftypes.Data { +func validateReceivedMessages(ts *testState, client *resty.Client, topic string, msgType fftypes.MessageType, txtype fftypes.TransactionType, count int) (data []*fftypes.Data) { var group *fftypes.Bytes32 - messages := GetMessages(ts.t, client, ts.startTime, msgType, 200) + messages := GetMessages(ts.t, client, ts.startTime, msgType, topic, 200) for i, message := range messages { ts.t.Logf("Message %d: %+v", i, *message) if group != nil { @@ -91,36 +91,44 @@ func validateReceivedMessages(ts *testState, client *resty.Client, msgType fftyp group = message.Header.Group } assert.Equal(ts.t, count, len(messages)) - assert.Equal(ts.t, txtype, (messages)[idx].Header.TxType) - assert.Equal(ts.t, "default", (messages)[idx].Header.Namespace) - assert.Equal(ts.t, fftypes.FFStringArray{"default"}, (messages)[idx].Header.Topics) - - data := GetData(ts.t, client, ts.startTime, 200) - var msgData *fftypes.Data - for i, d := range data { - ts.t.Logf("Data %d: %+v", i, *d) - if *d.ID == *messages[idx].Data[0].ID { - msgData = d + + var returnData []*fftypes.Data + for idx := 0; idx < len(messages); idx++ { + assert.Equal(ts.t, txtype, (messages)[idx].Header.TxType) + assert.Equal(ts.t, fftypes.FFStringArray{topic}, (messages)[idx].Header.Topics) + assert.Equal(ts.t, topic, (messages)[idx].Header.Topics[0]) + + data := GetDataForMessage(ts.t, client, ts.startTime, (messages)[idx].Header.ID) + var msgData *fftypes.Data + for i, d := range data { + ts.t.Logf("Data %d: %+v", i, *d) + if *d.ID == *messages[idx].Data[0].ID { + msgData = d + } + } + assert.NotNil(ts.t, msgData, "Found data with ID '%s'", messages[idx].Data[0].ID) + if group == nil { + assert.Equal(ts.t, 1, len(data)) } - } - assert.NotNil(ts.t, msgData, "Found data with ID '%s'", messages[idx].Data[0].ID) - if group == nil { - assert.Equal(ts.t, 1, len(data)) - } - assert.Equal(ts.t, "default", msgData.Namespace) - expectedHash, err := msgData.CalcHash(context.Background()) - assert.NoError(ts.t, err) - assert.Equal(ts.t, *expectedHash, *msgData.Hash) + returnData = append(returnData, msgData) + + assert.Equal(ts.t, "default", msgData.Namespace) + expectedHash, err := msgData.CalcHash(context.Background()) + assert.NoError(ts.t, err) + assert.Equal(ts.t, *expectedHash, *msgData.Hash) + + if msgData.Blob != nil { + blob := GetBlob(ts.t, client, msgData, 200) + assert.NotNil(ts.t, blob) + var hash fftypes.Bytes32 = sha256.Sum256(blob) + assert.Equal(ts.t, *msgData.Blob.Hash, hash) + } - if msgData.Blob != nil { - blob := GetBlob(ts.t, client, msgData, 200) - assert.NotNil(ts.t, blob) - var hash fftypes.Bytes32 = sha256.Sum256(blob) - assert.Equal(ts.t, *msgData.Blob.Hash, hash) } - return msgData + // Flip data (returned in most recent order) into delivery order + return returnData } func validateAccountBalances(t *testing.T, client *resty.Client, poolID *fftypes.UUID, tokenIndex string, balances map[string]int64) { @@ -131,6 +139,10 @@ func validateAccountBalances(t *testing.T, client *resty.Client, poolID *fftypes } } +func pickTopic(i int, options []string) string { + return options[i%len(options)] +} + func readStackFile(t *testing.T) *Stack { stackFile := os.Getenv("STACK_FILE") if stackFile == "" { diff --git a/test/e2e/onchain_offchain_test.go b/test/e2e/onchain_offchain_test.go index 0466f9c9d1..acbccdbfd5 100644 --- a/test/e2e/onchain_offchain_test.go +++ b/test/e2e/onchain_offchain_test.go @@ -25,13 +25,13 @@ import ( "math/big" "strings" + image2ascii "github.com/qeesung/image2ascii/convert" + "github.com/go-resty/resty/v2" "github.com/hyperledger/firefly/pkg/fftypes" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" - - image2ascii "github.com/qeesung/image2ascii/convert" ) type OnChainOffChainTestSuite struct { @@ -49,25 +49,41 @@ func (suite *OnChainOffChainTestSuite) TestE2EBroadcast() { received1, changes1 := wsReader(suite.testState.ws1) received2, changes2 := wsReader(suite.testState.ws2) - var resp *resty.Response - value := fftypes.JSONAnyPtr(`"Hello"`) - data := fftypes.DataRefOrValue{ - Value: value, - } + // Broadcast some messages, that should get batched, across two topics + totalMessages := 10 + topics := []string{"topicA", "topicB"} + expectedData := make(map[string][]*fftypes.DataRefOrValue) + for i := 0; i < 10; i++ { + value := fftypes.JSONAnyPtr(fmt.Sprintf(`"Hello number %d"`, i)) + data := &fftypes.DataRefOrValue{ + Value: value, + } + topic := pickTopic(i, topics) - resp, err := BroadcastMessage(suite.testState.client1, &data, false) - require.NoError(suite.T(), err) - assert.Equal(suite.T(), 202, resp.StatusCode()) + expectedData[topic] = append(expectedData[topic], data) - waitForMessageConfirmed(suite.T(), received1, fftypes.MessageTypeBroadcast) - <-changes1 // also expect database change events - val1 := validateReceivedMessages(suite.testState, suite.testState.client1, fftypes.MessageTypeBroadcast, fftypes.TransactionTypeBatchPin, 1, 0) - assert.Equal(suite.T(), data.Value, val1.Value) + resp, err := BroadcastMessage(suite.testState.client1, topic, data, false) + require.NoError(suite.T(), err) + assert.Equal(suite.T(), 202, resp.StatusCode()) + } - waitForMessageConfirmed(suite.T(), received2, fftypes.MessageTypeBroadcast) - <-changes2 // also expect database change events - val2 := validateReceivedMessages(suite.testState, suite.testState.client2, fftypes.MessageTypeBroadcast, fftypes.TransactionTypeBatchPin, 1, 0) - assert.Equal(suite.T(), data.Value, val2.Value) + for i := 0; i < totalMessages; i++ { + // Wait for all thel message-confirmed events, from both participants + waitForMessageConfirmed(suite.T(), received1, fftypes.MessageTypeBroadcast) + waitForMessageConfirmed(suite.T(), received2, fftypes.MessageTypeBroadcast) + <-changes1 // also expect database change events + <-changes2 // also expect database change events + } + + for topic, dataArray := range expectedData { + receiver1data := validateReceivedMessages(suite.testState, suite.testState.client1, topic, fftypes.MessageTypeBroadcast, fftypes.TransactionTypeBatchPin, len(dataArray)) + receiver2data := validateReceivedMessages(suite.testState, suite.testState.client2, topic, fftypes.MessageTypeBroadcast, fftypes.TransactionTypeBatchPin, len(dataArray)) + // Messages should be returned in exactly reverse send order (newest first) + for i := (len(dataArray) - 1); i >= 0; i-- { + assert.Equal(suite.T(), dataArray[i].Value, receiver1data[i].Value) + assert.Equal(suite.T(), dataArray[i].Value, receiver2data[i].Value) + } + } } @@ -90,7 +106,7 @@ func (suite *OnChainOffChainTestSuite) TestStrongDatatypesBroadcast() { } // Should be rejected as datatype not known - resp, err := BroadcastMessage(suite.testState.client1, &data, true) + resp, err := BroadcastMessage(suite.testState.client1, "topic1", &data, true) require.NoError(suite.T(), err) assert.Equal(suite.T(), 400, resp.StatusCode()) assert.Contains(suite.T(), resp.String(), "FF10195") // datatype not found @@ -102,7 +118,7 @@ func (suite *OnChainOffChainTestSuite) TestStrongDatatypesBroadcast() { } dt = CreateDatatype(suite.T(), suite.testState.client1, dt, true) - resp, err = BroadcastMessage(suite.testState.client1, &data, true) + resp, err = BroadcastMessage(suite.testState.client1, "topic1", &data, true) require.NoError(suite.T(), err) assert.Equal(suite.T(), 400, resp.StatusCode()) assert.Contains(suite.T(), resp.String(), "FF10198") // does not conform @@ -112,7 +128,7 @@ func (suite *OnChainOffChainTestSuite) TestStrongDatatypesBroadcast() { "name": "mywidget" }`) - resp, err = BroadcastMessage(suite.testState.client1, &data, true) + resp, err = BroadcastMessage(suite.testState.client1, "topic1", &data, true) require.NoError(suite.T(), err) assert.Equal(suite.T(), 200, resp.StatusCode()) @@ -141,7 +157,7 @@ func (suite *OnChainOffChainTestSuite) TestStrongDatatypesPrivate() { } // Should be rejected as datatype not known - resp, err := PrivateMessage(suite.T(), suite.testState.client1, &data, []string{ + resp, err := PrivateMessage(suite.T(), suite.testState.client1, "topic1", &data, []string{ suite.testState.org1.Name, suite.testState.org2.Name, }, "", fftypes.TransactionTypeBatchPin, true) @@ -156,7 +172,7 @@ func (suite *OnChainOffChainTestSuite) TestStrongDatatypesPrivate() { } dt = CreateDatatype(suite.T(), suite.testState.client1, dt, true) - resp, err = PrivateMessage(suite.T(), suite.testState.client1, &data, []string{ + resp, err = PrivateMessage(suite.T(), suite.testState.client1, "topic1", &data, []string{ suite.testState.org1.Name, suite.testState.org2.Name, }, "", fftypes.TransactionTypeBatchPin, false) @@ -169,7 +185,7 @@ func (suite *OnChainOffChainTestSuite) TestStrongDatatypesPrivate() { "name": "mywidget" }`) - resp, err = PrivateMessage(suite.T(), suite.testState.client1, &data, []string{ + resp, err = PrivateMessage(suite.T(), suite.testState.client1, "topic1", &data, []string{ suite.testState.org1.Name, suite.testState.org2.Name, }, "", fftypes.TransactionTypeBatchPin, true) @@ -183,31 +199,49 @@ func (suite *OnChainOffChainTestSuite) TestStrongDatatypesPrivate() { } func (suite *OnChainOffChainTestSuite) TestE2EPrivate() { + defer suite.testState.done() received1, _ := wsReader(suite.testState.ws1) received2, _ := wsReader(suite.testState.ws2) - var resp *resty.Response - value := fftypes.JSONAnyPtr(`"Hello"`) - data := fftypes.DataRefOrValue{ - Value: value, + // Send 10 messages, that should get batched, across two topics + totalMessages := 10 + topics := []string{"topicA", "topicB"} + expectedData := make(map[string][]*fftypes.DataRefOrValue) + for i := 0; i < 10; i++ { + value := fftypes.JSONAnyPtr(fmt.Sprintf(`"Hello number %d"`, i)) + data := &fftypes.DataRefOrValue{ + Value: value, + } + topic := pickTopic(i, topics) + + expectedData[topic] = append(expectedData[topic], data) + + resp, err := PrivateMessage(suite.T(), suite.testState.client1, topic, data, []string{ + suite.testState.org1.Name, + suite.testState.org2.Name, + }, "", fftypes.TransactionTypeBatchPin, false) + require.NoError(suite.T(), err) + assert.Equal(suite.T(), 202, resp.StatusCode()) } - resp, err := PrivateMessage(suite.T(), suite.testState.client1, &data, []string{ - suite.testState.org1.Name, - suite.testState.org2.Name, - }, "", fftypes.TransactionTypeBatchPin, false) - require.NoError(suite.T(), err) - assert.Equal(suite.T(), 202, resp.StatusCode()) + for i := 0; i < totalMessages; i++ { + // Wait for all thel message-confirmed events, from both participants + waitForMessageConfirmed(suite.T(), received1, fftypes.MessageTypePrivate) + waitForMessageConfirmed(suite.T(), received2, fftypes.MessageTypePrivate) + } - <-received1 - val1 := validateReceivedMessages(suite.testState, suite.testState.client1, fftypes.MessageTypePrivate, fftypes.TransactionTypeBatchPin, 1, 0) - assert.Equal(suite.T(), data.Value, val1.Value) + for topic, dataArray := range expectedData { + receiver1data := validateReceivedMessages(suite.testState, suite.testState.client1, topic, fftypes.MessageTypePrivate, fftypes.TransactionTypeBatchPin, len(dataArray)) + receiver2data := validateReceivedMessages(suite.testState, suite.testState.client2, topic, fftypes.MessageTypePrivate, fftypes.TransactionTypeBatchPin, len(dataArray)) + // Messages should be returned in exactly reverse send order (newest first) + for i := (len(dataArray) - 1); i >= 0; i-- { + assert.Equal(suite.T(), dataArray[i].Value, receiver1data[i].Value) + assert.Equal(suite.T(), dataArray[i].Value, receiver2data[i].Value) + } + } - <-received2 - val2 := validateReceivedMessages(suite.testState, suite.testState.client2, fftypes.MessageTypePrivate, fftypes.TransactionTypeBatchPin, 1, 0) - assert.Equal(suite.T(), data.Value, val2.Value) } func (suite *OnChainOffChainTestSuite) TestE2EBroadcastBlob() { @@ -218,21 +252,21 @@ func (suite *OnChainOffChainTestSuite) TestE2EBroadcastBlob() { var resp *resty.Response - data, resp, err := BroadcastBlobMessage(suite.T(), suite.testState.client1) + data, resp, err := BroadcastBlobMessage(suite.T(), suite.testState.client1, "topic1") require.NoError(suite.T(), err) assert.Equal(suite.T(), 202, resp.StatusCode()) waitForMessageConfirmed(suite.T(), received1, fftypes.MessageTypeBroadcast) - val1 := validateReceivedMessages(suite.testState, suite.testState.client1, fftypes.MessageTypeBroadcast, fftypes.TransactionTypeBatchPin, 1, 0) - assert.Regexp(suite.T(), "myfile.txt", val1.Value.String()) - assert.Equal(suite.T(), "myfile.txt", val1.Blob.Name) - assert.Equal(suite.T(), data.Blob.Size, val1.Blob.Size) + val1 := validateReceivedMessages(suite.testState, suite.testState.client1, "topic1", fftypes.MessageTypeBroadcast, fftypes.TransactionTypeBatchPin, 1) + assert.Regexp(suite.T(), "myfile.txt", val1[0].Value.String()) + assert.Equal(suite.T(), "myfile.txt", val1[0].Blob.Name) + assert.Equal(suite.T(), data.Blob.Size, val1[0].Blob.Size) waitForMessageConfirmed(suite.T(), received2, fftypes.MessageTypeBroadcast) - val2 := validateReceivedMessages(suite.testState, suite.testState.client2, fftypes.MessageTypeBroadcast, fftypes.TransactionTypeBatchPin, 1, 0) - assert.Regexp(suite.T(), "myfile.txt", val2.Value.String()) - assert.Equal(suite.T(), "myfile.txt", val2.Blob.Name) - assert.Equal(suite.T(), data.Blob.Size, val2.Blob.Size) + val2 := validateReceivedMessages(suite.testState, suite.testState.client2, "topic1", fftypes.MessageTypeBroadcast, fftypes.TransactionTypeBatchPin, 1) + assert.Regexp(suite.T(), "myfile.txt", val2[0].Value.String()) + assert.Equal(suite.T(), "myfile.txt", val2[0].Blob.Name) + assert.Equal(suite.T(), data.Blob.Size, val2[0].Blob.Size) } @@ -244,7 +278,7 @@ func (suite *OnChainOffChainTestSuite) TestE2EPrivateBlobDatatypeTagged() { var resp *resty.Response - data, resp, err := PrivateBlobMessageDatatypeTagged(suite.T(), suite.testState.client1, []string{ + data, resp, err := PrivateBlobMessageDatatypeTagged(suite.T(), suite.testState.client1, "topic1", []string{ suite.testState.org1.Name, suite.testState.org2.Name, }) @@ -254,16 +288,16 @@ func (suite *OnChainOffChainTestSuite) TestE2EPrivateBlobDatatypeTagged() { assert.NotNil(suite.T(), data.Blob.Hash) waitForMessageConfirmed(suite.T(), received1, fftypes.MessageTypePrivate) - res1 := validateReceivedMessages(suite.testState, suite.testState.client1, fftypes.MessageTypePrivate, fftypes.TransactionTypeBatchPin, 1, 0) - assert.Equal(suite.T(), data.Blob.Hash.String(), res1.Blob.Hash.String()) - assert.Empty(suite.T(), res1.Blob.Name) - assert.Equal(suite.T(), data.Blob.Size, res1.Blob.Size) + res1 := validateReceivedMessages(suite.testState, suite.testState.client1, "topic1", fftypes.MessageTypePrivate, fftypes.TransactionTypeBatchPin, 1) + assert.Equal(suite.T(), data.Blob.Hash.String(), res1[0].Blob.Hash.String()) + assert.Empty(suite.T(), res1[0].Blob.Name) + assert.Equal(suite.T(), data.Blob.Size, res1[0].Blob.Size) waitForMessageConfirmed(suite.T(), received2, fftypes.MessageTypePrivate) - res2 := validateReceivedMessages(suite.testState, suite.testState.client2, fftypes.MessageTypePrivate, fftypes.TransactionTypeBatchPin, 1, 0) - assert.Equal(suite.T(), data.Blob.Hash.String(), res2.Blob.Hash.String()) - assert.Empty(suite.T(), res2.Blob.Name) - assert.Equal(suite.T(), data.Blob.Size, res2.Blob.Size) + res2 := validateReceivedMessages(suite.testState, suite.testState.client2, "topic1", fftypes.MessageTypePrivate, fftypes.TransactionTypeBatchPin, 1) + assert.Equal(suite.T(), data.Blob.Hash.String(), res2[0].Blob.Hash.String()) + assert.Empty(suite.T(), res2[0].Blob.Name) + assert.Equal(suite.T(), data.Blob.Size, res2[0].Blob.Size) } func (suite *OnChainOffChainTestSuite) TestE2EWebhookExchange() { @@ -296,7 +330,7 @@ func (suite *OnChainOffChainTestSuite) TestE2EWebhookExchange() { } var resp *resty.Response - resp, err := PrivateMessage(suite.T(), suite.testState.client1, &data, []string{ + resp, err := PrivateMessage(suite.T(), suite.testState.client1, "topic1", &data, []string{ suite.testState.org1.Name, suite.testState.org2.Name, }, "myrequest", fftypes.TransactionTypeBatchPin, false) @@ -305,18 +339,21 @@ func (suite *OnChainOffChainTestSuite) TestE2EWebhookExchange() { waitForMessageConfirmed(suite.T(), received1, fftypes.MessageTypePrivate) // request 1 waitForMessageConfirmed(suite.T(), received2, fftypes.MessageTypePrivate) // request 2 + waitForMessageConfirmed(suite.T(), received1, fftypes.MessageTypePrivate) // response 1 + waitForMessageConfirmed(suite.T(), received2, fftypes.MessageTypePrivate) // response 2 + + // When we query the confirmed messages for each receiver, we will see the requests and responses. + // We just check the reponses (index 1) - waitForMessageConfirmed(suite.T(), received1, fftypes.MessageTypePrivate) // reply 1 - val1 := validateReceivedMessages(suite.testState, suite.testState.client1, fftypes.MessageTypePrivate, fftypes.TransactionTypeBatchPin, 2, 0) - assert.Equal(suite.T(), float64(200), val1.Value.JSONObject()["status"]) - decoded1, err := base64.StdEncoding.DecodeString(val1.Value.JSONObject().GetString("body")) + receiver1vals := validateReceivedMessages(suite.testState, suite.testState.client1, "topic1", fftypes.MessageTypePrivate, fftypes.TransactionTypeBatchPin, 2) + assert.Equal(suite.T(), float64(200), receiver1vals[1].Value.JSONObject()["status"]) + decoded1, err := base64.StdEncoding.DecodeString(receiver1vals[1].Value.JSONObject().GetString("body")) assert.NoError(suite.T(), err) assert.Regexp(suite.T(), "Example YAML", string(decoded1)) - waitForMessageConfirmed(suite.T(), received2, fftypes.MessageTypePrivate) // reply 2 - val2 := validateReceivedMessages(suite.testState, suite.testState.client1, fftypes.MessageTypePrivate, fftypes.TransactionTypeBatchPin, 2, 0) - assert.Equal(suite.T(), float64(200), val2.Value.JSONObject()["status"]) - decoded2, err := base64.StdEncoding.DecodeString(val2.Value.JSONObject().GetString("body")) + receiver2vals := validateReceivedMessages(suite.testState, suite.testState.client2, "topic1", fftypes.MessageTypePrivate, fftypes.TransactionTypeBatchPin, 2) + assert.Equal(suite.T(), float64(200), receiver2vals[1].Value.JSONObject()["status"]) + decoded2, err := base64.StdEncoding.DecodeString(receiver2vals[1].Value.JSONObject().GetString("body")) assert.NoError(suite.T(), err) assert.Regexp(suite.T(), "Example YAML", string(decoded2)) } diff --git a/test/e2e/restclient.go b/test/e2e/restclient.go index 334c335071..08b00e09cc 100644 --- a/test/e2e/restclient.go +++ b/test/e2e/restclient.go @@ -86,11 +86,13 @@ func GetNamespaces(client *resty.Client) (*resty.Response, error) { Get(urlGetNamespaces) } -func GetMessages(t *testing.T, client *resty.Client, startTime time.Time, msgType fftypes.MessageType, expectedStatus int) (msgs []*fftypes.Message) { +func GetMessages(t *testing.T, client *resty.Client, startTime time.Time, msgType fftypes.MessageType, topic string, expectedStatus int) (msgs []*fftypes.Message) { path := urlGetMessages resp, err := client.R(). SetQueryParam("type", string(msgType)). SetQueryParam("created", fmt.Sprintf(">%d", startTime.UnixNano())). + SetQueryParam("topics", topic). + SetQueryParam("sort", "confirmed"). SetResult(&msgs). Get(path) require.NoError(t, err) @@ -109,20 +111,10 @@ func GetData(t *testing.T, client *resty.Client, startTime time.Time, expectedSt return data } -func GetDataForMessage(t *testing.T, client *resty.Client, startTime time.Time, messageHash *fftypes.Bytes32) (data []*fftypes.Data) { - var msgs []*fftypes.Message +func GetDataForMessage(t *testing.T, client *resty.Client, startTime time.Time, msgID *fftypes.UUID) (data []*fftypes.Data) { path := urlGetMessages + path += "/" + msgID.String() + "/data" resp, err := client.R(). - SetQueryParam("hash", messageHash.String()). - SetQueryParam("created", fmt.Sprintf(">%d", startTime.UnixNano())). - SetResult(&msgs). - Get(path) - require.NoError(t, err) - require.Equal(t, 200, resp.StatusCode(), "GET %s [%d]: %s", path, resp.StatusCode(), resp.String()) - require.Equal(t, 1, len(msgs)) - - path += "/" + msgs[0].Header.ID.String() + "/data" - resp, err = client.R(). SetQueryParam("created", fmt.Sprintf(">%d", startTime.UnixNano())). SetResult(&data). Get(path) @@ -189,9 +181,14 @@ func DeleteSubscription(t *testing.T, client *resty.Client, id *fftypes.UUID) { require.Equal(t, 204, resp.StatusCode(), "DELETE %s [%d]: %s", path, resp.StatusCode(), resp.String()) } -func BroadcastMessage(client *resty.Client, data *fftypes.DataRefOrValue, confirm bool) (*resty.Response, error) { +func BroadcastMessage(client *resty.Client, topic string, data *fftypes.DataRefOrValue, confirm bool) (*resty.Response, error) { return client.R(). SetBody(fftypes.MessageInOut{ + Message: fftypes.Message{ + Header: fftypes.MessageHeader{ + Topics: fftypes.FFStringArray{topic}, + }, + }, InlineData: fftypes.InlineData{data}, }). SetQueryParam("confirm", strconv.FormatBool(confirm)). @@ -240,10 +237,15 @@ func CreateBlob(t *testing.T, client *resty.Client, dt *fftypes.DatatypeRef) *ff return &data } -func BroadcastBlobMessage(t *testing.T, client *resty.Client) (*fftypes.Data, *resty.Response, error) { +func BroadcastBlobMessage(t *testing.T, client *resty.Client, topic string) (*fftypes.Data, *resty.Response, error) { data := CreateBlob(t, client, nil) res, err := client.R(). SetBody(fftypes.MessageInOut{ + Message: fftypes.Message{ + Header: fftypes.MessageHeader{ + Topics: fftypes.FFStringArray{topic}, + }, + }, InlineData: fftypes.InlineData{ {DataRef: fftypes.DataRef{ID: data.ID}}, }, @@ -252,7 +254,7 @@ func BroadcastBlobMessage(t *testing.T, client *resty.Client) (*fftypes.Data, *r return data, res, err } -func PrivateBlobMessageDatatypeTagged(t *testing.T, client *resty.Client, orgNames []string) (*fftypes.Data, *resty.Response, error) { +func PrivateBlobMessageDatatypeTagged(t *testing.T, client *resty.Client, topic string, orgNames []string) (*fftypes.Data, *resty.Response, error) { data := CreateBlob(t, client, &fftypes.DatatypeRef{Name: "myblob"}) members := make([]fftypes.MemberInput, len(orgNames)) for i, oName := range orgNames { @@ -263,6 +265,11 @@ func PrivateBlobMessageDatatypeTagged(t *testing.T, client *resty.Client, orgNam } res, err := client.R(). SetBody(fftypes.MessageInOut{ + Message: fftypes.Message{ + Header: fftypes.MessageHeader{ + Topics: fftypes.FFStringArray{topic}, + }, + }, InlineData: fftypes.InlineData{ {DataRef: fftypes.DataRef{ID: data.ID}}, }, @@ -275,7 +282,7 @@ func PrivateBlobMessageDatatypeTagged(t *testing.T, client *resty.Client, orgNam return data, res, err } -func PrivateMessage(t *testing.T, client *resty.Client, data *fftypes.DataRefOrValue, orgNames []string, tag string, txType fftypes.TransactionType, confirm bool) (*resty.Response, error) { +func PrivateMessage(t *testing.T, client *resty.Client, topic string, data *fftypes.DataRefOrValue, orgNames []string, tag string, txType fftypes.TransactionType, confirm bool) (*resty.Response, error) { members := make([]fftypes.MemberInput, len(orgNames)) for i, oName := range orgNames { // We let FireFly resolve the friendly name of the org to the identity @@ -288,6 +295,7 @@ func PrivateMessage(t *testing.T, client *resty.Client, data *fftypes.DataRefOrV Header: fftypes.MessageHeader{ Tag: tag, TxType: txType, + Topics: fftypes.FFStringArray{topic}, }, }, InlineData: fftypes.InlineData{data}, diff --git a/test/e2e/tokens_test.go b/test/e2e/tokens_test.go index e929a03b07..504693cdab 100644 --- a/test/e2e/tokens_test.go +++ b/test/e2e/tokens_test.go @@ -117,7 +117,7 @@ func (suite *TokensTestSuite) TestE2EFungibleTokensAsync() { assert.Equal(suite.T(), "erc1155", transfers[0].Connector) assert.Equal(suite.T(), fftypes.TokenTransferTypeTransfer, transfers[0].Type) assert.Equal(suite.T(), int64(1), transfers[0].Amount.Int().Int64()) - data := GetDataForMessage(suite.T(), suite.testState.client1, suite.testState.startTime, transfers[0].MessageHash) + data := GetDataForMessage(suite.T(), suite.testState.client1, suite.testState.startTime, transfers[0].Message) assert.Equal(suite.T(), 1, len(data)) assert.Equal(suite.T(), `"payment for data"`, data[0].Value.String()) validateAccountBalances(suite.T(), suite.testState.client1, poolID, "", map[string]int64{ @@ -254,7 +254,7 @@ func (suite *TokensTestSuite) TestE2ENonFungibleTokensSync() { assert.Equal(suite.T(), fftypes.TokenTransferTypeTransfer, transferOut.Type) assert.Equal(suite.T(), "1", transferOut.TokenIndex) assert.Equal(suite.T(), int64(1), transferOut.Amount.Int().Int64()) - data := GetDataForMessage(suite.T(), suite.testState.client1, suite.testState.startTime, transferOut.MessageHash) + data := GetDataForMessage(suite.T(), suite.testState.client1, suite.testState.startTime, transferOut.Message) assert.Equal(suite.T(), 1, len(data)) assert.Equal(suite.T(), `"ownership change"`, data[0].Value.String()) validateAccountBalances(suite.T(), suite.testState.client1, poolID, "1", map[string]int64{