Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
BEGIN;

DROP INDEX events_topic;

ALTER TABLE events DROP COLUMN topic;
ALTER TABLE contractlisteners DROP COLUMN topic;

COMMIT;
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
BEGIN;

ALTER TABLE events ADD COLUMN topic VARCHAR(64);
ALTER TABLE contractlisteners ADD COLUMN topic VARCHAR(64);

UPDATE events SET topic = '';
UPDATE contractlisteners SET topic = '';

ALTER TABLE events ALTER COLUMN topic SET NOT NULL;
ALTER TABLE contractlisteners ALTER COLUMN topic SET NOT NULL;

CREATE INDEX events_topic ON events(topic);

COMMIT;
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
DROP INDEX events_topic;

ALTER TABLE events DROP COLUMN topic;
ALTER TABLE contractlisteners DROP COLUMN topic;

Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
ALTER TABLE events ADD COLUMN topic VARCHAR(64);
ALTER TABLE contractlisteners ADD COLUMN topic VARCHAR(64);

UPDATE events SET topic = '';
UPDATE contractlisteners SET topic = '';

CREATE INDEX events_topic ON events(topic);
60 changes: 37 additions & 23 deletions docs/swagger/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2282,6 +2282,8 @@ paths:
type: object
protocolId:
type: string
topic:
type: string
type: object
description: Success
default:
Expand Down Expand Up @@ -2349,6 +2351,8 @@ paths:
type: object
protocolId:
type: string
topic:
type: string
type: object
responses:
"200":
Expand Down Expand Up @@ -2395,6 +2399,8 @@ paths:
type: object
protocolId:
type: string
topic:
type: string
type: object
description: Success
default:
Expand Down Expand Up @@ -2496,6 +2502,8 @@ paths:
type: object
protocolId:
type: string
topic:
type: string
type: object
description: Success
default:
Expand Down Expand Up @@ -3581,6 +3589,11 @@ paths:
name: sequence
schema:
type: string
- description: 'Data filter field. Prefixes supported: > >= < <= @ ^ ! !@ !^'
in: query
name: topic
schema:
type: string
- description: 'Data filter field. Prefixes supported: > >= < <= @ ^ ! !@ !^'
in: query
name: tx
Expand Down Expand Up @@ -3640,6 +3653,8 @@ paths:
sequence:
format: int64
type: integer
topic:
type: string
tx: {}
type:
enum:
Expand Down Expand Up @@ -3703,6 +3718,8 @@ paths:
sequence:
format: int64
type: integer
topic:
type: string
tx: {}
type:
enum:
Expand Down Expand Up @@ -4816,6 +4833,11 @@ paths:
name: sequence
schema:
type: string
- description: 'Data filter field. Prefixes supported: > >= < <= @ ^ ! !@ !^'
in: query
name: topic
schema:
type: string
- description: 'Data filter field. Prefixes supported: > >= < <= @ ^ ! !@ !^'
in: query
name: tx
Expand Down Expand Up @@ -4875,6 +4897,8 @@ paths:
sequence:
format: int64
type: integer
topic:
type: string
tx: {}
type:
enum:
Expand Down Expand Up @@ -5835,17 +5859,7 @@ paths:
type: string
- description: 'Data filter field. Prefixes supported: > >= < <= @ ^ ! !@ !^'
in: query
name: filter.group
schema:
type: string
- description: 'Data filter field. Prefixes supported: > >= < <= @ ^ ! !@ !^'
in: query
name: filter.tag
schema:
type: string
- description: 'Data filter field. Prefixes supported: > >= < <= @ ^ ! !@ !^'
in: query
name: filter.topics
name: filters
schema:
type: string
- description: 'Data filter field. Prefixes supported: > >= < <= @ ^ ! !@ !^'
Expand Down Expand Up @@ -5939,11 +5953,11 @@ paths:
type: string
tag:
type: string
topics:
type: string
type: object
tag:
type: string
topic:
type: string
topics:
type: string
transaction:
Expand Down Expand Up @@ -6021,11 +6035,11 @@ paths:
type: string
tag:
type: string
topics:
type: string
type: object
tag:
type: string
topic:
type: string
topics:
type: string
transaction:
Expand Down Expand Up @@ -6172,11 +6186,11 @@ paths:
type: string
tag:
type: string
topics:
type: string
type: object
tag:
type: string
topic:
type: string
topics:
type: string
transaction:
Expand Down Expand Up @@ -6254,11 +6268,11 @@ paths:
type: string
tag:
type: string
topics:
type: string
type: object
tag:
type: string
topic:
type: string
topics:
type: string
transaction:
Expand Down Expand Up @@ -6405,11 +6419,11 @@ paths:
type: string
tag:
type: string
topics:
type: string
type: object
tag:
type: string
topic:
type: string
topics:
type: string
transaction:
Expand Down Expand Up @@ -6525,11 +6539,11 @@ paths:
type: string
tag:
type: string
topics:
type: string
type: object
tag:
type: string
topic:
type: string
topics:
type: string
transaction:
Expand Down
11 changes: 7 additions & 4 deletions internal/batch/batch_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,10 +595,13 @@ func (bp *batchProcessor) markPayloadDispatched(state *DispatchState) error {
if bp.conf.txType == fftypes.TransactionTypeUnpinned {
for _, msg := range state.Payload.Messages {
// Emit a confirmation event locally immediately
event := fftypes.NewEvent(fftypes.EventTypeMessageConfirmed, state.Persisted.Namespace, msg.Header.ID, state.Persisted.TX.ID)
event.Correlator = msg.Header.CID
if err := bp.database.InsertEvent(ctx, event); err != nil {
return err
for _, topic := range msg.Header.Topics {
// One event per topic
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nothing inherently wrong with this, but it's one of those things that will need to be clearly release-noted. For any apps that use multiple topics, they'll suddenly get more events with copies of the same data - so it just reinforces the need for those apps to have idempotent data processing.

event := fftypes.NewEvent(fftypes.EventTypeMessageConfirmed, state.Persisted.Namespace, msg.Header.ID, state.Persisted.TX.ID, topic)
event.Correlator = msg.Header.CID
if err := bp.database.InsertEvent(ctx, event); err != nil {
return err
}
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions internal/batch/batch_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ func TestMarkMessageDispatchedUnpinnedOK(t *testing.T) {
for i := 0; i < 5; i++ {
msgid := fftypes.NewUUID()
bp.newWork <- &batchWork{
msg: &fftypes.Message{Header: fftypes.MessageHeader{ID: msgid}, Sequence: int64(1000 + i)},
msg: &fftypes.Message{Header: fftypes.MessageHeader{ID: msgid, Topics: fftypes.FFStringArray{"topic1"}}, Sequence: int64(1000 + i)},
}
}
}()
Expand Down Expand Up @@ -460,7 +460,7 @@ func TestDispatchWithPublicBlobUpdates(t *testing.T) {
// Dispatch the work
go func() {
bp.newWork <- &batchWork{
msg: &fftypes.Message{Header: fftypes.MessageHeader{ID: fftypes.NewUUID()}, Sequence: int64(1000)},
msg: &fftypes.Message{Header: fftypes.MessageHeader{ID: fftypes.NewUUID(), Topics: fftypes.FFStringArray{"topic1"}}, Sequence: int64(1000)},
data: fftypes.DataArray{
{ID: dataID, Blob: &fftypes.BlobRef{
Public: "public/ref",
Expand Down
8 changes: 7 additions & 1 deletion internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,10 @@ var (
EventDispatcherRetryMaxDelay = rootKey("event.dispatcher.retry.maxDelay")
// EventDBEventsBufferSize the size of the buffer of change events
EventDBEventsBufferSize = rootKey("event.dbevents.bufferSize")
// EventListenerTopicCacheSize cache size for blockchain listeners addresses
EventListenerTopicCacheSize = rootKey("event.listenerToipc.cache.size")
// EventListenerTopicCacheTTL cache time-to-live for private group addresses
EventListenerTopicCacheTTL = rootKey("event.listenerToipc.cache.ttl")
// GroupCacheSize cache size for private group addresses
GroupCacheSize = rootKey("group.cache.size")
// GroupCacheTTL cache time-to-live for private group addresses
Expand Down Expand Up @@ -340,6 +344,8 @@ func Reset() {
viper.SetDefault(string(EventDispatcherPollTimeout), "30s")
viper.SetDefault(string(EventTransportsEnabled), []string{"websockets", "webhooks"})
viper.SetDefault(string(EventTransportsDefault), "websockets")
viper.SetDefault(string(EventListenerTopicCacheSize), "100Kb")
viper.SetDefault(string(EventListenerTopicCacheTTL), "5m")
viper.SetDefault(string(GroupCacheSize), "1Mb")
viper.SetDefault(string(GroupCacheTTL), "1h")
viper.SetDefault(string(AdminEnabled), false)
Expand All @@ -354,7 +360,7 @@ func Reset() {
viper.SetDefault(string(MessageCacheSize), "50Mb")
viper.SetDefault(string(MessageCacheTTL), "5m")
viper.SetDefault(string(MessageWriterBatchMaxInserts), 200)
viper.SetDefault(string(MessageWriterBatchTimeout), "50ms")
viper.SetDefault(string(MessageWriterBatchTimeout), "10ms")
viper.SetDefault(string(MessageWriterCount), 5)
viper.SetDefault(string(NamespacesDefault), "default")
viper.SetDefault(string(NamespacesPredefined), fftypes.JSONObjectArray{{"name": "default", "description": "Default predefined namespace"}})
Expand Down
10 changes: 7 additions & 3 deletions internal/database/sqlcommon/contractlisteners_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ var (
"name",
"protocol_id",
"location",
"created",
"topic",
"options",
"created",
}
contractListenerFilterFieldMap = map[string]string{
"interface": "interface_id",
Expand Down Expand Up @@ -78,6 +79,7 @@ func (s *SQLCommon) UpsertContractListener(ctx context.Context, sub *fftypes.Con
Set("namespace", sub.Namespace).
Set("name", sub.Name).
Set("location", sub.Location).
Set("topic", sub.Topic).
Set("options", sub.Options).
Where(sq.Eq{"protocol_id": sub.ProtocolID}),
func() {
Expand All @@ -99,8 +101,9 @@ func (s *SQLCommon) UpsertContractListener(ctx context.Context, sub *fftypes.Con
sub.Name,
sub.ProtocolID,
sub.Location,
sub.Created,
sub.Topic,
sub.Options,
sub.Created,
),
func() {
s.callbacks.UUIDCollectionNSEvent(database.CollectionContractListeners, fftypes.ChangeEventTypeCreated, sub.Namespace, sub.ID)
Expand All @@ -125,8 +128,9 @@ func (s *SQLCommon) contractListenerResult(ctx context.Context, row *sql.Rows) (
&sub.Name,
&sub.ProtocolID,
&sub.Location,
&sub.Created,
&sub.Topic,
&sub.Options,
&sub.Created,
)
if err != nil {
return nil, i18n.WrapError(ctx, err, i18n.MsgDBReadErr, "contractlisteners")
Expand Down
6 changes: 5 additions & 1 deletion internal/database/sqlcommon/contractlisteners_sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ func TestContractListenerE2EWithDB(t *testing.T) {
Name: "sub1",
ProtocolID: "sb-123",
Location: fftypes.JSONAnyPtrBytes(locationJson),
Topic: "topic1",
Options: &fftypes.ContractListenerOptions{
FirstEvent: "0",
},
}

s.callbacks.On("UUIDCollectionNSEvent", database.CollectionContractListeners, fftypes.ChangeEventTypeCreated, "ns", sub.ID).Return()
Expand Down Expand Up @@ -227,7 +231,7 @@ func TestContractListenerDeleteFail(t *testing.T) {
s, mock := newMockProvider().init()
mock.ExpectBegin()
mock.ExpectQuery("SELECT .*").WillReturnRows(sqlmock.NewRows(contractListenerColumns).AddRow(
fftypes.NewUUID(), nil, []byte("{}"), "ns1", "sub1", "123", "{}", fftypes.Now(), nil),
fftypes.NewUUID(), nil, []byte("{}"), "ns1", "sub1", "123", "{}", "topic1", nil, fftypes.Now()),
)
mock.ExpectExec("DELETE .*").WillReturnError(fmt.Errorf("pop"))
err := s.DeleteContractListenerByID(context.Background(), fftypes.NewUUID())
Expand Down
5 changes: 4 additions & 1 deletion internal/database/sqlcommon/event_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ var (
"ref",
"cid",
"tx_id",
"topic",
"created",
}
eventFilterFieldMap = map[string]string{
Expand Down Expand Up @@ -76,13 +77,14 @@ func (s *SQLCommon) setEventInsertValues(query sq.InsertBuilder, event *fftypes.
event.Reference,
event.Correlator,
event.Transaction,
event.Topic,
event.Created,
)
}

func (s *SQLCommon) eventInserted(ctx context.Context, event *fftypes.Event) {
s.callbacks.OrderedUUIDCollectionNSEvent(database.CollectionEvents, fftypes.ChangeEventTypeCreated, event.Namespace, event.ID, event.Sequence)
log.L(ctx).Infof("Emitted %s event %s ref=%s (sequence=%d)", event.Type, event.ID, event.Reference, event.Sequence)
log.L(ctx).Infof("Emitted %s event %s for %s:%s (correlator=%v,topic=%s)", event.Type, event.ID, event.Namespace, event.Reference, event.Correlator, event.Topic)
}

func (s *SQLCommon) insertEventsPreCommit(ctx context.Context, tx *txWrapper, events []*fftypes.Event) (err error) {
Expand Down Expand Up @@ -134,6 +136,7 @@ func (s *SQLCommon) eventResult(ctx context.Context, row *sql.Rows) (*fftypes.Ev
&event.Reference,
&event.Correlator,
&event.Transaction,
&event.Topic,
&event.Created,
// Must be added to the list of columns in all selects
&event.Sequence,
Expand Down
1 change: 1 addition & 0 deletions internal/database/sqlcommon/event_sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func TestEventE2EWithDB(t *testing.T) {
Type: fftypes.EventTypeMessageConfirmed,
Reference: fftypes.NewUUID(),
Correlator: fftypes.NewUUID(),
Topic: "topic1",
Created: fftypes.Now(),
}

Expand Down
Loading