diff --git a/internal/database/postgres/postgres.go b/internal/database/postgres/postgres.go index 0277d13012..9af8c78e14 100644 --- a/internal/database/postgres/postgres.go +++ b/internal/database/postgres/postgres.go @@ -42,8 +42,8 @@ func (psql *Postgres) Init(ctx context.Context, config config.Section) error { return psql.SQLCommon.Init(ctx, psql, config, capabilities) } -func (psql *Postgres) SetHandler(handler database.Callbacks) { - psql.SQLCommon.SetHandler(handler) +func (psql *Postgres) SetHandler(namespace string, handler database.Callbacks) { + psql.SQLCommon.SetHandler(namespace, handler) } func (psql *Postgres) Name() string { diff --git a/internal/database/postgres/postgres_test.go b/internal/database/postgres/postgres_test.go index dea0124367..60a4f7de79 100644 --- a/internal/database/postgres/postgres_test.go +++ b/internal/database/postgres/postgres_test.go @@ -29,7 +29,7 @@ import ( func TestPostgresProvider(t *testing.T) { psql := &Postgres{} - psql.SetHandler(&databasemocks.Callbacks{}) + psql.SetHandler("ns", &databasemocks.Callbacks{}) config := config.RootSection("unittest") psql.InitConfig(config) config.Set(sqlcommon.SQLConfDatasourceURL, "!bad connection") diff --git a/internal/database/sqlcommon/contractapis_sql_test.go b/internal/database/sqlcommon/contractapis_sql_test.go index 154fb681a4..31ee8e6587 100644 --- a/internal/database/sqlcommon/contractapis_sql_test.go +++ b/internal/database/sqlcommon/contractapis_sql_test.go @@ -126,7 +126,7 @@ func TestContractAPIDBFailUpdate(t *testing.T) { func TestUpsertContractAPIIDMismatch(t *testing.T) { s, db := newMockProvider().init() callbacks := &databasemocks.Callbacks{} - s.SetHandler(callbacks) + s.SetHandler("ns1", callbacks) apiID := fftypes.NewUUID() api := &core.ContractAPI{ ID: apiID, diff --git a/internal/database/sqlcommon/provider_mock_test.go b/internal/database/sqlcommon/provider_mock_test.go index 91a6a27519..2331c64157 100644 --- a/internal/database/sqlcommon/provider_mock_test.go +++ b/internal/database/sqlcommon/provider_mock_test.go @@ -64,7 +64,7 @@ func newMockProvider() *mockProvider { // init is a convenience to init for tests that aren't testing init itself func (mp *mockProvider) init() (*mockProvider, sqlmock.Sqlmock) { _ = mp.Init(context.Background(), mp, mp.config, mp.capabilities) - mp.SetHandler(mp.callbacks) + mp.SetHandler(database.GlobalHandler, mp.callbacks) return mp, mp.mdb } diff --git a/internal/database/sqlcommon/provider_sqlitego_test.go b/internal/database/sqlcommon/provider_sqlitego_test.go index 1b6a643601..d57e7f02ad 100644 --- a/internal/database/sqlcommon/provider_sqlitego_test.go +++ b/internal/database/sqlcommon/provider_sqlitego_test.go @@ -65,7 +65,7 @@ func newSQLiteTestProvider(t *testing.T) (*sqliteGoTestProvider, func()) { err = tp.Init(context.Background(), tp, tp.config, tp.capabilities) assert.NoError(tp.t, err) - tp.SetHandler(tp.callbacks) + tp.SetHandler(database.GlobalHandler, tp.callbacks) return tp, func() { tp.Close() diff --git a/internal/database/sqlcommon/sqlcommon.go b/internal/database/sqlcommon/sqlcommon.go index 311748277f..6547968873 100644 --- a/internal/database/sqlcommon/sqlcommon.go +++ b/internal/database/sqlcommon/sqlcommon.go @@ -45,35 +45,47 @@ type SQLCommon struct { } type callbacks struct { - handlers []database.Callbacks + handlers map[string]database.Callbacks } func (cb *callbacks) OrderedUUIDCollectionNSEvent(resType database.OrderedUUIDCollectionNS, eventType core.ChangeEventType, ns string, id *fftypes.UUID, sequence int64) { - for _, cb := range cb.handlers { + if cb, ok := cb.handlers[ns]; ok { + cb.OrderedUUIDCollectionNSEvent(resType, eventType, ns, id, sequence) + } + if cb, ok := cb.handlers[database.GlobalHandler]; ok { cb.OrderedUUIDCollectionNSEvent(resType, eventType, ns, id, sequence) } } func (cb *callbacks) OrderedCollectionNSEvent(resType database.OrderedCollectionNS, eventType core.ChangeEventType, ns string, sequence int64) { - for _, cb := range cb.handlers { + if cb, ok := cb.handlers[ns]; ok { + cb.OrderedCollectionNSEvent(resType, eventType, ns, sequence) + } + if cb, ok := cb.handlers[database.GlobalHandler]; ok { cb.OrderedCollectionNSEvent(resType, eventType, ns, sequence) } } func (cb *callbacks) UUIDCollectionNSEvent(resType database.UUIDCollectionNS, eventType core.ChangeEventType, ns string, id *fftypes.UUID) { - for _, cb := range cb.handlers { + if cb, ok := cb.handlers[ns]; ok { + cb.UUIDCollectionNSEvent(resType, eventType, ns, id) + } + if cb, ok := cb.handlers[database.GlobalHandler]; ok { cb.UUIDCollectionNSEvent(resType, eventType, ns, id) } } func (cb *callbacks) UUIDCollectionEvent(resType database.UUIDCollection, eventType core.ChangeEventType, id *fftypes.UUID) { - for _, cb := range cb.handlers { + if cb, ok := cb.handlers[database.GlobalHandler]; ok { cb.UUIDCollectionEvent(resType, eventType, id) } } func (cb *callbacks) HashCollectionNSEvent(resType database.HashCollectionNS, eventType core.ChangeEventType, ns string, hash *fftypes.Bytes32) { - for _, cb := range cb.handlers { + if cb, ok := cb.handlers[ns]; ok { + cb.HashCollectionNSEvent(resType, eventType, ns, hash) + } + if cb, ok := cb.handlers[database.GlobalHandler]; ok { cb.HashCollectionNSEvent(resType, eventType, ns, hash) } } @@ -130,8 +142,11 @@ func (s *SQLCommon) Init(ctx context.Context, provider Provider, config config.S return nil } -func (s *SQLCommon) SetHandler(handler database.Callbacks) { - s.callbacks.handlers = append(s.callbacks.handlers, handler) +func (s *SQLCommon) SetHandler(namespace string, handler database.Callbacks) { + if s.callbacks.handlers == nil { + s.callbacks.handlers = make(map[string]database.Callbacks) + } + s.callbacks.handlers[namespace] = handler } func (s *SQLCommon) Capabilities() *database.Capabilities { return s.capabilities } diff --git a/internal/database/sqlcommon/sqlcommon_test.go b/internal/database/sqlcommon/sqlcommon_test.go index 27f3b0c54e..641e9a61b9 100644 --- a/internal/database/sqlcommon/sqlcommon_test.go +++ b/internal/database/sqlcommon/sqlcommon_test.go @@ -26,7 +26,9 @@ import ( sq "github.com/Masterminds/squirrel" "github.com/golang-migrate/migrate/v4" "github.com/hyperledger/firefly-common/pkg/config" + "github.com/hyperledger/firefly-common/pkg/fftypes" "github.com/hyperledger/firefly/mocks/databasemocks" + "github.com/hyperledger/firefly/pkg/core" "github.com/hyperledger/firefly/pkg/database" "github.com/stretchr/testify/assert" ) @@ -361,3 +363,26 @@ func TestInsertTxRowsIncompleteReturn(t *testing.T) { err = s.insertTxRows(ctx, "table1", tx, sb, nil, []int64{1, 2}, false) assert.Regexp(t, "FF10116", err) } + +func TestNamespaceCallbacks(t *testing.T) { + tcb := &databasemocks.Callbacks{} + cb := callbacks{ + handlers: map[string]database.Callbacks{ + "ns1": tcb, + }, + } + id := fftypes.NewUUID() + hash := fftypes.NewRandB32() + + tcb.On("OrderedUUIDCollectionNSEvent", database.CollectionMessages, core.ChangeEventTypeCreated, "ns1", id, int64(1)).Return() + tcb.On("OrderedCollectionNSEvent", database.CollectionPins, core.ChangeEventTypeCreated, "ns1", int64(1)).Return() + tcb.On("UUIDCollectionNSEvent", database.CollectionOperations, core.ChangeEventTypeCreated, "ns1", id).Return() + tcb.On("UUIDCollectionEvent", database.CollectionNamespaces, core.ChangeEventTypeCreated, id).Return() + tcb.On("HashCollectionNSEvent", database.CollectionGroups, core.ChangeEventTypeUpdated, "ns1", hash).Return() + + cb.OrderedUUIDCollectionNSEvent(database.CollectionMessages, core.ChangeEventTypeCreated, "ns1", id, 1) + cb.OrderedCollectionNSEvent(database.CollectionPins, core.ChangeEventTypeCreated, "ns1", 1) + cb.UUIDCollectionNSEvent(database.CollectionOperations, core.ChangeEventTypeCreated, "ns1", id) + cb.UUIDCollectionEvent(database.CollectionNamespaces, core.ChangeEventTypeCreated, id) + cb.HashCollectionNSEvent(database.CollectionGroups, core.ChangeEventTypeUpdated, "ns1", hash) +} diff --git a/internal/database/sqlcommon/tokenpool_sql_test.go b/internal/database/sqlcommon/tokenpool_sql_test.go index 140f52a6fc..b5f3849d37 100644 --- a/internal/database/sqlcommon/tokenpool_sql_test.go +++ b/internal/database/sqlcommon/tokenpool_sql_test.go @@ -176,7 +176,7 @@ func TestUpsertTokenPoolFailCommit(t *testing.T) { func TestUpsertTokenPoolUpdateIDMismatch(t *testing.T) { s, db := newMockProvider().init() callbacks := &databasemocks.Callbacks{} - s.SetHandler(callbacks) + s.SetHandler("ns1", callbacks) poolID := fftypes.NewUUID() pool := &core.TokenPool{ ID: poolID, diff --git a/internal/database/sqlite3/sqlite3.go b/internal/database/sqlite3/sqlite3.go index a8402cb90e..ecbbaa46aa 100644 --- a/internal/database/sqlite3/sqlite3.go +++ b/internal/database/sqlite3/sqlite3.go @@ -58,8 +58,8 @@ func (sqlite *SQLite3) Init(ctx context.Context, config config.Section) error { return sqlite.SQLCommon.Init(ctx, sqlite, config, capabilities) } -func (sqlite *SQLite3) SetHandler(handler database.Callbacks) { - sqlite.SQLCommon.SetHandler(handler) +func (sqlite *SQLite3) SetHandler(namespace string, handler database.Callbacks) { + sqlite.SQLCommon.SetHandler(namespace, handler) } func (sqlite *SQLite3) Name() string { diff --git a/internal/database/sqlite3/sqlite3_test.go b/internal/database/sqlite3/sqlite3_test.go index cac1659dab..aa43df616d 100644 --- a/internal/database/sqlite3/sqlite3_test.go +++ b/internal/database/sqlite3/sqlite3_test.go @@ -32,7 +32,7 @@ import ( func TestSQLite3GoProvider(t *testing.T) { sqlite := &SQLite3{} - sqlite.SetHandler(&databasemocks.Callbacks{}) + sqlite.SetHandler("ns", &databasemocks.Callbacks{}) config := config.RootSection("unittest") sqlite.InitConfig(config) config.Set(sqlcommon.SQLConfDatasourceURL, "!wrong://") diff --git a/internal/dataexchange/ffdx/dxevent.go b/internal/dataexchange/ffdx/dxevent.go index 8020e3c733..e23a1ab65b 100644 --- a/internal/dataexchange/ffdx/dxevent.go +++ b/internal/dataexchange/ffdx/dxevent.go @@ -17,7 +17,8 @@ package ffdx import ( - "strings" + "encoding/json" + "fmt" "github.com/hyperledger/firefly-common/pkg/fftypes" "github.com/hyperledger/firefly-common/pkg/i18n" @@ -92,6 +93,7 @@ func (e *dxEvent) TransferResult() *dataexchange.TransferResult { } func (h *FFDX) dispatchEvent(msg *wsEvent) { + var namespace string var err error e := &dxEvent{ffdx: h, id: msg.EventID, requestID: msg.RequestID} switch msg.Type { @@ -129,10 +131,21 @@ func (h *FFDX) dispatchEvent(msg *wsEvent) { }, } case messageReceived: - e.dxType = dataexchange.DXEventTypeMessageReceived - e.messageReceived = &dataexchange.MessageReceived{ - PeerID: msg.Sender, - Data: []byte(msg.Message), + // De-serialize the transport wrapper + var wrapper *core.TransportWrapper + err = json.Unmarshal([]byte(msg.Message), &wrapper) + switch { + case err != nil: + err = fmt.Errorf("invalid transmission from peer '%s': %s", msg.Sender, err) + case wrapper.Batch == nil: + err = fmt.Errorf("invalid transmission from peer '%s': nil batch", msg.Sender) + default: + namespace = wrapper.Batch.Namespace + e.dxType = dataexchange.DXEventTypeMessageReceived + e.messageReceived = &dataexchange.MessageReceived{ + PeerID: msg.Sender, + Transport: wrapper, + } } case blobFailed: e.dxType = dataexchange.DXEventTypeTransferResult @@ -161,14 +174,10 @@ func (h *FFDX) dispatchEvent(msg *wsEvent) { var hash *fftypes.Bytes32 hash, err = fftypes.ParseBytes32(h.ctx, msg.Hash) if err == nil { - var ns string - pathParts := strings.Split(msg.Path, "/") - if len(pathParts) >= 2 { - ns = pathParts[len(pathParts)-2] - } + _, namespace, _ = splitBlobPath(msg.Path) e.dxType = dataexchange.DXEventTypePrivateBlobReceived e.privateBlobReceived = &dataexchange.PrivateBlobReceived{ - Namespace: ns, + Namespace: namespace, PeerID: msg.Sender, Hash: *hash, Size: msg.Size, @@ -188,11 +197,15 @@ func (h *FFDX) dispatchEvent(msg *wsEvent) { default: err = i18n.NewError(h.ctx, coremsgs.MsgUnexpectedDXMessageType, msg.Type) } + // If we couldn't dispatch the event we received, we still ack it if err != nil { log.L(h.ctx).Warnf("Failed to dispatch DX event: %s", err) e.Ack() } else { - h.callbacks.DXEvent(e) + if namespace == "" && msg.RequestID != "" { + namespace, _, _ = core.ParseNamespacedOpID(h.ctx, msg.RequestID) + } + h.callbacks.DXEvent(h.ctx, namespace, e) } } diff --git a/internal/dataexchange/ffdx/ffdx.go b/internal/dataexchange/ffdx/ffdx.go index e4359d4e04..23c6abfd22 100644 --- a/internal/dataexchange/ffdx/ffdx.go +++ b/internal/dataexchange/ffdx/ffdx.go @@ -23,6 +23,7 @@ import ( "io" "net/http" "strconv" + "strings" "sync" "github.com/go-resty/resty/v2" @@ -50,15 +51,36 @@ type FFDX struct { } type callbacks struct { - handlers []dataexchange.Callbacks + handlers map[string]dataexchange.Callbacks } -func (cb *callbacks) DXEvent(event dataexchange.DXEvent) { - for _, cb := range cb.handlers { - cb.DXEvent(event) +func (cb *callbacks) DXEvent(ctx context.Context, namespace string, event dataexchange.DXEvent) { + if handler, ok := cb.handlers[namespace]; ok { + handler.DXEvent(event) + } else { + log.L(ctx).Errorf("unknown namespace on event '%s'", event.EventID()) + event.Ack() } } +func splitLast(s string, sep string) (string, string) { + split := strings.LastIndex(s, sep) + if split == -1 { + return "", s + } + return s[:split], s[split+1:] +} + +func splitBlobPath(path string) (prefix, namespace, id string) { + path, id = splitLast(path, "/") + path, namespace = splitLast(path, "/") + return path, namespace, id +} + +func joinBlobPath(namespace, id string) string { + return fmt.Sprintf("%s/%s", namespace, id) +} + const ( dxHTTPHeaderHash = "dx-hash" dxHTTPHeaderSize = "dx-size" @@ -121,7 +143,7 @@ func (h *FFDX) Name() string { func (h *FFDX) Init(ctx context.Context, config config.Section) (err error) { h.ctx = log.WithLogField(ctx, "dx", "https") h.ackChannel = make(chan *ack) - + h.callbacks.handlers = make(map[string]dataexchange.Callbacks) h.needsInit = config.GetBool(DataExchangeInitEnabled) if config.GetString(ffresty.HTTPConfigURL) == "" { @@ -148,8 +170,8 @@ func (h *FFDX) SetNodes(nodes []fftypes.JSONObject) { h.nodes = nodes } -func (h *FFDX) SetHandler(handler dataexchange.Callbacks) { - h.callbacks.handlers = append(h.callbacks.handlers, handler) +func (h *FFDX) SetHandler(namespace string, handler dataexchange.Callbacks) { + h.callbacks.handlers[namespace] = handler } func (h *FFDX) Start() error { @@ -227,7 +249,7 @@ func (h *FFDX) AddPeer(ctx context.Context, peer fftypes.JSONObject) (err error) } func (h *FFDX) UploadBlob(ctx context.Context, ns string, id fftypes.UUID, content io.Reader) (payloadRef string, hash *fftypes.Bytes32, size int64, err error) { - payloadRef = fmt.Sprintf("%s/%s", ns, &id) + payloadRef = joinBlobPath(ns, id.String()) var upload uploadBlob res, err := h.client.R().SetContext(ctx). SetFileReader("file", id.String(), content). diff --git a/internal/dataexchange/ffdx/ffdx_test.go b/internal/dataexchange/ffdx/ffdx_test.go index de41c97901..d511bd51b0 100644 --- a/internal/dataexchange/ffdx/ffdx_test.go +++ b/internal/dataexchange/ffdx/ffdx_test.go @@ -73,6 +73,38 @@ func newTestFFDX(t *testing.T, manifestEnabled bool) (h *FFDX, toServer, fromSer } } +func TestSplitBlobPath(t *testing.T) { + prefix, namespace, id := splitBlobPath("") + assert.Equal(t, "", prefix) + assert.Equal(t, "", namespace) + assert.Equal(t, "", id) + + prefix, namespace, id = splitBlobPath("123") + assert.Equal(t, "", prefix) + assert.Equal(t, "", namespace) + assert.Equal(t, "123", id) + + prefix, namespace, id = splitBlobPath("ns1/123") + assert.Equal(t, "", prefix) + assert.Equal(t, "ns1", namespace) + assert.Equal(t, "123", id) + + prefix, namespace, id = splitBlobPath("/ns1/123") + assert.Equal(t, "", prefix) + assert.Equal(t, "ns1", namespace) + assert.Equal(t, "123", id) + + prefix, namespace, id = splitBlobPath("/root/test/ns1/123") + assert.Equal(t, "/root/test", prefix) + assert.Equal(t, "ns1", namespace) + assert.Equal(t, "123", id) +} + +func TestJoinBlobPath(t *testing.T) { + path := joinBlobPath("ns1", "123") + assert.Equal(t, "ns1/123", path) +} + func TestInitBadURL(t *testing.T) { coreconfig.Reset() h := &FFDX{} @@ -429,11 +461,14 @@ func TestTransferBlobError(t *testing.T) { assert.Regexp(t, "FF10229", err) } -func TestEvents(t *testing.T) { +func TestBadEvents(t *testing.T) { h, toServer, fromServer, _, done := newTestFFDX(t, false) defer done() + mcb := &dataexchangemocks.Callbacks{} + h.SetHandler("ns1", mcb) + err := h.Start() assert.NoError(t, err) @@ -442,84 +477,118 @@ func TestEvents(t *testing.T) { msg := <-toServer assert.Equal(t, `{"action":"ack","id":"0"}`, string(msg)) + namespacedID := fmt.Sprintf("ns2:%s", fftypes.NewUUID()) + fromServer <- `{"id":"1","type":"message-failed","requestID":"` + namespacedID + `"}` + msg = <-toServer + assert.Equal(t, `{"action":"ack","id":"1"}`, string(msg)) + + fromServer <- `{"id":"2","type":"message-received","sender":"peer1","message":"bad!"}` + msg = <-toServer + assert.Equal(t, `{"action":"ack","id":"2"}`, string(msg)) + + fromServer <- `{"id":"3","type":"message-received","sender":"peer1","message":"{}"}` + msg = <-toServer + assert.Equal(t, `{"action":"ack","id":"3"}`, string(msg)) + + mcb.AssertExpectations(t) +} + +func TestMessageEvents(t *testing.T) { + + h, toServer, fromServer, _, done := newTestFFDX(t, false) + defer done() + mcb := &dataexchangemocks.Callbacks{} - h.SetHandler(mcb) + h.SetHandler("ns1", mcb) + + err := h.Start() + assert.NoError(t, err) + namespacedID1 := fmt.Sprintf("ns1:%s", fftypes.NewUUID()) mcb.On("DXEvent", mock.MatchedBy(func(ev dataexchange.DXEvent) bool { return ev.EventID() == "1" && - ev.NamespacedID() == "tx12345" && + ev.NamespacedID() == namespacedID1 && ev.Type() == dataexchange.DXEventTypeTransferResult && - ev.TransferResult().TrackingID == "tx12345" && + ev.TransferResult().TrackingID == namespacedID1 && ev.TransferResult().Status == core.OpStatusFailed && ev.TransferResult().Error == "pop" })).Run(acker()).Return(nil) - fromServer <- `{"id":"1","type":"message-failed","requestID":"tx12345","error":"pop"}` - msg = <-toServer + fromServer <- `{"id":"1","type":"message-failed","requestID":"` + namespacedID1 + `","error":"pop"}` + msg := <-toServer assert.Equal(t, `{"action":"ack","id":"1"}`, string(msg)) + namespacedID2 := fmt.Sprintf("ns1:%s", fftypes.NewUUID()) mcb.On("DXEvent", mock.MatchedBy(func(ev dataexchange.DXEvent) bool { return ev.EventID() == "2" && ev.Type() == dataexchange.DXEventTypeTransferResult && - ev.TransferResult().TrackingID == "tx12345" && + ev.TransferResult().TrackingID == namespacedID2 && ev.TransferResult().Status == core.OpStatusSucceeded })).Run(acker()).Return(nil) - fromServer <- `{"id":"2","type":"message-delivered","requestID":"tx12345"}` + fromServer <- `{"id":"2","type":"message-delivered","requestID":"` + namespacedID2 + `"}` msg = <-toServer assert.Equal(t, `{"action":"ack","id":"2"}`, string(msg)) + namespacedID3 := fmt.Sprintf("ns1:%s", fftypes.NewUUID()) mcb.On("DXEvent", mock.MatchedBy(func(ev dataexchange.DXEvent) bool { return ev.EventID() == "3" && ev.Type() == dataexchange.DXEventTypeTransferResult && - ev.TransferResult().TrackingID == "tx12345" && + ev.TransferResult().TrackingID == namespacedID3 && ev.TransferResult().Status == core.OpStatusSucceeded && ev.TransferResult().Manifest == `{"manifest":true}` && ev.TransferResult().Info.String() == `{"signatures":"and stuff"}` })).Run(acker()).Return(nil) - fromServer <- `{"id":"3","type":"message-acknowledged","requestID":"tx12345","info":{"signatures":"and stuff"},"manifest":"{\"manifest\":true}"}` + fromServer <- `{"id":"3","type":"message-acknowledged","requestID":"` + namespacedID3 + `","info":{"signatures":"and stuff"},"manifest":"{\"manifest\":true}"}` msg = <-toServer assert.Equal(t, `{"action":"ack","id":"3"}`, string(msg)) mcb.On("DXEvent", mock.MatchedBy(func(ev dataexchange.DXEvent) bool { return ev.EventID() == "4" && ev.Type() == dataexchange.DXEventTypeMessageReceived && - ev.MessageReceived().PeerID == "peer1" && - string(ev.MessageReceived().Data) == "message1" + ev.MessageReceived().PeerID == "peer1" })).Run(manifestAcker(`{"manifest":true}`)).Return(nil) - fromServer <- `{"id":"4","type":"message-received","sender":"peer1","message":"message1"}` + fromServer <- `{"id":"4","type":"message-received","sender":"peer1","message":"{\"batch\":{\"namespace\":\"ns1\"}}"}` msg = <-toServer assert.Equal(t, `{"action":"ack","id":"4","manifest":"{\"manifest\":true}"}`, string(msg)) + mcb.AssertExpectations(t) +} + +func TestBlobEvents(t *testing.T) { + + h, toServer, fromServer, _, done := newTestFFDX(t, false) + defer done() + + mcb := &dataexchangemocks.Callbacks{} + h.SetHandler("ns1", mcb) + + err := h.Start() + assert.NoError(t, err) + + namespacedID5 := fmt.Sprintf("ns1:%s", fftypes.NewUUID()) mcb.On("DXEvent", mock.MatchedBy(func(ev dataexchange.DXEvent) bool { return ev.EventID() == "5" && ev.Type() == dataexchange.DXEventTypeTransferResult && - ev.TransferResult().TrackingID == "tx12345" && + ev.TransferResult().TrackingID == namespacedID5 && ev.TransferResult().Status == core.OpStatusFailed && ev.TransferResult().Error == "pop" })).Run(acker()).Return(nil) - fromServer <- `{"id":"5","type":"blob-failed","requestID":"tx12345","error":"pop"}` - msg = <-toServer + fromServer <- `{"id":"5","type":"blob-failed","requestID":"` + namespacedID5 + `","error":"pop"}` + msg := <-toServer assert.Equal(t, `{"action":"ack","id":"5"}`, string(msg)) + namespacedID6 := fmt.Sprintf("ns1:%s", fftypes.NewUUID()) mcb.On("DXEvent", mock.MatchedBy(func(ev dataexchange.DXEvent) bool { return ev.EventID() == "6" && ev.Type() == dataexchange.DXEventTypeTransferResult && - ev.TransferResult().TrackingID == "tx12345" && + ev.TransferResult().TrackingID == namespacedID6 && ev.TransferResult().Status == core.OpStatusSucceeded && ev.TransferResult().Info.String() == `{"some":"details"}` })).Run(acker()).Return(nil) - fromServer <- `{"id":"6","type":"blob-delivered","requestID":"tx12345","info":{"some":"details"}}` + fromServer <- `{"id":"6","type":"blob-delivered","requestID":"` + namespacedID6 + `","info":{"some":"details"}}` msg = <-toServer assert.Equal(t, `{"action":"ack","id":"6"}`, string(msg)) - fromServer <- `{"id":"7","type":"blob-received","sender":"peer1","path":"ns1/! not a UUID"}` - msg = <-toServer - assert.Equal(t, `{"action":"ack","id":"7"}`, string(msg)) - u := fftypes.NewUUID() - fromServer <- fmt.Sprintf(`{"id":"8","type":"blob-received","sender":"peer1","path":"ns1/%s","hash":"!wrong","size":-1}`, u.String()) - msg = <-toServer - assert.Equal(t, `{"action":"ack","id":"8"}`, string(msg)) - hash := fftypes.NewRandB32() mcb.On("DXEvent", mock.MatchedBy(func(ev dataexchange.DXEvent) bool { return ev.EventID() == "9" && @@ -530,14 +599,15 @@ func TestEvents(t *testing.T) { msg = <-toServer assert.Equal(t, `{"action":"ack","id":"9"}`, string(msg)) + namespacedID10 := fmt.Sprintf("ns1:%s", fftypes.NewUUID()) mcb.On("DXEvent", mock.MatchedBy(func(ev dataexchange.DXEvent) bool { return ev.EventID() == "10" && ev.Type() == dataexchange.DXEventTypeTransferResult && - ev.TransferResult().TrackingID == "tx12345" && + ev.TransferResult().TrackingID == namespacedID10 && ev.TransferResult().Status == core.OpStatusSucceeded && ev.TransferResult().Info.String() == `{"signatures":"and stuff"}` })).Run(acker()).Return(nil) - fromServer <- `{"id":"10","type":"blob-acknowledged","requestID":"tx12345","info":{"signatures":"and stuff"},"manifest":"{\"manifest\":true}"}` + fromServer <- `{"id":"10","type":"blob-acknowledged","requestID":"` + namespacedID10 + `","info":{"signatures":"and stuff"},"manifest":"{\"manifest\":true}"}` msg = <-toServer assert.Equal(t, `{"action":"ack","id":"10"}`, string(msg)) @@ -558,23 +628,25 @@ func TestEventsWithManifest(t *testing.T) { assert.Equal(t, `{"action":"ack","id":"0"}`, string(msg)) mcb := &dataexchangemocks.Callbacks{} - h.SetHandler(mcb) + h.SetHandler("ns1", mcb) + namespacedID1 := fmt.Sprintf("ns1:%s", fftypes.NewUUID()) mcb.On("DXEvent", mock.MatchedBy(func(ev dataexchange.DXEvent) bool { return ev.EventID() == "1" && ev.Type() == dataexchange.DXEventTypeTransferResult && ev.TransferResult().Status == core.OpStatusPending })).Run(acker()).Return(nil) - fromServer <- `{"id":"1","type":"message-delivered","requestID":"tx12345"}` + fromServer <- `{"id":"1","type":"message-delivered","requestID":"` + namespacedID1 + `"}` msg = <-toServer assert.Equal(t, `{"action":"ack","id":"1"}`, string(msg)) + namespacedID2 := fmt.Sprintf("ns1:%s", fftypes.NewUUID()) mcb.On("DXEvent", mock.MatchedBy(func(ev dataexchange.DXEvent) bool { return ev.EventID() == "2" && ev.Type() == dataexchange.DXEventTypeTransferResult && ev.TransferResult().Status == core.OpStatusPending })).Run(acker()).Return(nil) - fromServer <- `{"id":"2","type":"blob-delivered","requestID":"tx12345"}` + fromServer <- `{"id":"2","type":"blob-delivered","requestID":"` + namespacedID2 + `"}` msg = <-toServer assert.Equal(t, `{"action":"ack","id":"2"}`, string(msg)) @@ -586,7 +658,7 @@ func TestEventLoopReceiveClosed(t *testing.T) { wsm := &wsmocks.WSClient{} h := &FFDX{ ctx: context.Background(), - callbacks: callbacks{handlers: []dataexchange.Callbacks{dxc}}, + callbacks: callbacks{handlers: map[string]dataexchange.Callbacks{"ns1": dxc}}, wsconn: wsm, } r := make(chan []byte) @@ -602,7 +674,7 @@ func TestEventLoopSendClosed(t *testing.T) { ctx, cancelCtx := context.WithCancel(context.Background()) h := &FFDX{ ctx: ctx, - callbacks: callbacks{handlers: []dataexchange.Callbacks{dxc}}, + callbacks: callbacks{handlers: map[string]dataexchange.Callbacks{"ns1": dxc}}, wsconn: wsm, ackChannel: make(chan *ack, 1), } @@ -623,7 +695,7 @@ func TestEventLoopClosedContext(t *testing.T) { cancel() h := &FFDX{ ctx: ctx, - callbacks: callbacks{handlers: []dataexchange.Callbacks{dxc}}, + callbacks: callbacks{handlers: map[string]dataexchange.Callbacks{"ns1": dxc}}, wsconn: wsm, } r := make(chan []byte, 1) diff --git a/internal/events/batch_pin_complete.go b/internal/events/batch_pin_complete.go index 404ff9d8ef..34de7a9eca 100644 --- a/internal/events/batch_pin_complete.go +++ b/internal/events/batch_pin_complete.go @@ -41,7 +41,7 @@ func (em *eventManager) BatchPinComplete(bi blockchain.Plugin, batchPin *blockch return nil // move on } if batchPin.Namespace != em.namespace { - log.L(em.ctx).Debugf("Ignoring BatchPin from different namespace '%s'", batchPin.Namespace) + log.L(em.ctx).Debugf("Ignoring batch pin from different namespace '%s'", batchPin.Namespace) return nil // move on } diff --git a/internal/events/dx_callbacks.go b/internal/events/dx_callbacks.go index c1668fc9e6..4c8ad85496 100644 --- a/internal/events/dx_callbacks.go +++ b/internal/events/dx_callbacks.go @@ -19,7 +19,6 @@ package events import ( "context" "database/sql/driver" - "encoding/json" "github.com/hyperledger/firefly-common/pkg/fftypes" "github.com/hyperledger/firefly-common/pkg/log" @@ -184,27 +183,15 @@ func (em *eventManager) messageReceived(dx dataexchange.Plugin, event dataexchan l := log.L(em.ctx) mr := event.MessageReceived() + l.Infof("Private batch received from %s peer '%s'", dx.Name(), mr.PeerID) - // De-serialize the transport wrapper - var wrapper *core.TransportWrapper - err := json.Unmarshal(mr.Data, &wrapper) - if err != nil { - l.Errorf("Invalid transmission from %s peer '%s': %s", dx.Name(), mr.PeerID, err) - event.AckWithManifest("") - return - } - if wrapper.Batch == nil { - l.Errorf("Invalid transmission: nil batch") + if mr.Transport.Batch.Namespace != em.namespace { + log.L(em.ctx).Debugf("Ignoring batch from different namespace '%s'", mr.Transport.Batch.Namespace) event.AckWithManifest("") return } - if wrapper.Batch.Namespace != em.namespace { - log.L(em.ctx).Debugf("Ignoring batch from different namespace '%s'", wrapper.Batch.Namespace) - return - } - l.Infof("Private batch received from %s peer '%s' (len=%d)", dx.Name(), mr.PeerID, len(mr.Data)) - manifestString, err := em.privateBatchReceived(mr.PeerID, wrapper.Batch, wrapper.Group) + manifestString, err := em.privateBatchReceived(mr.PeerID, mr.Transport.Batch, mr.Transport.Group) if err != nil { l.Warnf("Exited while persisting batch: %s", err) // We do NOT ack here as we broke out of the retry @@ -224,6 +211,7 @@ func (em *eventManager) privateBlobReceived(dx dataexchange.Plugin, event dataex } if br.Namespace != em.namespace { log.L(em.ctx).Debugf("Ignoring blob from different namespace '%s'", br.Namespace) + event.Ack() // Still confirm the event return } diff --git a/internal/events/dx_callbacks_test.go b/internal/events/dx_callbacks_test.go index e808b1af51..e797236c1e 100644 --- a/internal/events/dx_callbacks_test.go +++ b/internal/events/dx_callbacks_test.go @@ -18,7 +18,6 @@ package events import ( "context" - "encoding/json" "fmt" "strings" "testing" @@ -36,15 +35,15 @@ import ( "github.com/stretchr/testify/mock" ) -func sampleBatchTransfer(t *testing.T, txType core.TransactionType) (*core.Batch, []byte) { +func sampleBatchTransfer(t *testing.T, txType core.TransactionType) (*core.Batch, *core.TransportWrapper) { data := &core.Data{ID: fftypes.NewUUID(), Value: fftypes.JSONAnyPtr(`"test"`)} batch := sampleBatch(t, core.BatchTypePrivate, txType, core.DataArray{data}) - b, _ := json.Marshal(&core.TransportWrapper{ + b := &core.TransportWrapper{ Batch: batch, Group: &core.Group{ Hash: fftypes.NewRandB32(), }, - }) + } return batch, b } @@ -82,18 +81,18 @@ func newTestNode(name string, owner *core.Identity) *core.Identity { return identity } -func newMessageReceivedNoAck(peerID string, data []byte) *dataexchangemocks.DXEvent { +func newMessageReceivedNoAck(peerID string, transport *core.TransportWrapper) *dataexchangemocks.DXEvent { mde := &dataexchangemocks.DXEvent{} mde.On("MessageReceived").Return(&dataexchange.MessageReceived{ - PeerID: peerID, - Data: data, + PeerID: peerID, + Transport: transport, }) mde.On("Type").Return(dataexchange.DXEventTypeMessageReceived).Maybe() return mde } -func newMessageReceived(peerID string, data []byte, expectedManifest string) *dataexchangemocks.DXEvent { - mde := newMessageReceivedNoAck(peerID, data) +func newMessageReceived(peerID string, transport *core.TransportWrapper, expectedManifest string) *dataexchangemocks.DXEvent { + mde := newMessageReceivedNoAck(peerID, transport) mde.On("AckWithManifest", expectedManifest).Return() return mde } @@ -183,12 +182,12 @@ func TestMessageReceiveOkBadBatchIgnored(t *testing.T) { data := &core.Data{ID: fftypes.NewUUID(), Value: fftypes.JSONAnyPtr(`"test"`)} batch := sampleBatch(t, core.BatchTypePrivate, core.TransactionTypeBatchPin, core.DataArray{data}) batch.Payload.TX.Type = core.TransactionTypeTokenPool - b, _ := json.Marshal(&core.TransportWrapper{ + b := &core.TransportWrapper{ Batch: batch, Group: &core.Group{ Hash: fftypes.NewRandB32(), }, - }) + } org1 := newTestOrg("org1") node1 := newTestNode("node1", org1) @@ -238,65 +237,6 @@ func TestMessageReceivePersistBatchError(t *testing.T) { mim.AssertExpectations(t) } -func TestMessageReceivedBadData(t *testing.T) { - em, cancel := newTestEventManager(t) - defer cancel() - - mdx := &dataexchangemocks.Plugin{} - mdx.On("Name").Return("utdx") - - mde := newMessageReceived("peer1", []byte(`!{}`), "") - em.messageReceived(mdx, mde) - - mde.AssertExpectations(t) - -} - -func TestMessageReceivedUnknownType(t *testing.T) { - em, cancel := newTestEventManager(t) - defer cancel() - - mdx := &dataexchangemocks.Plugin{} - mdx.On("Name").Return("utdx") - - mde := newMessageReceived("peer1", []byte(`{ - "type": "unknown" - }`), "") - em.messageReceived(mdx, mde) - - mde.AssertExpectations(t) -} - -func TestMessageReceivedNilBatch(t *testing.T) { - em, cancel := newTestEventManager(t) - defer cancel() - - mdx := &dataexchangemocks.Plugin{} - mdx.On("Name").Return("utdx") - - mde := newMessageReceived("peer1", []byte(`{ - "type": "batch" - }`), "") - em.messageReceived(mdx, mde) - - mde.AssertExpectations(t) -} - -func TestMessageReceivedNilMessage(t *testing.T) { - em, cancel := newTestEventManager(t) - defer cancel() - - mdx := &dataexchangemocks.Plugin{} - mdx.On("Name").Return("utdx") - - mde := newMessageReceived("peer1", []byte(`{ - "type": "message" - }`), "") - em.messageReceived(mdx, mde) - - mde.AssertExpectations(t) -} - func TestMessageReceivedWrongNS(t *testing.T) { em, cancel := newTestEventManager(t) defer cancel() @@ -307,29 +247,13 @@ func TestMessageReceivedWrongNS(t *testing.T) { mdx := &dataexchangemocks.Plugin{} mdx.On("Name").Return("utdx") - mde := newMessageReceivedNoAck("peer1", b) + mde := newMessageReceived("peer1", b, "") em.messageReceived(mdx, mde) mde.AssertExpectations(t) } -func TestMessageReceivedNilGroup(t *testing.T) { - em, cancel := newTestEventManager(t) - defer cancel() - - mdx := &dataexchangemocks.Plugin{} - mdx.On("Name").Return("utdx") - - mde := newMessageReceived("peer1", []byte(`{ - "type": "message", - "message": {} - }`), "") - em.messageReceived(mdx, mde) - - mde.AssertExpectations(t) -} - func TestMessageReceiveNodeLookupError(t *testing.T) { em, cancel := newTestEventManager(t) cancel() // to stop retry @@ -339,9 +263,9 @@ func TestMessageReceiveNodeLookupError(t *testing.T) { Namespace: "ns1", }, } - b, _ := json.Marshal(&core.TransportWrapper{ + b := &core.TransportWrapper{ Batch: batch, - }) + } mim := em.identity.(*identitymanagermocks.Manager) mim.On("FindIdentityForVerifier", em.ctx, []core.IdentityType{core.IdentityTypeNode}, &core.VerifierRef{ @@ -523,8 +447,7 @@ func TestPrivateBlobReceivedWrongNS(t *testing.T) { mdx := &dataexchangemocks.Plugin{} mdx.On("Name").Return("utdx") - // no ack as we are simulating termination mid retry - mde := newPrivateBlobReceivedNoAck("peer1", hash, 12345, "ns1/path1") + mde := newPrivateBlobReceived("peer1", hash, 12345, "ns1/path1") em.privateBlobReceived(mdx, mde) mde.AssertExpectations(t) diff --git a/internal/identity/tbd/tbd.go b/internal/identity/tbd/tbd.go index 1fb5aac022..716b6eb19b 100644 --- a/internal/identity/tbd/tbd.go +++ b/internal/identity/tbd/tbd.go @@ -37,7 +37,7 @@ func (tbd *TBD) Init(ctx context.Context, config config.Section) (err error) { return nil } -func (tbd *TBD) SetHandler(handler identity.Callbacks) { +func (tbd *TBD) SetHandler(namespace string, handler identity.Callbacks) { } func (tbd *TBD) Start() error { diff --git a/internal/namespace/manager.go b/internal/namespace/manager.go index 5081d157eb..e213f54edc 100644 --- a/internal/namespace/manager.go +++ b/internal/namespace/manager.go @@ -552,7 +552,7 @@ func (nm *namespaceManager) initPlugins(ctx context.Context) (err error) { if err = entry.plugin.Init(ctx, entry.config); err != nil { return err } - entry.plugin.SetHandler(nm) + entry.plugin.SetHandler(database.GlobalHandler, nm) } for _, entry := range nm.plugins.blockchain { if err = entry.plugin.Init(ctx, entry.config, nm.metrics); err != nil { diff --git a/internal/namespace/manager_test.go b/internal/namespace/manager_test.go index 60976473ed..0fbb292961 100644 --- a/internal/namespace/manager_test.go +++ b/internal/namespace/manager_test.go @@ -43,6 +43,7 @@ import ( "github.com/hyperledger/firefly/mocks/spieventsmocks" "github.com/hyperledger/firefly/mocks/tokenmocks" "github.com/hyperledger/firefly/pkg/core" + "github.com/hyperledger/firefly/pkg/database" "github.com/hyperledger/firefly/pkg/tokens" "github.com/spf13/viper" "github.com/stretchr/testify/assert" @@ -132,7 +133,7 @@ func TestInit(t *testing.T) { nm.utOrchestrator = mo nm.mdi.On("Init", mock.Anything, mock.Anything).Return(nil) - nm.mdi.On("SetHandler", mock.Anything).Return() + nm.mdi.On("SetHandler", database.GlobalHandler, mock.Anything).Return() nm.mbi.On("Init", mock.Anything, mock.Anything, nm.mmi).Return(nil) nm.mdx.On("Init", mock.Anything, mock.Anything).Return(nil) nm.mps.On("Init", mock.Anything, mock.Anything).Return(nil) @@ -170,7 +171,7 @@ func TestInitBlockchainFail(t *testing.T) { nm.utOrchestrator = &orchestratormocks.Orchestrator{} nm.mdi.On("Init", mock.Anything, mock.Anything).Return(nil) - nm.mdi.On("SetHandler", mock.Anything).Return() + nm.mdi.On("SetHandler", database.GlobalHandler, mock.Anything).Return() nm.mbi.On("Init", mock.Anything, mock.Anything, nm.mmi).Return(fmt.Errorf("pop")) ctx, cancelCtx := context.WithCancel(context.Background()) @@ -185,7 +186,7 @@ func TestInitDataExchangeFail(t *testing.T) { nm.utOrchestrator = &orchestratormocks.Orchestrator{} nm.mdi.On("Init", mock.Anything, mock.Anything).Return(nil) - nm.mdi.On("SetHandler", mock.Anything).Return() + nm.mdi.On("SetHandler", database.GlobalHandler, mock.Anything).Return() nm.mbi.On("Init", mock.Anything, mock.Anything, nm.mmi).Return(nil) nm.mdx.On("Init", mock.Anything, mock.Anything).Return(fmt.Errorf("pop")) @@ -201,7 +202,7 @@ func TestInitSharedStorageFail(t *testing.T) { nm.utOrchestrator = &orchestratormocks.Orchestrator{} nm.mdi.On("Init", mock.Anything, mock.Anything).Return(nil) - nm.mdi.On("SetHandler", mock.Anything).Return() + nm.mdi.On("SetHandler", database.GlobalHandler, mock.Anything).Return() nm.mbi.On("Init", mock.Anything, mock.Anything, nm.mmi).Return(nil) nm.mdx.On("Init", mock.Anything, mock.Anything).Return(nil) nm.mps.On("Init", mock.Anything, mock.Anything).Return(fmt.Errorf("pop")) @@ -218,7 +219,7 @@ func TestInitTokensFail(t *testing.T) { nm.utOrchestrator = &orchestratormocks.Orchestrator{} nm.mdi.On("Init", mock.Anything, mock.Anything).Return(nil) - nm.mdi.On("SetHandler", mock.Anything).Return() + nm.mdi.On("SetHandler", database.GlobalHandler, mock.Anything).Return() nm.mbi.On("Init", mock.Anything, mock.Anything, nm.mmi).Return(nil) nm.mdx.On("Init", mock.Anything, mock.Anything).Return(nil) nm.mps.On("Init", mock.Anything, mock.Anything).Return(nil) @@ -236,7 +237,7 @@ func TestInitEventsFail(t *testing.T) { nm.utOrchestrator = &orchestratormocks.Orchestrator{} nm.mdi.On("Init", mock.Anything, mock.Anything).Return(nil) - nm.mdi.On("SetHandler", mock.Anything).Return() + nm.mdi.On("SetHandler", database.GlobalHandler, mock.Anything).Return() nm.mbi.On("Init", mock.Anything, mock.Anything, nm.mmi).Return(nil) nm.mdx.On("Init", mock.Anything, mock.Anything).Return(nil) nm.mps.On("Init", mock.Anything, mock.Anything).Return(nil) @@ -253,12 +254,14 @@ func TestInitOrchestratorFail(t *testing.T) { defer nm.cleanup(t) nm.mdi.On("Init", mock.Anything, mock.Anything).Return(nil) - nm.mdi.On("SetHandler", mock.Anything).Return() + nm.mdi.On("SetHandler", database.GlobalHandler, mock.Anything).Return() + nm.mdi.On("SetHandler", "default", mock.Anything).Return() nm.mdi.On("GetIdentities", mock.Anything, "default", mock.Anything).Return(nil, nil, fmt.Errorf("pop")) nm.mbi.On("Init", mock.Anything, mock.Anything, nm.mmi).Return(nil) nm.mbi.On("SetHandler", mock.Anything).Return() nm.mdx.On("Init", mock.Anything, mock.Anything).Return(nil) nm.mps.On("Init", mock.Anything, mock.Anything).Return(nil) + nm.mps.On("SetHandler", "default", mock.Anything).Return() nm.mti.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil) nm.mev.On("Init", mock.Anything, mock.Anything).Return(nil) @@ -277,7 +280,7 @@ func TestInitVersion1(t *testing.T) { nm.utOrchestrator = mo nm.mdi.On("Init", mock.Anything, mock.Anything).Return(nil) - nm.mdi.On("SetHandler", mock.Anything).Return() + nm.mdi.On("SetHandler", database.GlobalHandler, mock.Anything).Return() nm.mbi.On("Init", mock.Anything, mock.Anything, nm.mmi).Return(nil) nm.mdx.On("Init", mock.Anything, mock.Anything).Return(nil) nm.mps.On("Init", mock.Anything, mock.Anything).Return(nil) @@ -306,7 +309,7 @@ func TestInitVersion1Fail(t *testing.T) { nm.utOrchestrator = mo nm.mdi.On("Init", mock.Anything, mock.Anything).Return(nil) - nm.mdi.On("SetHandler", mock.Anything).Return() + nm.mdi.On("SetHandler", database.GlobalHandler, mock.Anything).Return() nm.mbi.On("Init", mock.Anything, mock.Anything, nm.mmi).Return(nil) nm.mdx.On("Init", mock.Anything, mock.Anything).Return(nil) nm.mps.On("Init", mock.Anything, mock.Anything).Return(nil) @@ -701,7 +704,7 @@ func TestInitBadNamespace(t *testing.T) { nm.utOrchestrator = &orchestratormocks.Orchestrator{} nm.mdi.On("Init", mock.Anything, mock.Anything).Return(nil) - nm.mdi.On("SetHandler", mock.Anything).Return() + nm.mdi.On("SetHandler", database.GlobalHandler, mock.Anything).Return() nm.mbi.On("Init", mock.Anything, mock.Anything, nm.mmi).Return(nil) nm.mdx.On("Init", mock.Anything, mock.Anything).Return(nil) nm.mps.On("Init", mock.Anything, mock.Anything).Return(nil) diff --git a/internal/orchestrator/orchestrator.go b/internal/orchestrator/orchestrator.go index c535abd5b0..b9b7a6d02b 100644 --- a/internal/orchestrator/orchestrator.go +++ b/internal/orchestrator/orchestrator.go @@ -353,8 +353,9 @@ func (or *orchestrator) Operations() operations.Manager { } func (or *orchestrator) initPlugins(ctx context.Context) (err error) { - or.plugins.Database.Plugin.SetHandler(or) + or.plugins.Database.Plugin.SetHandler(or.namespace, or) or.plugins.Blockchain.Plugin.SetHandler(&or.bc) + or.plugins.SharedStorage.Plugin.SetHandler(or.namespace, &or.bc) fb := database.IdentityQueryFactory.NewFilter(ctx) nodes, _, err := or.database().GetIdentities(ctx, or.namespace, fb.And( @@ -368,9 +369,7 @@ func (or *orchestrator) initPlugins(ctx context.Context) (err error) { nodeInfo[i] = node.Profile } or.plugins.DataExchange.Plugin.SetNodes(nodeInfo) - or.plugins.DataExchange.Plugin.SetHandler(&or.bc) - - or.plugins.SharedStorage.Plugin.SetHandler(&or.bc) + or.plugins.DataExchange.Plugin.SetHandler(or.namespace, &or.bc) for _, token := range or.plugins.Tokens { if err := token.Plugin.SetHandler(or.namespace, &or.bc); err != nil { diff --git a/internal/orchestrator/orchestrator_test.go b/internal/orchestrator/orchestrator_test.go index f1dd359192..8edbf1ecbb 100644 --- a/internal/orchestrator/orchestrator_test.go +++ b/internal/orchestrator/orchestrator_test.go @@ -185,12 +185,12 @@ func TestNewOrchestrator(t *testing.T) { func TestInitOK(t *testing.T) { or := newTestOrchestrator() defer or.cleanup(t) - or.mdi.On("SetHandler", mock.Anything).Return() + or.mdi.On("SetHandler", "ns", mock.Anything).Return() or.mbi.On("SetHandler", mock.Anything).Return() or.mdi.On("GetIdentities", mock.Anything, "ns", mock.Anything).Return([]*core.Identity{{}}, nil, nil) - or.mdx.On("SetHandler", mock.Anything).Return() + or.mdx.On("SetHandler", "ns", mock.Anything).Return() or.mdx.On("SetNodes", mock.Anything).Return() - or.mps.On("SetHandler", mock.Anything).Return() + or.mps.On("SetHandler", "ns", mock.Anything).Return() or.mti.On("SetHandler", "ns", mock.Anything).Return(nil) err := or.Init(or.ctx, or.cancelCtx) assert.NoError(t, err) @@ -209,12 +209,12 @@ func TestInitOK(t *testing.T) { func TestInitTokenListenerFail(t *testing.T) { or := newTestOrchestrator() defer or.cleanup(t) - or.mdi.On("SetHandler", mock.Anything).Return() + or.mdi.On("SetHandler", "ns", mock.Anything).Return() or.mbi.On("SetHandler", mock.Anything).Return() or.mdi.On("GetIdentities", mock.Anything, "ns", mock.Anything).Return([]*core.Identity{{}}, nil, nil) - or.mdx.On("SetHandler", mock.Anything).Return() + or.mdx.On("SetHandler", "ns", mock.Anything).Return() or.mdx.On("SetNodes", mock.Anything).Return() - or.mps.On("SetHandler", mock.Anything).Return() + or.mps.On("SetHandler", "ns", mock.Anything).Return() or.mti.On("SetHandler", "ns", mock.Anything).Return(fmt.Errorf("pop")) err := or.Init(or.ctx, or.cancelCtx) assert.EqualError(t, err, "pop") @@ -223,8 +223,9 @@ func TestInitTokenListenerFail(t *testing.T) { func TestInitDataexchangeNodesFail(t *testing.T) { or := newTestOrchestrator() defer or.cleanup(t) - or.mdi.On("SetHandler", mock.Anything).Return() + or.mdi.On("SetHandler", "ns", mock.Anything).Return() or.mbi.On("SetHandler", mock.Anything).Return() + or.mps.On("SetHandler", "ns", mock.Anything).Return() or.mdi.On("GetIdentities", mock.Anything, "ns", mock.Anything).Return(nil, nil, fmt.Errorf("pop")) ctx := context.Background() err := or.initPlugins(ctx) diff --git a/internal/sharedstorage/ipfs/ipfs.go b/internal/sharedstorage/ipfs/ipfs.go index a7a994a9eb..c3ac2c2261 100644 --- a/internal/sharedstorage/ipfs/ipfs.go +++ b/internal/sharedstorage/ipfs/ipfs.go @@ -67,7 +67,7 @@ func (i *IPFS) Init(ctx context.Context, config config.Section) error { return nil } -func (i *IPFS) SetHandler(handler sharedstorage.Callbacks) { +func (i *IPFS) SetHandler(namespace string, handler sharedstorage.Callbacks) { } func (i *IPFS) Capabilities() *sharedstorage.Capabilities { diff --git a/internal/tokens/fftokens/fftokens.go b/internal/tokens/fftokens/fftokens.go index a068061cfb..ec423492ee 100644 --- a/internal/tokens/fftokens/fftokens.go +++ b/internal/tokens/fftokens/fftokens.go @@ -54,8 +54,7 @@ func (cb *callbacks) TokenOpUpdate(plugin tokens.Plugin, nsOpID string, txState func (cb *callbacks) TokenPoolCreated(namespace string, plugin tokens.Plugin, pool *tokens.TokenPool) error { if namespace == "" { - // Older token subscriptions don't populate namespace, so deliver the event to every listener and let them filter - // TODO: deprecate this path + // Older token subscriptions don't populate namespace, so deliver the event to every listener for _, cb := range cb.handlers { if err := cb.TokenPoolCreated(plugin, pool); err != nil { return err @@ -71,8 +70,7 @@ func (cb *callbacks) TokenPoolCreated(namespace string, plugin tokens.Plugin, po func (cb *callbacks) TokensTransferred(namespace string, plugin tokens.Plugin, transfer *tokens.TokenTransfer) error { if namespace == "" { - // Older token subscriptions don't populate namespace, so deliver the event to every listener and let them filter - // TODO: deprecate this path + // Older token subscriptions don't populate namespace, so deliver the event to every listener for _, cb := range cb.handlers { if err := cb.TokensTransferred(plugin, transfer); err != nil { return err @@ -88,8 +86,7 @@ func (cb *callbacks) TokensTransferred(namespace string, plugin tokens.Plugin, t func (cb *callbacks) TokensApproved(namespace string, plugin tokens.Plugin, approval *tokens.TokenApproval) error { if namespace == "" { - // Older token subscriptions don't populate namespace, so deliver the event to every listener and let them filter - // TODO: deprecate this path + // Older token subscriptions don't populate namespace, so deliver the event to every listener for _, cb := range cb.handlers { if err := cb.TokensApproved(plugin, approval); err != nil { return err diff --git a/mocks/databasemocks/plugin.go b/mocks/databasemocks/plugin.go index a70229605b..de00461496 100644 --- a/mocks/databasemocks/plugin.go +++ b/mocks/databasemocks/plugin.go @@ -2410,9 +2410,9 @@ func (_m *Plugin) RunAsGroup(ctx context.Context, fn func(context.Context) error return r0 } -// SetHandler provides a mock function with given fields: handler -func (_m *Plugin) SetHandler(handler database.Callbacks) { - _m.Called(handler) +// SetHandler provides a mock function with given fields: namespace, handler +func (_m *Plugin) SetHandler(namespace string, handler database.Callbacks) { + _m.Called(namespace, handler) } // UpdateBatch provides a mock function with given fields: ctx, id, update diff --git a/mocks/dataexchangemocks/plugin.go b/mocks/dataexchangemocks/plugin.go index 9b1c406c3c..db294a0694 100644 --- a/mocks/dataexchangemocks/plugin.go +++ b/mocks/dataexchangemocks/plugin.go @@ -174,9 +174,9 @@ func (_m *Plugin) SendMessage(ctx context.Context, nsOpID string, peerID string, return r0 } -// SetHandler provides a mock function with given fields: handler -func (_m *Plugin) SetHandler(handler dataexchange.Callbacks) { - _m.Called(handler) +// SetHandler provides a mock function with given fields: namespace, handler +func (_m *Plugin) SetHandler(namespace string, handler dataexchange.Callbacks) { + _m.Called(namespace, handler) } // SetNodes provides a mock function with given fields: nodes diff --git a/mocks/identitymocks/plugin.go b/mocks/identitymocks/plugin.go index 1cf2899ed8..7917b124e0 100644 --- a/mocks/identitymocks/plugin.go +++ b/mocks/identitymocks/plugin.go @@ -66,9 +66,9 @@ func (_m *Plugin) Name() string { return r0 } -// SetHandler provides a mock function with given fields: handler -func (_m *Plugin) SetHandler(handler identity.Callbacks) { - _m.Called(handler) +// SetHandler provides a mock function with given fields: namespace, handler +func (_m *Plugin) SetHandler(namespace string, handler identity.Callbacks) { + _m.Called(namespace, handler) } // Start provides a mock function with given fields: diff --git a/mocks/sharedstoragemocks/plugin.go b/mocks/sharedstoragemocks/plugin.go index e538977667..c57521921a 100644 --- a/mocks/sharedstoragemocks/plugin.go +++ b/mocks/sharedstoragemocks/plugin.go @@ -91,9 +91,9 @@ func (_m *Plugin) Name() string { return r0 } -// SetHandler provides a mock function with given fields: handler -func (_m *Plugin) SetHandler(handler sharedstorage.Callbacks) { - _m.Called(handler) +// SetHandler provides a mock function with given fields: namespace, handler +func (_m *Plugin) SetHandler(namespace string, handler sharedstorage.Callbacks) { + _m.Called(namespace, handler) } // UploadData provides a mock function with given fields: ctx, data diff --git a/pkg/core/operation.go b/pkg/core/operation.go index 4d145f491c..af939b76ab 100644 --- a/pkg/core/operation.go +++ b/pkg/core/operation.go @@ -129,7 +129,7 @@ func (po *PreparedOperation) NamespacedIDString() string { func ParseNamespacedOpID(ctx context.Context, nsIDStr string) (string, *fftypes.UUID, error) { nsIDSplit := strings.Split(nsIDStr, ":") if len(nsIDSplit) != 2 { - return "", nil, i18n.NewError(context.Background(), coremsgs.MsgInvalidNamespaceUUID, nsIDStr) + return "", nil, i18n.NewError(ctx, coremsgs.MsgInvalidNamespaceUUID, nsIDStr) } ns := nsIDSplit[0] uuidStr := nsIDSplit[1] diff --git a/pkg/database/plugin.go b/pkg/database/plugin.go index 4da1fc2a1a..88e3cae889 100644 --- a/pkg/database/plugin.go +++ b/pkg/database/plugin.go @@ -43,6 +43,11 @@ const ( UpsertOptimizationExisting ) +const ( + // Pseudo-namespace to register a global callback handler, which will receive all namespaced and non-namespaced events + GlobalHandler = "ff:global" +) + // Plugin is the interface implemented by each plugin type Plugin interface { PersistenceInterface // Split out to aid pluggability the next level down (SQL provider etc.) @@ -54,7 +59,8 @@ type Plugin interface { Init(ctx context.Context, config config.Section) error // SetHandler registers a handler to receive callbacks - SetHandler(handler Callbacks) + // If namespace is set, plugin will attempt to deliver only events for that namespace + SetHandler(namespace string, handler Callbacks) // Capabilities returns capabilities - not called until after Init Capabilities() *Capabilities diff --git a/pkg/dataexchange/plugin.go b/pkg/dataexchange/plugin.go index 523a2d7b60..285ff13af2 100644 --- a/pkg/dataexchange/plugin.go +++ b/pkg/dataexchange/plugin.go @@ -67,7 +67,8 @@ type Plugin interface { SetNodes(nodes []fftypes.JSONObject) // SetHandler registers a handler to receive callbacks - SetHandler(handler Callbacks) + // If namespace is set, plugin will attempt to deliver only events for that namespace + SetHandler(namespace string, handler Callbacks) // Data exchange interface must not deliver any events until start is called Start() error @@ -125,8 +126,8 @@ const ( ) type MessageReceived struct { - PeerID string - Data []byte + PeerID string + Transport *core.TransportWrapper } type PrivateBlobReceived struct { diff --git a/pkg/events/plugin.go b/pkg/events/plugin.go index abd2a99a79..8697f4d47d 100644 --- a/pkg/events/plugin.go +++ b/pkg/events/plugin.go @@ -36,6 +36,7 @@ type Plugin interface { Init(ctx context.Context, config config.Section) error // SetHandler registers a handler to receive callbacks + // If namespace is set, plugin will attempt to deliver only events for that namespace SetHandler(namespace string, handler Callbacks) error // Capabilities returns capabilities - not called until after Init diff --git a/pkg/identity/plugin.go b/pkg/identity/plugin.go index cfa43a0a5f..7fa46d9445 100644 --- a/pkg/identity/plugin.go +++ b/pkg/identity/plugin.go @@ -34,7 +34,8 @@ type Plugin interface { Init(ctx context.Context, config config.Section) error // SetHandler registers a handler to receive callbacks - SetHandler(handler Callbacks) + // If namespace is set, plugin will attempt to deliver only events for that namespace + SetHandler(namespace string, handler Callbacks) // Blockchain interface must not deliver any events until start is called Start() error diff --git a/pkg/sharedstorage/plugin.go b/pkg/sharedstorage/plugin.go index ff8fcbe493..49ab0ef44d 100644 --- a/pkg/sharedstorage/plugin.go +++ b/pkg/sharedstorage/plugin.go @@ -35,7 +35,8 @@ type Plugin interface { Init(ctx context.Context, config config.Section) error // SetHandler registers a handler to receive callbacks - SetHandler(handler Callbacks) + // If namespace is set, plugin will attempt to deliver only events for that namespace + SetHandler(namespace string, handler Callbacks) // Capabilities returns capabilities - not called until after Init Capabilities() *Capabilities diff --git a/pkg/tokens/plugin.go b/pkg/tokens/plugin.go index 309266a727..d9cf249b57 100644 --- a/pkg/tokens/plugin.go +++ b/pkg/tokens/plugin.go @@ -36,6 +36,7 @@ type Plugin interface { Init(ctx context.Context, name string, config config.Section) error // SetHandler registers a handler to receive callbacks + // If namespace is set, plugin will attempt to deliver only events for that namespace SetHandler(namespace string, handler Callbacks) error // Blockchain interface must not deliver any events until start is called