From 0dcae82b1ff6879c0f782f076087a83d08a8a87b Mon Sep 17 00:00:00 2001 From: Andrew Richardson Date: Wed, 15 Jun 2022 10:49:03 -0400 Subject: [PATCH 1/3] Move event plugin init to namespace manager Signed-off-by: Andrew Richardson --- internal/events/event_manager.go | 9 +-- internal/events/event_manager_test.go | 40 +++--------- internal/events/subscription_manager.go | 45 +++---------- internal/events/subscription_manager_test.go | 27 +------- internal/events/system/events.go | 53 +++++++++------- internal/events/system/events_test.go | 17 +++-- internal/events/webhooks/webhooks.go | 63 +++++++++++-------- internal/events/webhooks/webhooks_test.go | 46 ++++++++++---- internal/events/websockets/websockets.go | 34 ++++++---- internal/events/websockets/websockets_test.go | 17 +++-- internal/namespace/manager.go | 52 +++++++++++++++ internal/orchestrator/orchestrator.go | 4 +- mocks/eventsmocks/plugin.go | 24 +++++-- mocks/orchestratormocks/orchestrator.go | 18 ------ pkg/events/plugin.go | 6 +- 15 files changed, 250 insertions(+), 205 deletions(-) diff --git a/internal/events/event_manager.go b/internal/events/event_manager.go index 33c76440ed..63af11c7a2 100644 --- a/internal/events/event_manager.go +++ b/internal/events/event_manager.go @@ -46,6 +46,7 @@ import ( "github.com/hyperledger/firefly/pkg/core" "github.com/hyperledger/firefly/pkg/database" "github.com/hyperledger/firefly/pkg/dataexchange" + "github.com/hyperledger/firefly/pkg/events" "github.com/hyperledger/firefly/pkg/sharedstorage" "github.com/hyperledger/firefly/pkg/tokens" "github.com/karlseguin/ccache" @@ -112,7 +113,7 @@ type eventManager struct { chainListenerCacheTTL time.Duration } -func NewEventManager(ctx context.Context, ns string, ni sysmessaging.LocalNodeInfo, si sharedstorage.Plugin, di database.Plugin, bi blockchain.Plugin, im identity.Manager, dh definitions.DefinitionHandler, dm data.Manager, bm broadcast.Manager, pm privatemessaging.Manager, am assets.Manager, sd shareddownload.Manager, mm metrics.Manager, txHelper txcommon.Helper) (EventManager, error) { +func NewEventManager(ctx context.Context, ns string, ni sysmessaging.LocalNodeInfo, si sharedstorage.Plugin, di database.Plugin, bi blockchain.Plugin, im identity.Manager, dh definitions.DefinitionHandler, dm data.Manager, bm broadcast.Manager, pm privatemessaging.Manager, am assets.Manager, sd shareddownload.Manager, mm metrics.Manager, txHelper txcommon.Helper, transports map[string]events.Plugin) (EventManager, error) { if ni == nil || si == nil || di == nil || bi == nil || im == nil || dh == nil || dm == nil || bm == nil || pm == nil || am == nil { return nil, i18n.NewError(ctx, coremsgs.MsgInitializationNilDepError, "EventManager") } @@ -150,7 +151,7 @@ func NewEventManager(ctx context.Context, ns string, ni sysmessaging.LocalNodeIn em.blobReceiver = newBlobReceiver(ctx, em.aggregator) var err error - if em.subManager, err = newSubscriptionManager(ctx, di, dm, newEventNotifier, bm, pm, txHelper); err != nil { + if em.subManager, err = newSubscriptionManager(ctx, ns, di, dm, newEventNotifier, bm, pm, txHelper, transports); err != nil { return nil, err } @@ -252,9 +253,9 @@ func (em *eventManager) GetPlugins() []*core.NodeStatusPlugin { eventsArray := make([]*core.NodeStatusPlugin, 0) plugins := em.subManager.transports - for _, plugin := range plugins { + for name := range plugins { eventsArray = append(eventsArray, &core.NodeStatusPlugin{ - PluginType: plugin.Name(), + PluginType: name, }) } diff --git a/internal/events/event_manager_test.go b/internal/events/event_manager_test.go index 96f5db2a0d..046c94a8dd 100644 --- a/internal/events/event_manager_test.go +++ b/internal/events/event_manager_test.go @@ -42,6 +42,7 @@ import ( "github.com/hyperledger/firefly/mocks/txcommonmocks" "github.com/hyperledger/firefly/pkg/core" "github.com/hyperledger/firefly/pkg/database" + "github.com/hyperledger/firefly/pkg/events" "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -80,6 +81,8 @@ func newTestEventManagerCommon(t *testing.T, metrics, dbconcurrency bool) (*even mni := &sysmessagingmocks.LocalNodeInfo{} mdd := &shareddownloadmocks.Manager{} mmi := &metricsmocks.Manager{} + mev := &eventsmocks.Plugin{} + events := map[string]events.Plugin{"websockets": mev} txHelper := txcommon.NewTransactionHelper(mdi, mdm) mmi.On("IsMetricsEnabled").Return(metrics) if metrics { @@ -89,7 +92,9 @@ func newTestEventManagerCommon(t *testing.T, metrics, dbconcurrency bool) (*even met.On("Name").Return("ut").Maybe() mbi.On("VerifierType").Return(core.VerifierTypeEthAddress).Maybe() mdi.On("Capabilities").Return(&database.Capabilities{Concurrency: dbconcurrency}).Maybe() - emi, err := NewEventManager(ctx, "ns1", mni, mpi, mdi, mbi, mim, msh, mdm, mbm, mpm, mam, mdd, mmi, txHelper) + mev.On("RegisterListener", "ns1", mock.Anything).Return(nil).Maybe() + mev.On("ValidateOptions", mock.Anything).Return(nil).Maybe() + emi, err := NewEventManager(ctx, "ns1", mni, mpi, mdi, mbi, mim, msh, mdm, mbm, mpm, mam, mdd, mmi, txHelper, events) em := emi.(*eventManager) em.txHelper = &txcommonmocks.Helper{} mockRunAsGroupPassthrough(mdi) @@ -124,33 +129,11 @@ func TestStartStop(t *testing.T) { } func TestStartStopBadDependencies(t *testing.T) { - _, err := NewEventManager(context.Background(), "", nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil) + _, err := NewEventManager(context.Background(), "", nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil) assert.Regexp(t, "FF10128", err) } -func TestStartStopBadTransports(t *testing.T) { - config.Set(coreconfig.EventTransportsEnabled, []string{"wrongun"}) - defer coreconfig.Reset() - mdi := &databasemocks.Plugin{} - mbi := &blockchainmocks.Plugin{} - mim := &identitymanagermocks.Manager{} - mpi := &sharedstoragemocks.Plugin{} - mdm := &datamocks.Manager{} - msh := &definitionsmocks.DefinitionHandler{} - mbm := &broadcastmocks.Manager{} - mpm := &privatemessagingmocks.Manager{} - mni := &sysmessagingmocks.LocalNodeInfo{} - mam := &assetmocks.Manager{} - msd := &shareddownloadmocks.Manager{} - mm := &metricsmocks.Manager{} - txHelper := txcommon.NewTransactionHelper(mdi, mdm) - mdi.On("Capabilities").Return(&database.Capabilities{Concurrency: false}).Maybe() - mbi.On("VerifierType").Return(core.VerifierTypeEthAddress) - _, err := NewEventManager(context.Background(), "ns1", mni, mpi, mdi, mbi, mim, msh, mdm, mbm, mpm, mam, msd, mm, txHelper) - assert.Regexp(t, "FF10172", err) -} - func TestEmitSubscriptionEventsNoops(t *testing.T) { em, cancel := newTestEventManager(t) mdi := em.database.(*databasemocks.Plugin) @@ -398,7 +381,8 @@ func TestAddInternalListener(t *testing.T) { conf := config.RootSection("ut.events") ie.InitConfig(conf) - ie.Init(em.ctx, conf, cbs) + ie.Init(em.ctx, conf) + ie.RegisterListener("ns1", cbs) em.internalEvents = ie defer cancel() err := em.AddSystemEventListener("ns1", func(event *core.EventDelivery) error { return nil }) @@ -414,12 +398,6 @@ func TestGetPlugins(t *testing.T) { { PluginType: "websockets", }, - { - PluginType: "webhooks", - }, - { - PluginType: "system", - }, } assert.ElementsMatch(t, em.GetPlugins(), expectedPlugins) diff --git a/internal/events/subscription_manager.go b/internal/events/subscription_manager.go index 07860ed9cd..78d04df362 100644 --- a/internal/events/subscription_manager.go +++ b/internal/events/subscription_manager.go @@ -30,8 +30,6 @@ import ( "github.com/hyperledger/firefly/internal/coreconfig" "github.com/hyperledger/firefly/internal/coremsgs" "github.com/hyperledger/firefly/internal/data" - "github.com/hyperledger/firefly/internal/events/eifactory" - "github.com/hyperledger/firefly/internal/events/system" "github.com/hyperledger/firefly/internal/privatemessaging" "github.com/hyperledger/firefly/internal/txcommon" "github.com/hyperledger/firefly/pkg/core" @@ -75,6 +73,7 @@ type connection struct { type subscriptionManager struct { ctx context.Context + namespace string database database.Plugin data data.Manager txHelper txcommon.Helper @@ -92,13 +91,14 @@ type subscriptionManager struct { retry retry.Retry } -func newSubscriptionManager(ctx context.Context, di database.Plugin, dm data.Manager, en *eventNotifier, bm broadcast.Manager, pm privatemessaging.Manager, txHelper txcommon.Helper) (*subscriptionManager, error) { +func newSubscriptionManager(ctx context.Context, ns string, di database.Plugin, dm data.Manager, en *eventNotifier, bm broadcast.Manager, pm privatemessaging.Manager, txHelper txcommon.Helper, transports map[string]events.Plugin) (*subscriptionManager, error) { ctx, cancelCtx := context.WithCancel(ctx) sm := &subscriptionManager{ ctx: ctx, + namespace: ns, database: di, data: dm, - transports: make(map[string]events.Plugin), + transports: transports, connections: make(map[string]*connection), durableSubs: make(map[fftypes.UUID]*subscription), newOrUpdatedSubscriptions: make(chan *fftypes.UUID), @@ -116,42 +116,13 @@ func newSubscriptionManager(ctx context.Context, di database.Plugin, dm data.Man }, } - err := sm.loadTransports() - if err == nil { - err = sm.initTransports() - } - return sm, err -} - -func (sm *subscriptionManager) loadTransports() error { - var err error - enabledTransports := config.GetStringSlice(coreconfig.EventTransportsEnabled) - uniqueTransports := make(map[string]bool) - for _, transport := range enabledTransports { - uniqueTransports[transport] = true - } - // Cannot disable the internal listener - uniqueTransports[system.SystemEventsTransport] = true - for transport := range uniqueTransports { - sm.transports[transport], err = eifactory.GetPlugin(sm.ctx, transport) - if err != nil { - return err - } - } - return nil -} - -func (sm *subscriptionManager) initTransports() error { - var err error for _, ei := range sm.transports { - config := config.RootSection("events").SubSection(ei.Name()) - ei.InitConfig(config) - err = ei.Init(sm.ctx, config, &boundCallbacks{sm: sm, ei: ei}) - if err != nil { - return err + if err := ei.RegisterListener(sm.namespace, &boundCallbacks{sm: sm, ei: ei}); err != nil { + return nil, err } } - return nil + + return sm, nil } func (sm *subscriptionManager) start() error { diff --git a/internal/events/subscription_manager_test.go b/internal/events/subscription_manager_test.go index 8cffc58b84..f45a71d134 100644 --- a/internal/events/subscription_manager_test.go +++ b/internal/events/subscription_manager_test.go @@ -53,7 +53,7 @@ func newTestSubManager(t *testing.T, mei *eventsmocks.Plugin) (*subscriptionMana mei.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil) mdi.On("GetEvents", mock.Anything, mock.Anything, mock.Anything).Return([]*core.Event{}, nil, nil).Maybe() mdi.On("GetOffset", mock.Anything, mock.Anything, mock.Anything).Return(&core.Offset{RowID: 3333333, Current: 0}, nil).Maybe() - sm, err := newSubscriptionManager(ctx, mdi, mdm, newEventNotifier(ctx, "ut"), mbm, mpm, txHelper) + sm, err := newSubscriptionManager(ctx, "ns1", mdi, mdm, newEventNotifier(ctx, "ut"), mbm, mpm, txHelper, nil) assert.NoError(t, err) sm.transports = map[string]events.Plugin{ "ut": mei, @@ -167,31 +167,6 @@ func TestRegisterEphemeralSubscriptionsFail(t *testing.T) { } -func TestSubManagerBadPlugin(t *testing.T) { - mdi := &databasemocks.Plugin{} - mdm := &datamocks.Manager{} - mbm := &broadcastmocks.Manager{} - mpm := &privatemessagingmocks.Manager{} - txHelper := txcommon.NewTransactionHelper(mdi, mdm) - coreconfig.Reset() - config.Set(coreconfig.EventTransportsEnabled, []string{"!unknown!"}) - _, err := newSubscriptionManager(context.Background(), mdi, mdm, newEventNotifier(context.Background(), "ut"), mbm, mpm, txHelper) - assert.Regexp(t, "FF10172", err) -} - -func TestSubManagerTransportInitError(t *testing.T) { - mei := &eventsmocks.Plugin{} - mei.On("Name").Return("ut") - mei.On("InitConfig", mock.Anything).Return() - mei.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("pop")) - - sm, cancel := newTestSubManager(t, mei) - defer cancel() - - err := sm.initTransports() - assert.EqualError(t, err, "pop") -} - func TestStartSubRestoreFail(t *testing.T) { mei := &eventsmocks.Plugin{} sm, cancel := newTestSubManager(t, mei) diff --git a/internal/events/system/events.go b/internal/events/system/events.go index 9aa863a3bf..d0827004e3 100644 --- a/internal/events/system/events.go +++ b/internal/events/system/events.go @@ -35,7 +35,7 @@ const ( type Events struct { ctx context.Context capabilities *events.Capabilities - callbacks events.Callbacks + callbacks map[string]events.Callbacks mux sync.Mutex listeners map[string][]EventListener connID string @@ -46,17 +46,22 @@ type EventListener func(event *core.EventDelivery) error func (se *Events) Name() string { return SystemEventsTransport } -func (se *Events) Init(ctx context.Context, config config.Section, callbacks events.Callbacks) (err error) { +func (se *Events) Init(ctx context.Context, config config.Section) (err error) { *se = Events{ ctx: ctx, capabilities: &events.Capabilities{}, - callbacks: callbacks, + callbacks: make(map[string]events.Callbacks), listeners: make(map[string][]EventListener), readAhead: uint16(config.GetInt(SystemEventsConfReadAhead)), connID: fftypes.ShortID(), } + return nil +} + +func (se *Events) RegisterListener(namespace string, listener events.Callbacks) error { + se.callbacks[namespace] = listener // We have a single logical connection, that matches all subscriptions - return callbacks.RegisterConnection(se.connID, func(sr core.SubscriptionRef) bool { return true }) + return listener.RegisterConnection(se.connID, func(sr core.SubscriptionRef) bool { return true }) } func (se *Events) Capabilities() *events.Capabilities { @@ -70,19 +75,21 @@ func (se *Events) ValidateOptions(options *core.SubscriptionOptions) error { func (se *Events) AddListener(ns string, el EventListener) error { no := false newest := core.SubOptsFirstEventNewest - err := se.callbacks.EphemeralSubscription(se.connID, ns, &core.SubscriptionFilter{ /* all events */ }, &core.SubscriptionOptions{ - SubscriptionCoreOptions: core.SubscriptionCoreOptions{ - WithData: &no, - ReadAhead: &se.readAhead, - FirstEvent: &newest, - }, - }) - if err != nil { - return err + if cb, ok := se.callbacks[ns]; ok { + err := cb.EphemeralSubscription(se.connID, ns, &core.SubscriptionFilter{ /* all events */ }, &core.SubscriptionOptions{ + SubscriptionCoreOptions: core.SubscriptionCoreOptions{ + WithData: &no, + ReadAhead: &se.readAhead, + FirstEvent: &newest, + }, + }) + if err != nil { + return err + } + se.mux.Lock() + se.listeners[ns] = append(se.listeners[ns], el) + se.mux.Unlock() } - se.mux.Lock() - se.listeners[ns] = append(se.listeners[ns], el) - se.mux.Unlock() return nil } @@ -98,11 +105,13 @@ func (se *Events) DeliveryRequest(connID string, sub *core.Subscription, event * } } } - se.callbacks.DeliveryResponse(connID, &core.EventDeliveryResponse{ - ID: event.ID, - Rejected: false, - Subscription: event.Subscription, - Reply: nil, - }) + if cb, ok := se.callbacks[sub.Namespace]; ok { + cb.DeliveryResponse(connID, &core.EventDeliveryResponse{ + ID: event.ID, + Rejected: false, + Subscription: event.Subscription, + Reply: nil, + }) + } return nil } diff --git a/internal/events/system/events_test.go b/internal/events/system/events_test.go index 40c5c66705..7e9b526927 100644 --- a/internal/events/system/events_test.go +++ b/internal/events/system/events_test.go @@ -42,7 +42,8 @@ func newTestEvents(t *testing.T) (se *Events, cancel func()) { ctx, cancelCtx := context.WithCancel(context.Background()) config := config.RootSection("ut.events") se.InitConfig(config) - se.Init(ctx, config, cbs) + se.Init(ctx, config) + se.RegisterListener("ns1", cbs) assert.Equal(t, "system", se.Name()) assert.NotNil(t, se.Capabilities()) assert.Nil(t, se.ValidateOptions(&core.SubscriptionOptions{})) @@ -54,7 +55,13 @@ func TestDeliveryRequestOk(t *testing.T) { se, cancel := newTestEvents(t) defer cancel() - cbs := se.callbacks.(*eventsmocks.Callbacks) + sub := &core.Subscription{ + SubscriptionRef: core.SubscriptionRef{ + Namespace: "ns1", + }, + } + + cbs := se.callbacks["ns1"].(*eventsmocks.Callbacks) cbs.On("EphemeralSubscription", mock.Anything, "ns1", mock.Anything, mock.Anything).Return(nil) cbs.On("DeliveryResponse", se.connID, mock.Anything).Return(nil) @@ -65,7 +72,7 @@ func TestDeliveryRequestOk(t *testing.T) { }) assert.NoError(t, err) - err = se.DeliveryRequest(se.connID, &core.Subscription{}, &core.EventDelivery{ + err = se.DeliveryRequest(se.connID, sub, &core.EventDelivery{ EnrichedEvent: core.EnrichedEvent{ Event: core.Event{ Namespace: "ns1", @@ -93,7 +100,7 @@ func TestDeliveryRequestFail(t *testing.T) { se, cancel := newTestEvents(t) defer cancel() - cbs := se.callbacks.(*eventsmocks.Callbacks) + cbs := se.callbacks["ns1"].(*eventsmocks.Callbacks) cbs.On("EphemeralSubscription", mock.Anything, "ns1", mock.Anything, mock.Anything).Return(nil) err := se.AddListener("ns1", func(event *core.EventDelivery) error { @@ -117,7 +124,7 @@ func TestAddListenerFail(t *testing.T) { se, cancel := newTestEvents(t) defer cancel() - cbs := se.callbacks.(*eventsmocks.Callbacks) + cbs := se.callbacks["ns1"].(*eventsmocks.Callbacks) cbs.On("EphemeralSubscription", mock.Anything, "ns1", mock.Anything, mock.Anything).Return(fmt.Errorf("pop")) err := se.AddListener("ns1", func(event *core.EventDelivery) error { return nil }) diff --git a/internal/events/webhooks/webhooks.go b/internal/events/webhooks/webhooks.go index 127f091c3f..e51128e53c 100644 --- a/internal/events/webhooks/webhooks.go +++ b/internal/events/webhooks/webhooks.go @@ -40,7 +40,7 @@ import ( type WebHooks struct { ctx context.Context capabilities *events.Capabilities - callbacks events.Callbacks + callbacks map[string]events.Callbacks client *resty.Client connID string } @@ -62,16 +62,21 @@ type whResponse struct { func (wh *WebHooks) Name() string { return "webhooks" } -func (wh *WebHooks) Init(ctx context.Context, config config.Section, callbacks events.Callbacks) (err error) { +func (wh *WebHooks) Init(ctx context.Context, config config.Section) (err error) { *wh = WebHooks{ ctx: ctx, capabilities: &events.Capabilities{}, - callbacks: callbacks, + callbacks: make(map[string]events.Callbacks), client: ffresty.New(ctx, config), connID: fftypes.ShortID(), } + return nil +} + +func (wh *WebHooks) RegisterListener(namespace string, listener events.Callbacks) error { + wh.callbacks[namespace] = listener // We have a single logical connection, that matches all subscriptions - return callbacks.RegisterConnection(wh.connID, func(sr core.SubscriptionRef) bool { return true }) + return listener.RegisterConnection(wh.connID, func(sr core.SubscriptionRef) bool { return true }) } func (wh *WebHooks) Capabilities() *events.Capabilities { @@ -286,26 +291,28 @@ func (wh *WebHooks) doDelivery(connID string, reply bool, sub *core.Subscription if req != nil && req.replyTx != "" { txType = fftypes.FFEnum(strings.ToLower(req.replyTx)) } - wh.callbacks.DeliveryResponse(connID, &core.EventDeliveryResponse{ - ID: event.ID, - Rejected: false, - Subscription: event.Subscription, - Reply: &core.MessageInOut{ - Message: core.Message{ - Header: core.MessageHeader{ - CID: event.Message.Header.ID, - Group: event.Message.Header.Group, - Type: event.Message.Header.Type, - Topics: event.Message.Header.Topics, - Tag: sub.Options.TransportOptions().GetString("replytag"), - TxType: txType, + if cb, ok := wh.callbacks[sub.Namespace]; ok { + cb.DeliveryResponse(connID, &core.EventDeliveryResponse{ + ID: event.ID, + Rejected: false, + Subscription: event.Subscription, + Reply: &core.MessageInOut{ + Message: core.Message{ + Header: core.MessageHeader{ + CID: event.Message.Header.ID, + Group: event.Message.Header.Group, + Type: event.Message.Header.Type, + Topics: event.Message.Header.Topics, + Tag: sub.Options.TransportOptions().GetString("replytag"), + TxType: txType, + }, + }, + InlineData: core.InlineData{ + {Value: fftypes.JSONAnyPtrBytes(b)}, }, }, - InlineData: core.InlineData{ - {Value: fftypes.JSONAnyPtrBytes(b)}, - }, - }, - }) + }) + } } return nil } @@ -322,11 +329,13 @@ func (wh *WebHooks) DeliveryRequest(connID string, sub *core.Subscription, event // avoid loops - and there's no way for us to detect here if a user has configured correctly // to avoid a loop. log.L(wh.ctx).Debugf("Webhook subscription with reply enabled called with reply event '%s'", event.ID) - wh.callbacks.DeliveryResponse(connID, &core.EventDeliveryResponse{ - ID: event.ID, - Rejected: false, - Subscription: event.Subscription, - }) + if cb, ok := wh.callbacks[sub.Namespace]; ok { + cb.DeliveryResponse(connID, &core.EventDeliveryResponse{ + ID: event.ID, + Rejected: false, + Subscription: event.Subscription, + }) + } return nil } diff --git a/internal/events/webhooks/webhooks_test.go b/internal/events/webhooks/webhooks_test.go index 1193eeee9e..1d27d3fc29 100644 --- a/internal/events/webhooks/webhooks_test.go +++ b/internal/events/webhooks/webhooks_test.go @@ -47,7 +47,8 @@ func newTestWebHooks(t *testing.T) (wh *WebHooks, cancel func()) { ctx, cancelCtx := context.WithCancel(context.Background()) svrConfig := config.RootSection("ut.webhooks") wh.InitConfig(svrConfig) - wh.Init(ctx, svrConfig, cbs) + wh.Init(ctx, svrConfig) + wh.RegisterListener("ns1", cbs) assert.Equal(t, "webhooks", wh.Name()) assert.NotNil(t, wh.Capabilities()) return wh, cancelCtx @@ -146,6 +147,9 @@ func TestRequestWithBodyReplyEndToEnd(t *testing.T) { msgID := fftypes.NewUUID() groupHash := fftypes.NewRandB32() sub := &core.Subscription{ + SubscriptionRef: core.SubscriptionRef{ + Namespace: "ns1", + }, Options: core.SubscriptionOptions{ SubscriptionCoreOptions: core.SubscriptionCoreOptions{ WithData: &yes, @@ -207,7 +211,7 @@ func TestRequestWithBodyReplyEndToEnd(t *testing.T) { }`), } - mcb := wh.callbacks.(*eventsmocks.Callbacks) + mcb := wh.callbacks["ns1"].(*eventsmocks.Callbacks) mcb.On("DeliveryResponse", mock.Anything, mock.MatchedBy(func(response *core.EventDeliveryResponse) bool { assert.Equal(t, *msgID, *response.Reply.Message.Header.CID) assert.Equal(t, *groupHash, *response.Reply.Message.Header.Group) @@ -253,6 +257,9 @@ func TestRequestWithEmptyStringBodyReplyEndToEnd(t *testing.T) { msgID := fftypes.NewUUID() groupHash := fftypes.NewRandB32() sub := &core.Subscription{ + SubscriptionRef: core.SubscriptionRef{ + Namespace: "ns1", + }, Options: core.SubscriptionOptions{ SubscriptionCoreOptions: core.SubscriptionCoreOptions{ WithData: &yes, @@ -314,7 +321,7 @@ func TestRequestWithEmptyStringBodyReplyEndToEnd(t *testing.T) { }`), } - mcb := wh.callbacks.(*eventsmocks.Callbacks) + mcb := wh.callbacks["ns1"].(*eventsmocks.Callbacks) mcb.On("DeliveryResponse", mock.Anything, mock.MatchedBy(func(response *core.EventDeliveryResponse) bool { assert.Equal(t, *msgID, *response.Reply.Message.Header.CID) assert.Equal(t, *groupHash, *response.Reply.Message.Header.Group) @@ -408,6 +415,9 @@ func TestRequestReplyEmptyData(t *testing.T) { yes := true sub := &core.Subscription{ + SubscriptionRef: core.SubscriptionRef{ + Namespace: "ns1", + }, Options: core.SubscriptionOptions{ SubscriptionCoreOptions: core.SubscriptionCoreOptions{ WithData: &yes, @@ -434,7 +444,7 @@ func TestRequestReplyEmptyData(t *testing.T) { }, } - mcb := wh.callbacks.(*eventsmocks.Callbacks) + mcb := wh.callbacks["ns1"].(*eventsmocks.Callbacks) mcb.On("DeliveryResponse", mock.Anything, mock.MatchedBy(func(response *core.EventDeliveryResponse) bool { assert.Equal(t, *msgID, *response.Reply.Message.Header.CID) assert.Nil(t, response.Reply.Message.Header.Group) @@ -483,7 +493,7 @@ func TestRequestReplyBadJSON(t *testing.T) { }, } - mcb := wh.callbacks.(*eventsmocks.Callbacks) + mcb := wh.callbacks["ns1"].(*eventsmocks.Callbacks) mcb.On("DeliveryResponse", mock.Anything, mock.MatchedBy(func(response *core.EventDeliveryResponse) bool { assert.Equal(t, float64(502), response.Reply.InlineData[0].Value.JSONObject()["status"]) assert.Regexp(t, "FF10257", response.Reply.InlineData[0].Value.JSONObject().GetObject("body")["error"]) @@ -517,6 +527,9 @@ func TestRequestReplyDataArrayBadStatusB64(t *testing.T) { yes := true sub := &core.Subscription{ + SubscriptionRef: core.SubscriptionRef{ + Namespace: "ns1", + }, Options: core.SubscriptionOptions{ SubscriptionCoreOptions: core.SubscriptionCoreOptions{ WithData: &yes, @@ -543,7 +556,7 @@ func TestRequestReplyDataArrayBadStatusB64(t *testing.T) { }, } - mcb := wh.callbacks.(*eventsmocks.Callbacks) + mcb := wh.callbacks["ns1"].(*eventsmocks.Callbacks) mcb.On("DeliveryResponse", mock.Anything, mock.MatchedBy(func(response *core.EventDeliveryResponse) bool { assert.Equal(t, *msgID, *response.Reply.Message.Header.CID) assert.Nil(t, response.Reply.Message.Header.Group) @@ -572,7 +585,11 @@ func TestRequestReplyDataArrayError(t *testing.T) { server := httptest.NewServer(r) server.Close() - sub := &core.Subscription{} + sub := &core.Subscription{ + SubscriptionRef: core.SubscriptionRef{ + Namespace: "ns1", + }, + } to := sub.Options.TransportOptions() to["url"] = fmt.Sprintf("http://%s/myapi", server.Listener.Addr()) to["reply"] = true @@ -593,7 +610,7 @@ func TestRequestReplyDataArrayError(t *testing.T) { }, } - mcb := wh.callbacks.(*eventsmocks.Callbacks) + mcb := wh.callbacks["ns1"].(*eventsmocks.Callbacks) mcb.On("DeliveryResponse", mock.Anything, mock.MatchedBy(func(response *core.EventDeliveryResponse) bool { assert.Equal(t, *msgID, *response.Reply.Message.Header.CID) assert.Nil(t, response.Reply.Message.Header.Group) @@ -621,7 +638,11 @@ func TestRequestReplyBuildRequestFailFastAsk(t *testing.T) { server := httptest.NewServer(r) server.Close() - sub := &core.Subscription{} + sub := &core.Subscription{ + SubscriptionRef: core.SubscriptionRef{ + Namespace: "ns1", + }, + } sub.Options.TransportOptions()["reply"] = true sub.Options.TransportOptions()["fastack"] = true event := &core.EventDelivery{ @@ -642,7 +663,7 @@ func TestRequestReplyBuildRequestFailFastAsk(t *testing.T) { } waiter := make(chan struct{}) - mcb := wh.callbacks.(*eventsmocks.Callbacks) + mcb := wh.callbacks["ns1"].(*eventsmocks.Callbacks) dr := mcb.On("DeliveryResponse", mock.Anything, mock.MatchedBy(func(response *core.EventDeliveryResponse) bool { assert.Equal(t, *msgID, *response.Reply.Message.Header.CID) assert.Nil(t, response.Reply.Message.Header.Group) @@ -671,6 +692,9 @@ func TestDeliveryRequestNilMessage(t *testing.T) { yes := true sub := &core.Subscription{ + SubscriptionRef: core.SubscriptionRef{ + Namespace: "ns1", + }, Options: core.SubscriptionOptions{ SubscriptionCoreOptions: core.SubscriptionCoreOptions{ WithData: &yes, @@ -724,7 +748,7 @@ func TestDeliveryRequestReplyToReply(t *testing.T) { }, } - mcb := wh.callbacks.(*eventsmocks.Callbacks) + mcb := wh.callbacks["ns1"].(*eventsmocks.Callbacks) mcb.On("DeliveryResponse", mock.Anything, mock.MatchedBy(func(response *core.EventDeliveryResponse) bool { return !response.Rejected // should be accepted as a no-op so we can move on to other events })) diff --git a/internal/events/websockets/websockets.go b/internal/events/websockets/websockets.go index 08fb570898..5299564de6 100644 --- a/internal/events/websockets/websockets.go +++ b/internal/events/websockets/websockets.go @@ -33,7 +33,7 @@ import ( type WebSockets struct { ctx context.Context capabilities *events.Capabilities - callbacks events.Callbacks + callbacks map[string]events.Callbacks connections map[string]*websocketConnection connMux sync.Mutex upgrader websocket.Upgrader @@ -41,12 +41,12 @@ type WebSockets struct { func (ws *WebSockets) Name() string { return "websockets" } -func (ws *WebSockets) Init(ctx context.Context, config config.Section, callbacks events.Callbacks) error { +func (ws *WebSockets) Init(ctx context.Context, config config.Section) error { *ws = WebSockets{ ctx: ctx, connections: make(map[string]*websocketConnection), capabilities: &events.Capabilities{}, - callbacks: callbacks, + callbacks: make(map[string]events.Callbacks), upgrader: websocket.Upgrader{ ReadBufferSize: int(config.GetByteSize(ReadBufferSize)), WriteBufferSize: int(config.GetByteSize(WriteBufferSize)), @@ -59,6 +59,11 @@ func (ws *WebSockets) Init(ctx context.Context, config config.Section, callbacks return nil } +func (ws *WebSockets) RegisterListener(namespace string, listener events.Callbacks) error { + ws.callbacks[namespace] = listener + return nil +} + func (ws *WebSockets) Capabilities() *events.Capabilities { return ws.capabilities } @@ -99,20 +104,25 @@ func (ws *WebSockets) ServeHTTP(res http.ResponseWriter, req *http.Request) { } func (ws *WebSockets) ack(connID string, inflight *core.EventDeliveryResponse) { - ws.callbacks.DeliveryResponse(connID, inflight) + if cb, ok := ws.callbacks[inflight.Subscription.Namespace]; ok { + cb.DeliveryResponse(connID, inflight) + } } func (ws *WebSockets) start(wc *websocketConnection, start *core.WSStart) error { if start.Namespace == "" || (!start.Ephemeral && start.Name == "") { return i18n.NewError(ws.ctx, coremsgs.MsgWSInvalidStartAction) } - if start.Ephemeral { - return ws.callbacks.EphemeralSubscription(wc.connID, start.Namespace, &start.Filter, &start.Options) + if cb, ok := ws.callbacks[start.Namespace]; ok { + if start.Ephemeral { + return cb.EphemeralSubscription(wc.connID, start.Namespace, &start.Filter, &start.Options) + } + // We can have multiple subscriptions on a single connection + return cb.RegisterConnection(wc.connID, func(sr core.SubscriptionRef) bool { + return wc.durableSubMatcher(sr) + }) } - // We can have multiple subscriptions on a single - return ws.callbacks.RegisterConnection(wc.connID, func(sr core.SubscriptionRef) bool { - return wc.durableSubMatcher(sr) - }) + return nil } func (ws *WebSockets) connClosed(connID string) { @@ -120,7 +130,9 @@ func (ws *WebSockets) connClosed(connID string) { delete(ws.connections, connID) ws.connMux.Unlock() // Drop lock before calling back - ws.callbacks.ConnectionClosed(connID) + for _, cb := range ws.callbacks { + cb.ConnectionClosed(connID) + } } func (ws *WebSockets) WaitClosed() { diff --git a/internal/events/websockets/websockets_test.go b/internal/events/websockets/websockets_test.go index ffd7c54d36..ff7c9e2a10 100644 --- a/internal/events/websockets/websockets_test.go +++ b/internal/events/websockets/websockets_test.go @@ -46,7 +46,8 @@ func newTestWebsockets(t *testing.T, cbs *eventsmocks.Callbacks, queryParams ... ctx, cancelCtx := context.WithCancel(context.Background()) svrConfig := config.RootSection("ut.websockets") ws.InitConfig(svrConfig) - ws.Init(ctx, svrConfig, cbs) + ws.Init(ctx, svrConfig) + ws.RegisterListener("ns1", cbs) assert.Equal(t, "websockets", ws.Name()) assert.NotNil(t, ws.Capabilities()) cbs.On("ConnectionClosed", mock.Anything).Return(nil).Maybe() @@ -181,7 +182,10 @@ func TestStartReceiveAckEphemeral(t *testing.T) { EnrichedEvent: core.EnrichedEvent{ Event: core.Event{ID: fftypes.NewUUID()}, }, - Subscription: core.SubscriptionRef{ID: fftypes.NewUUID()}, + Subscription: core.SubscriptionRef{ + ID: fftypes.NewUUID(), + Namespace: "ns1", + }, }, nil) b := <-wsc.Receive() @@ -304,7 +308,10 @@ func TestAutoStartReceiveAckEphemeral(t *testing.T) { EnrichedEvent: core.EnrichedEvent{ Event: core.Event{ID: fftypes.NewUUID()}, }, - Subscription: core.SubscriptionRef{ID: fftypes.NewUUID()}, + Subscription: core.SubscriptionRef{ + ID: fftypes.NewUUID(), + Namespace: "ns1", + }, }, nil) b := <-wsc.Receive() @@ -396,7 +403,7 @@ func TestHandleAckMultipleStartedNoSubSingleMatch(t *testing.T) { ctx: context.Background(), ws: &WebSockets{ ctx: context.Background(), - callbacks: cbs, + callbacks: map[string]events.Callbacks{"ns1": cbs}, }, started: []*websocketStartedSub{{ephemeral: false, name: "name1", namespace: "ns1"}}, sendMessages: make(chan interface{}, 1), @@ -496,7 +503,7 @@ func TestDispatchAutoAck(t *testing.T) { connID: fftypes.NewUUID().String(), ws: &WebSockets{ ctx: context.Background(), - callbacks: cbs, + callbacks: map[string]events.Callbacks{"ns1": cbs}, connections: make(map[string]*websocketConnection), }, started: []*websocketStartedSub{{ephemeral: false, name: "name1", namespace: "ns1"}}, diff --git a/internal/namespace/manager.go b/internal/namespace/manager.go index cd1a6db3df..5381217be7 100644 --- a/internal/namespace/manager.go +++ b/internal/namespace/manager.go @@ -28,6 +28,8 @@ import ( "github.com/hyperledger/firefly/internal/coremsgs" "github.com/hyperledger/firefly/internal/database/difactory" "github.com/hyperledger/firefly/internal/dataexchange/dxfactory" + "github.com/hyperledger/firefly/internal/events/eifactory" + "github.com/hyperledger/firefly/internal/events/system" "github.com/hyperledger/firefly/internal/identity/iifactory" "github.com/hyperledger/firefly/internal/metrics" "github.com/hyperledger/firefly/internal/orchestrator" @@ -38,6 +40,7 @@ import ( "github.com/hyperledger/firefly/pkg/core" "github.com/hyperledger/firefly/pkg/database" "github.com/hyperledger/firefly/pkg/dataexchange" + "github.com/hyperledger/firefly/pkg/events" "github.com/hyperledger/firefly/pkg/identity" "github.com/hyperledger/firefly/pkg/sharedstorage" "github.com/hyperledger/firefly/pkg/tokens" @@ -90,6 +93,7 @@ type namespaceManager struct { sharedstorage map[string]sharedStoragePlugin dataexchange map[string]dataExchangePlugin tokens map[string]tokensPlugin + events map[string]eventsPlugin } metricsEnabled bool metrics metrics.Manager @@ -127,6 +131,11 @@ type identityPlugin struct { plugin identity.Plugin } +type eventsPlugin struct { + config config.Section + plugin events.Plugin +} + func NewNamespaceManager(withDefaults bool) Manager { nm := &namespaceManager{ namespaces: make(map[string]*namespace), @@ -274,6 +283,13 @@ func (nm *namespaceManager) loadPlugins(ctx context.Context) (err error) { } } + if nm.plugins.events == nil { + nm.plugins.events, err = nm.getEventPlugins(ctx) + if err != nil { + return err + } + } + return nil } @@ -558,6 +574,11 @@ func (nm *namespaceManager) initPlugins(ctx context.Context) (err error) { return err } } + for _, entry := range nm.plugins.events { + if err = entry.plugin.Init(nm.ctx, entry.config); err != nil { + return err + } + } return nil } @@ -661,6 +682,11 @@ func (nm *namespaceManager) loadNamespace(ctx context.Context, name string, inde return nil, err } + p.Events = make(map[string]events.Plugin, len(nm.plugins.events)) + for name, entry := range nm.plugins.events { + p.Events[name] = entry.plugin + } + return &namespace{ description: conf.GetString(coreconfig.NamespaceDescription), config: config, @@ -831,3 +857,29 @@ func (nm *namespaceManager) ResolveOperationByNamespacedID(ctx context.Context, } return or.Operations().ResolveOperationByID(ctx, ns, u, op) } + +func (nm *namespaceManager) getEventPlugins(ctx context.Context) (plugins map[string]eventsPlugin, err error) { + plugins = make(map[string]eventsPlugin) + enabledTransports := config.GetStringSlice(coreconfig.EventTransportsEnabled) + uniqueTransports := make(map[string]bool) + for _, transport := range enabledTransports { + uniqueTransports[transport] = true + } + // Cannot disable the internal listener + uniqueTransports[system.SystemEventsTransport] = true + for transport := range uniqueTransports { + plugin, err := eifactory.GetPlugin(ctx, transport) + if err != nil { + return nil, err + } + + name := plugin.Name() + section := config.RootSection("events").SubSection(name) + plugin.InitConfig(section) + plugins[name] = eventsPlugin{ + config: section, + plugin: plugin, + } + } + return plugins, err +} diff --git a/internal/orchestrator/orchestrator.go b/internal/orchestrator/orchestrator.go index f1c6367dfd..83759cf12d 100644 --- a/internal/orchestrator/orchestrator.go +++ b/internal/orchestrator/orchestrator.go @@ -43,6 +43,7 @@ import ( "github.com/hyperledger/firefly/pkg/core" "github.com/hyperledger/firefly/pkg/database" "github.com/hyperledger/firefly/pkg/dataexchange" + eventsplugin "github.com/hyperledger/firefly/pkg/events" idplugin "github.com/hyperledger/firefly/pkg/identity" "github.com/hyperledger/firefly/pkg/sharedstorage" "github.com/hyperledger/firefly/pkg/tokens" @@ -154,6 +155,7 @@ type Plugins struct { DataExchange DataExchangePlugin Database DatabasePlugin Tokens []TokensPlugin + Events map[string]eventsplugin.Plugin } type Config struct { @@ -462,7 +464,7 @@ func (or *orchestrator) initComponents(ctx context.Context) (err error) { } if or.events == nil { - or.events, err = events.NewEventManager(ctx, or.namespace, or, or.sharedstorage(), or.database(), or.blockchain(), or.identity, or.definitions, or.data, or.broadcast, or.messaging, or.assets, or.sharedDownload, or.metrics, or.txHelper) + or.events, err = events.NewEventManager(ctx, or.namespace, or, or.sharedstorage(), or.database(), or.blockchain(), or.identity, or.definitions, or.data, or.broadcast, or.messaging, or.assets, or.sharedDownload, or.metrics, or.txHelper, or.plugins.Events) if err != nil { return err } diff --git a/mocks/eventsmocks/plugin.go b/mocks/eventsmocks/plugin.go index f387b3fb96..6f3baf4cf3 100644 --- a/mocks/eventsmocks/plugin.go +++ b/mocks/eventsmocks/plugin.go @@ -49,13 +49,13 @@ func (_m *Plugin) DeliveryRequest(connID string, sub *core.Subscription, event * return r0 } -// Init provides a mock function with given fields: ctx, _a1, callbacks -func (_m *Plugin) Init(ctx context.Context, _a1 config.Section, callbacks events.Callbacks) error { - ret := _m.Called(ctx, _a1, callbacks) +// Init provides a mock function with given fields: ctx, _a1 +func (_m *Plugin) Init(ctx context.Context, _a1 config.Section) error { + ret := _m.Called(ctx, _a1) var r0 error - if rf, ok := ret.Get(0).(func(context.Context, config.Section, events.Callbacks) error); ok { - r0 = rf(ctx, _a1, callbacks) + if rf, ok := ret.Get(0).(func(context.Context, config.Section) error); ok { + r0 = rf(ctx, _a1) } else { r0 = ret.Error(0) } @@ -82,6 +82,20 @@ func (_m *Plugin) Name() string { return r0 } +// RegisterListener provides a mock function with given fields: namespace, listener +func (_m *Plugin) RegisterListener(namespace string, listener events.Callbacks) error { + ret := _m.Called(namespace, listener) + + var r0 error + if rf, ok := ret.Get(0).(func(string, events.Callbacks) error); ok { + r0 = rf(namespace, listener) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // ValidateOptions provides a mock function with given fields: options func (_m *Plugin) ValidateOptions(options *core.SubscriptionOptions) error { ret := _m.Called(options) diff --git a/mocks/orchestratormocks/orchestrator.go b/mocks/orchestratormocks/orchestrator.go index 8084bb5247..9a5b495766 100644 --- a/mocks/orchestratormocks/orchestrator.go +++ b/mocks/orchestratormocks/orchestrator.go @@ -20,8 +20,6 @@ import ( events "github.com/hyperledger/firefly/internal/events" - metrics "github.com/hyperledger/firefly/internal/metrics" - mock "github.com/stretchr/testify/mock" networkmap "github.com/hyperledger/firefly/internal/networkmap" @@ -1196,22 +1194,6 @@ func (_m *Orchestrator) Init(ctx context.Context, cancelCtx context.CancelFunc) return r0 } -// Metrics provides a mock function with given fields: -func (_m *Orchestrator) Metrics() metrics.Manager { - ret := _m.Called() - - var r0 metrics.Manager - if rf, ok := ret.Get(0).(func() metrics.Manager); ok { - r0 = rf() - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(metrics.Manager) - } - } - - return r0 -} - // NetworkMap provides a mock function with given fields: func (_m *Orchestrator) NetworkMap() networkmap.Manager { ret := _m.Called() diff --git a/pkg/events/plugin.go b/pkg/events/plugin.go index 83fa3b9539..4942d5ff08 100644 --- a/pkg/events/plugin.go +++ b/pkg/events/plugin.go @@ -33,8 +33,10 @@ type Plugin interface { InitConfig(config config.Section) // Init initializes the plugin, with configuration - // Returns the supported featureset of the interface - Init(ctx context.Context, config config.Section, callbacks Callbacks) error + Init(ctx context.Context, config config.Section) error + + // RegisterListener registers a listener to receive callbacks + RegisterListener(namespace string, listener Callbacks) error // Capabilities returns capabilities - not called until after Init Capabilities() *Capabilities From 2e31f80c3ad17ec36b007e3065b499b627bb06c3 Mon Sep 17 00:00:00 2001 From: Andrew Richardson Date: Mon, 20 Jun 2022 16:10:19 -0400 Subject: [PATCH 2/3] Address test coverage gaps Signed-off-by: Andrew Richardson --- internal/events/eifactory/factory.go | 7 --- internal/events/subscription_manager_test.go | 36 ++++++++++++++ internal/events/webhooks/webhooks_test.go | 16 +++++- internal/events/websockets/websockets.go | 2 +- internal/events/websockets/websockets_test.go | 13 +++++ internal/namespace/manager_test.go | 49 +++++++++++++++++++ 6 files changed, 114 insertions(+), 9 deletions(-) diff --git a/internal/events/eifactory/factory.go b/internal/events/eifactory/factory.go index 01f46ecdf2..635765b472 100644 --- a/internal/events/eifactory/factory.go +++ b/internal/events/eifactory/factory.go @@ -19,7 +19,6 @@ package eifactory import ( "context" - "github.com/hyperledger/firefly-common/pkg/config" "github.com/hyperledger/firefly-common/pkg/i18n" "github.com/hyperledger/firefly/internal/coremsgs" "github.com/hyperledger/firefly/internal/events/system" @@ -42,12 +41,6 @@ func init() { } } -func InitConfig(config config.Section) { - for _, plugin := range plugins { - plugin.InitConfig(config.SubSection(plugin.Name())) - } -} - func GetPlugin(ctx context.Context, pluginType string) (events.Plugin, error) { plugin, ok := pluginsByName[pluginType] if !ok { diff --git a/internal/events/subscription_manager_test.go b/internal/events/subscription_manager_test.go index 54dbc09f3f..e0213f2537 100644 --- a/internal/events/subscription_manager_test.go +++ b/internal/events/subscription_manager_test.go @@ -116,6 +116,42 @@ func TestRegisterDurableSubscriptions(t *testing.T) { assert.Nil(t, sm.connections["conn2"]) } +func TestReloadDurableSubscription(t *testing.T) { + + sub1 := fftypes.NewUUID() + + mei := &eventsmocks.Plugin{} + sm, cancel := newTestSubManager(t, mei) + defer cancel() + + sm.connections["conn1"] = &connection{ + ei: mei, + id: "conn1", + transport: "ut", + dispatchers: make(map[fftypes.UUID]*eventDispatcher), + matcher: func(sr core.SubscriptionRef) bool { + return sr.Namespace == "ns1" && sr.Name == "sub1" + }, + } + + mdi := sm.database.(*databasemocks.Plugin) + mdi.On("GetSubscriptions", mock.Anything, mock.Anything).Return([]*core.Subscription{ + {SubscriptionRef: core.SubscriptionRef{ + ID: sub1, + Namespace: "ns1", + Name: "sub1", + }, Transport: "ut"}, + }, nil, nil) + mei.On("ValidateOptions", mock.Anything).Return(nil) + err := sm.start() + assert.NoError(t, err) + + // Close with active conns + sm.close() + assert.Nil(t, sm.connections["conn1"]) + assert.Nil(t, sm.connections["conn2"]) +} + func TestRegisterEphemeralSubscriptions(t *testing.T) { mei := &eventsmocks.Plugin{} sm, cancel := newTestSubManager(t, mei) diff --git a/internal/events/webhooks/webhooks_test.go b/internal/events/webhooks/webhooks_test.go index 1d27d3fc29..a8adf732b6 100644 --- a/internal/events/webhooks/webhooks_test.go +++ b/internal/events/webhooks/webhooks_test.go @@ -455,6 +455,8 @@ func TestRequestReplyEmptyData(t *testing.T) { err := wh.DeliveryRequest(mock.Anything, sub, event, core.DataArray{}) assert.NoError(t, err) assert.True(t, called) + + mcb.AssertExpectations(t) } func TestRequestReplyBadJSON(t *testing.T) { @@ -471,7 +473,11 @@ func TestRequestReplyBadJSON(t *testing.T) { server := httptest.NewServer(r) defer server.Close() - sub := &core.Subscription{} + sub := &core.Subscription{ + SubscriptionRef: core.SubscriptionRef{ + Namespace: "ns1", + }, + } to := sub.Options.TransportOptions() to["url"] = fmt.Sprintf("http://%s/myapi", server.Listener.Addr()) to["reply"] = true @@ -502,7 +508,10 @@ func TestRequestReplyBadJSON(t *testing.T) { err := wh.DeliveryRequest(mock.Anything, sub, event, core.DataArray{}) assert.NoError(t, err) + + mcb.AssertExpectations(t) } + func TestRequestReplyDataArrayBadStatusB64(t *testing.T) { wh, cancel := newTestWebHooks(t) defer cancel() @@ -723,6 +732,9 @@ func TestDeliveryRequestReplyToReply(t *testing.T) { yes := true sub := &core.Subscription{ + SubscriptionRef: core.SubscriptionRef{ + Namespace: "ns1", + }, Options: core.SubscriptionOptions{ SubscriptionCoreOptions: core.SubscriptionCoreOptions{ WithData: &yes, @@ -755,4 +767,6 @@ func TestDeliveryRequestReplyToReply(t *testing.T) { err := wh.DeliveryRequest(mock.Anything, sub, event, nil) assert.NoError(t, err) + + mcb.AssertExpectations(t) } diff --git a/internal/events/websockets/websockets.go b/internal/events/websockets/websockets.go index 5299564de6..cb555edc47 100644 --- a/internal/events/websockets/websockets.go +++ b/internal/events/websockets/websockets.go @@ -122,7 +122,7 @@ func (ws *WebSockets) start(wc *websocketConnection, start *core.WSStart) error return wc.durableSubMatcher(sr) }) } - return nil + return i18n.NewError(ws.ctx, coremsgs.MsgNamespaceNotExist) } func (ws *WebSockets) connClosed(connID string) { diff --git a/internal/events/websockets/websockets_test.go b/internal/events/websockets/websockets_test.go index ff7c9e2a10..9229c36e49 100644 --- a/internal/events/websockets/websockets_test.go +++ b/internal/events/websockets/websockets_test.go @@ -339,6 +339,19 @@ func TestAutoStartBadOptions(t *testing.T) { cbs.AssertExpectations(t) } +func TestAutoStartBadNamespace(t *testing.T) { + cbs := &eventsmocks.Callbacks{} + _, wsc, cancel := newTestWebsockets(t, cbs, "ephemeral", "namespace=ns2") + defer cancel() + + b := <-wsc.Receive() + var res core.WSError + err := json.Unmarshal(b, &res) + assert.NoError(t, err) + assert.Regexp(t, "FF10187", res.Error) + cbs.AssertExpectations(t) +} + func TestHandleAckWithAutoAck(t *testing.T) { eventUUID := fftypes.NewUUID() wsc := &websocketConnection{ diff --git a/internal/namespace/manager_test.go b/internal/namespace/manager_test.go index 4d882ef663..7e92abd77a 100644 --- a/internal/namespace/manager_test.go +++ b/internal/namespace/manager_test.go @@ -34,6 +34,7 @@ import ( "github.com/hyperledger/firefly/mocks/blockchainmocks" "github.com/hyperledger/firefly/mocks/databasemocks" "github.com/hyperledger/firefly/mocks/dataexchangemocks" + "github.com/hyperledger/firefly/mocks/eventsmocks" "github.com/hyperledger/firefly/mocks/identitymocks" "github.com/hyperledger/firefly/mocks/metricsmocks" "github.com/hyperledger/firefly/mocks/operationmocks" @@ -57,6 +58,7 @@ type testNamespaceManager struct { mdx *dataexchangemocks.Plugin mps *sharedstoragemocks.Plugin mti *tokenmocks.Plugin + mev *eventsmocks.Plugin } func (nm *testNamespaceManager) cleanup(t *testing.T) { @@ -83,6 +85,7 @@ func newTestNamespaceManager(resetConfig bool) *testNamespaceManager { mdx: &dataexchangemocks.Plugin{}, mps: &sharedstoragemocks.Plugin{}, mti: &tokenmocks.Plugin{}, + mev: &eventsmocks.Plugin{}, namespaceManager: namespaceManager{ ctx: context.Background(), namespaces: make(map[string]*namespace), @@ -107,6 +110,9 @@ func newTestNamespaceManager(resetConfig bool) *testNamespaceManager { nm.plugins.tokens = map[string]tokensPlugin{ "erc721": {plugin: nm.mti}, } + nm.plugins.events = map[string]eventsPlugin{ + "websockets": {plugin: nm.mev}, + } nm.namespaceManager.metrics = nm.mmi nm.namespaceManager.adminEvents = nm.mae return nm @@ -131,6 +137,7 @@ func TestInit(t *testing.T) { nm.mdx.On("Init", mock.Anything, mock.Anything).Return(nil) nm.mps.On("Init", mock.Anything, mock.Anything).Return(nil) nm.mti.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil) + nm.mev.On("Init", mock.Anything, mock.Anything).Return(nil) nm.mbi.On("NetworkVersion").Return(2) ctx, cancelCtx := context.WithCancel(context.Background()) @@ -222,6 +229,25 @@ func TestInitTokensFail(t *testing.T) { assert.EqualError(t, err, "pop") } +func TestInitEventsFail(t *testing.T) { + nm := newTestNamespaceManager(true) + defer nm.cleanup(t) + + nm.utOrchestrator = &orchestratormocks.Orchestrator{} + + nm.mdi.On("Init", mock.Anything, mock.Anything).Return(nil) + nm.mdi.On("RegisterListener", mock.Anything).Return() + nm.mbi.On("Init", mock.Anything, mock.Anything, nm.mmi).Return(nil) + nm.mdx.On("Init", mock.Anything, mock.Anything).Return(nil) + nm.mps.On("Init", mock.Anything, mock.Anything).Return(nil) + nm.mti.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil) + nm.mev.On("Init", mock.Anything, mock.Anything).Return(fmt.Errorf("pop")) + + ctx, cancelCtx := context.WithCancel(context.Background()) + err := nm.Init(ctx, cancelCtx) + assert.EqualError(t, err, "pop") +} + func TestInitOrchestratorFail(t *testing.T) { nm := newTestNamespaceManager(true) defer nm.cleanup(t) @@ -234,6 +260,7 @@ func TestInitOrchestratorFail(t *testing.T) { nm.mdx.On("Init", mock.Anything, mock.Anything).Return(nil) nm.mps.On("Init", mock.Anything, mock.Anything).Return(nil) nm.mti.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil) + nm.mev.On("Init", mock.Anything, mock.Anything).Return(nil) ctx, cancelCtx := context.WithCancel(context.Background()) err := nm.Init(ctx, cancelCtx) @@ -255,6 +282,7 @@ func TestInitVersion1(t *testing.T) { nm.mdx.On("Init", mock.Anything, mock.Anything).Return(nil) nm.mps.On("Init", mock.Anything, mock.Anything).Return(nil) nm.mti.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil) + nm.mev.On("Init", mock.Anything, mock.Anything).Return(nil) nm.mbi.On("NetworkVersion").Return(1) ctx, cancelCtx := context.WithCancel(context.Background()) @@ -283,6 +311,7 @@ func TestInitVersion1Fail(t *testing.T) { nm.mdx.On("Init", mock.Anything, mock.Anything).Return(nil) nm.mps.On("Init", mock.Anything, mock.Anything).Return(nil) nm.mti.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil) + nm.mev.On("Init", mock.Anything, mock.Anything).Return(nil) nm.mbi.On("NetworkVersion").Return(1) ctx, cancelCtx := context.WithCancel(context.Background()) @@ -646,6 +675,25 @@ func TestTokensPluginDuplicate(t *testing.T) { assert.Regexp(t, "FF10395", err) } +func TestEventsPluginDefaults(t *testing.T) { + nm := newTestNamespaceManager(true) + defer nm.cleanup(t) + nm.plugins.events = nil + plugins, err := nm.getEventPlugins(context.Background()) + assert.Equal(t, 3, len(plugins)) + assert.NoError(t, err) +} + +func TestEventsPluginBadType(t *testing.T) { + nm := newTestNamespaceManager(true) + defer nm.cleanup(t) + nm.plugins.events = nil + config.Set(coreconfig.EventTransportsEnabled, []string{"!unknown!"}) + ctx, cancelCtx := context.WithCancel(context.Background()) + err := nm.Init(ctx, cancelCtx) + assert.Error(t, err) +} + func TestInitBadNamespace(t *testing.T) { nm := newTestNamespaceManager(true) defer nm.cleanup(t) @@ -658,6 +706,7 @@ func TestInitBadNamespace(t *testing.T) { nm.mdx.On("Init", mock.Anything, mock.Anything).Return(nil) nm.mps.On("Init", mock.Anything, mock.Anything).Return(nil) nm.mti.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil) + nm.mev.On("Init", mock.Anything, mock.Anything).Return(nil) viper.SetConfigType("yaml") err := viper.ReadConfig(strings.NewReader(` From e6c6d4edc0f77ecfc1bb4b1a99ee836b99ba9f23 Mon Sep 17 00:00:00 2001 From: Andrew Richardson Date: Mon, 20 Jun 2022 16:22:47 -0400 Subject: [PATCH 3/3] Replace some "listener" terminology with "handler" Signed-off-by: Andrew Richardson --- internal/blockchain/ethereum/ethereum.go | 14 +++---- internal/blockchain/ethereum/ethereum_test.go | 18 ++++----- internal/blockchain/fabric/fabric.go | 14 +++---- internal/blockchain/fabric/fabric_test.go | 38 +++++++++---------- internal/database/postgres/postgres.go | 4 +- internal/database/postgres/postgres_test.go | 2 +- .../sqlcommon/contractapis_sql_test.go | 2 +- .../database/sqlcommon/provider_mock_test.go | 2 +- .../sqlcommon/provider_sqlitego_test.go | 2 +- internal/database/sqlcommon/sqlcommon.go | 16 ++++---- .../database/sqlcommon/tokenpool_sql_test.go | 2 +- internal/database/sqlite3/sqlite3.go | 4 +- internal/database/sqlite3/sqlite3_test.go | 2 +- internal/dataexchange/ffdx/ffdx.go | 8 ++-- internal/dataexchange/ffdx/ffdx_test.go | 10 ++--- internal/events/event_manager_test.go | 6 +-- internal/events/subscription_manager.go | 2 +- internal/events/system/events.go | 6 +-- internal/events/system/events_test.go | 2 +- internal/events/webhooks/webhooks.go | 6 +-- internal/events/webhooks/webhooks_test.go | 2 +- internal/events/websockets/websockets.go | 4 +- internal/events/websockets/websockets_test.go | 2 +- internal/identity/tbd/tbd.go | 2 +- internal/namespace/manager.go | 2 +- internal/namespace/manager_test.go | 22 +++++------ internal/orchestrator/orchestrator.go | 10 ++--- internal/orchestrator/orchestrator_test.go | 24 ++++++------ internal/sharedstorage/ipfs/ipfs.go | 2 +- internal/tokens/fftokens/fftokens.go | 22 +++++------ internal/tokens/fftokens/fftokens_test.go | 14 +++---- mocks/blockchainmocks/plugin.go | 6 +-- mocks/databasemocks/plugin.go | 10 ++--- mocks/dataexchangemocks/plugin.go | 10 ++--- mocks/eventsmocks/plugin.go | 8 ++-- mocks/identitymocks/plugin.go | 6 +-- mocks/sharedstoragemocks/plugin.go | 6 +-- mocks/tokenmocks/plugin.go | 8 ++-- pkg/blockchain/plugin.go | 4 +- pkg/database/plugin.go | 4 +- pkg/dataexchange/plugin.go | 4 +- pkg/events/plugin.go | 4 +- pkg/identity/plugin.go | 4 +- pkg/sharedstorage/plugin.go | 4 +- pkg/tokens/plugin.go | 4 +- 45 files changed, 174 insertions(+), 174 deletions(-) diff --git a/internal/blockchain/ethereum/ethereum.go b/internal/blockchain/ethereum/ethereum.go index 74389618a7..4474b9dba5 100644 --- a/internal/blockchain/ethereum/ethereum.go +++ b/internal/blockchain/ethereum/ethereum.go @@ -81,17 +81,17 @@ type Ethereum struct { } type callbacks struct { - listeners []blockchain.Callbacks + handlers []blockchain.Callbacks } func (cb *callbacks) BlockchainOpUpdate(plugin blockchain.Plugin, nsOpID string, txState blockchain.TransactionStatus, blockchainTXID, errorMessage string, opOutput fftypes.JSONObject) { - for _, cb := range cb.listeners { + for _, cb := range cb.handlers { cb.BlockchainOpUpdate(plugin, nsOpID, txState, blockchainTXID, errorMessage, opOutput) } } func (cb *callbacks) BatchPinComplete(batch *blockchain.BatchPin, signingKey *core.VerifierRef) error { - for _, cb := range cb.listeners { + for _, cb := range cb.handlers { if err := cb.BatchPinComplete(batch, signingKey); err != nil { return err } @@ -100,7 +100,7 @@ func (cb *callbacks) BatchPinComplete(batch *blockchain.BatchPin, signingKey *co } func (cb *callbacks) BlockchainNetworkAction(action string, event *blockchain.Event, signingKey *core.VerifierRef) error { - for _, cb := range cb.listeners { + for _, cb := range cb.handlers { if err := cb.BlockchainNetworkAction(action, event, signingKey); err != nil { return err } @@ -109,7 +109,7 @@ func (cb *callbacks) BlockchainNetworkAction(action string, event *blockchain.Ev } func (cb *callbacks) BlockchainEvent(event *blockchain.EventWithSubscription) error { - for _, cb := range cb.listeners { + for _, cb := range cb.handlers { if err := cb.BlockchainEvent(event); err != nil { return err } @@ -247,8 +247,8 @@ func (e *Ethereum) Init(ctx context.Context, config config.Section, metrics metr return nil } -func (e *Ethereum) RegisterListener(listener blockchain.Callbacks) { - e.callbacks.listeners = append(e.callbacks.listeners, listener) +func (e *Ethereum) SetHandler(handler blockchain.Callbacks) { + e.callbacks.handlers = append(e.callbacks.handlers, handler) } func (e *Ethereum) Start() (err error) { diff --git a/internal/blockchain/ethereum/ethereum_test.go b/internal/blockchain/ethereum/ethereum_test.go index df6e2d754e..7622918bff 100644 --- a/internal/blockchain/ethereum/ethereum_test.go +++ b/internal/blockchain/ethereum/ethereum_test.go @@ -1096,7 +1096,7 @@ func TestHandleMessageBatchPinOK(t *testing.T) { em := &blockchainmocks.Callbacks{} e := &Ethereum{ - callbacks: callbacks{listeners: []blockchain.Callbacks{em}}, + callbacks: callbacks{handlers: []blockchain.Callbacks{em}}, } e.fireflyContract.subscription = "sb-b5b97a4e-a317-4053-6400-1474650efcb5" e.fireflyContract.networkVersion = 1 @@ -1219,7 +1219,7 @@ func TestHandleMessageEmptyPayloadRef(t *testing.T) { em := &blockchainmocks.Callbacks{} e := &Ethereum{ - callbacks: callbacks{listeners: []blockchain.Callbacks{em}}, + callbacks: callbacks{handlers: []blockchain.Callbacks{em}}, } e.fireflyContract.subscription = "sb-b5b97a4e-a317-4053-6400-1474650efcb5" e.fireflyContract.networkVersion = 1 @@ -1281,7 +1281,7 @@ func TestHandleMessageBatchPinExit(t *testing.T) { em := &blockchainmocks.Callbacks{} e := &Ethereum{ - callbacks: callbacks{listeners: []blockchain.Callbacks{em}}, + callbacks: callbacks{handlers: []blockchain.Callbacks{em}}, } e.fireflyContract.subscription = "sb-b5b97a4e-a317-4053-6400-1474650efcb5" e.fireflyContract.networkVersion = 1 @@ -1506,7 +1506,7 @@ func TestHandleReceiptTXSuccess(t *testing.T) { e := &Ethereum{ ctx: context.Background(), topic: "topic1", - callbacks: callbacks{listeners: []blockchain.Callbacks{em}}, + callbacks: callbacks{handlers: []blockchain.Callbacks{em}}, wsconn: wsm, } @@ -1576,7 +1576,7 @@ func TestHandleBadPayloadsAndThenReceiptFailure(t *testing.T) { }`) em := &blockchainmocks.Callbacks{} - e.RegisterListener(em) + e.SetHandler(em) txsu := em.On("BlockchainOpUpdate", e, "ns1:"+operationID.String(), @@ -1821,7 +1821,7 @@ func TestHandleMessageContractEvent(t *testing.T) { em := &blockchainmocks.Callbacks{} e := &Ethereum{ - callbacks: callbacks{listeners: []blockchain.Callbacks{em}}, + callbacks: callbacks{handlers: []blockchain.Callbacks{em}}, } e.fireflyContract.subscription = "sb-b5b97a4e-a317-4053-6400-1474650efcb5" @@ -1883,7 +1883,7 @@ func TestHandleMessageContractEventError(t *testing.T) { em := &blockchainmocks.Callbacks{} e := &Ethereum{ - callbacks: callbacks{listeners: []blockchain.Callbacks{em}}, + callbacks: callbacks{handlers: []blockchain.Callbacks{em}}, } e.fireflyContract.subscription = "sb-b5b97a4e-a317-4053-6400-1474650efcb5" @@ -3075,7 +3075,7 @@ func TestHandleNetworkAction(t *testing.T) { em := &blockchainmocks.Callbacks{} e := &Ethereum{ - callbacks: callbacks{listeners: []blockchain.Callbacks{em}}, + callbacks: callbacks{handlers: []blockchain.Callbacks{em}}, } e.fireflyContract.subscription = "sb-b5b97a4e-a317-4053-6400-1474650efcb5" @@ -3121,7 +3121,7 @@ func TestHandleNetworkActionFail(t *testing.T) { em := &blockchainmocks.Callbacks{} e := &Ethereum{ - callbacks: callbacks{listeners: []blockchain.Callbacks{em}}, + callbacks: callbacks{handlers: []blockchain.Callbacks{em}}, } e.fireflyContract.subscription = "sb-b5b97a4e-a317-4053-6400-1474650efcb5" diff --git a/internal/blockchain/fabric/fabric.go b/internal/blockchain/fabric/fabric.go index efea17ce18..353e408124 100644 --- a/internal/blockchain/fabric/fabric.go +++ b/internal/blockchain/fabric/fabric.go @@ -72,17 +72,17 @@ type Fabric struct { } type callbacks struct { - listeners []blockchain.Callbacks + handlers []blockchain.Callbacks } func (cb *callbacks) BlockchainOpUpdate(plugin blockchain.Plugin, nsOpID string, txState blockchain.TransactionStatus, blockchainTXID, errorMessage string, opOutput fftypes.JSONObject) { - for _, cb := range cb.listeners { + for _, cb := range cb.handlers { cb.BlockchainOpUpdate(plugin, nsOpID, txState, blockchainTXID, errorMessage, opOutput) } } func (cb *callbacks) BatchPinComplete(batch *blockchain.BatchPin, signingKey *core.VerifierRef) error { - for _, cb := range cb.listeners { + for _, cb := range cb.handlers { if err := cb.BatchPinComplete(batch, signingKey); err != nil { return err } @@ -91,7 +91,7 @@ func (cb *callbacks) BatchPinComplete(batch *blockchain.BatchPin, signingKey *co } func (cb *callbacks) BlockchainNetworkAction(action string, event *blockchain.Event, signingKey *core.VerifierRef) error { - for _, cb := range cb.listeners { + for _, cb := range cb.handlers { if err := cb.BlockchainNetworkAction(action, event, signingKey); err != nil { return err } @@ -100,7 +100,7 @@ func (cb *callbacks) BlockchainNetworkAction(action string, event *blockchain.Ev } func (cb *callbacks) BlockchainEvent(event *blockchain.EventWithSubscription) error { - for _, cb := range cb.listeners { + for _, cb := range cb.handlers { if err := cb.BlockchainEvent(event); err != nil { return err } @@ -322,8 +322,8 @@ func (f *Fabric) TerminateContract(ctx context.Context, contracts *core.FireFlyC return f.ConfigureContract(ctx, contracts) } -func (f *Fabric) RegisterListener(listener blockchain.Callbacks) { - f.callbacks.listeners = append(f.callbacks.listeners, listener) +func (f *Fabric) SetHandler(handler blockchain.Callbacks) { + f.callbacks.handlers = append(f.callbacks.handlers, handler) } func (f *Fabric) Start() (err error) { diff --git a/internal/blockchain/fabric/fabric_test.go b/internal/blockchain/fabric/fabric_test.go index e8ea554b67..8808cad7fd 100644 --- a/internal/blockchain/fabric/fabric_test.go +++ b/internal/blockchain/fabric/fabric_test.go @@ -943,7 +943,7 @@ func TestHandleMessageBatchPinOK(t *testing.T) { em := &blockchainmocks.Callbacks{} e := &Fabric{ - callbacks: callbacks{listeners: []blockchain.Callbacks{em}}, + callbacks: callbacks{handlers: []blockchain.Callbacks{em}}, } e.fireflyContract.subscription = "sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e" @@ -990,7 +990,7 @@ func TestHandleMessageEmptyPayloadRef(t *testing.T) { em := &blockchainmocks.Callbacks{} e := &Fabric{ - callbacks: callbacks{listeners: []blockchain.Callbacks{em}}, + callbacks: callbacks{handlers: []blockchain.Callbacks{em}}, } e.fireflyContract.subscription = "sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e" @@ -1037,7 +1037,7 @@ func TestHandleMessageBatchPinExit(t *testing.T) { em := &blockchainmocks.Callbacks{} e := &Fabric{ - callbacks: callbacks{listeners: []blockchain.Callbacks{em}}, + callbacks: callbacks{handlers: []blockchain.Callbacks{em}}, } e.fireflyContract.subscription = "sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e" @@ -1070,7 +1070,7 @@ func TestHandleMessageBatchPinEmpty(t *testing.T) { em := &blockchainmocks.Callbacks{} e := &Fabric{ - callbacks: callbacks{listeners: []blockchain.Callbacks{em}}, + callbacks: callbacks{handlers: []blockchain.Callbacks{em}}, } e.fireflyContract.subscription = "sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e" @@ -1096,7 +1096,7 @@ func TestHandleMessageUnknownEventName(t *testing.T) { em := &blockchainmocks.Callbacks{} e := &Fabric{ - callbacks: callbacks{listeners: []blockchain.Callbacks{em}}, + callbacks: callbacks{handlers: []blockchain.Callbacks{em}}, } e.fireflyContract.subscription = "sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e" @@ -1111,7 +1111,7 @@ func TestHandleMessageUnknownEventName(t *testing.T) { func TestHandleMessageBatchPinBadBatchHash(t *testing.T) { em := &blockchainmocks.Callbacks{} e := &Fabric{ - callbacks: callbacks{listeners: []blockchain.Callbacks{em}}, + callbacks: callbacks{handlers: []blockchain.Callbacks{em}}, } e.fireflyContract.subscription = "sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e" data := []byte(`[{ @@ -1133,7 +1133,7 @@ func TestHandleMessageBatchPinBadBatchHash(t *testing.T) { func TestHandleMessageBatchPinBadPin(t *testing.T) { em := &blockchainmocks.Callbacks{} e := &Fabric{ - callbacks: callbacks{listeners: []blockchain.Callbacks{em}}, + callbacks: callbacks{handlers: []blockchain.Callbacks{em}}, } e.fireflyContract.subscription = "sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e" data := []byte(`[{ @@ -1155,7 +1155,7 @@ func TestHandleMessageBatchPinBadPin(t *testing.T) { func TestHandleMessageBatchPinBadPayloadEncoding(t *testing.T) { em := &blockchainmocks.Callbacks{} e := &Fabric{ - callbacks: callbacks{listeners: []blockchain.Callbacks{em}}, + callbacks: callbacks{handlers: []blockchain.Callbacks{em}}, } e.fireflyContract.subscription = "sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e" data := []byte(`[{ @@ -1177,7 +1177,7 @@ func TestHandleMessageBatchPinBadPayloadEncoding(t *testing.T) { func TestHandleMessageBatchPinBadPayloadUUIDs(t *testing.T) { em := &blockchainmocks.Callbacks{} e := &Fabric{ - callbacks: callbacks{listeners: []blockchain.Callbacks{em}}, + callbacks: callbacks{handlers: []blockchain.Callbacks{em}}, } e.fireflyContract.subscription = "sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e" data := []byte(`[{ @@ -1199,7 +1199,7 @@ func TestHandleMessageBatchPinBadPayloadUUIDs(t *testing.T) { func TestHandleMessageBatchBadJSON(t *testing.T) { em := &blockchainmocks.Callbacks{} e := &Fabric{ - callbacks: callbacks{listeners: []blockchain.Callbacks{em}}, + callbacks: callbacks{handlers: []blockchain.Callbacks{em}}, } err := e.handleMessageBatch(context.Background(), []interface{}{10, 20}) assert.NoError(t, err) @@ -1270,7 +1270,7 @@ func TestEventLoopUnexpectedMessage(t *testing.T) { "requestPayload": "{\"from\":\"0x91d2b4381a4cd5c7c0f27565a7d4b829844c8635\",\"gas\":0,\"gasPrice\":0,\"headers\":{\"id\":\"6fb94fff-81d3-4094-567d-e031b1871694\",\"type\":\"SendTransaction\"},\"method\":{\"inputs\":[{\"internalType\":\"bytes32\",\"name\":\"txnId\",\"type\":\"bytes32\"},{\"internalType\":\"bytes32\",\"name\":\"batchId\",\"type\":\"bytes32\"},{\"internalType\":\"bytes32\",\"name\":\"payloadRef\",\"type\":\"bytes32\"}],\"name\":\"broadcastBatch\",\"outputs\":[],\"stateMutability\":\"nonpayable\",\"type\":\"function\"},\"params\":[\"12345\",\"!\",\"!\"],\"to\":\"0xd3266a857285fb75eb7df37353b4a15c8bb828f5\",\"value\":0}" }`) em := &blockchainmocks.Callbacks{} - e.RegisterListener(em) + e.SetHandler(em) txsu := em.On("BlockchainOpUpdate", e, "ns1:"+operationID.String(), @@ -1296,7 +1296,7 @@ func TestHandleReceiptTXSuccess(t *testing.T) { e := &Fabric{ ctx: context.Background(), topic: "topic1", - callbacks: callbacks{listeners: []blockchain.Callbacks{em}}, + callbacks: callbacks{handlers: []blockchain.Callbacks{em}}, wsconn: wsm, } @@ -1336,7 +1336,7 @@ func TestHandleReceiptNoRequestID(t *testing.T) { e := &Fabric{ ctx: context.Background(), topic: "topic1", - callbacks: callbacks{listeners: []blockchain.Callbacks{em}}, + callbacks: callbacks{handlers: []blockchain.Callbacks{em}}, wsconn: wsm, } @@ -1353,7 +1353,7 @@ func TestHandleReceiptFailedTx(t *testing.T) { e := &Fabric{ ctx: context.Background(), topic: "topic1", - callbacks: callbacks{listeners: []blockchain.Callbacks{em}}, + callbacks: callbacks{handlers: []blockchain.Callbacks{em}}, wsconn: wsm, } @@ -1545,7 +1545,7 @@ func TestHandleMessageContractEvent(t *testing.T) { em := &blockchainmocks.Callbacks{} e := &Fabric{ - callbacks: callbacks{listeners: []blockchain.Callbacks{em}}, + callbacks: callbacks{handlers: []blockchain.Callbacks{em}}, } e.fireflyContract.subscription = "sb-b5b97a4e-a317-4053-6400-1474650efcb5" @@ -1603,7 +1603,7 @@ func TestHandleMessageContractEventBadPayload(t *testing.T) { em := &blockchainmocks.Callbacks{} e := &Fabric{ - callbacks: callbacks{listeners: []blockchain.Callbacks{em}}, + callbacks: callbacks{handlers: []blockchain.Callbacks{em}}, } e.fireflyContract.subscription = "sb-b5b97a4e-a317-4053-6400-1474650efcb5" @@ -1631,7 +1631,7 @@ func TestHandleMessageContractEventError(t *testing.T) { em := &blockchainmocks.Callbacks{} e := &Fabric{ - callbacks: callbacks{listeners: []blockchain.Callbacks{em}}, + callbacks: callbacks{handlers: []blockchain.Callbacks{em}}, } e.fireflyContract.subscription = "sb-b5b97a4e-a317-4053-6400-1474650efcb5" @@ -2030,7 +2030,7 @@ func TestHandleNetworkAction(t *testing.T) { em := &blockchainmocks.Callbacks{} e := &Fabric{ - callbacks: callbacks{listeners: []blockchain.Callbacks{em}}, + callbacks: callbacks{handlers: []blockchain.Callbacks{em}}, } e.fireflyContract.subscription = "sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e" @@ -2068,7 +2068,7 @@ func TestHandleNetworkActionFail(t *testing.T) { em := &blockchainmocks.Callbacks{} e := &Fabric{ - callbacks: callbacks{listeners: []blockchain.Callbacks{em}}, + callbacks: callbacks{handlers: []blockchain.Callbacks{em}}, } e.fireflyContract.subscription = "sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e" diff --git a/internal/database/postgres/postgres.go b/internal/database/postgres/postgres.go index a047ccd859..0277d13012 100644 --- a/internal/database/postgres/postgres.go +++ b/internal/database/postgres/postgres.go @@ -42,8 +42,8 @@ func (psql *Postgres) Init(ctx context.Context, config config.Section) error { return psql.SQLCommon.Init(ctx, psql, config, capabilities) } -func (psql *Postgres) RegisterListener(listener database.Callbacks) { - psql.SQLCommon.RegisterListener(listener) +func (psql *Postgres) SetHandler(handler database.Callbacks) { + psql.SQLCommon.SetHandler(handler) } func (psql *Postgres) Name() string { diff --git a/internal/database/postgres/postgres_test.go b/internal/database/postgres/postgres_test.go index ffc78365fb..dea0124367 100644 --- a/internal/database/postgres/postgres_test.go +++ b/internal/database/postgres/postgres_test.go @@ -29,7 +29,7 @@ import ( func TestPostgresProvider(t *testing.T) { psql := &Postgres{} - psql.RegisterListener(&databasemocks.Callbacks{}) + psql.SetHandler(&databasemocks.Callbacks{}) config := config.RootSection("unittest") psql.InitConfig(config) config.Set(sqlcommon.SQLConfDatasourceURL, "!bad connection") diff --git a/internal/database/sqlcommon/contractapis_sql_test.go b/internal/database/sqlcommon/contractapis_sql_test.go index 9a8868697f..154fb681a4 100644 --- a/internal/database/sqlcommon/contractapis_sql_test.go +++ b/internal/database/sqlcommon/contractapis_sql_test.go @@ -126,7 +126,7 @@ func TestContractAPIDBFailUpdate(t *testing.T) { func TestUpsertContractAPIIDMismatch(t *testing.T) { s, db := newMockProvider().init() callbacks := &databasemocks.Callbacks{} - s.RegisterListener(callbacks) + s.SetHandler(callbacks) apiID := fftypes.NewUUID() api := &core.ContractAPI{ ID: apiID, diff --git a/internal/database/sqlcommon/provider_mock_test.go b/internal/database/sqlcommon/provider_mock_test.go index 9671c1a05b..91a6a27519 100644 --- a/internal/database/sqlcommon/provider_mock_test.go +++ b/internal/database/sqlcommon/provider_mock_test.go @@ -64,7 +64,7 @@ func newMockProvider() *mockProvider { // init is a convenience to init for tests that aren't testing init itself func (mp *mockProvider) init() (*mockProvider, sqlmock.Sqlmock) { _ = mp.Init(context.Background(), mp, mp.config, mp.capabilities) - mp.RegisterListener(mp.callbacks) + mp.SetHandler(mp.callbacks) return mp, mp.mdb } diff --git a/internal/database/sqlcommon/provider_sqlitego_test.go b/internal/database/sqlcommon/provider_sqlitego_test.go index be0ea34939..1b6a643601 100644 --- a/internal/database/sqlcommon/provider_sqlitego_test.go +++ b/internal/database/sqlcommon/provider_sqlitego_test.go @@ -65,7 +65,7 @@ func newSQLiteTestProvider(t *testing.T) (*sqliteGoTestProvider, func()) { err = tp.Init(context.Background(), tp, tp.config, tp.capabilities) assert.NoError(tp.t, err) - tp.RegisterListener(tp.callbacks) + tp.SetHandler(tp.callbacks) return tp, func() { tp.Close() diff --git a/internal/database/sqlcommon/sqlcommon.go b/internal/database/sqlcommon/sqlcommon.go index 770be69730..311748277f 100644 --- a/internal/database/sqlcommon/sqlcommon.go +++ b/internal/database/sqlcommon/sqlcommon.go @@ -45,35 +45,35 @@ type SQLCommon struct { } type callbacks struct { - listeners []database.Callbacks + handlers []database.Callbacks } func (cb *callbacks) OrderedUUIDCollectionNSEvent(resType database.OrderedUUIDCollectionNS, eventType core.ChangeEventType, ns string, id *fftypes.UUID, sequence int64) { - for _, cb := range cb.listeners { + for _, cb := range cb.handlers { cb.OrderedUUIDCollectionNSEvent(resType, eventType, ns, id, sequence) } } func (cb *callbacks) OrderedCollectionNSEvent(resType database.OrderedCollectionNS, eventType core.ChangeEventType, ns string, sequence int64) { - for _, cb := range cb.listeners { + for _, cb := range cb.handlers { cb.OrderedCollectionNSEvent(resType, eventType, ns, sequence) } } func (cb *callbacks) UUIDCollectionNSEvent(resType database.UUIDCollectionNS, eventType core.ChangeEventType, ns string, id *fftypes.UUID) { - for _, cb := range cb.listeners { + for _, cb := range cb.handlers { cb.UUIDCollectionNSEvent(resType, eventType, ns, id) } } func (cb *callbacks) UUIDCollectionEvent(resType database.UUIDCollection, eventType core.ChangeEventType, id *fftypes.UUID) { - for _, cb := range cb.listeners { + for _, cb := range cb.handlers { cb.UUIDCollectionEvent(resType, eventType, id) } } func (cb *callbacks) HashCollectionNSEvent(resType database.HashCollectionNS, eventType core.ChangeEventType, ns string, hash *fftypes.Bytes32) { - for _, cb := range cb.listeners { + for _, cb := range cb.handlers { cb.HashCollectionNSEvent(resType, eventType, ns, hash) } } @@ -130,8 +130,8 @@ func (s *SQLCommon) Init(ctx context.Context, provider Provider, config config.S return nil } -func (s *SQLCommon) RegisterListener(listener database.Callbacks) { - s.callbacks.listeners = append(s.callbacks.listeners, listener) +func (s *SQLCommon) SetHandler(handler database.Callbacks) { + s.callbacks.handlers = append(s.callbacks.handlers, handler) } func (s *SQLCommon) Capabilities() *database.Capabilities { return s.capabilities } diff --git a/internal/database/sqlcommon/tokenpool_sql_test.go b/internal/database/sqlcommon/tokenpool_sql_test.go index db12d4bdf1..140f52a6fc 100644 --- a/internal/database/sqlcommon/tokenpool_sql_test.go +++ b/internal/database/sqlcommon/tokenpool_sql_test.go @@ -176,7 +176,7 @@ func TestUpsertTokenPoolFailCommit(t *testing.T) { func TestUpsertTokenPoolUpdateIDMismatch(t *testing.T) { s, db := newMockProvider().init() callbacks := &databasemocks.Callbacks{} - s.RegisterListener(callbacks) + s.SetHandler(callbacks) poolID := fftypes.NewUUID() pool := &core.TokenPool{ ID: poolID, diff --git a/internal/database/sqlite3/sqlite3.go b/internal/database/sqlite3/sqlite3.go index d0eafb3d42..a8402cb90e 100644 --- a/internal/database/sqlite3/sqlite3.go +++ b/internal/database/sqlite3/sqlite3.go @@ -58,8 +58,8 @@ func (sqlite *SQLite3) Init(ctx context.Context, config config.Section) error { return sqlite.SQLCommon.Init(ctx, sqlite, config, capabilities) } -func (sqlite *SQLite3) RegisterListener(listener database.Callbacks) { - sqlite.SQLCommon.RegisterListener(listener) +func (sqlite *SQLite3) SetHandler(handler database.Callbacks) { + sqlite.SQLCommon.SetHandler(handler) } func (sqlite *SQLite3) Name() string { diff --git a/internal/database/sqlite3/sqlite3_test.go b/internal/database/sqlite3/sqlite3_test.go index 78e5aa0a05..cac1659dab 100644 --- a/internal/database/sqlite3/sqlite3_test.go +++ b/internal/database/sqlite3/sqlite3_test.go @@ -32,7 +32,7 @@ import ( func TestSQLite3GoProvider(t *testing.T) { sqlite := &SQLite3{} - sqlite.RegisterListener(&databasemocks.Callbacks{}) + sqlite.SetHandler(&databasemocks.Callbacks{}) config := config.RootSection("unittest") sqlite.InitConfig(config) config.Set(sqlcommon.SQLConfDatasourceURL, "!wrong://") diff --git a/internal/dataexchange/ffdx/ffdx.go b/internal/dataexchange/ffdx/ffdx.go index eed9b14b79..e4359d4e04 100644 --- a/internal/dataexchange/ffdx/ffdx.go +++ b/internal/dataexchange/ffdx/ffdx.go @@ -50,11 +50,11 @@ type FFDX struct { } type callbacks struct { - listeners []dataexchange.Callbacks + handlers []dataexchange.Callbacks } func (cb *callbacks) DXEvent(event dataexchange.DXEvent) { - for _, cb := range cb.listeners { + for _, cb := range cb.handlers { cb.DXEvent(event) } } @@ -148,8 +148,8 @@ func (h *FFDX) SetNodes(nodes []fftypes.JSONObject) { h.nodes = nodes } -func (h *FFDX) RegisterListener(listener dataexchange.Callbacks) { - h.callbacks.listeners = append(h.callbacks.listeners, listener) +func (h *FFDX) SetHandler(handler dataexchange.Callbacks) { + h.callbacks.handlers = append(h.callbacks.handlers, handler) } func (h *FFDX) Start() error { diff --git a/internal/dataexchange/ffdx/ffdx_test.go b/internal/dataexchange/ffdx/ffdx_test.go index 25a37476dd..23e7889070 100644 --- a/internal/dataexchange/ffdx/ffdx_test.go +++ b/internal/dataexchange/ffdx/ffdx_test.go @@ -443,7 +443,7 @@ func TestEvents(t *testing.T) { assert.Equal(t, `{"action":"ack","id":"0"}`, string(msg)) mcb := &dataexchangemocks.Callbacks{} - h.RegisterListener(mcb) + h.SetHandler(mcb) mcb.On("DXEvent", mock.MatchedBy(func(ev dataexchange.DXEvent) bool { return ev.NamespacedID() == "1" && @@ -557,7 +557,7 @@ func TestEventsWithManifest(t *testing.T) { assert.Equal(t, `{"action":"ack","id":"0"}`, string(msg)) mcb := &dataexchangemocks.Callbacks{} - h.RegisterListener(mcb) + h.SetHandler(mcb) mcb.On("DXEvent", mock.MatchedBy(func(ev dataexchange.DXEvent) bool { return ev.NamespacedID() == "1" && @@ -585,7 +585,7 @@ func TestEventLoopReceiveClosed(t *testing.T) { wsm := &wsmocks.WSClient{} h := &FFDX{ ctx: context.Background(), - callbacks: callbacks{listeners: []dataexchange.Callbacks{dxc}}, + callbacks: callbacks{handlers: []dataexchange.Callbacks{dxc}}, wsconn: wsm, } r := make(chan []byte) @@ -601,7 +601,7 @@ func TestEventLoopSendClosed(t *testing.T) { ctx, cancelCtx := context.WithCancel(context.Background()) h := &FFDX{ ctx: ctx, - callbacks: callbacks{listeners: []dataexchange.Callbacks{dxc}}, + callbacks: callbacks{handlers: []dataexchange.Callbacks{dxc}}, wsconn: wsm, ackChannel: make(chan *ack, 1), } @@ -622,7 +622,7 @@ func TestEventLoopClosedContext(t *testing.T) { cancel() h := &FFDX{ ctx: ctx, - callbacks: callbacks{listeners: []dataexchange.Callbacks{dxc}}, + callbacks: callbacks{handlers: []dataexchange.Callbacks{dxc}}, wsconn: wsm, } r := make(chan []byte, 1) diff --git a/internal/events/event_manager_test.go b/internal/events/event_manager_test.go index f9dc645236..94b26d92ac 100644 --- a/internal/events/event_manager_test.go +++ b/internal/events/event_manager_test.go @@ -92,7 +92,7 @@ func newTestEventManagerCommon(t *testing.T, metrics, dbconcurrency bool) (*even met.On("Name").Return("ut").Maybe() mbi.On("VerifierType").Return(core.VerifierTypeEthAddress).Maybe() mdi.On("Capabilities").Return(&database.Capabilities{Concurrency: dbconcurrency}).Maybe() - mev.On("RegisterListener", "ns1", mock.Anything).Return(nil).Maybe() + mev.On("SetHandler", "ns1", mock.Anything).Return(nil).Maybe() mev.On("ValidateOptions", mock.Anything).Return(nil).Maybe() emi, err := NewEventManager(ctx, "ns1", mni, mpi, mdi, mbi, mim, msh, mdm, mbm, mpm, mam, mdd, mmi, txHelper, events) em := emi.(*eventManager) @@ -154,7 +154,7 @@ func TestStartStopEventListenerFail(t *testing.T) { txHelper := txcommon.NewTransactionHelper("ns1", mdi, mdm) mdi.On("Capabilities").Return(&database.Capabilities{Concurrency: false}) mbi.On("VerifierType").Return(core.VerifierTypeEthAddress) - mev.On("RegisterListener", "ns1", mock.Anything).Return(fmt.Errorf("pop")) + mev.On("SetHandler", "ns1", mock.Anything).Return(fmt.Errorf("pop")) _, err := NewEventManager(context.Background(), "ns1", mni, mpi, mdi, mbi, mim, msh, mdm, mbm, mpm, mam, msd, mm, txHelper, events) assert.EqualError(t, err, "pop") } @@ -407,7 +407,7 @@ func TestAddInternalListener(t *testing.T) { conf := config.RootSection("ut.events") ie.InitConfig(conf) ie.Init(em.ctx, conf) - ie.RegisterListener("ns1", cbs) + ie.SetHandler("ns1", cbs) em.internalEvents = ie defer cancel() err := em.AddSystemEventListener("ns1", func(event *core.EventDelivery) error { return nil }) diff --git a/internal/events/subscription_manager.go b/internal/events/subscription_manager.go index 78d04df362..15b6f462d5 100644 --- a/internal/events/subscription_manager.go +++ b/internal/events/subscription_manager.go @@ -117,7 +117,7 @@ func newSubscriptionManager(ctx context.Context, ns string, di database.Plugin, } for _, ei := range sm.transports { - if err := ei.RegisterListener(sm.namespace, &boundCallbacks{sm: sm, ei: ei}); err != nil { + if err := ei.SetHandler(sm.namespace, &boundCallbacks{sm: sm, ei: ei}); err != nil { return nil, err } } diff --git a/internal/events/system/events.go b/internal/events/system/events.go index d0827004e3..50f241d8d5 100644 --- a/internal/events/system/events.go +++ b/internal/events/system/events.go @@ -58,10 +58,10 @@ func (se *Events) Init(ctx context.Context, config config.Section) (err error) { return nil } -func (se *Events) RegisterListener(namespace string, listener events.Callbacks) error { - se.callbacks[namespace] = listener +func (se *Events) SetHandler(namespace string, handler events.Callbacks) error { + se.callbacks[namespace] = handler // We have a single logical connection, that matches all subscriptions - return listener.RegisterConnection(se.connID, func(sr core.SubscriptionRef) bool { return true }) + return handler.RegisterConnection(se.connID, func(sr core.SubscriptionRef) bool { return true }) } func (se *Events) Capabilities() *events.Capabilities { diff --git a/internal/events/system/events_test.go b/internal/events/system/events_test.go index 7e9b526927..65aab44b0b 100644 --- a/internal/events/system/events_test.go +++ b/internal/events/system/events_test.go @@ -43,7 +43,7 @@ func newTestEvents(t *testing.T) (se *Events, cancel func()) { config := config.RootSection("ut.events") se.InitConfig(config) se.Init(ctx, config) - se.RegisterListener("ns1", cbs) + se.SetHandler("ns1", cbs) assert.Equal(t, "system", se.Name()) assert.NotNil(t, se.Capabilities()) assert.Nil(t, se.ValidateOptions(&core.SubscriptionOptions{})) diff --git a/internal/events/webhooks/webhooks.go b/internal/events/webhooks/webhooks.go index e51128e53c..0cb284af62 100644 --- a/internal/events/webhooks/webhooks.go +++ b/internal/events/webhooks/webhooks.go @@ -73,10 +73,10 @@ func (wh *WebHooks) Init(ctx context.Context, config config.Section) (err error) return nil } -func (wh *WebHooks) RegisterListener(namespace string, listener events.Callbacks) error { - wh.callbacks[namespace] = listener +func (wh *WebHooks) SetHandler(namespace string, handler events.Callbacks) error { + wh.callbacks[namespace] = handler // We have a single logical connection, that matches all subscriptions - return listener.RegisterConnection(wh.connID, func(sr core.SubscriptionRef) bool { return true }) + return handler.RegisterConnection(wh.connID, func(sr core.SubscriptionRef) bool { return true }) } func (wh *WebHooks) Capabilities() *events.Capabilities { diff --git a/internal/events/webhooks/webhooks_test.go b/internal/events/webhooks/webhooks_test.go index a8adf732b6..2a70c43837 100644 --- a/internal/events/webhooks/webhooks_test.go +++ b/internal/events/webhooks/webhooks_test.go @@ -48,7 +48,7 @@ func newTestWebHooks(t *testing.T) (wh *WebHooks, cancel func()) { svrConfig := config.RootSection("ut.webhooks") wh.InitConfig(svrConfig) wh.Init(ctx, svrConfig) - wh.RegisterListener("ns1", cbs) + wh.SetHandler("ns1", cbs) assert.Equal(t, "webhooks", wh.Name()) assert.NotNil(t, wh.Capabilities()) return wh, cancelCtx diff --git a/internal/events/websockets/websockets.go b/internal/events/websockets/websockets.go index cb555edc47..8b3d0ecc47 100644 --- a/internal/events/websockets/websockets.go +++ b/internal/events/websockets/websockets.go @@ -59,8 +59,8 @@ func (ws *WebSockets) Init(ctx context.Context, config config.Section) error { return nil } -func (ws *WebSockets) RegisterListener(namespace string, listener events.Callbacks) error { - ws.callbacks[namespace] = listener +func (ws *WebSockets) SetHandler(namespace string, handler events.Callbacks) error { + ws.callbacks[namespace] = handler return nil } diff --git a/internal/events/websockets/websockets_test.go b/internal/events/websockets/websockets_test.go index 9229c36e49..5800357d6b 100644 --- a/internal/events/websockets/websockets_test.go +++ b/internal/events/websockets/websockets_test.go @@ -47,7 +47,7 @@ func newTestWebsockets(t *testing.T, cbs *eventsmocks.Callbacks, queryParams ... svrConfig := config.RootSection("ut.websockets") ws.InitConfig(svrConfig) ws.Init(ctx, svrConfig) - ws.RegisterListener("ns1", cbs) + ws.SetHandler("ns1", cbs) assert.Equal(t, "websockets", ws.Name()) assert.NotNil(t, ws.Capabilities()) cbs.On("ConnectionClosed", mock.Anything).Return(nil).Maybe() diff --git a/internal/identity/tbd/tbd.go b/internal/identity/tbd/tbd.go index 197d9269c6..1fb5aac022 100644 --- a/internal/identity/tbd/tbd.go +++ b/internal/identity/tbd/tbd.go @@ -37,7 +37,7 @@ func (tbd *TBD) Init(ctx context.Context, config config.Section) (err error) { return nil } -func (tbd *TBD) RegisterListener(listener identity.Callbacks) { +func (tbd *TBD) SetHandler(handler identity.Callbacks) { } func (tbd *TBD) Start() error { diff --git a/internal/namespace/manager.go b/internal/namespace/manager.go index 171a9be95f..5081d157eb 100644 --- a/internal/namespace/manager.go +++ b/internal/namespace/manager.go @@ -552,7 +552,7 @@ func (nm *namespaceManager) initPlugins(ctx context.Context) (err error) { if err = entry.plugin.Init(ctx, entry.config); err != nil { return err } - entry.plugin.RegisterListener(nm) + entry.plugin.SetHandler(nm) } for _, entry := range nm.plugins.blockchain { if err = entry.plugin.Init(ctx, entry.config, nm.metrics); err != nil { diff --git a/internal/namespace/manager_test.go b/internal/namespace/manager_test.go index 7e92abd77a..60976473ed 100644 --- a/internal/namespace/manager_test.go +++ b/internal/namespace/manager_test.go @@ -132,7 +132,7 @@ func TestInit(t *testing.T) { nm.utOrchestrator = mo nm.mdi.On("Init", mock.Anything, mock.Anything).Return(nil) - nm.mdi.On("RegisterListener", mock.Anything).Return() + nm.mdi.On("SetHandler", mock.Anything).Return() nm.mbi.On("Init", mock.Anything, mock.Anything, nm.mmi).Return(nil) nm.mdx.On("Init", mock.Anything, mock.Anything).Return(nil) nm.mps.On("Init", mock.Anything, mock.Anything).Return(nil) @@ -170,7 +170,7 @@ func TestInitBlockchainFail(t *testing.T) { nm.utOrchestrator = &orchestratormocks.Orchestrator{} nm.mdi.On("Init", mock.Anything, mock.Anything).Return(nil) - nm.mdi.On("RegisterListener", mock.Anything).Return() + nm.mdi.On("SetHandler", mock.Anything).Return() nm.mbi.On("Init", mock.Anything, mock.Anything, nm.mmi).Return(fmt.Errorf("pop")) ctx, cancelCtx := context.WithCancel(context.Background()) @@ -185,7 +185,7 @@ func TestInitDataExchangeFail(t *testing.T) { nm.utOrchestrator = &orchestratormocks.Orchestrator{} nm.mdi.On("Init", mock.Anything, mock.Anything).Return(nil) - nm.mdi.On("RegisterListener", mock.Anything).Return() + nm.mdi.On("SetHandler", mock.Anything).Return() nm.mbi.On("Init", mock.Anything, mock.Anything, nm.mmi).Return(nil) nm.mdx.On("Init", mock.Anything, mock.Anything).Return(fmt.Errorf("pop")) @@ -201,7 +201,7 @@ func TestInitSharedStorageFail(t *testing.T) { nm.utOrchestrator = &orchestratormocks.Orchestrator{} nm.mdi.On("Init", mock.Anything, mock.Anything).Return(nil) - nm.mdi.On("RegisterListener", mock.Anything).Return() + nm.mdi.On("SetHandler", mock.Anything).Return() nm.mbi.On("Init", mock.Anything, mock.Anything, nm.mmi).Return(nil) nm.mdx.On("Init", mock.Anything, mock.Anything).Return(nil) nm.mps.On("Init", mock.Anything, mock.Anything).Return(fmt.Errorf("pop")) @@ -218,7 +218,7 @@ func TestInitTokensFail(t *testing.T) { nm.utOrchestrator = &orchestratormocks.Orchestrator{} nm.mdi.On("Init", mock.Anything, mock.Anything).Return(nil) - nm.mdi.On("RegisterListener", mock.Anything).Return() + nm.mdi.On("SetHandler", mock.Anything).Return() nm.mbi.On("Init", mock.Anything, mock.Anything, nm.mmi).Return(nil) nm.mdx.On("Init", mock.Anything, mock.Anything).Return(nil) nm.mps.On("Init", mock.Anything, mock.Anything).Return(nil) @@ -236,7 +236,7 @@ func TestInitEventsFail(t *testing.T) { nm.utOrchestrator = &orchestratormocks.Orchestrator{} nm.mdi.On("Init", mock.Anything, mock.Anything).Return(nil) - nm.mdi.On("RegisterListener", mock.Anything).Return() + nm.mdi.On("SetHandler", mock.Anything).Return() nm.mbi.On("Init", mock.Anything, mock.Anything, nm.mmi).Return(nil) nm.mdx.On("Init", mock.Anything, mock.Anything).Return(nil) nm.mps.On("Init", mock.Anything, mock.Anything).Return(nil) @@ -253,10 +253,10 @@ func TestInitOrchestratorFail(t *testing.T) { defer nm.cleanup(t) nm.mdi.On("Init", mock.Anything, mock.Anything).Return(nil) - nm.mdi.On("RegisterListener", mock.Anything).Return() + nm.mdi.On("SetHandler", mock.Anything).Return() nm.mdi.On("GetIdentities", mock.Anything, "default", mock.Anything).Return(nil, nil, fmt.Errorf("pop")) nm.mbi.On("Init", mock.Anything, mock.Anything, nm.mmi).Return(nil) - nm.mbi.On("RegisterListener", mock.Anything).Return() + nm.mbi.On("SetHandler", mock.Anything).Return() nm.mdx.On("Init", mock.Anything, mock.Anything).Return(nil) nm.mps.On("Init", mock.Anything, mock.Anything).Return(nil) nm.mti.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil) @@ -277,7 +277,7 @@ func TestInitVersion1(t *testing.T) { nm.utOrchestrator = mo nm.mdi.On("Init", mock.Anything, mock.Anything).Return(nil) - nm.mdi.On("RegisterListener", mock.Anything).Return() + nm.mdi.On("SetHandler", mock.Anything).Return() nm.mbi.On("Init", mock.Anything, mock.Anything, nm.mmi).Return(nil) nm.mdx.On("Init", mock.Anything, mock.Anything).Return(nil) nm.mps.On("Init", mock.Anything, mock.Anything).Return(nil) @@ -306,7 +306,7 @@ func TestInitVersion1Fail(t *testing.T) { nm.utOrchestrator = mo nm.mdi.On("Init", mock.Anything, mock.Anything).Return(nil) - nm.mdi.On("RegisterListener", mock.Anything).Return() + nm.mdi.On("SetHandler", mock.Anything).Return() nm.mbi.On("Init", mock.Anything, mock.Anything, nm.mmi).Return(nil) nm.mdx.On("Init", mock.Anything, mock.Anything).Return(nil) nm.mps.On("Init", mock.Anything, mock.Anything).Return(nil) @@ -701,7 +701,7 @@ func TestInitBadNamespace(t *testing.T) { nm.utOrchestrator = &orchestratormocks.Orchestrator{} nm.mdi.On("Init", mock.Anything, mock.Anything).Return(nil) - nm.mdi.On("RegisterListener", mock.Anything).Return() + nm.mdi.On("SetHandler", mock.Anything).Return() nm.mbi.On("Init", mock.Anything, mock.Anything, nm.mmi).Return(nil) nm.mdx.On("Init", mock.Anything, mock.Anything).Return(nil) nm.mps.On("Init", mock.Anything, mock.Anything).Return(nil) diff --git a/internal/orchestrator/orchestrator.go b/internal/orchestrator/orchestrator.go index 988b23bedc..c535abd5b0 100644 --- a/internal/orchestrator/orchestrator.go +++ b/internal/orchestrator/orchestrator.go @@ -353,8 +353,8 @@ func (or *orchestrator) Operations() operations.Manager { } func (or *orchestrator) initPlugins(ctx context.Context) (err error) { - or.plugins.Database.Plugin.RegisterListener(or) - or.plugins.Blockchain.Plugin.RegisterListener(&or.bc) + or.plugins.Database.Plugin.SetHandler(or) + or.plugins.Blockchain.Plugin.SetHandler(&or.bc) fb := database.IdentityQueryFactory.NewFilter(ctx) nodes, _, err := or.database().GetIdentities(ctx, or.namespace, fb.And( @@ -368,12 +368,12 @@ func (or *orchestrator) initPlugins(ctx context.Context) (err error) { nodeInfo[i] = node.Profile } or.plugins.DataExchange.Plugin.SetNodes(nodeInfo) - or.plugins.DataExchange.Plugin.RegisterListener(&or.bc) + or.plugins.DataExchange.Plugin.SetHandler(&or.bc) - or.plugins.SharedStorage.Plugin.RegisterListener(&or.bc) + or.plugins.SharedStorage.Plugin.SetHandler(&or.bc) for _, token := range or.plugins.Tokens { - if err := token.Plugin.RegisterListener(or.namespace, &or.bc); err != nil { + if err := token.Plugin.SetHandler(or.namespace, &or.bc); err != nil { return err } } diff --git a/internal/orchestrator/orchestrator_test.go b/internal/orchestrator/orchestrator_test.go index 9f1c75193c..f1dd359192 100644 --- a/internal/orchestrator/orchestrator_test.go +++ b/internal/orchestrator/orchestrator_test.go @@ -185,13 +185,13 @@ func TestNewOrchestrator(t *testing.T) { func TestInitOK(t *testing.T) { or := newTestOrchestrator() defer or.cleanup(t) - or.mdi.On("RegisterListener", mock.Anything).Return() - or.mbi.On("RegisterListener", mock.Anything).Return() + or.mdi.On("SetHandler", mock.Anything).Return() + or.mbi.On("SetHandler", mock.Anything).Return() or.mdi.On("GetIdentities", mock.Anything, "ns", mock.Anything).Return([]*core.Identity{{}}, nil, nil) - or.mdx.On("RegisterListener", mock.Anything).Return() + or.mdx.On("SetHandler", mock.Anything).Return() or.mdx.On("SetNodes", mock.Anything).Return() - or.mps.On("RegisterListener", mock.Anything).Return() - or.mti.On("RegisterListener", "ns", mock.Anything).Return(nil) + or.mps.On("SetHandler", mock.Anything).Return() + or.mti.On("SetHandler", "ns", mock.Anything).Return(nil) err := or.Init(or.ctx, or.cancelCtx) assert.NoError(t, err) @@ -209,13 +209,13 @@ func TestInitOK(t *testing.T) { func TestInitTokenListenerFail(t *testing.T) { or := newTestOrchestrator() defer or.cleanup(t) - or.mdi.On("RegisterListener", mock.Anything).Return() - or.mbi.On("RegisterListener", mock.Anything).Return() + or.mdi.On("SetHandler", mock.Anything).Return() + or.mbi.On("SetHandler", mock.Anything).Return() or.mdi.On("GetIdentities", mock.Anything, "ns", mock.Anything).Return([]*core.Identity{{}}, nil, nil) - or.mdx.On("RegisterListener", mock.Anything).Return() + or.mdx.On("SetHandler", mock.Anything).Return() or.mdx.On("SetNodes", mock.Anything).Return() - or.mps.On("RegisterListener", mock.Anything).Return() - or.mti.On("RegisterListener", "ns", mock.Anything).Return(fmt.Errorf("pop")) + or.mps.On("SetHandler", mock.Anything).Return() + or.mti.On("SetHandler", "ns", mock.Anything).Return(fmt.Errorf("pop")) err := or.Init(or.ctx, or.cancelCtx) assert.EqualError(t, err, "pop") } @@ -223,8 +223,8 @@ func TestInitTokenListenerFail(t *testing.T) { func TestInitDataexchangeNodesFail(t *testing.T) { or := newTestOrchestrator() defer or.cleanup(t) - or.mdi.On("RegisterListener", mock.Anything).Return() - or.mbi.On("RegisterListener", mock.Anything).Return() + or.mdi.On("SetHandler", mock.Anything).Return() + or.mbi.On("SetHandler", mock.Anything).Return() or.mdi.On("GetIdentities", mock.Anything, "ns", mock.Anything).Return(nil, nil, fmt.Errorf("pop")) ctx := context.Background() err := or.initPlugins(ctx) diff --git a/internal/sharedstorage/ipfs/ipfs.go b/internal/sharedstorage/ipfs/ipfs.go index a7d878ca48..a7a994a9eb 100644 --- a/internal/sharedstorage/ipfs/ipfs.go +++ b/internal/sharedstorage/ipfs/ipfs.go @@ -67,7 +67,7 @@ func (i *IPFS) Init(ctx context.Context, config config.Section) error { return nil } -func (i *IPFS) RegisterListener(listener sharedstorage.Callbacks) { +func (i *IPFS) SetHandler(handler sharedstorage.Callbacks) { } func (i *IPFS) Capabilities() *sharedstorage.Capabilities { diff --git a/internal/tokens/fftokens/fftokens.go b/internal/tokens/fftokens/fftokens.go index 96a31f108c..a068061cfb 100644 --- a/internal/tokens/fftokens/fftokens.go +++ b/internal/tokens/fftokens/fftokens.go @@ -43,11 +43,11 @@ type FFTokens struct { } type callbacks struct { - listeners map[string]tokens.Callbacks + handlers map[string]tokens.Callbacks } func (cb *callbacks) TokenOpUpdate(plugin tokens.Plugin, nsOpID string, txState core.OpStatus, blockchainTXID, errorMessage string, opOutput fftypes.JSONObject) { - for _, cb := range cb.listeners { + for _, cb := range cb.handlers { cb.TokenOpUpdate(plugin, nsOpID, txState, blockchainTXID, errorMessage, opOutput) } } @@ -56,13 +56,13 @@ func (cb *callbacks) TokenPoolCreated(namespace string, plugin tokens.Plugin, po if namespace == "" { // Older token subscriptions don't populate namespace, so deliver the event to every listener and let them filter // TODO: deprecate this path - for _, cb := range cb.listeners { + for _, cb := range cb.handlers { if err := cb.TokenPoolCreated(plugin, pool); err != nil { return err } } } else { - if listener, ok := cb.listeners[namespace]; ok { + if listener, ok := cb.handlers[namespace]; ok { return listener.TokenPoolCreated(plugin, pool) } } @@ -73,13 +73,13 @@ func (cb *callbacks) TokensTransferred(namespace string, plugin tokens.Plugin, t if namespace == "" { // Older token subscriptions don't populate namespace, so deliver the event to every listener and let them filter // TODO: deprecate this path - for _, cb := range cb.listeners { + for _, cb := range cb.handlers { if err := cb.TokensTransferred(plugin, transfer); err != nil { return err } } } else { - if listener, ok := cb.listeners[namespace]; ok { + if listener, ok := cb.handlers[namespace]; ok { return listener.TokensTransferred(plugin, transfer) } } @@ -90,13 +90,13 @@ func (cb *callbacks) TokensApproved(namespace string, plugin tokens.Plugin, appr if namespace == "" { // Older token subscriptions don't populate namespace, so deliver the event to every listener and let them filter // TODO: deprecate this path - for _, cb := range cb.listeners { + for _, cb := range cb.handlers { if err := cb.TokensApproved(plugin, approval); err != nil { return err } } } else { - if listener, ok := cb.listeners[namespace]; ok { + if listener, ok := cb.handlers[namespace]; ok { return listener.TokensApproved(plugin, approval) } } @@ -203,7 +203,7 @@ func (ft *FFTokens) Init(ctx context.Context, name string, config config.Section ft.ctx = log.WithLogField(ctx, "proto", "fftokens") ft.configuredName = name ft.capabilities = &tokens.Capabilities{} - ft.callbacks.listeners = make(map[string]tokens.Callbacks) + ft.callbacks.handlers = make(map[string]tokens.Callbacks) if config.GetString(ffresty.HTTPConfigURL) == "" { return i18n.NewError(ctx, coremsgs.MsgMissingPluginConfig, "url", "tokens.fftokens") @@ -225,8 +225,8 @@ func (ft *FFTokens) Init(ctx context.Context, name string, config config.Section return nil } -func (ft *FFTokens) RegisterListener(namespace string, listener tokens.Callbacks) error { - ft.callbacks.listeners[namespace] = listener +func (ft *FFTokens) SetHandler(namespace string, handler tokens.Callbacks) error { + ft.callbacks.handlers[namespace] = handler res, err := ft.client.R().SetContext(ft.ctx). SetBody(&tokenInit{ diff --git a/internal/tokens/fftokens/fftokens_test.go b/internal/tokens/fftokens/fftokens_test.go index 452b5c17a5..4081672df4 100644 --- a/internal/tokens/fftokens/fftokens_test.go +++ b/internal/tokens/fftokens/fftokens_test.go @@ -262,7 +262,7 @@ func TestCreateTokenPoolSynchronous(t *testing.T) { }) mcb := &tokenmocks.Callbacks{} - err := h.RegisterListener("ns1", mcb) + err := h.SetHandler("ns1", mcb) assert.NoError(t, err) mcb.On("TokenPoolCreated", h, mock.MatchedBy(func(p *tokens.TokenPool) bool { @@ -424,7 +424,7 @@ func TestActivateTokenPoolSynchronous(t *testing.T) { }) mcb := &tokenmocks.Callbacks{} - h.RegisterListener("ns1", mcb) + h.SetHandler("ns1", mcb) mcb.On("TokenPoolCreated", h, mock.MatchedBy(func(p *tokens.TokenPool) bool { return p.PoolLocator == "F1" && p.Type == core.TokenTypeFungible && p.TX.ID == nil && p.Event.ProtocolID == "" })).Return(nil) @@ -474,7 +474,7 @@ func TestActivateTokenPoolSynchronousBadResponse(t *testing.T) { }) mcb := &tokenmocks.Callbacks{} - h.RegisterListener("ns1", mcb) + h.SetHandler("ns1", mcb) mcb.On("TokenPoolCreated", h, mock.MatchedBy(func(p *tokens.TokenPool) bool { return p.PoolLocator == "F1" && p.Type == core.TokenTypeFungible && p.TX.ID == nil })).Return(nil) @@ -810,7 +810,7 @@ func TestReceiptEvents(t *testing.T) { assert.NoError(t, err) mcb := &tokenmocks.Callbacks{} - h.RegisterListener("ns1", mcb) + h.SetHandler("ns1", mcb) opID := fftypes.NewUUID() // receipt: bad ID - passed through @@ -854,7 +854,7 @@ func TestPoolEvents(t *testing.T) { assert.NoError(t, err) mcb := &tokenmocks.Callbacks{} - h.RegisterListener("ns1", mcb) + h.SetHandler("ns1", mcb) txID := fftypes.NewUUID() // token-pool: missing data @@ -950,7 +950,7 @@ func TestTransferEvents(t *testing.T) { assert.NoError(t, err) mcb := &tokenmocks.Callbacks{} - h.RegisterListener("ns1", mcb) + h.SetHandler("ns1", mcb) txID := fftypes.NewUUID() // token-mint: missing data @@ -1169,7 +1169,7 @@ func TestApprovalEvents(t *testing.T) { assert.NoError(t, err) mcb := &tokenmocks.Callbacks{} - h.RegisterListener("ns1", mcb) + h.SetHandler("ns1", mcb) txID := fftypes.NewUUID() // token-approval: success diff --git a/mocks/blockchainmocks/plugin.go b/mocks/blockchainmocks/plugin.go index 11eeebac0f..c2e599dd46 100644 --- a/mocks/blockchainmocks/plugin.go +++ b/mocks/blockchainmocks/plugin.go @@ -268,9 +268,9 @@ func (_m *Plugin) QueryContract(ctx context.Context, location *fftypes.JSONAny, return r0, r1 } -// RegisterListener provides a mock function with given fields: listener -func (_m *Plugin) RegisterListener(listener blockchain.Callbacks) { - _m.Called(listener) +// SetHandler provides a mock function with given fields: handler +func (_m *Plugin) SetHandler(handler blockchain.Callbacks) { + _m.Called(handler) } // Start provides a mock function with given fields: diff --git a/mocks/databasemocks/plugin.go b/mocks/databasemocks/plugin.go index aa4c6bde95..a70229605b 100644 --- a/mocks/databasemocks/plugin.go +++ b/mocks/databasemocks/plugin.go @@ -2368,11 +2368,6 @@ func (_m *Plugin) Name() string { return r0 } -// RegisterListener provides a mock function with given fields: listener -func (_m *Plugin) RegisterListener(listener database.Callbacks) { - _m.Called(listener) -} - // ReplaceMessage provides a mock function with given fields: ctx, message func (_m *Plugin) ReplaceMessage(ctx context.Context, message *core.Message) error { ret := _m.Called(ctx, message) @@ -2415,6 +2410,11 @@ func (_m *Plugin) RunAsGroup(ctx context.Context, fn func(context.Context) error return r0 } +// SetHandler provides a mock function with given fields: handler +func (_m *Plugin) SetHandler(handler database.Callbacks) { + _m.Called(handler) +} + // UpdateBatch provides a mock function with given fields: ctx, id, update func (_m *Plugin) UpdateBatch(ctx context.Context, id *fftypes.UUID, update database.Update) error { ret := _m.Called(ctx, id, update) diff --git a/mocks/dataexchangemocks/plugin.go b/mocks/dataexchangemocks/plugin.go index 7f8268aa01..9b1c406c3c 100644 --- a/mocks/dataexchangemocks/plugin.go +++ b/mocks/dataexchangemocks/plugin.go @@ -160,11 +160,6 @@ func (_m *Plugin) Name() string { return r0 } -// RegisterListener provides a mock function with given fields: listener -func (_m *Plugin) RegisterListener(listener dataexchange.Callbacks) { - _m.Called(listener) -} - // SendMessage provides a mock function with given fields: ctx, nsOpID, peerID, data func (_m *Plugin) SendMessage(ctx context.Context, nsOpID string, peerID string, data []byte) error { ret := _m.Called(ctx, nsOpID, peerID, data) @@ -179,6 +174,11 @@ func (_m *Plugin) SendMessage(ctx context.Context, nsOpID string, peerID string, return r0 } +// SetHandler provides a mock function with given fields: handler +func (_m *Plugin) SetHandler(handler dataexchange.Callbacks) { + _m.Called(handler) +} + // SetNodes provides a mock function with given fields: nodes func (_m *Plugin) SetNodes(nodes []fftypes.JSONObject) { _m.Called(nodes) diff --git a/mocks/eventsmocks/plugin.go b/mocks/eventsmocks/plugin.go index 6f3baf4cf3..792f069f71 100644 --- a/mocks/eventsmocks/plugin.go +++ b/mocks/eventsmocks/plugin.go @@ -82,13 +82,13 @@ func (_m *Plugin) Name() string { return r0 } -// RegisterListener provides a mock function with given fields: namespace, listener -func (_m *Plugin) RegisterListener(namespace string, listener events.Callbacks) error { - ret := _m.Called(namespace, listener) +// SetHandler provides a mock function with given fields: namespace, handler +func (_m *Plugin) SetHandler(namespace string, handler events.Callbacks) error { + ret := _m.Called(namespace, handler) var r0 error if rf, ok := ret.Get(0).(func(string, events.Callbacks) error); ok { - r0 = rf(namespace, listener) + r0 = rf(namespace, handler) } else { r0 = ret.Error(0) } diff --git a/mocks/identitymocks/plugin.go b/mocks/identitymocks/plugin.go index 8cb0c62fe5..1cf2899ed8 100644 --- a/mocks/identitymocks/plugin.go +++ b/mocks/identitymocks/plugin.go @@ -66,9 +66,9 @@ func (_m *Plugin) Name() string { return r0 } -// RegisterListener provides a mock function with given fields: listener -func (_m *Plugin) RegisterListener(listener identity.Callbacks) { - _m.Called(listener) +// SetHandler provides a mock function with given fields: handler +func (_m *Plugin) SetHandler(handler identity.Callbacks) { + _m.Called(handler) } // Start provides a mock function with given fields: diff --git a/mocks/sharedstoragemocks/plugin.go b/mocks/sharedstoragemocks/plugin.go index 2e1f13529e..e538977667 100644 --- a/mocks/sharedstoragemocks/plugin.go +++ b/mocks/sharedstoragemocks/plugin.go @@ -91,9 +91,9 @@ func (_m *Plugin) Name() string { return r0 } -// RegisterListener provides a mock function with given fields: listener -func (_m *Plugin) RegisterListener(listener sharedstorage.Callbacks) { - _m.Called(listener) +// SetHandler provides a mock function with given fields: handler +func (_m *Plugin) SetHandler(handler sharedstorage.Callbacks) { + _m.Called(handler) } // UploadData provides a mock function with given fields: ctx, data diff --git a/mocks/tokenmocks/plugin.go b/mocks/tokenmocks/plugin.go index a1a3d3548e..a0049ea1a6 100644 --- a/mocks/tokenmocks/plugin.go +++ b/mocks/tokenmocks/plugin.go @@ -138,13 +138,13 @@ func (_m *Plugin) Name() string { return r0 } -// RegisterListener provides a mock function with given fields: namespace, listener -func (_m *Plugin) RegisterListener(namespace string, listener tokens.Callbacks) error { - ret := _m.Called(namespace, listener) +// SetHandler provides a mock function with given fields: namespace, handler +func (_m *Plugin) SetHandler(namespace string, handler tokens.Callbacks) error { + ret := _m.Called(namespace, handler) var r0 error if rf, ok := ret.Get(0).(func(string, tokens.Callbacks) error); ok { - r0 = rf(namespace, listener) + r0 = rf(namespace, handler) } else { r0 = ret.Error(0) } diff --git a/pkg/blockchain/plugin.go b/pkg/blockchain/plugin.go index 4ae497d9bb..84ce9caafc 100644 --- a/pkg/blockchain/plugin.go +++ b/pkg/blockchain/plugin.go @@ -35,8 +35,8 @@ type Plugin interface { // Init initializes the plugin, with configuration Init(ctx context.Context, config config.Section, metrics metrics.Manager) error - // RegisterListener registers a listener to receive callbacks - RegisterListener(listener Callbacks) + // SetHandler registers a handler to receive callbacks + SetHandler(handler Callbacks) // ConfigureContract initializes the subscription to the FireFly contract // - Checks the provided contract info against the plugin's configuration, and updates it as needed diff --git a/pkg/database/plugin.go b/pkg/database/plugin.go index 61ed48fc43..4da1fc2a1a 100644 --- a/pkg/database/plugin.go +++ b/pkg/database/plugin.go @@ -53,8 +53,8 @@ type Plugin interface { // Init initializes the plugin, with configuration Init(ctx context.Context, config config.Section) error - // RegisterListener registers a listener to receive callbacks - RegisterListener(listener Callbacks) + // SetHandler registers a handler to receive callbacks + SetHandler(handler Callbacks) // Capabilities returns capabilities - not called until after Init Capabilities() *Capabilities diff --git a/pkg/dataexchange/plugin.go b/pkg/dataexchange/plugin.go index 3a6db6e4ff..7a57467526 100644 --- a/pkg/dataexchange/plugin.go +++ b/pkg/dataexchange/plugin.go @@ -66,8 +66,8 @@ type Plugin interface { // SetNodes initializes the known nodes from the database SetNodes(nodes []fftypes.JSONObject) - // RegisterListener registers a listener to receive callbacks - RegisterListener(listener Callbacks) + // SetHandler registers a handler to receive callbacks + SetHandler(handler Callbacks) // Data exchange interface must not deliver any events until start is called Start() error diff --git a/pkg/events/plugin.go b/pkg/events/plugin.go index 4942d5ff08..abd2a99a79 100644 --- a/pkg/events/plugin.go +++ b/pkg/events/plugin.go @@ -35,8 +35,8 @@ type Plugin interface { // Init initializes the plugin, with configuration Init(ctx context.Context, config config.Section) error - // RegisterListener registers a listener to receive callbacks - RegisterListener(namespace string, listener Callbacks) error + // SetHandler registers a handler to receive callbacks + SetHandler(namespace string, handler Callbacks) error // Capabilities returns capabilities - not called until after Init Capabilities() *Capabilities diff --git a/pkg/identity/plugin.go b/pkg/identity/plugin.go index 59c9f5248d..cfa43a0a5f 100644 --- a/pkg/identity/plugin.go +++ b/pkg/identity/plugin.go @@ -33,8 +33,8 @@ type Plugin interface { // Init initializes the plugin, with configuration Init(ctx context.Context, config config.Section) error - // RegisterListener registers a listener to receive callbacks - RegisterListener(listener Callbacks) + // SetHandler registers a handler to receive callbacks + SetHandler(handler Callbacks) // Blockchain interface must not deliver any events until start is called Start() error diff --git a/pkg/sharedstorage/plugin.go b/pkg/sharedstorage/plugin.go index 4c65855125..ff8fcbe493 100644 --- a/pkg/sharedstorage/plugin.go +++ b/pkg/sharedstorage/plugin.go @@ -34,8 +34,8 @@ type Plugin interface { // Init initializes the plugin, with configuration Init(ctx context.Context, config config.Section) error - // RegisterListener registers a listener to receive callbacks - RegisterListener(listener Callbacks) + // SetHandler registers a handler to receive callbacks + SetHandler(handler Callbacks) // Capabilities returns capabilities - not called until after Init Capabilities() *Capabilities diff --git a/pkg/tokens/plugin.go b/pkg/tokens/plugin.go index 8be3b561e9..309266a727 100644 --- a/pkg/tokens/plugin.go +++ b/pkg/tokens/plugin.go @@ -35,8 +35,8 @@ type Plugin interface { // Init initializes the plugin, with configuration Init(ctx context.Context, name string, config config.Section) error - // RegisterListener registers a listener to receive callbacks - RegisterListener(namespace string, listener Callbacks) error + // SetHandler registers a handler to receive callbacks + SetHandler(namespace string, handler Callbacks) error // Blockchain interface must not deliver any events until start is called Start() error