diff --git a/db/migrations/postgres/000074_add_event_and_listener_topic.down.sql b/db/migrations/postgres/000074_add_event_and_listener_topic.down.sql new file mode 100644 index 0000000000..e90ee446ae --- /dev/null +++ b/db/migrations/postgres/000074_add_event_and_listener_topic.down.sql @@ -0,0 +1,8 @@ +BEGIN; + +DROP INDEX events_topic; + +ALTER TABLE events DROP COLUMN topic; +ALTER TABLE contractlisteners DROP COLUMN topic; + +COMMIT; diff --git a/db/migrations/postgres/000074_add_event_and_listener_topic.up.sql b/db/migrations/postgres/000074_add_event_and_listener_topic.up.sql new file mode 100644 index 0000000000..a18c8d9c7d --- /dev/null +++ b/db/migrations/postgres/000074_add_event_and_listener_topic.up.sql @@ -0,0 +1,14 @@ +BEGIN; + +ALTER TABLE events ADD COLUMN topic VARCHAR(64); +ALTER TABLE contractlisteners ADD COLUMN topic VARCHAR(64); + +UPDATE events SET topic = ''; +UPDATE contractlisteners SET topic = ''; + +ALTER TABLE events ALTER COLUMN topic SET NOT NULL; +ALTER TABLE contractlisteners ALTER COLUMN topic SET NOT NULL; + +CREATE INDEX events_topic ON events(topic); + +COMMIT; diff --git a/db/migrations/sqlite/000074_add_event_and_listener_topic.down.sql b/db/migrations/sqlite/000074_add_event_and_listener_topic.down.sql new file mode 100644 index 0000000000..1cfee88d47 --- /dev/null +++ b/db/migrations/sqlite/000074_add_event_and_listener_topic.down.sql @@ -0,0 +1,5 @@ +DROP INDEX events_topic; + +ALTER TABLE events DROP COLUMN topic; +ALTER TABLE contractlisteners DROP COLUMN topic; + diff --git a/db/migrations/sqlite/000074_add_event_and_listener_topic.up.sql b/db/migrations/sqlite/000074_add_event_and_listener_topic.up.sql new file mode 100644 index 0000000000..6dfaaff364 --- /dev/null +++ b/db/migrations/sqlite/000074_add_event_and_listener_topic.up.sql @@ -0,0 +1,7 @@ +ALTER TABLE events ADD COLUMN topic VARCHAR(64); +ALTER TABLE contractlisteners ADD COLUMN topic VARCHAR(64); + +UPDATE events SET topic = ''; +UPDATE contractlisteners SET topic = ''; + +CREATE INDEX events_topic ON events(topic); diff --git a/docs/swagger/swagger.yaml b/docs/swagger/swagger.yaml index cdb705564c..e0e3a30b1e 100644 --- a/docs/swagger/swagger.yaml +++ b/docs/swagger/swagger.yaml @@ -2282,6 +2282,8 @@ paths: type: object protocolId: type: string + topic: + type: string type: object description: Success default: @@ -2349,6 +2351,8 @@ paths: type: object protocolId: type: string + topic: + type: string type: object responses: "200": @@ -2395,6 +2399,8 @@ paths: type: object protocolId: type: string + topic: + type: string type: object description: Success default: @@ -2496,6 +2502,8 @@ paths: type: object protocolId: type: string + topic: + type: string type: object description: Success default: @@ -3581,6 +3589,11 @@ paths: name: sequence schema: type: string + - description: 'Data filter field. Prefixes supported: > >= < <= @ ^ ! !@ !^' + in: query + name: topic + schema: + type: string - description: 'Data filter field. Prefixes supported: > >= < <= @ ^ ! !@ !^' in: query name: tx @@ -3640,6 +3653,8 @@ paths: sequence: format: int64 type: integer + topic: + type: string tx: {} type: enum: @@ -3703,6 +3718,8 @@ paths: sequence: format: int64 type: integer + topic: + type: string tx: {} type: enum: @@ -4816,6 +4833,11 @@ paths: name: sequence schema: type: string + - description: 'Data filter field. Prefixes supported: > >= < <= @ ^ ! !@ !^' + in: query + name: topic + schema: + type: string - description: 'Data filter field. Prefixes supported: > >= < <= @ ^ ! !@ !^' in: query name: tx @@ -4875,6 +4897,8 @@ paths: sequence: format: int64 type: integer + topic: + type: string tx: {} type: enum: @@ -5835,17 +5859,7 @@ paths: type: string - description: 'Data filter field. Prefixes supported: > >= < <= @ ^ ! !@ !^' in: query - name: filter.group - schema: - type: string - - description: 'Data filter field. Prefixes supported: > >= < <= @ ^ ! !@ !^' - in: query - name: filter.tag - schema: - type: string - - description: 'Data filter field. Prefixes supported: > >= < <= @ ^ ! !@ !^' - in: query - name: filter.topics + name: filters schema: type: string - description: 'Data filter field. Prefixes supported: > >= < <= @ ^ ! !@ !^' @@ -5939,11 +5953,11 @@ paths: type: string tag: type: string - topics: - type: string type: object tag: type: string + topic: + type: string topics: type: string transaction: @@ -6021,11 +6035,11 @@ paths: type: string tag: type: string - topics: - type: string type: object tag: type: string + topic: + type: string topics: type: string transaction: @@ -6172,11 +6186,11 @@ paths: type: string tag: type: string - topics: - type: string type: object tag: type: string + topic: + type: string topics: type: string transaction: @@ -6254,11 +6268,11 @@ paths: type: string tag: type: string - topics: - type: string type: object tag: type: string + topic: + type: string topics: type: string transaction: @@ -6405,11 +6419,11 @@ paths: type: string tag: type: string - topics: - type: string type: object tag: type: string + topic: + type: string topics: type: string transaction: @@ -6525,11 +6539,11 @@ paths: type: string tag: type: string - topics: - type: string type: object tag: type: string + topic: + type: string topics: type: string transaction: diff --git a/internal/batch/batch_processor.go b/internal/batch/batch_processor.go index a1adc9c1ba..1731a5ca95 100644 --- a/internal/batch/batch_processor.go +++ b/internal/batch/batch_processor.go @@ -595,10 +595,13 @@ func (bp *batchProcessor) markPayloadDispatched(state *DispatchState) error { if bp.conf.txType == fftypes.TransactionTypeUnpinned { for _, msg := range state.Payload.Messages { // Emit a confirmation event locally immediately - event := fftypes.NewEvent(fftypes.EventTypeMessageConfirmed, state.Persisted.Namespace, msg.Header.ID, state.Persisted.TX.ID) - event.Correlator = msg.Header.CID - if err := bp.database.InsertEvent(ctx, event); err != nil { - return err + for _, topic := range msg.Header.Topics { + // One event per topic + event := fftypes.NewEvent(fftypes.EventTypeMessageConfirmed, state.Persisted.Namespace, msg.Header.ID, state.Persisted.TX.ID, topic) + event.Correlator = msg.Header.CID + if err := bp.database.InsertEvent(ctx, event); err != nil { + return err + } } } } diff --git a/internal/batch/batch_processor_test.go b/internal/batch/batch_processor_test.go index 6ae24999c1..0c8133f7a4 100644 --- a/internal/batch/batch_processor_test.go +++ b/internal/batch/batch_processor_test.go @@ -338,7 +338,7 @@ func TestMarkMessageDispatchedUnpinnedOK(t *testing.T) { for i := 0; i < 5; i++ { msgid := fftypes.NewUUID() bp.newWork <- &batchWork{ - msg: &fftypes.Message{Header: fftypes.MessageHeader{ID: msgid}, Sequence: int64(1000 + i)}, + msg: &fftypes.Message{Header: fftypes.MessageHeader{ID: msgid, Topics: fftypes.FFStringArray{"topic1"}}, Sequence: int64(1000 + i)}, } } }() @@ -460,7 +460,7 @@ func TestDispatchWithPublicBlobUpdates(t *testing.T) { // Dispatch the work go func() { bp.newWork <- &batchWork{ - msg: &fftypes.Message{Header: fftypes.MessageHeader{ID: fftypes.NewUUID()}, Sequence: int64(1000)}, + msg: &fftypes.Message{Header: fftypes.MessageHeader{ID: fftypes.NewUUID(), Topics: fftypes.FFStringArray{"topic1"}}, Sequence: int64(1000)}, data: fftypes.DataArray{ {ID: dataID, Blob: &fftypes.BlobRef{ Public: "public/ref", diff --git a/internal/config/config.go b/internal/config/config.go index 2996f63885..92bf3649ed 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -150,6 +150,10 @@ var ( EventDispatcherRetryMaxDelay = rootKey("event.dispatcher.retry.maxDelay") // EventDBEventsBufferSize the size of the buffer of change events EventDBEventsBufferSize = rootKey("event.dbevents.bufferSize") + // EventListenerTopicCacheSize cache size for blockchain listeners addresses + EventListenerTopicCacheSize = rootKey("event.listenerToipc.cache.size") + // EventListenerTopicCacheTTL cache time-to-live for private group addresses + EventListenerTopicCacheTTL = rootKey("event.listenerToipc.cache.ttl") // GroupCacheSize cache size for private group addresses GroupCacheSize = rootKey("group.cache.size") // GroupCacheTTL cache time-to-live for private group addresses @@ -340,6 +344,8 @@ func Reset() { viper.SetDefault(string(EventDispatcherPollTimeout), "30s") viper.SetDefault(string(EventTransportsEnabled), []string{"websockets", "webhooks"}) viper.SetDefault(string(EventTransportsDefault), "websockets") + viper.SetDefault(string(EventListenerTopicCacheSize), "100Kb") + viper.SetDefault(string(EventListenerTopicCacheTTL), "5m") viper.SetDefault(string(GroupCacheSize), "1Mb") viper.SetDefault(string(GroupCacheTTL), "1h") viper.SetDefault(string(AdminEnabled), false) @@ -354,7 +360,7 @@ func Reset() { viper.SetDefault(string(MessageCacheSize), "50Mb") viper.SetDefault(string(MessageCacheTTL), "5m") viper.SetDefault(string(MessageWriterBatchMaxInserts), 200) - viper.SetDefault(string(MessageWriterBatchTimeout), "50ms") + viper.SetDefault(string(MessageWriterBatchTimeout), "10ms") viper.SetDefault(string(MessageWriterCount), 5) viper.SetDefault(string(NamespacesDefault), "default") viper.SetDefault(string(NamespacesPredefined), fftypes.JSONObjectArray{{"name": "default", "description": "Default predefined namespace"}}) diff --git a/internal/database/sqlcommon/contractlisteners_sql.go b/internal/database/sqlcommon/contractlisteners_sql.go index dc0c783677..40bc980719 100644 --- a/internal/database/sqlcommon/contractlisteners_sql.go +++ b/internal/database/sqlcommon/contractlisteners_sql.go @@ -37,8 +37,9 @@ var ( "name", "protocol_id", "location", - "created", + "topic", "options", + "created", } contractListenerFilterFieldMap = map[string]string{ "interface": "interface_id", @@ -78,6 +79,7 @@ func (s *SQLCommon) UpsertContractListener(ctx context.Context, sub *fftypes.Con Set("namespace", sub.Namespace). Set("name", sub.Name). Set("location", sub.Location). + Set("topic", sub.Topic). Set("options", sub.Options). Where(sq.Eq{"protocol_id": sub.ProtocolID}), func() { @@ -99,8 +101,9 @@ func (s *SQLCommon) UpsertContractListener(ctx context.Context, sub *fftypes.Con sub.Name, sub.ProtocolID, sub.Location, - sub.Created, + sub.Topic, sub.Options, + sub.Created, ), func() { s.callbacks.UUIDCollectionNSEvent(database.CollectionContractListeners, fftypes.ChangeEventTypeCreated, sub.Namespace, sub.ID) @@ -125,8 +128,9 @@ func (s *SQLCommon) contractListenerResult(ctx context.Context, row *sql.Rows) ( &sub.Name, &sub.ProtocolID, &sub.Location, - &sub.Created, + &sub.Topic, &sub.Options, + &sub.Created, ) if err != nil { return nil, i18n.WrapError(ctx, err, i18n.MsgDBReadErr, "contractlisteners") diff --git a/internal/database/sqlcommon/contractlisteners_sql_test.go b/internal/database/sqlcommon/contractlisteners_sql_test.go index a022ce6def..94a34dc95f 100644 --- a/internal/database/sqlcommon/contractlisteners_sql_test.go +++ b/internal/database/sqlcommon/contractlisteners_sql_test.go @@ -50,6 +50,10 @@ func TestContractListenerE2EWithDB(t *testing.T) { Name: "sub1", ProtocolID: "sb-123", Location: fftypes.JSONAnyPtrBytes(locationJson), + Topic: "topic1", + Options: &fftypes.ContractListenerOptions{ + FirstEvent: "0", + }, } s.callbacks.On("UUIDCollectionNSEvent", database.CollectionContractListeners, fftypes.ChangeEventTypeCreated, "ns", sub.ID).Return() @@ -227,7 +231,7 @@ func TestContractListenerDeleteFail(t *testing.T) { s, mock := newMockProvider().init() mock.ExpectBegin() mock.ExpectQuery("SELECT .*").WillReturnRows(sqlmock.NewRows(contractListenerColumns).AddRow( - fftypes.NewUUID(), nil, []byte("{}"), "ns1", "sub1", "123", "{}", fftypes.Now(), nil), + fftypes.NewUUID(), nil, []byte("{}"), "ns1", "sub1", "123", "{}", "topic1", nil, fftypes.Now()), ) mock.ExpectExec("DELETE .*").WillReturnError(fmt.Errorf("pop")) err := s.DeleteContractListenerByID(context.Background(), fftypes.NewUUID()) diff --git a/internal/database/sqlcommon/event_sql.go b/internal/database/sqlcommon/event_sql.go index e6d01bd6a8..33850dcfce 100644 --- a/internal/database/sqlcommon/event_sql.go +++ b/internal/database/sqlcommon/event_sql.go @@ -35,6 +35,7 @@ var ( "ref", "cid", "tx_id", + "topic", "created", } eventFilterFieldMap = map[string]string{ @@ -76,13 +77,14 @@ func (s *SQLCommon) setEventInsertValues(query sq.InsertBuilder, event *fftypes. event.Reference, event.Correlator, event.Transaction, + event.Topic, event.Created, ) } func (s *SQLCommon) eventInserted(ctx context.Context, event *fftypes.Event) { s.callbacks.OrderedUUIDCollectionNSEvent(database.CollectionEvents, fftypes.ChangeEventTypeCreated, event.Namespace, event.ID, event.Sequence) - log.L(ctx).Infof("Emitted %s event %s ref=%s (sequence=%d)", event.Type, event.ID, event.Reference, event.Sequence) + log.L(ctx).Infof("Emitted %s event %s for %s:%s (correlator=%v,topic=%s)", event.Type, event.ID, event.Namespace, event.Reference, event.Correlator, event.Topic) } func (s *SQLCommon) insertEventsPreCommit(ctx context.Context, tx *txWrapper, events []*fftypes.Event) (err error) { @@ -134,6 +136,7 @@ func (s *SQLCommon) eventResult(ctx context.Context, row *sql.Rows) (*fftypes.Ev &event.Reference, &event.Correlator, &event.Transaction, + &event.Topic, &event.Created, // Must be added to the list of columns in all selects &event.Sequence, diff --git a/internal/database/sqlcommon/event_sql_test.go b/internal/database/sqlcommon/event_sql_test.go index 1824feb960..d4777628a0 100644 --- a/internal/database/sqlcommon/event_sql_test.go +++ b/internal/database/sqlcommon/event_sql_test.go @@ -44,6 +44,7 @@ func TestEventE2EWithDB(t *testing.T) { Type: fftypes.EventTypeMessageConfirmed, Reference: fftypes.NewUUID(), Correlator: fftypes.NewUUID(), + Topic: "topic1", Created: fftypes.Now(), } diff --git a/internal/database/sqlcommon/subscription_sql_test.go b/internal/database/sqlcommon/subscription_sql_test.go index ed160b9d6e..d50b9892da 100644 --- a/internal/database/sqlcommon/subscription_sql_test.go +++ b/internal/database/sqlcommon/subscription_sql_test.go @@ -78,10 +78,10 @@ func TestSubscriptionsE2EWithDB(t *testing.T) { Transport: "websockets", Filter: fftypes.SubscriptionFilter{ Events: string(fftypes.EventTypeMessageConfirmed), + Topic: "topics.*", Message: fftypes.MessageFilter{ - Topics: "topics.*", - Tag: "tag.*", - Group: "group.*", + Tag: "tag.*", + Group: "group.*", }, }, Options: subOpts, diff --git a/internal/definitions/definition_handler_contracts.go b/internal/definitions/definition_handler_contracts.go index 0031b17628..a3a5a5adab 100644 --- a/internal/definitions/definition_handler_contracts.go +++ b/internal/definitions/definition_handler_contracts.go @@ -90,7 +90,7 @@ func (dh *definitionHandlers) handleFFIBroadcast(ctx context.Context, state Defi l.Infof("Contract interface created id=%s author=%s", broadcast.ID, msg.Header.Author) state.AddFinalize(func(ctx context.Context) error { - event := fftypes.NewEvent(fftypes.EventTypeContractInterfaceConfirmed, broadcast.Namespace, broadcast.ID, tx) + event := fftypes.NewEvent(fftypes.EventTypeContractInterfaceConfirmed, broadcast.Namespace, broadcast.ID, tx, broadcast.Topic()) return dh.database.InsertEvent(ctx, event) }) return HandlerResult{Action: ActionConfirm}, nil @@ -121,7 +121,7 @@ func (dh *definitionHandlers) handleContractAPIBroadcast(ctx context.Context, st l.Infof("Contract API created id=%s author=%s", broadcast.ID, msg.Header.Author) state.AddFinalize(func(ctx context.Context) error { - event := fftypes.NewEvent(fftypes.EventTypeContractAPIConfirmed, broadcast.Namespace, broadcast.ID, tx) + event := fftypes.NewEvent(fftypes.EventTypeContractAPIConfirmed, broadcast.Namespace, broadcast.ID, tx, fftypes.SystemTopicDefinitions) return dh.database.InsertEvent(ctx, event) }) return HandlerResult{Action: ActionConfirm}, nil diff --git a/internal/definitions/definition_handler_datatype.go b/internal/definitions/definition_handler_datatype.go index 5dde2a0e67..7f51bf7df3 100644 --- a/internal/definitions/definition_handler_datatype.go +++ b/internal/definitions/definition_handler_datatype.go @@ -56,7 +56,7 @@ func (dh *definitionHandlers) handleDatatypeBroadcast(ctx context.Context, state } state.AddFinalize(func(ctx context.Context) error { - event := fftypes.NewEvent(fftypes.EventTypeDatatypeConfirmed, dt.Namespace, dt.ID, tx) + event := fftypes.NewEvent(fftypes.EventTypeDatatypeConfirmed, dt.Namespace, dt.ID, tx, fftypes.SystemTopicDefinitions) return dh.database.InsertEvent(ctx, event) }) return HandlerResult{Action: ActionConfirm}, nil diff --git a/internal/definitions/definition_handler_identity_claim.go b/internal/definitions/definition_handler_identity_claim.go index c0403e2493..35164157c1 100644 --- a/internal/definitions/definition_handler_identity_claim.go +++ b/internal/definitions/definition_handler_identity_claim.go @@ -211,7 +211,7 @@ func (dh *definitionHandlers) handleIdentityClaim(ctx context.Context, state Def } state.AddFinalize(func(ctx context.Context) error { - event := fftypes.NewEvent(fftypes.EventTypeIdentityConfirmed, identity.Namespace, identity.ID, nil) + event := fftypes.NewEvent(fftypes.EventTypeIdentityConfirmed, identity.Namespace, identity.ID, nil, fftypes.SystemTopicDefinitions) return dh.database.InsertEvent(ctx, event) }) return HandlerResult{Action: ActionConfirm}, nil diff --git a/internal/definitions/definition_handler_identity_update.go b/internal/definitions/definition_handler_identity_update.go index 1bc97e87dd..6bd4368584 100644 --- a/internal/definitions/definition_handler_identity_update.go +++ b/internal/definitions/definition_handler_identity_update.go @@ -63,7 +63,7 @@ func (dh *definitionHandlers) handleIdentityUpdateBroadcast(ctx context.Context, } state.AddFinalize(func(ctx context.Context) error { - event := fftypes.NewEvent(fftypes.EventTypeIdentityUpdated, identity.Namespace, identity.ID, nil) + event := fftypes.NewEvent(fftypes.EventTypeIdentityUpdated, identity.Namespace, identity.ID, nil, fftypes.SystemTopicDefinitions) return dh.database.InsertEvent(ctx, event) }) return HandlerResult{Action: ActionConfirm}, err diff --git a/internal/definitions/definition_handler_namespace.go b/internal/definitions/definition_handler_namespace.go index 738ee6776a..faff9609bb 100644 --- a/internal/definitions/definition_handler_namespace.go +++ b/internal/definitions/definition_handler_namespace.go @@ -56,7 +56,7 @@ func (dh *definitionHandlers) handleNamespaceBroadcast(ctx context.Context, stat } state.AddFinalize(func(ctx context.Context) error { - event := fftypes.NewEvent(fftypes.EventTypeNamespaceConfirmed, ns.Name, ns.ID, tx) + event := fftypes.NewEvent(fftypes.EventTypeNamespaceConfirmed, ns.Name, ns.ID, tx, fftypes.SystemTopicDefinitions) return dh.database.InsertEvent(ctx, event) }) return HandlerResult{Action: ActionConfirm}, nil diff --git a/internal/events/aggregator.go b/internal/events/aggregator.go index 5354fa69d5..3083e026c5 100644 --- a/internal/events/aggregator.go +++ b/internal/events/aggregator.go @@ -398,9 +398,10 @@ func (ag *aggregator) processMessage(ctx context.Context, manifest *fftypes.Batc } dispatched := false + var newState fftypes.MessageState if dataAvailable { l.Debugf("Attempt dispatch msg=%s broadcastContexts=%v privatePins=%v", msg.Header.ID, unmaskedContexts, msg.Pins) - dispatched, err = ag.attemptMessageDispatch(ctx, msg, data, manifest.TX.ID, state, pin) + newState, dispatched, err = ag.attemptMessageDispatch(ctx, msg, data, manifest.TX.ID, state, pin) if err != nil { return err } @@ -413,7 +414,7 @@ func (ag *aggregator) processMessage(ctx context.Context, manifest *fftypes.Batc for _, np := range nextPins { np.IncrementNextPin(ctx) } - state.MarkMessageDispatched(ctx, manifest.ID, msg, msgBaseIndex) + state.MarkMessageDispatched(ctx, manifest.ID, msg, msgBaseIndex, newState) } else { for _, unmaskedContext := range unmaskedContexts { state.SetContextBlockedBy(ctx, *unmaskedContext, pin.Sequence) @@ -423,16 +424,16 @@ func (ag *aggregator) processMessage(ctx context.Context, manifest *fftypes.Batc return nil } -func (ag *aggregator) attemptMessageDispatch(ctx context.Context, msg *fftypes.Message, data fftypes.DataArray, tx *fftypes.UUID, state *batchState, pin *fftypes.Pin) (valid bool, err error) { +func (ag *aggregator) attemptMessageDispatch(ctx context.Context, msg *fftypes.Message, data fftypes.DataArray, tx *fftypes.UUID, state *batchState, pin *fftypes.Pin) (newState fftypes.MessageState, valid bool, err error) { // Check the pin signer is valid for the message if valid, err := ag.checkOnchainConsistency(ctx, msg, pin); err != nil || !valid { - return false, err + return "", false, err } // Verify we have all the blobs for the data if resolved, err := ag.resolveBlobs(ctx, data); err != nil || !resolved { - return false, err + return "", false, err } // For transfers, verify the transfer has come through @@ -443,10 +444,10 @@ func (ag *aggregator) attemptMessageDispatch(ctx context.Context, msg *fftypes.M ) if transfers, _, err := ag.database.GetTokenTransfers(ctx, filter); err != nil || len(transfers) == 0 { log.L(ctx).Debugf("Transfer for message %s not yet available", msg.Header.ID) - return false, err + return "", false, err } else if !msg.Hash.Equals(transfers[0].MessageHash) { log.L(ctx).Errorf("Message hash %s does not match hash recorded in transfer: %s", msg.Hash, transfers[0].MessageHash) - return false, nil + return "", false, nil } } @@ -460,10 +461,10 @@ func (ag *aggregator) attemptMessageDispatch(ctx context.Context, msg *fftypes.M handlerResult, err := ag.definitions.HandleDefinitionBroadcast(ctx, state, msg, data, tx) log.L(ctx).Infof("Result of definition broadcast '%s' [%s]: %s", msg.Header.Tag, msg.Header.ID, handlerResult.Action) if handlerResult.Action == definitions.ActionRetry { - return false, err + return "", false, err } if handlerResult.Action == definitions.ActionWait { - return false, nil + return "", false, nil } customCorrelator = handlerResult.CustomCorrelator valid = handlerResult.Action == definitions.ActionConfirm @@ -474,46 +475,39 @@ func (ag *aggregator) attemptMessageDispatch(ctx context.Context, msg *fftypes.M case len(msg.Data) > 0: valid, err = ag.data.ValidateAll(ctx, data) if err != nil { - return false, err + return "", false, err } } - status := fftypes.MessageStateConfirmed + newState = fftypes.MessageStateConfirmed eventType := fftypes.EventTypeMessageConfirmed if valid { state.pendingConfirms[*msg.Header.ID] = msg } else { - status = fftypes.MessageStateRejected + newState = fftypes.MessageStateRejected eventType = fftypes.EventTypeMessageRejected } state.AddFinalize(func(ctx context.Context) error { - // This message is now confirmed - setConfirmed := database.MessageQueryFactory.NewUpdate(ctx). - Set("confirmed", fftypes.Now()). // the timestamp of the aggregator provides ordering - Set("state", status) // mark if the message was confirmed or rejected - if err = ag.database.UpdateMessage(ctx, msg.Header.ID, setConfirmed); err != nil { - return err - } - - // Generate the appropriate event - event := fftypes.NewEvent(eventType, msg.Header.Namespace, msg.Header.ID, tx) - event.Correlator = msg.Header.CID - if customCorrelator != nil { - // Definition handlers can set a custom event correlator (such as a token pool ID) - event.Correlator = customCorrelator - } - if err = ag.database.InsertEvent(ctx, event); err != nil { - return err + // Generate the appropriate event - one per topic (events cover a single topic) + for _, topic := range msg.Header.Topics { + event := fftypes.NewEvent(eventType, msg.Header.Namespace, msg.Header.ID, tx, topic) + event.Correlator = msg.Header.CID + if customCorrelator != nil { + // Definition handlers can set a custom event correlator (such as a token pool ID) + event.Correlator = customCorrelator + } + if err = ag.database.InsertEvent(ctx, event); err != nil { + return err + } } - log.L(ctx).Infof("Emitting %s %s for message %s:%s (correlator=%v)", eventType, event.ID, msg.Header.Namespace, msg.Header.ID, event.Correlator) return nil }) if ag.metrics.IsMetricsEnabled() { ag.metrics.MessageConfirmed(msg, eventType) } - return true, nil + return newState, true, nil } // resolveBlobs ensures that the blobs for all the attachments in the data array, have been received into the diff --git a/internal/events/aggregator_batch_state.go b/internal/events/aggregator_batch_state.go index 164b5497c8..ce6626f3f4 100644 --- a/internal/events/aggregator_batch_state.go +++ b/internal/events/aggregator_batch_state.go @@ -74,6 +74,7 @@ type dispatchedMessage struct { firstPinIndex int64 topicCount int msgPins fftypes.FFStringArray + newState fftypes.MessageState } // batchState is the object that tracks the in-memory state that builds up while processing a batch of pins, @@ -222,13 +223,14 @@ func (bs *batchState) CheckMaskedContextReady(ctx context.Context, msg *fftypes. }, err } -func (bs *batchState) MarkMessageDispatched(ctx context.Context, batchID *fftypes.UUID, msg *fftypes.Message, msgBaseIndex int64) { +func (bs *batchState) MarkMessageDispatched(ctx context.Context, batchID *fftypes.UUID, msg *fftypes.Message, msgBaseIndex int64, newState fftypes.MessageState) { bs.dispatchedMessages = append(bs.dispatchedMessages, &dispatchedMessage{ batchID: batchID, msgID: msg.Header.ID, firstPinIndex: msgBaseIndex, topicCount: len(msg.Header.Topics), msgPins: msg.Pins, + newState: newState, }) } @@ -270,6 +272,7 @@ func (bs *batchState) flushPins(ctx context.Context) error { // Note that this might include pins not in the batch we read from the database, as the page size // cannot be guaranteed to overlap with the set of indexes of a message within a batch. pinsDispatched := make(map[fftypes.UUID][]driver.Value) + msgStateUpdates := make(map[fftypes.MessageState][]driver.Value) for _, dm := range bs.dispatchedMessages { batchDispatched := pinsDispatched[*dm.batchID] l.Debugf("Marking message dispatched batch=%s msg=%s firstIndex=%d topics=%d pins=%s", dm.batchID, dm.msgID, dm.firstPinIndex, dm.topicCount, dm.msgPins) @@ -279,7 +282,9 @@ func (bs *batchState) flushPins(ctx context.Context) error { if len(batchDispatched) > 0 { pinsDispatched[*dm.batchID] = batchDispatched } + msgStateUpdates[dm.newState] = append(msgStateUpdates[dm.newState], dm.msgID) } + // Build one uber update for DB efficiency if len(pinsDispatched) > 0 { fb := database.PinQueryFactory.NewFilter(ctx) @@ -296,6 +301,19 @@ func (bs *batchState) flushPins(ctx context.Context) error { } } + // Also do the same for each type of state update, to mark messages dispatched with a new state + confirmTime := fftypes.Now() // All messages get the same confirmed timestamp the Events (not Messages directly) should be used for confirm sequence + for msgState, msgIDs := range msgStateUpdates { + fb := database.MessageQueryFactory.NewFilter(ctx) + filter := fb.In("id", msgIDs) + setConfirmed := database.MessageQueryFactory.NewUpdate(ctx). + Set("confirmed", confirmTime). + Set("state", msgState) + if err := bs.database.UpdateMessages(ctx, filter, setConfirmed); err != nil { + return err + } + } + return nil } diff --git a/internal/events/aggregator_batch_state_test.go b/internal/events/aggregator_batch_state_test.go index 5295020bc4..52404889e7 100644 --- a/internal/events/aggregator_batch_state_test.go +++ b/internal/events/aggregator_batch_state_test.go @@ -26,7 +26,7 @@ import ( "github.com/stretchr/testify/mock" ) -func TestFlushPinsFail(t *testing.T) { +func TestFlushPinsFailUpdatePins(t *testing.T) { ag, cancel := newTestAggregator() defer cancel() bs := newBatchState(ag) @@ -40,7 +40,28 @@ func TestFlushPinsFail(t *testing.T) { Topics: fftypes.FFStringArray{"topic1"}, }, Pins: fftypes.FFStringArray{"pin1"}, - }, 0) + }, 0, fftypes.MessageStateConfirmed) + + err := bs.flushPins(ag.ctx) + assert.Regexp(t, "pop", err) +} + +func TestFlushPinsFailUpdateMessages(t *testing.T) { + ag, cancel := newTestAggregator() + defer cancel() + bs := newBatchState(ag) + + mdi := ag.database.(*databasemocks.Plugin) + mdi.On("UpdatePins", ag.ctx, mock.Anything, mock.Anything).Return(nil) + mdi.On("UpdateMessages", ag.ctx, mock.Anything, mock.Anything).Return(fmt.Errorf("pop")) + + bs.MarkMessageDispatched(ag.ctx, fftypes.NewUUID(), &fftypes.Message{ + Header: fftypes.MessageHeader{ + ID: fftypes.NewUUID(), + Topics: fftypes.FFStringArray{"topic1"}, + }, + Pins: fftypes.FFStringArray{"pin1"}, + }, 0, fftypes.MessageStateConfirmed) err := bs.flushPins(ag.ctx) assert.Regexp(t, "pop", err) diff --git a/internal/events/aggregator_test.go b/internal/events/aggregator_test.go index a3f0a89b6c..b33d88a39a 100644 --- a/internal/events/aggregator_test.go +++ b/internal/events/aggregator_test.go @@ -216,7 +216,7 @@ func TestAggregationMaskedZeroNonceMatch(t *testing.T) { // Set the pin to dispatched mdi.On("UpdatePins", ag.ctx, mock.Anything, mock.Anything).Return(nil) // Update the message - mdi.On("UpdateMessage", ag.ctx, mock.Anything, mock.MatchedBy(func(u database.Update) bool { + mdi.On("UpdateMessages", ag.ctx, mock.Anything, mock.MatchedBy(func(u database.Update) bool { update, err := u.Finalize() assert.NoError(t, err) assert.Len(t, update.SetOperations, 2) @@ -353,7 +353,7 @@ func TestAggregationMaskedNextSequenceMatch(t *testing.T) { // Set the pin to dispatched mdi.On("UpdatePins", ag.ctx, mock.Anything, mock.Anything).Return(nil) // Update the message - mdi.On("UpdateMessage", ag.ctx, mock.Anything, mock.Anything).Return(nil) + mdi.On("UpdateMessages", ag.ctx, mock.Anything, mock.Anything).Return(nil) _, err := ag.processPinsEventsHandler([]fftypes.LocallySequenced{ &fftypes.Pin{ @@ -439,7 +439,7 @@ func TestAggregationBroadcast(t *testing.T) { // Set the pin to dispatched mdi.On("UpdatePins", ag.ctx, mock.Anything, mock.Anything).Return(nil) // Update the message - mdi.On("UpdateMessage", ag.ctx, mock.Anything, mock.Anything).Return(nil) + mdi.On("UpdateMessages", ag.ctx, mock.Anything, mock.Anything).Return(nil) err := ag.processPins(ag.ctx, []*fftypes.Pin{ { @@ -533,7 +533,7 @@ func TestAggregationMigratedBroadcast(t *testing.T) { // Set the pin to dispatched mdi.On("UpdatePins", ag.ctx, mock.Anything, mock.Anything).Return(nil) // Update the message - mdi.On("UpdateMessage", ag.ctx, mock.Anything, mock.Anything).Return(nil) + mdi.On("UpdateMessages", ag.ctx, mock.Anything, mock.Anything).Return(nil) err = ag.processPins(ag.ctx, []*fftypes.Pin{ { @@ -1082,7 +1082,7 @@ func TestProcessMsgFailPinUpdate(t *testing.T) { mdm.On("GetMessageWithDataCached", ag.ctx, mock.Anything, data.CRORequirePins).Return(msg, nil, true, nil) mdm.On("ValidateAll", ag.ctx, mock.Anything).Return(false, nil) mdi.On("InsertEvent", ag.ctx, mock.Anything).Return(nil) - mdi.On("UpdateMessage", ag.ctx, mock.Anything, mock.Anything).Return(nil) + mdi.On("UpdateMessages", ag.ctx, mock.Anything, mock.Anything).Return(nil) mdi.On("UpdateNextPin", ag.ctx, mock.Anything, mock.Anything).Return(fmt.Errorf("pop")) err := ag.processMessage(ag.ctx, &fftypes.BatchManifest{ @@ -1114,8 +1114,10 @@ func TestCheckMaskedContextReadyMismatchedAuthor(t *testing.T) { bs := newBatchState(ag) _, err := bs.CheckMaskedContextReady(ag.ctx, &fftypes.Message{ Header: fftypes.MessageHeader{ - ID: fftypes.NewUUID(), - Group: fftypes.NewRandB32(), + ID: fftypes.NewUUID(), + Group: fftypes.NewRandB32(), + Tag: fftypes.SystemTagDefineDatatype, + Topics: fftypes.FFStringArray{"topic1"}, SignerRef: fftypes.SignerRef{ Author: "author1", Key: "0x12345", @@ -1348,7 +1350,7 @@ func TestAttemptMessageDispatchFailValidateData(t *testing.T) { mdm.On("GetMessageData", ag.ctx, mock.Anything, true).Return(fftypes.DataArray{}, true, nil) mdm.On("ValidateAll", ag.ctx, mock.Anything).Return(false, fmt.Errorf("pop")) - _, err := ag.attemptMessageDispatch(ag.ctx, &fftypes.Message{ + _, _, err := ag.attemptMessageDispatch(ag.ctx, &fftypes.Message{ Header: fftypes.MessageHeader{ID: fftypes.NewUUID(), SignerRef: fftypes.SignerRef{Key: "0x12345", Author: org1.DID}}, Data: fftypes.DataRefs{ {ID: fftypes.NewUUID()}, @@ -1375,7 +1377,7 @@ func TestAttemptMessageDispatchMissingBlobs(t *testing.T) { mdm.On("CopyBlobPStoDX", ag.ctx, mock.Anything).Return(nil, nil) - dispatched, err := ag.attemptMessageDispatch(ag.ctx, &fftypes.Message{ + _, dispatched, err := ag.attemptMessageDispatch(ag.ctx, &fftypes.Message{ Header: fftypes.MessageHeader{ID: fftypes.NewUUID(), SignerRef: fftypes.SignerRef{Key: "0x12345", Author: org1.DID}}, }, fftypes.DataArray{ {ID: fftypes.NewUUID(), Hash: fftypes.NewRandB32(), Blob: &fftypes.BlobRef{ @@ -1410,7 +1412,7 @@ func TestAttemptMessageDispatchMissingTransfers(t *testing.T) { }, } msg.Hash = msg.Header.Hash() - dispatched, err := ag.attemptMessageDispatch(ag.ctx, msg, fftypes.DataArray{}, nil, &batchState{}, &fftypes.Pin{Signer: "0x12345"}) + _, dispatched, err := ag.attemptMessageDispatch(ag.ctx, msg, fftypes.DataArray{}, nil, &batchState{}, &fftypes.Pin{Signer: "0x12345"}) assert.NoError(t, err) assert.False(t, dispatched) @@ -1437,7 +1439,7 @@ func TestAttemptMessageDispatchGetTransfersFail(t *testing.T) { }, } msg.Hash = msg.Header.Hash() - dispatched, err := ag.attemptMessageDispatch(ag.ctx, msg, fftypes.DataArray{}, nil, &batchState{}, &fftypes.Pin{Signer: "0x12345"}) + _, dispatched, err := ag.attemptMessageDispatch(ag.ctx, msg, fftypes.DataArray{}, nil, &batchState{}, &fftypes.Pin{Signer: "0x12345"}) assert.EqualError(t, err, "pop") assert.False(t, dispatched) @@ -1470,7 +1472,7 @@ func TestAttemptMessageDispatchTransferMismatch(t *testing.T) { mdi := ag.database.(*databasemocks.Plugin) mdi.On("GetTokenTransfers", ag.ctx, mock.Anything).Return(transfers, nil, nil) - dispatched, err := ag.attemptMessageDispatch(ag.ctx, msg, fftypes.DataArray{}, nil, &batchState{}, &fftypes.Pin{Signer: "0x12345"}) + _, dispatched, err := ag.attemptMessageDispatch(ag.ctx, msg, fftypes.DataArray{}, nil, &batchState{}, &fftypes.Pin{Signer: "0x12345"}) assert.NoError(t, err) assert.False(t, dispatched) @@ -1497,7 +1499,7 @@ func TestDefinitionBroadcastActionRejectCustomCorrelator(t *testing.T) { mdm.On("GetMessageData", ag.ctx, mock.Anything, true).Return(fftypes.DataArray{}, true, nil) mdi := ag.database.(*databasemocks.Plugin) - mdi.On("UpdateMessage", ag.ctx, mock.Anything, mock.MatchedBy(func(u database.Update) bool { + mdi.On("UpdateMessages", ag.ctx, mock.Anything, mock.MatchedBy(func(u database.Update) bool { update, err := u.Finalize() assert.NoError(t, err) assert.Len(t, update.SetOperations, 2) @@ -1518,12 +1520,14 @@ func TestDefinitionBroadcastActionRejectCustomCorrelator(t *testing.T) { return event.Correlator.Equals(customCorrelator) })).Return(nil) - _, err := ag.attemptMessageDispatch(ag.ctx, &fftypes.Message{ + _, _, err := ag.attemptMessageDispatch(ag.ctx, &fftypes.Message{ Header: fftypes.MessageHeader{ Type: fftypes.MessageTypeDefinition, ID: fftypes.NewUUID(), Namespace: "any", SignerRef: fftypes.SignerRef{Key: "0x12345", Author: org1.DID}, + Tag: fftypes.SystemTagDefineDatatype, + Topics: fftypes.FFStringArray{"topic1"}, }, Data: fftypes.DataRefs{ {ID: fftypes.NewUUID()}, @@ -1548,7 +1552,7 @@ func TestDefinitionBroadcastInvalidSigner(t *testing.T) { mdm.On("GetMessageData", ag.ctx, mock.Anything, true).Return(fftypes.DataArray{}, true, nil) mdi := ag.database.(*databasemocks.Plugin) - mdi.On("UpdateMessage", ag.ctx, mock.Anything, mock.MatchedBy(func(u database.Update) bool { + mdi.On("UpdateMessages", ag.ctx, mock.Anything, mock.MatchedBy(func(u database.Update) bool { update, err := u.Finalize() assert.NoError(t, err) assert.Len(t, update.SetOperations, 2) @@ -1567,7 +1571,7 @@ func TestDefinitionBroadcastInvalidSigner(t *testing.T) { })).Return(nil) mdi.On("InsertEvent", ag.ctx, mock.Anything).Return(nil) - _, err := ag.attemptMessageDispatch(ag.ctx, &fftypes.Message{ + _, _, err := ag.attemptMessageDispatch(ag.ctx, &fftypes.Message{ Header: fftypes.MessageHeader{ Type: fftypes.MessageTypeDefinition, ID: fftypes.NewUUID(), @@ -1710,7 +1714,7 @@ func TestDefinitionBroadcastActionRetry(t *testing.T) { mdm := ag.data.(*datamocks.Manager) mdm.On("GetMessageWithDataCached", ag.ctx, mock.Anything).Return(msg1, fftypes.DataArray{}, true, nil) - _, err := ag.attemptMessageDispatch(ag.ctx, msg1, nil, nil, &batchState{}, &fftypes.Pin{Signer: "0x12345"}) + _, _, err := ag.attemptMessageDispatch(ag.ctx, msg1, nil, nil, &batchState{}, &fftypes.Pin{Signer: "0x12345"}) assert.EqualError(t, err, "pop") } @@ -1724,7 +1728,7 @@ func TestDefinitionBroadcastRejectSignerLookupFail(t *testing.T) { mim := ag.identity.(*identitymanagermocks.Manager) mim.On("FindIdentityForVerifier", ag.ctx, mock.Anything, mock.Anything, mock.Anything).Return(nil, fmt.Errorf("pop")) - valid, err := ag.attemptMessageDispatch(ag.ctx, msg1, nil, nil, &batchState{}, &fftypes.Pin{Signer: "0x12345"}) + _, valid, err := ag.attemptMessageDispatch(ag.ctx, msg1, nil, nil, &batchState{}, &fftypes.Pin{Signer: "0x12345"}) assert.Regexp(t, "pop", err) assert.False(t, valid) @@ -1740,7 +1744,7 @@ func TestDefinitionBroadcastRejectSignerLookupWrongOrg(t *testing.T) { mim := ag.identity.(*identitymanagermocks.Manager) mim.On("FindIdentityForVerifier", ag.ctx, mock.Anything, mock.Anything, mock.Anything).Return(newTestOrg("org2"), nil) - valid, err := ag.attemptMessageDispatch(ag.ctx, msg1, nil, nil, &batchState{}, &fftypes.Pin{Signer: "0x12345"}) + _, valid, err := ag.attemptMessageDispatch(ag.ctx, msg1, nil, nil, &batchState{}, &fftypes.Pin{Signer: "0x12345"}) assert.NoError(t, err) assert.False(t, valid) @@ -1754,7 +1758,7 @@ func TestDefinitionBroadcastRejectBadSigner(t *testing.T) { msg1, _, org1, _ := newTestManifest(fftypes.MessageTypeDefinition, nil) msg1.Header.SignerRef = fftypes.SignerRef{Key: "0x23456", Author: org1.DID} - valid, err := ag.attemptMessageDispatch(ag.ctx, msg1, nil, nil, &batchState{}, &fftypes.Pin{Signer: "0x12345"}) + _, valid, err := ag.attemptMessageDispatch(ag.ctx, msg1, nil, nil, &batchState{}, &fftypes.Pin{Signer: "0x12345"}) assert.NoError(t, err) assert.False(t, valid) @@ -1773,7 +1777,7 @@ func TestDefinitionBroadcastRejectUnregisteredSignerIdentityClaim(t *testing.T) msh := ag.definitions.(*definitionsmocks.DefinitionHandlers) msh.On("HandleDefinitionBroadcast", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(definitions.HandlerResult{Action: definitions.ActionWait}, nil) - valid, err := ag.attemptMessageDispatch(ag.ctx, msg1, nil, nil, &batchState{}, &fftypes.Pin{Signer: "0x12345"}) + _, valid, err := ag.attemptMessageDispatch(ag.ctx, msg1, nil, nil, &batchState{}, &fftypes.Pin{Signer: "0x12345"}) assert.NoError(t, err) assert.False(t, valid) @@ -1790,7 +1794,7 @@ func TestDefinitionBroadcastRootUnregisteredOk(t *testing.T) { mim := ag.identity.(*identitymanagermocks.Manager) mim.On("FindIdentityForVerifier", ag.ctx, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil) - valid, err := ag.attemptMessageDispatch(ag.ctx, msg1, nil, nil, &batchState{}, &fftypes.Pin{Signer: "0x12345"}) + _, valid, err := ag.attemptMessageDispatch(ag.ctx, msg1, nil, nil, &batchState{}, &fftypes.Pin{Signer: "0x12345"}) assert.NoError(t, err) assert.False(t, valid) @@ -1809,7 +1813,7 @@ func TestDefinitionBroadcastActionWait(t *testing.T) { msh := ag.definitions.(*definitionsmocks.DefinitionHandlers) msh.On("HandleDefinitionBroadcast", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(definitions.HandlerResult{Action: definitions.ActionWait}, nil) - _, err := ag.attemptMessageDispatch(ag.ctx, msg1, nil, nil, &batchState{}, &fftypes.Pin{Signer: "0x12345"}) + _, _, err := ag.attemptMessageDispatch(ag.ctx, msg1, nil, nil, &batchState{}, &fftypes.Pin{Signer: "0x12345"}) assert.NoError(t, err) mim.AssertExpectations(t) @@ -1829,10 +1833,9 @@ func TestAttemptMessageDispatchEventFail(t *testing.T) { mim.On("FindIdentityForVerifier", ag.ctx, mock.Anything, mock.Anything, mock.Anything).Return(org1, nil) mdm.On("ValidateAll", ag.ctx, mock.Anything).Return(true, nil) - mdi.On("UpdateMessage", ag.ctx, mock.Anything, mock.Anything).Return(nil) mdi.On("InsertEvent", ag.ctx, mock.Anything).Return(fmt.Errorf("pop")) - _, err := ag.attemptMessageDispatch(ag.ctx, msg1, fftypes.DataArray{ + _, _, err := ag.attemptMessageDispatch(ag.ctx, msg1, fftypes.DataArray{ &fftypes.Data{ID: msg1.Data[0].ID}, }, nil, bs, &fftypes.Pin{Signer: "0x12345"}) assert.NoError(t, err) @@ -1859,10 +1862,9 @@ func TestAttemptMessageDispatchGroupInit(t *testing.T) { mim.On("FindIdentityForVerifier", ag.ctx, mock.Anything, mock.Anything, mock.Anything).Return(org1, nil) mdm.On("GetMessageData", ag.ctx, mock.Anything, true).Return(fftypes.DataArray{}, true, nil) mdm.On("ValidateAll", ag.ctx, mock.Anything).Return(true, nil) - mdi.On("UpdateMessage", ag.ctx, mock.Anything, mock.Anything).Return(nil) mdi.On("InsertEvent", ag.ctx, mock.Anything).Return(nil) - _, err := ag.attemptMessageDispatch(ag.ctx, &fftypes.Message{ + _, _, err := ag.attemptMessageDispatch(ag.ctx, &fftypes.Message{ Header: fftypes.MessageHeader{ ID: fftypes.NewUUID(), Type: fftypes.MessageTypeGroupInit, @@ -1873,37 +1875,12 @@ func TestAttemptMessageDispatchGroupInit(t *testing.T) { } -func TestAttemptMessageUpdateMessageFail(t *testing.T) { - ag, cancel := newTestAggregator() - defer cancel() - bs := newBatchState(ag) - org1 := newTestOrg("org1") - - mdi := ag.database.(*databasemocks.Plugin) - mdm := ag.data.(*datamocks.Manager) - mim := ag.identity.(*identitymanagermocks.Manager) - - mim.On("FindIdentityForVerifier", ag.ctx, mock.Anything, mock.Anything, mock.Anything).Return(org1, nil) - mdm.On("GetMessageData", ag.ctx, mock.Anything, true).Return(fftypes.DataArray{}, true, nil) - mdm.On("ValidateAll", ag.ctx, mock.Anything).Return(true, nil) - mdi.On("UpdateMessage", ag.ctx, mock.Anything, mock.Anything).Return(fmt.Errorf("pop")) - - _, err := ag.attemptMessageDispatch(ag.ctx, &fftypes.Message{ - Header: fftypes.MessageHeader{ID: fftypes.NewUUID(), SignerRef: fftypes.SignerRef{Key: "0x12345", Author: org1.DID}}, - }, nil, nil, bs, &fftypes.Pin{Signer: "0x12345"}) - assert.NoError(t, err) - - err = bs.RunFinalize(ag.ctx) - assert.EqualError(t, err, "pop") - -} - func TestRewindOffchainBatchesNoBatches(t *testing.T) { ag, cancel := newTestAggregator() defer cancel() mdi := ag.database.(*databasemocks.Plugin) - mdi.On("UpdateMessage", ag.ctx, mock.Anything, mock.Anything).Return(fmt.Errorf("pop")) + mdi.On("UpdateMessages", ag.ctx, mock.Anything, mock.Anything).Return(fmt.Errorf("pop")) rewind, offset := ag.rewindOffchainBatches() assert.False(t, rewind) diff --git a/internal/events/batch_pin_complete.go b/internal/events/batch_pin_complete.go index 19e8144e4d..cb021f3e45 100644 --- a/internal/events/batch_pin_complete.go +++ b/internal/events/batch_pin_complete.go @@ -118,7 +118,7 @@ func (em *eventManager) handleBroadcastPinComplete(batchPin *blockchain.BatchPin var batch *fftypes.Batch err := json.NewDecoder(body).Decode(&batch) if err != nil { - log.L(em.ctx).Errorf("Failed to parse payload referred in batch ID '%s' from transaction '%s'", batchPin.BatchID, batchPin.Event.ProtocolID) + log.L(em.ctx).Errorf("Failed to parse payload referred in batch ID '%s' from transaction '%s': %s", batchPin.BatchID, batchPin.Event.ProtocolID, err) return nil // log and swallow unprocessable data } body.Close() diff --git a/internal/events/blockchain_event.go b/internal/events/blockchain_event.go index b5eca28b63..153aeba2b8 100644 --- a/internal/events/blockchain_event.go +++ b/internal/events/blockchain_event.go @@ -18,6 +18,7 @@ package events import ( "context" + "fmt" "github.com/hyperledger/firefly/internal/log" "github.com/hyperledger/firefly/pkg/blockchain" @@ -42,11 +43,58 @@ func buildBlockchainEvent(ns string, subID *fftypes.UUID, event *blockchain.Even return ev } +func (em *eventManager) getChainListenerByProtocolIDCached(ctx context.Context, protocolID string) (*fftypes.ContractListener, error) { + return em.getChainListenerCached(fmt.Sprintf("pid:%s", protocolID), func() (*fftypes.ContractListener, error) { + return em.database.GetContractListenerByProtocolID(ctx, protocolID) + }) +} + +func (em *eventManager) getChainListenerByIDCached(ctx context.Context, id *fftypes.UUID) (*fftypes.ContractListener, error) { + return em.getChainListenerCached(fmt.Sprintf("id:%s", id), func() (*fftypes.ContractListener, error) { + return em.database.GetContractListenerByID(ctx, id) + }) +} + +func (em *eventManager) getChainListenerCached(cacheKey string, getter func() (*fftypes.ContractListener, error)) (*fftypes.ContractListener, error) { + cached := em.chainListenerCache.Get(cacheKey) + if cached != nil { + cached.Extend(em.chainListenerCacheTTL) + return cached.Value().(*fftypes.ContractListener), nil + } + listener, err := getter() + if listener == nil || err != nil { + return nil, err + } + em.chainListenerCache.Set(cacheKey, listener, em.chainListenerCacheTTL) + return listener, err +} + +func (em *eventManager) getTopicForChainListener(ctx context.Context, listenerID *fftypes.UUID) (string, error) { + if listenerID == nil { + return fftypes.SystemBatchPinTopic, nil + } + listener, err := em.getChainListenerByIDCached(ctx, listenerID) + if err != nil { + return "", err + } + var topic string + if listener != nil && listener.Topic != "" { + topic = listener.Topic + } else { + topic = listenerID.String() + } + return topic, nil +} + func (em *eventManager) persistBlockchainEvent(ctx context.Context, chainEvent *fftypes.BlockchainEvent) error { if err := em.database.InsertBlockchainEvent(ctx, chainEvent); err != nil { return err } - ffEvent := fftypes.NewEvent(fftypes.EventTypeBlockchainEventReceived, chainEvent.Namespace, chainEvent.ID, chainEvent.TX.ID) + topic, err := em.getTopicForChainListener(ctx, chainEvent.Listener) + if err != nil { + return err + } + ffEvent := fftypes.NewEvent(fftypes.EventTypeBlockchainEventReceived, chainEvent.Namespace, chainEvent.ID, chainEvent.TX.ID, topic) if err := em.database.InsertEvent(ctx, ffEvent); err != nil { return err } @@ -56,8 +104,7 @@ func (em *eventManager) persistBlockchainEvent(ctx context.Context, chainEvent * func (em *eventManager) BlockchainEvent(event *blockchain.EventWithSubscription) error { return em.retry.Do(em.ctx, "persist contract event", func(attempt int) (bool, error) { err := em.database.RunAsGroup(em.ctx, func(ctx context.Context) error { - // TODO: should cache this lookup for efficiency - sub, err := em.database.GetContractListenerByProtocolID(ctx, event.Subscription) + sub, err := em.getChainListenerByProtocolIDCached(ctx, event.Subscription) if err != nil { return err } diff --git a/internal/events/blockchain_event_test.go b/internal/events/blockchain_event_test.go index f92a5b760b..d1a07ec561 100644 --- a/internal/events/blockchain_event_test.go +++ b/internal/events/blockchain_event_test.go @@ -47,20 +47,22 @@ func TestContractEventWithRetries(t *testing.T) { sub := &fftypes.ContractListener{ Namespace: "ns", ID: fftypes.NewUUID(), + Topic: "topic1", } var eventID *fftypes.UUID mdi := em.database.(*databasemocks.Plugin) mdi.On("GetContractListenerByProtocolID", mock.Anything, "sb-1").Return(nil, fmt.Errorf("pop")).Once() - mdi.On("GetContractListenerByProtocolID", mock.Anything, "sb-1").Return(sub, nil).Times(3) + mdi.On("GetContractListenerByProtocolID", mock.Anything, "sb-1").Return(sub, nil).Times(1) // cached mdi.On("InsertBlockchainEvent", mock.Anything, mock.Anything).Return(fmt.Errorf("pop")).Once() mdi.On("InsertBlockchainEvent", mock.Anything, mock.MatchedBy(func(e *fftypes.BlockchainEvent) bool { eventID = e.ID return *e.Listener == *sub.ID && e.Name == "Changed" && e.Namespace == "ns" })).Return(nil).Times(2) + mdi.On("GetContractListenerByID", mock.Anything, sub.ID).Return(sub, nil) mdi.On("InsertEvent", mock.Anything, mock.Anything).Return(fmt.Errorf("pop")).Once() mdi.On("InsertEvent", mock.Anything, mock.MatchedBy(func(e *fftypes.Event) bool { - return e.Type == fftypes.EventTypeBlockchainEventReceived && e.Reference != nil && e.Reference == eventID + return e.Type == fftypes.EventTypeBlockchainEventReceived && e.Reference != nil && e.Reference == eventID && e.Topic == "topic1" })).Return(nil).Once() err := em.BlockchainEvent(ev) @@ -95,3 +97,48 @@ func TestContractEventUnknownSubscription(t *testing.T) { mdi.AssertExpectations(t) } + +func TestPersistBlockchainEventChainListenerLoopkupFail(t *testing.T) { + em, cancel := newTestEventManager(t) + defer cancel() + + ev := &fftypes.BlockchainEvent{ + Name: "Changed", + Output: fftypes.JSONObject{ + "value": "1", + }, + Info: fftypes.JSONObject{ + "blockNumber": "10", + }, + Listener: fftypes.NewUUID(), + } + + mdi := em.database.(*databasemocks.Plugin) + mdi.On("InsertBlockchainEvent", mock.Anything, mock.Anything).Return(nil) + mdi.On("GetContractListenerByID", mock.Anything, ev.Listener).Return(nil, fmt.Errorf("pop")) + + err := em.persistBlockchainEvent(em.ctx, ev) + assert.Regexp(t, "pop", err) + + mdi.AssertExpectations(t) +} + +func TestGetTopicForChainListenerFallback(t *testing.T) { + em, cancel := newTestEventManager(t) + defer cancel() + + sub := &fftypes.ContractListener{ + Namespace: "ns", + ID: fftypes.NewUUID(), + Topic: "", + } + + mdi := em.database.(*databasemocks.Plugin) + mdi.On("GetContractListenerByID", mock.Anything, mock.Anything).Return(sub, nil) + + topic, err := em.getTopicForChainListener(em.ctx, sub.ID) + assert.NoError(t, err) + assert.Equal(t, sub.ID.String(), topic) + + mdi.AssertExpectations(t) +} diff --git a/internal/events/dx_callbacks.go b/internal/events/dx_callbacks.go index 72122c541c..f7f016826f 100644 --- a/internal/events/dx_callbacks.go +++ b/internal/events/dx_callbacks.go @@ -172,10 +172,13 @@ func (em *eventManager) markUnpinnedMessagesConfirmed(ctx context.Context, batch } for _, msg := range batch.Payload.Messages { - event := fftypes.NewEvent(fftypes.EventTypeMessageConfirmed, batch.Namespace, msg.Header.ID, batch.Payload.TX.ID) - event.Correlator = msg.Header.CID - if err := em.database.InsertEvent(ctx, event); err != nil { - return err + for _, topic := range msg.Header.Topics { + // One event per topic + event := fftypes.NewEvent(fftypes.EventTypeMessageConfirmed, batch.Namespace, msg.Header.ID, batch.Payload.TX.ID, topic) + event.Correlator = msg.Header.CID + if err := em.database.InsertEvent(ctx, event); err != nil { + return err + } } } diff --git a/internal/events/event_dispatcher.go b/internal/events/event_dispatcher.go index b1a587578a..624fb4a793 100644 --- a/internal/events/event_dispatcher.go +++ b/internal/events/event_dispatcher.go @@ -188,16 +188,15 @@ func (ed *eventDispatcher) filterEvents(candidates []*fftypes.EventDelivery) []* tx := event.Transaction be := event.BlockchainEvent tag := "" + topic := event.Topic group := "" author := "" txType := "" beName := "" beListener := "" - var topics []string if msg != nil { tag = msg.Header.Tag - topics = msg.Header.Topics author = msg.Header.Author if msg.Header.Group != nil { group = msg.Header.Group.String() @@ -213,6 +212,16 @@ func (ed *eventDispatcher) filterEvents(candidates []*fftypes.EventDelivery) []* beListener = be.Listener.String() } + if filter.topicFilter != nil { + topicsMatch := false + if filter.topicFilter.MatchString(topic) { + topicsMatch = true + } + if !topicsMatch { + continue + } + } + if filter.messageFilter != nil { if filter.messageFilter.tagFilter != nil && !filter.messageFilter.tagFilter.MatchString(tag) { continue @@ -220,18 +229,6 @@ func (ed *eventDispatcher) filterEvents(candidates []*fftypes.EventDelivery) []* if filter.messageFilter.authorFilter != nil && !filter.messageFilter.authorFilter.MatchString(author) { continue } - if filter.messageFilter.topicsFilter != nil { - topicsMatch := false - for _, topic := range topics { - if filter.messageFilter.topicsFilter.MatchString(topic) { - topicsMatch = true - break - } - } - if !topicsMatch { - continue - } - } if filter.messageFilter.groupFilter != nil && !filter.messageFilter.groupFilter.MatchString(group) { continue } diff --git a/internal/events/event_dispatcher_test.go b/internal/events/event_dispatcher_test.go index 159fd9260a..7905830cca 100644 --- a/internal/events/event_dispatcher_test.go +++ b/internal/events/event_dispatcher_test.go @@ -469,8 +469,9 @@ func TestFilterEventsMatch(t *testing.T) { { EnrichedEvent: fftypes.EnrichedEvent{ Event: fftypes.Event{ - ID: id1, - Type: fftypes.EventTypeMessageConfirmed, + ID: id1, + Type: fftypes.EventTypeMessageConfirmed, + Topic: "topic1", }, Message: &fftypes.Message{ Header: fftypes.MessageHeader{ @@ -488,8 +489,9 @@ func TestFilterEventsMatch(t *testing.T) { { EnrichedEvent: fftypes.EnrichedEvent{ Event: fftypes.Event{ - ID: id2, - Type: fftypes.EventTypeMessageConfirmed, + ID: id2, + Type: fftypes.EventTypeMessageConfirmed, + Topic: "topic1", }, Message: &fftypes.Message{ Header: fftypes.MessageHeader{ @@ -507,8 +509,9 @@ func TestFilterEventsMatch(t *testing.T) { { EnrichedEvent: fftypes.EnrichedEvent{ Event: fftypes.Event{ - ID: id3, - Type: fftypes.EventTypeMessageRejected, + ID: id3, + Type: fftypes.EventTypeMessageRejected, + Topic: "topic2", }, Message: &fftypes.Message{ Header: fftypes.MessageHeader{ @@ -559,7 +562,7 @@ func TestFilterEventsMatch(t *testing.T) { }) ed.subscription.eventMatcher = regexp.MustCompile(fmt.Sprintf("^%s$", fftypes.EventTypeMessageConfirmed)) - ed.subscription.messageFilter.topicsFilter = regexp.MustCompile(".*") + ed.subscription.topicFilter = regexp.MustCompile(".*") ed.subscription.messageFilter.tagFilter = regexp.MustCompile(".*") ed.subscription.messageFilter.groupFilter = regexp.MustCompile(".*") matched := ed.filterEvents(events) @@ -569,7 +572,7 @@ func TestFilterEventsMatch(t *testing.T) { // id three has the wrong event type ed.subscription.eventMatcher = nil - ed.subscription.messageFilter.topicsFilter = nil + ed.subscription.topicFilter = nil ed.subscription.messageFilter.tagFilter = nil ed.subscription.messageFilter.groupFilter = nil matched = ed.filterEvents(events) @@ -580,19 +583,19 @@ func TestFilterEventsMatch(t *testing.T) { assert.Equal(t, *id4, *matched[3].ID) assert.Equal(t, *id5, *matched[4].ID) - ed.subscription.messageFilter.topicsFilter = regexp.MustCompile("topic1") + ed.subscription.topicFilter = regexp.MustCompile("topic1") matched = ed.filterEvents(events) assert.Equal(t, 2, len(matched)) assert.Equal(t, *id1, *matched[0].ID) assert.Equal(t, *id2, *matched[1].ID) - ed.subscription.messageFilter.topicsFilter = nil + ed.subscription.topicFilter = nil ed.subscription.messageFilter.tagFilter = regexp.MustCompile("tag2") matched = ed.filterEvents(events) assert.Equal(t, 1, len(matched)) assert.Equal(t, *id2, *matched[0].ID) - ed.subscription.messageFilter.topicsFilter = nil + ed.subscription.topicFilter = nil ed.subscription.messageFilter.authorFilter = nil ed.subscription.messageFilter.groupFilter = regexp.MustCompile(gid1.String()) matched = ed.filterEvents(events) @@ -604,7 +607,7 @@ func TestFilterEventsMatch(t *testing.T) { assert.Equal(t, 0, len(matched)) ed.subscription.messageFilter.groupFilter = nil - ed.subscription.messageFilter.topicsFilter = nil + ed.subscription.topicFilter = nil ed.subscription.messageFilter.tagFilter = nil ed.subscription.messageFilter.authorFilter = regexp.MustCompile("org2") matched = ed.filterEvents(events) diff --git a/internal/events/event_manager.go b/internal/events/event_manager.go index de46c3b4fe..8d53a0b49a 100644 --- a/internal/events/event_manager.go +++ b/internal/events/event_manager.go @@ -21,6 +21,7 @@ import ( "context" "encoding/json" "strconv" + "time" "github.com/hyperledger/firefly/internal/assets" "github.com/hyperledger/firefly/internal/broadcast" @@ -43,6 +44,7 @@ import ( "github.com/hyperledger/firefly/pkg/fftypes" "github.com/hyperledger/firefly/pkg/sharedstorage" "github.com/hyperledger/firefly/pkg/tokens" + "github.com/karlseguin/ccache" ) type EventManager interface { @@ -77,26 +79,28 @@ type EventManager interface { } type eventManager struct { - ctx context.Context - ni sysmessaging.LocalNodeInfo - sharedstorage sharedstorage.Plugin - database database.Plugin - txHelper txcommon.Helper - identity identity.Manager - definitions definitions.DefinitionHandlers - data data.Manager - subManager *subscriptionManager - retry retry.Retry - aggregator *aggregator - broadcast broadcast.Manager - messaging privatemessaging.Manager - assets assets.Manager - newEventNotifier *eventNotifier - newPinNotifier *eventNotifier - opCorrelationRetries int - defaultTransport string - internalEvents *system.Events - metrics metrics.Manager + ctx context.Context + ni sysmessaging.LocalNodeInfo + sharedstorage sharedstorage.Plugin + database database.Plugin + txHelper txcommon.Helper + identity identity.Manager + definitions definitions.DefinitionHandlers + data data.Manager + subManager *subscriptionManager + retry retry.Retry + aggregator *aggregator + broadcast broadcast.Manager + messaging privatemessaging.Manager + assets assets.Manager + newEventNotifier *eventNotifier + newPinNotifier *eventNotifier + opCorrelationRetries int + defaultTransport string + internalEvents *system.Events + metrics metrics.Manager + chainListenerCache *ccache.Cache + chainListenerCacheTTL time.Duration } func NewEventManager(ctx context.Context, ni sysmessaging.LocalNodeInfo, si sharedstorage.Plugin, di database.Plugin, bi blockchain.Plugin, im identity.Manager, dh definitions.DefinitionHandlers, dm data.Manager, bm broadcast.Manager, pm privatemessaging.Manager, am assets.Manager, mm metrics.Manager, txHelper txcommon.Helper) (EventManager, error) { @@ -122,12 +126,14 @@ func NewEventManager(ctx context.Context, ni sysmessaging.LocalNodeInfo, si shar MaximumDelay: config.GetDuration(config.EventAggregatorRetryMaxDelay), Factor: config.GetFloat64(config.EventAggregatorRetryFactor), }, - defaultTransport: config.GetString(config.EventTransportsDefault), - opCorrelationRetries: config.GetInt(config.EventAggregatorOpCorrelationRetries), - newEventNotifier: newEventNotifier, - newPinNotifier: newPinNotifier, - aggregator: newAggregator(ctx, di, bi, dh, im, dm, newPinNotifier, mm), - metrics: mm, + defaultTransport: config.GetString(config.EventTransportsDefault), + opCorrelationRetries: config.GetInt(config.EventAggregatorOpCorrelationRetries), + newEventNotifier: newEventNotifier, + newPinNotifier: newPinNotifier, + aggregator: newAggregator(ctx, di, bi, dh, im, dm, newPinNotifier, mm), + metrics: mm, + chainListenerCache: ccache.New(ccache.Configure().MaxSize(config.GetByteSize(config.EventListenerTopicCacheSize))), + chainListenerCacheTTL: config.GetDuration(config.EventListenerTopicCacheTTL), } ie, _ := eifactory.GetPlugin(ctx, system.SystemEventsTransport) em.internalEvents = ie.(*system.Events) diff --git a/internal/events/event_poller_test.go b/internal/events/event_poller_test.go index 7d0bb91cb6..e5362ef46c 100644 --- a/internal/events/event_poller_test.go +++ b/internal/events/event_poller_test.go @@ -207,7 +207,7 @@ func TestReadPageSingleCommitEvent(t *testing.T) { return false, nil }, nil) cancel() - ev1 := fftypes.NewEvent(fftypes.EventTypeMessageConfirmed, "ns1", fftypes.NewUUID(), nil) + ev1 := fftypes.NewEvent(fftypes.EventTypeMessageConfirmed, "ns1", fftypes.NewUUID(), nil, "") mdi.On("GetEvents", mock.Anything, mock.Anything).Return([]*fftypes.Event{ev1}, nil, nil).Once() mdi.On("GetEvents", mock.Anything, mock.Anything).Return([]*fftypes.Event{}, nil, nil) ep.eventLoop() @@ -227,7 +227,7 @@ func TestReadPageRewind(t *testing.T) { return true, 12345 }) cancel() - ev1 := fftypes.NewEvent(fftypes.EventTypeMessageConfirmed, "ns1", fftypes.NewUUID(), nil) + ev1 := fftypes.NewEvent(fftypes.EventTypeMessageConfirmed, "ns1", fftypes.NewUUID(), nil, "") mdi.On("GetEvents", mock.Anything, mock.MatchedBy(func(filter database.Filter) bool { f, err := filter.Finalize() assert.NoError(t, err) @@ -248,7 +248,7 @@ func TestReadPageProcessEventsRetryExit(t *testing.T) { mdi := &databasemocks.Plugin{} ep, cancel := newTestEventPoller(t, mdi, func(events []fftypes.LocallySequenced) (bool, error) { return false, fmt.Errorf("pop") }, nil) cancel() - ev1 := fftypes.NewEvent(fftypes.EventTypeMessageConfirmed, "ns1", fftypes.NewUUID(), nil) + ev1 := fftypes.NewEvent(fftypes.EventTypeMessageConfirmed, "ns1", fftypes.NewUUID(), nil, "") mdi.On("GetEvents", mock.Anything, mock.Anything).Return([]*fftypes.Event{ev1}, nil, nil).Once() ep.eventLoop() @@ -262,7 +262,7 @@ func TestProcessEventsFail(t *testing.T) { }, nil) defer cancel() _, err := ep.conf.newEventsHandler([]fftypes.LocallySequenced{ - fftypes.NewEvent(fftypes.EventTypeMessageConfirmed, "ns1", fftypes.NewUUID(), nil), + fftypes.NewEvent(fftypes.EventTypeMessageConfirmed, "ns1", fftypes.NewUUID(), nil, ""), }) assert.EqualError(t, err, "pop") mdi.AssertExpectations(t) diff --git a/internal/events/operation_update.go b/internal/events/operation_update.go index 0fb7af2fab..834bacc93b 100644 --- a/internal/events/operation_update.go +++ b/internal/events/operation_update.go @@ -37,8 +37,12 @@ func (em *eventManager) operationUpdateCtx(ctx context.Context, operationID *fft // Special handling for OpTypeTokenTransfer, which writes an event when it fails if op.Type == fftypes.OpTypeTokenTransfer && txState == fftypes.OpStatusFailed { - event := fftypes.NewEvent(fftypes.EventTypeTransferOpFailed, op.Namespace, op.ID, op.Transaction) tokenTransfer, err := txcommon.RetrieveTokenTransferInputs(ctx, op) + topic := "" + if tokenTransfer != nil { + topic = tokenTransfer.Pool.String() + } + event := fftypes.NewEvent(fftypes.EventTypeTransferOpFailed, op.Namespace, op.ID, op.Transaction, topic) if err != nil || tokenTransfer.LocalID == nil || tokenTransfer.Type == "" { log.L(em.ctx).Warnf("Could not parse token transfer: %s", err) } else { @@ -54,8 +58,12 @@ func (em *eventManager) operationUpdateCtx(ctx context.Context, operationID *fft // Special handling for OpTypeTokenApproval, which writes an event when it fails if op.Type == fftypes.OpTypeTokenApproval && txState == fftypes.OpStatusFailed { - event := fftypes.NewEvent(fftypes.EventTypeApprovalOpFailed, op.Namespace, op.ID, op.Transaction) tokenApproval, err := txcommon.RetrieveTokenApprovalInputs(ctx, op) + topic := "" + if tokenApproval != nil { + topic = tokenApproval.Pool.String() + } + event := fftypes.NewEvent(fftypes.EventTypeApprovalOpFailed, op.Namespace, op.ID, op.Transaction, topic) if err != nil || tokenApproval.LocalID == nil { log.L(em.ctx).Warnf("Could not parse token approval: %s", err) } else { diff --git a/internal/events/subscription_manager.go b/internal/events/subscription_manager.go index b80a4931dc..9c28dc4add 100644 --- a/internal/events/subscription_manager.go +++ b/internal/events/subscription_manager.go @@ -43,12 +43,12 @@ type subscription struct { messageFilter *messageFilter blockchainFilter *blockchainFilter transactionFilter *transactionFilter + topicFilter *regexp.Regexp } type messageFilter struct { groupFilter *regexp.Regexp tagFilter *regexp.Regexp - topicsFilter *regexp.Regexp authorFilter *regexp.Regexp } @@ -326,20 +326,20 @@ func (sm *subscriptionManager) parseSubscriptionDef(ctx context.Context, subDef } } - var topicsFilter *regexp.Regexp + var topicFilter *regexp.Regexp if filter.DeprecatedTopics != "" { - log.L(ctx).Warnf("Your subscription filter uses the deprecated 'topics' key - please change to 'message.topics' instead") + log.L(ctx).Warnf("Your subscription filter uses the deprecated 'topics' key - please change to 'topic' instead") // set to topics filter, will be overwritten by non-deprecated key if both are present - topicsFilter, err = regexp.Compile(filter.DeprecatedTopics) + topicFilter, err = regexp.Compile(filter.DeprecatedTopics) if err != nil { return nil, i18n.WrapError(ctx, err, i18n.MsgRegexpCompileFailed, "filter.topics", filter.DeprecatedTopics) } } - if filter.Message.Topics != "" { - topicsFilter, err = regexp.Compile(filter.Message.Topics) + if filter.Topic != "" { + topicFilter, err = regexp.Compile(filter.Topic) if err != nil { - return nil, i18n.WrapError(ctx, err, i18n.MsgRegexpCompileFailed, "filter.message.topics", filter.Message.Topics) + return nil, i18n.WrapError(ctx, err, i18n.MsgRegexpCompileFailed, "filter.topic", filter.Topic) } } @@ -364,10 +364,10 @@ func (sm *subscriptionManager) parseSubscriptionDef(ctx context.Context, subDef dispatcherElection: make(chan bool, 1), definition: subDef, eventMatcher: eventFilter, + topicFilter: topicFilter, messageFilter: &messageFilter{ tagFilter: tagFilter, groupFilter: groupFilter, - topicsFilter: topicsFilter, authorFilter: authorFilter, }, } diff --git a/internal/events/subscription_manager_test.go b/internal/events/subscription_manager_test.go index 0fd6d6331c..3147732bb4 100644 --- a/internal/events/subscription_manager_test.go +++ b/internal/events/subscription_manager_test.go @@ -155,7 +155,7 @@ func TestRegisterEphemeralSubscriptionsFail(t *testing.T) { err = be.EphemeralSubscription("conn1", "ns1", &fftypes.SubscriptionFilter{ Message: fftypes.MessageFilter{ - Topics: "[[[[[ !wrong", + Author: "[[[[[ !wrong", }, }, &fftypes.SubscriptionOptions{}) assert.Regexp(t, "FF10171", err) @@ -226,9 +226,9 @@ func TestStartSubRestoreOkSubsOK(t *testing.T) { ID: fftypes.NewUUID(), }, Filter: fftypes.SubscriptionFilter{ + Topic: ".*", Events: ".*", Message: fftypes.MessageFilter{ - Topics: ".*", Tag: ".*", Group: ".*", Author: ".*", @@ -290,31 +290,13 @@ func TestCreateSubscriptionBadTopicFilter(t *testing.T) { mei.On("ValidateOptions", mock.Anything).Return(nil) _, err := sm.parseSubscriptionDef(sm.ctx, &fftypes.Subscription{ Filter: fftypes.SubscriptionFilter{ - Message: fftypes.MessageFilter{ - Topics: "[[[[! badness", - }, + Topic: "[[[[! badness", }, Transport: "ut", }) assert.Regexp(t, "FF10171.*topic", err) } -func TestCreateSubscriptionBadContextFilter(t *testing.T) { - mei := &eventsmocks.PluginAll{} - sm, cancel := newTestSubManager(t, mei) - defer cancel() - mei.On("ValidateOptions", mock.Anything).Return(nil) - _, err := sm.parseSubscriptionDef(sm.ctx, &fftypes.Subscription{ - Filter: fftypes.SubscriptionFilter{ - Message: fftypes.MessageFilter{ - Tag: "[[[[! badness", - }, - }, - Transport: "ut", - }) - assert.Regexp(t, "FF10171.*tag", err) -} - func TestCreateSubscriptionBadGroupFilter(t *testing.T) { mei := &eventsmocks.PluginAll{} sm, cancel := newTestSubManager(t, mei) @@ -407,6 +389,22 @@ func TestCreateSubscriptionBadDeprecatedTagFilter(t *testing.T) { assert.Regexp(t, "FF10171.*tag", err) } +func TestCreateSubscriptionBadMessageTagFilter(t *testing.T) { + mei := &eventsmocks.PluginAll{} + sm, cancel := newTestSubManager(t, mei) + defer cancel() + mei.On("ValidateOptions", mock.Anything).Return(nil) + _, err := sm.parseSubscriptionDef(sm.ctx, &fftypes.Subscription{ + Filter: fftypes.SubscriptionFilter{ + Message: fftypes.MessageFilter{ + Tag: "[[[[! badness", + }, + }, + Transport: "ut", + }) + assert.Regexp(t, "FF10171.*message.tag", err) +} + func TestCreateSubscriptionBadDeprecatedAuthorFilter(t *testing.T) { mei := &eventsmocks.PluginAll{} sm, cancel := newTestSubManager(t, mei) @@ -506,6 +504,7 @@ func TestCreateSubscriptionWithDeprecatedFilters(t *testing.T) { mei.On("ValidateOptions", mock.Anything).Return(nil) _, err := sm.parseSubscriptionDef(sm.ctx, &fftypes.Subscription{ Filter: fftypes.SubscriptionFilter{ + Topic: "flop", DeprecatedTopics: "test", DeprecatedTag: "flap", DeprecatedAuthor: "flip", diff --git a/internal/events/token_pool_created.go b/internal/events/token_pool_created.go index b1eb76d542..ac46f9b811 100644 --- a/internal/events/token_pool_created.go +++ b/internal/events/token_pool_created.go @@ -68,7 +68,7 @@ func (em *eventManager) confirmPool(ctx context.Context, pool *fftypes.TokenPool return err } log.L(ctx).Infof("Token pool confirmed, id=%s", pool.ID) - event := fftypes.NewEvent(fftypes.EventTypePoolConfirmed, pool.Namespace, pool.ID, pool.TX.ID) + event := fftypes.NewEvent(fftypes.EventTypePoolConfirmed, pool.Namespace, pool.ID, pool.TX.ID, pool.ID.String()) return em.database.InsertEvent(ctx, event) } diff --git a/internal/events/tokens_approved.go b/internal/events/tokens_approved.go index a6bccb029d..fb9960e11c 100644 --- a/internal/events/tokens_approved.go +++ b/internal/events/tokens_approved.go @@ -104,7 +104,7 @@ func (em *eventManager) TokensApproved(ti tokens.Plugin, approval *tokens.TokenA return err } - event := fftypes.NewEvent(fftypes.EventTypeApprovalConfirmed, approval.Namespace, approval.LocalID, approval.TX.ID) + event := fftypes.NewEvent(fftypes.EventTypeApprovalConfirmed, approval.Namespace, approval.LocalID, approval.TX.ID, approval.Pool.String()) return em.database.InsertEvent(ctx, event) }) return err != nil, err // retry indefinitely (until context closes) diff --git a/internal/events/tokens_transferred.go b/internal/events/tokens_transferred.go index a7444e2a1c..89e9999f7c 100644 --- a/internal/events/tokens_transferred.go +++ b/internal/events/tokens_transferred.go @@ -145,7 +145,7 @@ func (em *eventManager) TokensTransferred(ti tokens.Plugin, transfer *tokens.Tok } } - event := fftypes.NewEvent(fftypes.EventTypeTransferConfirmed, transfer.Namespace, transfer.LocalID, transfer.TX.ID) + event := fftypes.NewEvent(fftypes.EventTypeTransferConfirmed, transfer.Namespace, transfer.LocalID, transfer.TX.ID, transfer.Pool.String()) return em.database.InsertEvent(ctx, event) }) return err != nil, err // retry indefinitely (until context closes) diff --git a/internal/txcommon/txcommon.go b/internal/txcommon/txcommon.go index 592b3f6874..bf593364fb 100644 --- a/internal/txcommon/txcommon.go +++ b/internal/txcommon/txcommon.go @@ -88,7 +88,7 @@ func (t *transactionHelper) SubmitNewTransaction(ctx context.Context, ns string, return nil, err } - if err := t.database.InsertEvent(ctx, fftypes.NewEvent(fftypes.EventTypeTransactionSubmitted, tx.Namespace, tx.ID, tx.ID)); err != nil { + if err := t.database.InsertEvent(ctx, fftypes.NewEvent(fftypes.EventTypeTransactionSubmitted, tx.Namespace, tx.ID, tx.ID, tx.Type.String())); err != nil { return nil, err } diff --git a/pkg/database/plugin.go b/pkg/database/plugin.go index 6c64c79814..9bbcb109da 100644 --- a/pkg/database/plugin.go +++ b/pkg/database/plugin.go @@ -784,16 +784,14 @@ var OperationQueryFactory = &queryFields{ // SubscriptionQueryFactory filter fields for data subscriptions var SubscriptionQueryFactory = &queryFields{ - "id": &UUIDField{}, - "namespace": &StringField{}, - "name": &StringField{}, - "transport": &StringField{}, - "events": &StringField{}, - "filter.topics": &StringField{}, - "filter.tag": &StringField{}, - "filter.group": &StringField{}, - "options": &StringField{}, - "created": &TimeField{}, + "id": &UUIDField{}, + "namespace": &StringField{}, + "name": &StringField{}, + "transport": &StringField{}, + "events": &StringField{}, + "filters": &JSONField{}, + "options": &StringField{}, + "created": &TimeField{}, } // EventQueryFactory filter fields for data events @@ -804,6 +802,7 @@ var EventQueryFactory = &queryFields{ "reference": &UUIDField{}, "correlator": &UUIDField{}, "tx": &UUIDField{}, + "topic": &StringField{}, "sequence": &Int64Field{}, "created": &TimeField{}, } diff --git a/pkg/fftypes/constants.go b/pkg/fftypes/constants.go index b15d29f6c5..e1c75789bd 100644 --- a/pkg/fftypes/constants.go +++ b/pkg/fftypes/constants.go @@ -22,6 +22,13 @@ const ( SystemNamespace = "ff_system" ) +const ( + // SystemTopicDefinitions is the FireFly event topic for events that are confirmations of definition of pre-defined datatypes + SystemTopicDefinitions = "ff_definition" + // SystemBatchPinTopic is the FireFly event topic for events from the FireFly batch pin listener + SystemBatchPinTopic = "ff_batch_pin" +) + const ( // SystemTagDefineDatatype is the tag for messages that broadcast data definitions diff --git a/pkg/fftypes/contract_listener.go b/pkg/fftypes/contract_listener.go index b8f577abf4..a2b23e0cbf 100644 --- a/pkg/fftypes/contract_listener.go +++ b/pkg/fftypes/contract_listener.go @@ -33,6 +33,7 @@ type ContractListener struct { Location *JSONAny `json:"location,omitempty"` Created *FFTime `json:"created,omitempty"` Event *FFISerializedEvent `json:"event,omitempty"` + Topic string `json:"topic,omitempty"` Options *ContractListenerOptions `json:"options,omitempty"` } diff --git a/pkg/fftypes/contracts.go b/pkg/fftypes/contracts.go index 0dcc1c9e7f..cad8b7d4b0 100644 --- a/pkg/fftypes/contracts.go +++ b/pkg/fftypes/contracts.go @@ -74,7 +74,7 @@ func (c *ContractAPI) Validate(ctx context.Context, existing bool) (err error) { } func (c *ContractAPI) Topic() string { - return namespaceTopic(c.Namespace) + return typeNamespaceNameTopicHash("contractapi", c.Namespace, c.Name) } func (c *ContractAPI) SetBroadcastMessage(msgID *UUID) { diff --git a/pkg/fftypes/contracts_test.go b/pkg/fftypes/contracts_test.go index 156cd75984..6b7742b1d9 100644 --- a/pkg/fftypes/contracts_test.go +++ b/pkg/fftypes/contracts_test.go @@ -52,7 +52,7 @@ func TestContractAPITopic(t *testing.T) { api := &ContractAPI{ Namespace: "ns1", } - assert.Equal(t, "ff_ns_ns1", api.Topic()) + assert.Equal(t, "4cccc66c1f0eebcf578f1e63b73a2047d4eb4c84c0a00c69b0e00c7490403d20", api.Topic()) } func TestContractAPISetBroadCastMessage(t *testing.T) { diff --git a/pkg/fftypes/datatype.go b/pkg/fftypes/datatype.go index 541a3c7173..d44bca7393 100644 --- a/pkg/fftypes/datatype.go +++ b/pkg/fftypes/datatype.go @@ -75,7 +75,7 @@ func (dt *Datatype) Validate(ctx context.Context, existing bool) (err error) { } func (dt *Datatype) Topic() string { - return namespaceTopic(dt.Namespace) + return typeNamespaceNameTopicHash("datatype", dt.Namespace, dt.Name) } func (dt *Datatype) SetBroadcastMessage(msgID *UUID) { diff --git a/pkg/fftypes/datatype_test.go b/pkg/fftypes/datatype_test.go index f325fcc8fd..db162b2760 100644 --- a/pkg/fftypes/datatype_test.go +++ b/pkg/fftypes/datatype_test.go @@ -75,7 +75,7 @@ func TestDatatypeValidation(t *testing.T) { assert.Regexp(t, "FF10201", dt.Validate(context.Background(), true)) var def Definition = dt - assert.Equal(t, "ff_ns_ok", def.Topic()) + assert.Equal(t, "8e23c0a7fa9ec15c68a662e0e502933facb3d249409efa2b4f89d479b9f990cb", def.Topic()) def.SetBroadcastMessage(NewUUID()) assert.NotNil(t, dt.Message) } diff --git a/pkg/fftypes/event.go b/pkg/fftypes/event.go index 3eb3a6915f..f6b8d63980 100644 --- a/pkg/fftypes/event.go +++ b/pkg/fftypes/event.go @@ -62,6 +62,7 @@ type Event struct { Reference *UUID `json:"reference"` Correlator *UUID `json:"correlator,omitempty"` Transaction *UUID `json:"tx,omitempty"` + Topic string `json:"topic,omitempty"` Created *FFTime `json:"created"` } @@ -90,13 +91,14 @@ type EventDeliveryResponse struct { Reply *MessageInOut `json:"reply,omitempty"` } -func NewEvent(t EventType, ns string, ref *UUID, tx *UUID) *Event { +func NewEvent(t EventType, ns string, ref *UUID, tx *UUID, topic string) *Event { return &Event{ ID: NewUUID(), Type: t, Namespace: ns, Reference: ref, Transaction: tx, + Topic: topic, Created: Now(), } } diff --git a/pkg/fftypes/event_test.go b/pkg/fftypes/event_test.go index 5bcbe5bde3..96c0db39f1 100644 --- a/pkg/fftypes/event_test.go +++ b/pkg/fftypes/event_test.go @@ -26,11 +26,12 @@ func TestNewEvent(t *testing.T) { ref := NewUUID() tx := NewUUID() - e := NewEvent(EventTypeMessageConfirmed, "ns1", ref, tx) + e := NewEvent(EventTypeMessageConfirmed, "ns1", ref, tx, "topic1") assert.Equal(t, EventTypeMessageConfirmed, e.Type) assert.Equal(t, "ns1", e.Namespace) assert.Equal(t, *ref, *e.Reference) assert.Equal(t, *tx, *e.Transaction) + assert.Equal(t, "topic1", e.Topic) e.Sequence = 12345 var ls LocallySequenced = e diff --git a/pkg/fftypes/ffi.go b/pkg/fftypes/ffi.go index 81357d2a50..4f8aacffea 100644 --- a/pkg/fftypes/ffi.go +++ b/pkg/fftypes/ffi.go @@ -102,7 +102,7 @@ func (f *FFI) Validate(ctx context.Context, existing bool) (err error) { } func (f *FFI) Topic() string { - return namespaceTopic(f.Namespace) + return typeNamespaceNameTopicHash("ffi", f.Namespace, f.Name) } func (f *FFI) SetBroadcastMessage(msgID *UUID) { diff --git a/pkg/fftypes/ffi_test.go b/pkg/fftypes/ffi_test.go index 27a8bd91f1..2d9f2de16f 100644 --- a/pkg/fftypes/ffi_test.go +++ b/pkg/fftypes/ffi_test.go @@ -138,7 +138,7 @@ func TestFFITopic(t *testing.T) { ffi := &FFI{ Namespace: "ns1", } - assert.Equal(t, "ff_ns_ns1", ffi.Topic()) + assert.Equal(t, "01a982a7251400a7ec64fccce6febee3942a56e37967fa2ba26d7d6f43523c82", ffi.Topic()) } func TestFFISetBroadCastMessage(t *testing.T) { diff --git a/pkg/fftypes/namespace.go b/pkg/fftypes/namespace.go index 75505c62fe..a4086697fd 100644 --- a/pkg/fftypes/namespace.go +++ b/pkg/fftypes/namespace.go @@ -18,7 +18,7 @@ package fftypes import ( "context" - "fmt" + "crypto/sha256" "github.com/hyperledger/firefly/internal/i18n" ) @@ -61,12 +61,18 @@ func (ns *Namespace) Validate(ctx context.Context, existing bool) (err error) { return nil } -func namespaceTopic(ns string) string { - return fmt.Sprintf("ff_ns_%s", ns) +func typeNamespaceNameTopicHash(objType string, ns string, name string) string { + // Topic generation function for ordering anything with a type, namespace and name. + // Means all messages racing for this name will be consistently ordered by all parties. + h := sha256.New() + h.Write([]byte(objType)) + h.Write([]byte(ns)) + h.Write([]byte(name)) + return HashResult(h).String() } func (ns *Namespace) Topic() string { - return namespaceTopic(ns.Name) + return typeNamespaceNameTopicHash("namespace", ns.Name, "") } func (ns *Namespace) SetBroadcastMessage(msgID *UUID) { diff --git a/pkg/fftypes/namespace_test.go b/pkg/fftypes/namespace_test.go index f8ab69fdc6..eb6815fad4 100644 --- a/pkg/fftypes/namespace_test.go +++ b/pkg/fftypes/namespace_test.go @@ -45,7 +45,7 @@ func TestNamespaceValidation(t *testing.T) { assert.Regexp(t, "FF10203", ns.Validate(context.Background(), true)) var nsDef Definition = ns - assert.Equal(t, "ff_ns_ok", nsDef.Topic()) + assert.Equal(t, "358de1708c312f6b9eb4c44e0d9811c6f69bf389871d38dd7501992b2c00b557", nsDef.Topic()) nsDef.SetBroadcastMessage(NewUUID()) assert.NotNil(t, ns.Message) diff --git a/pkg/fftypes/subscription.go b/pkg/fftypes/subscription.go index a56d046096..72d0ae7d32 100644 --- a/pkg/fftypes/subscription.go +++ b/pkg/fftypes/subscription.go @@ -31,6 +31,7 @@ type SubscriptionFilter struct { Message MessageFilter `json:"message,omitempty"` Transaction TransactionFilter `json:"transaction,omitempty"` BlockchainEvent BlockchainEventFilter `json:"blockchainevent,omitempty"` + Topic string `json:"topic,omitempty"` DeprecatedTopics string `json:"topics,omitempty"` DeprecatedTag string `json:"tag,omitempty"` DeprecatedGroup string `json:"group,omitempty"` @@ -41,7 +42,6 @@ func NewSubscriptionFilterFromQuery(query url.Values) SubscriptionFilter { return SubscriptionFilter{ Events: query.Get("filter.events"), Message: MessageFilter{ - Topics: query.Get("filter.message.topics"), Group: query.Get("filter.message.group"), Tag: query.Get("filter.message.tag"), Author: query.Get("filter.message.author"), @@ -53,15 +53,15 @@ func NewSubscriptionFilterFromQuery(query url.Values) SubscriptionFilter { Transaction: TransactionFilter{ Type: query.Get("filter.transaction.type"), }, - DeprecatedTopics: query.Get("filter.topics"), + Topic: query.Get("filter.topic"), DeprecatedTag: query.Get("filter.tag"), + DeprecatedTopics: query.Get("filter.topics"), DeprecatedGroup: query.Get("filter.group"), DeprecatedAuthor: query.Get("filter.author"), } } type MessageFilter struct { - Topics string `json:"topics,omitempty"` Tag string `json:"tag,omitempty"` Group string `json:"group,omitempty"` Author string `json:"author,omitempty"` diff --git a/pkg/fftypes/subscription_test.go b/pkg/fftypes/subscription_test.go index 8bf09148e9..604e4e3403 100644 --- a/pkg/fftypes/subscription_test.go +++ b/pkg/fftypes/subscription_test.go @@ -108,11 +108,12 @@ func TestSubscriptionUnMarshalFail(t *testing.T) { } func TestNewSubscriptionFilterFromQuery(t *testing.T) { - query, _ := url.ParseQuery("filter.events=message_confirmed&filter.message.topics=topic1&filter.blockchain.name=flapflip&filter.transaction.type=test&filter.group=deprecated") + query, _ := url.ParseQuery("filter.events=message_confirmed&filter.topic=topic1&filter.message.author=did:firefly:org/author1&filter.blockchain.name=flapflip&filter.transaction.type=test&filter.group=deprecated") expectedFilter := SubscriptionFilter{ Events: "message_confirmed", + Topic: "topic1", Message: MessageFilter{ - Topics: "topic1", + Author: "did:firefly:org/author1", }, BlockchainEvent: BlockchainEventFilter{ Name: "flapflip", diff --git a/pkg/fftypes/tokenpool.go b/pkg/fftypes/tokenpool.go index 48fb9d1cef..954f8d0a6c 100644 --- a/pkg/fftypes/tokenpool.go +++ b/pkg/fftypes/tokenpool.go @@ -74,7 +74,7 @@ func (t *TokenPool) Validate(ctx context.Context) (err error) { } func (t *TokenPoolAnnouncement) Topic() string { - return namespaceTopic(t.Pool.Namespace) + return typeNamespaceNameTopicHash("tokenpool", t.Pool.Namespace, t.Pool.Name) } func (t *TokenPoolAnnouncement) SetBroadcastMessage(msgID *UUID) { diff --git a/pkg/fftypes/tokenpool_test.go b/pkg/fftypes/tokenpool_test.go index d700327265..fb37aaf1c1 100644 --- a/pkg/fftypes/tokenpool_test.go +++ b/pkg/fftypes/tokenpool_test.go @@ -52,7 +52,7 @@ func TestTokenPoolDefinition(t *testing.T) { Name: "ok", } var def Definition = &TokenPoolAnnouncement{Pool: pool} - assert.Equal(t, "ff_ns_ok", def.Topic()) + assert.Equal(t, "73008386c5579b7015385528eb892f7773e13a20015c692f6b90b26e413fe8a4", def.Topic()) id := NewUUID() def.SetBroadcastMessage(id) diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index c3b731daf6..8425c73c26 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -85,13 +85,21 @@ func pollForUp(t *testing.T, client *resty.Client) { func validateReceivedMessages(ts *testState, client *resty.Client, topic string, msgType fftypes.MessageType, txtype fftypes.TransactionType, count int) (data fftypes.DataArray) { var group *fftypes.Bytes32 - messages := GetMessages(ts.t, client, ts.startTime, msgType, topic, 200) - for i, message := range messages { - ts.t.Logf("Message %d: %+v", i, *message) - if group != nil { - assert.Equal(ts.t, group.String(), message.Header.Group.String(), "All messages must be same group") + var messages []*fftypes.Message + events := GetMessageEvents(ts.t, client, ts.startTime, topic, 200) + for i, event := range events { + if event.Message != nil { + message := event.Message + ts.t.Logf("Message %d: %+v", i, *message) + if message.Header.Type != msgType { + continue + } + if group != nil { + assert.Equal(ts.t, group.String(), message.Header.Group.String(), "All messages must be same group") + } + group = message.Header.Group + messages = append(messages, message) } - group = message.Header.Group } assert.Equal(ts.t, count, len(messages)) diff --git a/test/e2e/restclient_test.go b/test/e2e/restclient_test.go index 1a7e03420b..c5210f5a31 100644 --- a/test/e2e/restclient_test.go +++ b/test/e2e/restclient_test.go @@ -90,6 +90,20 @@ func GetNamespaces(client *resty.Client) (*resty.Response, error) { Get(urlGetNamespaces) } +func GetMessageEvents(t *testing.T, client *resty.Client, startTime time.Time, topic string, expectedStatus int) (events []*fftypes.EnrichedEvent) { + path := urlGetEvents + resp, err := client.R(). + SetQueryParam("created", fmt.Sprintf(">%d", startTime.UnixNano())). + SetQueryParam("topic", topic). + SetQueryParam("sort", "sequence"). + SetQueryParam("fetchreferences", "true"). + SetResult(&events). + Get(path) + require.NoError(t, err) + require.Equal(t, expectedStatus, resp.StatusCode(), "GET %s [%d]: %s (count=%d)", path, resp.StatusCode(), resp.String(), len(events)) + return events +} + func GetMessages(t *testing.T, client *resty.Client, startTime time.Time, msgType fftypes.MessageType, topic string, expectedStatus int) (msgs []*fftypes.Message) { path := urlGetMessages resp, err := client.R().