diff --git a/internal/orchestrator/orchestrator.go b/internal/orchestrator/orchestrator.go index f1c6367dfd..f0d5e8aeca 100644 --- a/internal/orchestrator/orchestrator.go +++ b/internal/orchestrator/orchestrator.go @@ -374,7 +374,9 @@ func (or *orchestrator) initPlugins(ctx context.Context) (err error) { or.plugins.SharedStorage.Plugin.RegisterListener(&or.bc) for _, token := range or.plugins.Tokens { - token.Plugin.RegisterListener(&or.bc) + if err := token.Plugin.RegisterListener(or.namespace, &or.bc); err != nil { + return err + } } return nil diff --git a/internal/orchestrator/orchestrator_test.go b/internal/orchestrator/orchestrator_test.go index 9d6411fabb..676759e30d 100644 --- a/internal/orchestrator/orchestrator_test.go +++ b/internal/orchestrator/orchestrator_test.go @@ -191,7 +191,7 @@ func TestInitOK(t *testing.T) { or.mdx.On("RegisterListener", mock.Anything).Return() or.mdx.On("SetNodes", mock.Anything).Return() or.mps.On("RegisterListener", mock.Anything).Return() - or.mti.On("RegisterListener", mock.Anything).Return() + or.mti.On("RegisterListener", "ns", mock.Anything).Return(nil) err := or.Init(or.ctx, or.cancelCtx) assert.NoError(t, err) @@ -206,6 +206,20 @@ func TestInitOK(t *testing.T) { assert.Equal(t, or.mnm, or.NetworkMap()) } +func TestInitTokenListenerFail(t *testing.T) { + or := newTestOrchestrator() + defer or.cleanup(t) + or.mdi.On("RegisterListener", mock.Anything).Return() + or.mbi.On("RegisterListener", mock.Anything).Return() + or.mdi.On("GetIdentities", mock.Anything, mock.Anything).Return([]*core.Identity{{}}, nil, nil) + or.mdx.On("RegisterListener", mock.Anything).Return() + or.mdx.On("SetNodes", mock.Anything).Return() + or.mps.On("RegisterListener", mock.Anything).Return() + or.mti.On("RegisterListener", "ns", mock.Anything).Return(fmt.Errorf("pop")) + err := or.Init(or.ctx, or.cancelCtx) + assert.EqualError(t, err, "pop") +} + func TestInitDataexchangeNodesFail(t *testing.T) { or := newTestOrchestrator() defer or.cleanup(t) diff --git a/internal/tokens/fftokens/fftokens.go b/internal/tokens/fftokens/fftokens.go index 352f2f56f6..96a31f108c 100644 --- a/internal/tokens/fftokens/fftokens.go +++ b/internal/tokens/fftokens/fftokens.go @@ -43,7 +43,7 @@ type FFTokens struct { } type callbacks struct { - listeners []tokens.Callbacks + listeners map[string]tokens.Callbacks } func (cb *callbacks) TokenOpUpdate(plugin tokens.Plugin, nsOpID string, txState core.OpStatus, blockchainTXID, errorMessage string, opOutput fftypes.JSONObject) { @@ -52,28 +52,52 @@ func (cb *callbacks) TokenOpUpdate(plugin tokens.Plugin, nsOpID string, txState } } -func (cb *callbacks) TokenPoolCreated(plugin tokens.Plugin, pool *tokens.TokenPool) error { - for _, cb := range cb.listeners { - if err := cb.TokenPoolCreated(plugin, pool); err != nil { - return err +func (cb *callbacks) TokenPoolCreated(namespace string, plugin tokens.Plugin, pool *tokens.TokenPool) error { + if namespace == "" { + // Older token subscriptions don't populate namespace, so deliver the event to every listener and let them filter + // TODO: deprecate this path + for _, cb := range cb.listeners { + if err := cb.TokenPoolCreated(plugin, pool); err != nil { + return err + } + } + } else { + if listener, ok := cb.listeners[namespace]; ok { + return listener.TokenPoolCreated(plugin, pool) } } return nil } -func (cb *callbacks) TokensTransferred(plugin tokens.Plugin, transfer *tokens.TokenTransfer) error { - for _, cb := range cb.listeners { - if err := cb.TokensTransferred(plugin, transfer); err != nil { - return err +func (cb *callbacks) TokensTransferred(namespace string, plugin tokens.Plugin, transfer *tokens.TokenTransfer) error { + if namespace == "" { + // Older token subscriptions don't populate namespace, so deliver the event to every listener and let them filter + // TODO: deprecate this path + for _, cb := range cb.listeners { + if err := cb.TokensTransferred(plugin, transfer); err != nil { + return err + } + } + } else { + if listener, ok := cb.listeners[namespace]; ok { + return listener.TokensTransferred(plugin, transfer) } } return nil } -func (cb *callbacks) TokensApproved(plugin tokens.Plugin, approval *tokens.TokenApproval) error { - for _, cb := range cb.listeners { - if err := cb.TokensApproved(plugin, approval); err != nil { - return err +func (cb *callbacks) TokensApproved(namespace string, plugin tokens.Plugin, approval *tokens.TokenApproval) error { + if namespace == "" { + // Older token subscriptions don't populate namespace, so deliver the event to every listener and let them filter + // TODO: deprecate this path + for _, cb := range cb.listeners { + if err := cb.TokensApproved(plugin, approval); err != nil { + return err + } + } + } else { + if listener, ok := cb.listeners[namespace]; ok { + return listener.TokensApproved(plugin, approval) } } return nil @@ -89,6 +113,7 @@ type msgType string const ( messageReceipt msgType = "receipt" + messageBatch msgType = "batch" messageTokenPool msgType = "token-pool" messageTokenMint msgType = "token-mint" messageTokenBurn msgType = "token-burn" @@ -103,6 +128,10 @@ type tokenData struct { MessageHash *fftypes.Bytes32 `json:"messageHash,omitempty"` } +type tokenInit struct { + Namespace string `json:"namespace"` +} + type createPool struct { Type core.TokenType `json:"type"` RequestID string `json:"requestId"` @@ -114,6 +143,7 @@ type createPool struct { } type activatePool struct { + Namespace string `json:"namespace"` PoolLocator string `json:"poolLocator"` Config fftypes.JSONObject `json:"config"` RequestID string `json:"requestId,omitempty"` @@ -172,16 +202,15 @@ func (ft *FFTokens) Name() string { func (ft *FFTokens) Init(ctx context.Context, name string, config config.Section) (err error) { ft.ctx = log.WithLogField(ctx, "proto", "fftokens") ft.configuredName = name + ft.capabilities = &tokens.Capabilities{} + ft.callbacks.listeners = make(map[string]tokens.Callbacks) if config.GetString(ffresty.HTTPConfigURL) == "" { return i18n.NewError(ctx, coremsgs.MsgMissingPluginConfig, "url", "tokens.fftokens") } - ft.client = ffresty.New(ft.ctx, config) - ft.capabilities = &tokens.Capabilities{} wsConfig := wsclient.GenerateConfig(config) - if wsConfig.WSKeyPath == "" { wsConfig.WSKeyPath = "/api/ws" } @@ -196,8 +225,18 @@ func (ft *FFTokens) Init(ctx context.Context, name string, config config.Section return nil } -func (ft *FFTokens) RegisterListener(listener tokens.Callbacks) { - ft.callbacks.listeners = append(ft.callbacks.listeners, listener) +func (ft *FFTokens) RegisterListener(namespace string, listener tokens.Callbacks) error { + ft.callbacks.listeners[namespace] = listener + + res, err := ft.client.R().SetContext(ft.ctx). + SetBody(&tokenInit{ + Namespace: namespace, + }). + Post("/api/v1/init") + if err != nil || !res.IsSuccess() { + return wrapError(ft.ctx, nil, res, err) + } + return nil } func (ft *FFTokens) Start() error { @@ -230,10 +269,11 @@ func (ft *FFTokens) handleReceipt(ctx context.Context, data fftypes.JSONObject) func (ft *FFTokens) handleTokenPoolCreate(ctx context.Context, data fftypes.JSONObject) (err error) { tokenType := data.GetString("type") poolLocator := data.GetString("poolLocator") - standard := data.GetString("standard") // optional - symbol := data.GetString("symbol") // optional - decimals := data.GetInt64("decimals") // optional - info := data.GetObject("info") // optional + standard := data.GetString("standard") // optional + symbol := data.GetString("symbol") // optional + decimals := data.GetInt64("decimals") // optional + info := data.GetObject("info") // optional + namespace := data.GetString("namespace") // optional // All blockchain items below are optional blockchainEvent := data.GetObject("blockchain") @@ -296,7 +336,7 @@ func (ft *FFTokens) handleTokenPoolCreate(ctx context.Context, data fftypes.JSON } // If there's an error dispatching the event, we must return the error and shutdown - return ft.callbacks.TokenPoolCreated(ft, pool) + return ft.callbacks.TokenPoolCreated(namespace, ft, pool) } func (ft *FFTokens) handleTokenTransfer(ctx context.Context, t core.TokenTransferType, data fftypes.JSONObject) (err error) { @@ -308,6 +348,7 @@ func (ft *FFTokens) handleTokenTransfer(ctx context.Context, t core.TokenTransfe value := data.GetString("amount") tokenIndex := data.GetString("tokenIndex") // optional uri := data.GetString("uri") // optional + namespace := data.GetString("namespace") // optional blockchainEvent := data.GetObject("blockchain") blockchainID := blockchainEvent.GetString("id") @@ -384,7 +425,7 @@ func (ft *FFTokens) handleTokenTransfer(ctx context.Context, t core.TokenTransfe } // If there's an error dispatching the event, we must return the error and shutdown - return ft.callbacks.TokensTransferred(ft, transfer) + return ft.callbacks.TokensTransferred(namespace, ft, transfer) } func (ft *FFTokens) handleTokenApproval(ctx context.Context, data fftypes.JSONObject) (err error) { @@ -394,7 +435,8 @@ func (ft *FFTokens) handleTokenApproval(ctx context.Context, data fftypes.JSONOb poolLocator := data.GetString("poolLocator") operatorAddress := data.GetString("operator") approved := data.GetBool("approved") - info := data.GetObject("info") // optional + info := data.GetObject("info") // optional + namespace := data.GetString("namespace") // optional blockchainEvent := data.GetObject("blockchain") blockchainID := blockchainEvent.GetString("id") @@ -458,7 +500,54 @@ func (ft *FFTokens) handleTokenApproval(ctx context.Context, data fftypes.JSONOb }, } - return ft.callbacks.TokensApproved(ft, approval) + return ft.callbacks.TokensApproved(namespace, ft, approval) +} + +func (ft *FFTokens) handleMessage(ctx context.Context, msgBytes []byte) (err error) { + l := log.L(ctx) + + var msg wsEvent + if err = json.Unmarshal(msgBytes, &msg); err != nil { + l.Errorf("Message cannot be parsed as JSON: %s\n%s", err, string(msgBytes)) + return nil // Swallow this and move on + } + + l.Debugf("Received %s event %s", msg.Event, msg.ID) + switch msg.Event { + case messageReceipt: + ft.handleReceipt(ctx, msg.Data) + case messageBatch: + for _, msg := range msg.Data.GetObjectArray("events") { + if err = ft.handleMessage(ctx, []byte(msg.String())); err != nil { + break + } + } + case messageTokenPool: + err = ft.handleTokenPoolCreate(ctx, msg.Data) + case messageTokenMint: + err = ft.handleTokenTransfer(ctx, core.TokenTransferTypeMint, msg.Data) + case messageTokenBurn: + err = ft.handleTokenTransfer(ctx, core.TokenTransferTypeBurn, msg.Data) + case messageTokenTransfer: + err = ft.handleTokenTransfer(ctx, core.TokenTransferTypeTransfer, msg.Data) + case messageTokenApproval: + err = ft.handleTokenApproval(ctx, msg.Data) + default: + l.Errorf("Message unexpected: %s", msg.Event) + } + + if err == nil && msg.Event != messageReceipt && msg.ID != "" { + l.Debugf("Sending ack %s", msg.ID) + ack, _ := json.Marshal(fftypes.JSONObject{ + "event": "ack", + "data": fftypes.JSONObject{ + "id": msg.ID, + }, + }) + err = ft.wsconn.Send(ctx, ack) + } + + return err } func (ft *FFTokens) eventLoop() { @@ -475,43 +564,7 @@ func (ft *FFTokens) eventLoop() { l.Debugf("Event loop exiting (receive channel closed)") return } - - var msg wsEvent - err := json.Unmarshal(msgBytes, &msg) - if err != nil { - l.Errorf("Message cannot be parsed as JSON: %s\n%s", err, string(msgBytes)) - continue // Swallow this and move on - } - l.Debugf("Received %s event %s", msg.Event, msg.ID) - switch msg.Event { - case messageReceipt: - ft.handleReceipt(ctx, msg.Data) - case messageTokenPool: - err = ft.handleTokenPoolCreate(ctx, msg.Data) - case messageTokenMint: - err = ft.handleTokenTransfer(ctx, core.TokenTransferTypeMint, msg.Data) - case messageTokenBurn: - err = ft.handleTokenTransfer(ctx, core.TokenTransferTypeBurn, msg.Data) - case messageTokenTransfer: - err = ft.handleTokenTransfer(ctx, core.TokenTransferTypeTransfer, msg.Data) - case messageTokenApproval: - err = ft.handleTokenApproval(ctx, msg.Data) - default: - l.Errorf("Message unexpected: %s", msg.Event) - } - - if err == nil && msg.Event != messageReceipt && msg.ID != "" { - l.Debugf("Sending ack %s", msg.ID) - ack, _ := json.Marshal(fftypes.JSONObject{ - "event": "ack", - "data": fftypes.JSONObject{ - "id": msg.ID, - }, - }) - err = ft.wsconn.Send(ctx, ack) - } - - if err != nil { + if err := ft.handleMessage(ctx, msgBytes); err != nil { l.Errorf("Event loop exiting: %s", err) return } @@ -571,6 +624,7 @@ func (ft *FFTokens) ActivateTokenPool(ctx context.Context, nsOpID string, pool * res, err := ft.client.R().SetContext(ctx). SetBody(&activatePool{ RequestID: nsOpID, + Namespace: pool.Namespace, PoolLocator: pool.Locator, Config: pool.Config, }). diff --git a/internal/tokens/fftokens/fftokens_test.go b/internal/tokens/fftokens/fftokens_test.go index 71f4756c67..452b5c17a5 100644 --- a/internal/tokens/fftokens/fftokens_test.go +++ b/internal/tokens/fftokens/fftokens_test.go @@ -237,6 +237,8 @@ func TestCreateTokenPoolSynchronous(t *testing.T) { Symbol: "symbol", } + httpmock.RegisterResponder("POST", fmt.Sprintf("%s/api/v1/init", httpURL), + httpmock.NewStringResponder(204, "")) httpmock.RegisterResponder("POST", fmt.Sprintf("%s/api/v1/createpool", httpURL), func(req *http.Request) (*http.Response, error) { body := make(fftypes.JSONObject) @@ -260,7 +262,9 @@ func TestCreateTokenPoolSynchronous(t *testing.T) { }) mcb := &tokenmocks.Callbacks{} - h.RegisterListener(mcb) + err := h.RegisterListener("ns1", mcb) + assert.NoError(t, err) + mcb.On("TokenPoolCreated", h, mock.MatchedBy(func(p *tokens.TokenPool) bool { return p.PoolLocator == "F1" && p.Type == core.TokenTypeFungible && *p.TX.ID == *pool.TX.ID })).Return(nil) @@ -323,8 +327,9 @@ func TestActivateTokenPool(t *testing.T) { "address": "0x12345", } pool := &core.TokenPool{ - Locator: "N1", - Config: poolConfig, + Namespace: "ns1", + Locator: "N1", + Config: poolConfig, } httpmock.RegisterResponder("POST", fmt.Sprintf("%s/api/v1/activatepool", httpURL), @@ -333,6 +338,7 @@ func TestActivateTokenPool(t *testing.T) { err := json.NewDecoder(req.Body).Decode(&body) assert.NoError(t, err) assert.Equal(t, fftypes.JSONObject{ + "namespace": "ns1", "requestId": "ns1:" + opID.String(), "poolLocator": "N1", "config": poolConfig, @@ -384,16 +390,20 @@ func TestActivateTokenPoolSynchronous(t *testing.T) { "foo": "bar", } pool := &core.TokenPool{ - Locator: "N1", - Config: poolConfig, + Namespace: "ns1", + Locator: "N1", + Config: poolConfig, } + httpmock.RegisterResponder("POST", fmt.Sprintf("%s/api/v1/init", httpURL), + httpmock.NewStringResponder(204, "")) httpmock.RegisterResponder("POST", fmt.Sprintf("%s/api/v1/activatepool", httpURL), func(req *http.Request) (*http.Response, error) { body := make(fftypes.JSONObject) err := json.NewDecoder(req.Body).Decode(&body) assert.NoError(t, err) assert.Equal(t, fftypes.JSONObject{ + "namespace": "ns1", "requestId": "ns1:" + opID.String(), "poolLocator": "N1", "config": poolConfig, @@ -414,7 +424,7 @@ func TestActivateTokenPoolSynchronous(t *testing.T) { }) mcb := &tokenmocks.Callbacks{} - h.RegisterListener(mcb) + h.RegisterListener("ns1", mcb) mcb.On("TokenPoolCreated", h, mock.MatchedBy(func(p *tokens.TokenPool) bool { return p.PoolLocator == "F1" && p.Type == core.TokenTypeFungible && p.TX.ID == nil && p.Event.ProtocolID == "" })).Return(nil) @@ -434,16 +444,20 @@ func TestActivateTokenPoolSynchronousBadResponse(t *testing.T) { "foo": "bar", } pool := &core.TokenPool{ - Locator: "N1", - Config: poolConfig, + Namespace: "ns1", + Locator: "N1", + Config: poolConfig, } + httpmock.RegisterResponder("POST", fmt.Sprintf("%s/api/v1/init", httpURL), + httpmock.NewStringResponder(204, "")) httpmock.RegisterResponder("POST", fmt.Sprintf("%s/api/v1/activatepool", httpURL), func(req *http.Request) (*http.Response, error) { body := make(fftypes.JSONObject) err := json.NewDecoder(req.Body).Decode(&body) assert.NoError(t, err) assert.Equal(t, fftypes.JSONObject{ + "namespace": "ns1", "requestId": "ns1:" + opID.String(), "poolLocator": "N1", "config": poolConfig, @@ -460,7 +474,7 @@ func TestActivateTokenPoolSynchronousBadResponse(t *testing.T) { }) mcb := &tokenmocks.Callbacks{} - h.RegisterListener(mcb) + h.RegisterListener("ns1", mcb) mcb.On("TokenPoolCreated", h, mock.MatchedBy(func(p *tokens.TokenPool) bool { return p.PoolLocator == "F1" && p.Type == core.TokenTypeFungible && p.TX.ID == nil })).Return(nil) @@ -480,8 +494,9 @@ func TestActivateTokenPoolNoContent(t *testing.T) { "foo": "bar", } pool := &core.TokenPool{ - Locator: "N1", - Config: poolConfig, + Namespace: "ns1", + Locator: "N1", + Config: poolConfig, } httpmock.RegisterResponder("POST", fmt.Sprintf("%s/api/v1/activatepool", httpURL), @@ -490,6 +505,7 @@ func TestActivateTokenPoolNoContent(t *testing.T) { err := json.NewDecoder(req.Body).Decode(&body) assert.NoError(t, err) assert.Equal(t, fftypes.JSONObject{ + "namespace": "ns1", "requestId": "ns1:" + opID.String(), "poolLocator": "N1", "config": poolConfig, @@ -794,7 +810,7 @@ func TestReceiptEvents(t *testing.T) { assert.NoError(t, err) mcb := &tokenmocks.Callbacks{} - h.RegisterListener(mcb) + h.RegisterListener("ns1", mcb) opID := fftypes.NewUUID() // receipt: bad ID - passed through @@ -838,7 +854,7 @@ func TestPoolEvents(t *testing.T) { assert.NoError(t, err) mcb := &tokenmocks.Callbacks{} - h.RegisterListener(mcb) + h.RegisterListener("ns1", mcb) txID := fftypes.NewUUID() // token-pool: missing data @@ -882,6 +898,7 @@ func TestPoolEvents(t *testing.T) { "event": "token-pool", "data": fftypes.JSONObject{ "id": "000000000010/000020/000030/000040", + "namespace": "ns1", "type": "fungible", "poolLocator": "F1", "signer": "0x0", @@ -897,25 +914,30 @@ func TestPoolEvents(t *testing.T) { msg = <-toServer assert.Equal(t, `{"data":{"id":"8"},"event":"ack"}`, string(msg)) - // token-pool: callback fail + // token-pool: batch + callback fail mcb.On("TokenPoolCreated", h, mock.MatchedBy(func(p *tokens.TokenPool) bool { return p.PoolLocator == "F1" && p.Type == core.TokenTypeFungible && txID.Equals(p.TX.ID) && p.Event.ProtocolID == "000000000010/000020/000030" })).Return(fmt.Errorf("pop")).Once() fromServer <- fftypes.JSONObject{ "id": "9", - "event": "token-pool", + "event": "batch", "data": fftypes.JSONObject{ - "id": "000000000010/000020/000030/000040", - "type": "fungible", - "poolLocator": "F1", - "signer": "0x0", - "data": fftypes.JSONObject{"tx": txID.String()}.String(), - "blockchain": fftypes.JSONObject{ - "id": "000000000010/000020/000030", - "info": fftypes.JSONObject{ - "transactionHash": "0xffffeeee", + "events": fftypes.JSONObjectArray{{ + "event": "token-pool", + "data": fftypes.JSONObject{ + "id": "000000000010/000020/000030/000040", + "type": "fungible", + "poolLocator": "F1", + "signer": "0x0", + "data": fftypes.JSONObject{"tx": txID.String()}.String(), + "blockchain": fftypes.JSONObject{ + "id": "000000000010/000020/000030", + "info": fftypes.JSONObject{ + "transactionHash": "0xffffeeee", + }, + }, }, - }, + }}, }, }.String() } @@ -928,7 +950,7 @@ func TestTransferEvents(t *testing.T) { assert.NoError(t, err) mcb := &tokenmocks.Callbacks{} - h.RegisterListener(mcb) + h.RegisterListener("ns1", mcb) txID := fftypes.NewUUID() // token-mint: missing data @@ -970,6 +992,7 @@ func TestTransferEvents(t *testing.T) { "event": "token-mint", "data": fftypes.JSONObject{ "id": "000000000010/000020/000030/000040", + "namespace": "ns1", "poolLocator": "F1", "signer": "0x0", "to": "0x0", @@ -1146,7 +1169,7 @@ func TestApprovalEvents(t *testing.T) { assert.NoError(t, err) mcb := &tokenmocks.Callbacks{} - h.RegisterListener(mcb) + h.RegisterListener("ns1", mcb) txID := fftypes.NewUUID() // token-approval: success @@ -1158,6 +1181,7 @@ func TestApprovalEvents(t *testing.T) { "event": "token-approval", "data": fftypes.JSONObject{ "id": "000000000010/000020/000030/000040", + "namespace": "ns1", "subject": "a:b", "poolLocator": "F1", "signer": "0x0", diff --git a/mocks/orchestratormocks/orchestrator.go b/mocks/orchestratormocks/orchestrator.go index 8084bb5247..9a5b495766 100644 --- a/mocks/orchestratormocks/orchestrator.go +++ b/mocks/orchestratormocks/orchestrator.go @@ -20,8 +20,6 @@ import ( events "github.com/hyperledger/firefly/internal/events" - metrics "github.com/hyperledger/firefly/internal/metrics" - mock "github.com/stretchr/testify/mock" networkmap "github.com/hyperledger/firefly/internal/networkmap" @@ -1196,22 +1194,6 @@ func (_m *Orchestrator) Init(ctx context.Context, cancelCtx context.CancelFunc) return r0 } -// Metrics provides a mock function with given fields: -func (_m *Orchestrator) Metrics() metrics.Manager { - ret := _m.Called() - - var r0 metrics.Manager - if rf, ok := ret.Get(0).(func() metrics.Manager); ok { - r0 = rf() - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(metrics.Manager) - } - } - - return r0 -} - // NetworkMap provides a mock function with given fields: func (_m *Orchestrator) NetworkMap() networkmap.Manager { ret := _m.Called() diff --git a/mocks/tokenmocks/plugin.go b/mocks/tokenmocks/plugin.go index a53733786e..a1a3d3548e 100644 --- a/mocks/tokenmocks/plugin.go +++ b/mocks/tokenmocks/plugin.go @@ -138,9 +138,18 @@ func (_m *Plugin) Name() string { return r0 } -// RegisterListener provides a mock function with given fields: listener -func (_m *Plugin) RegisterListener(listener tokens.Callbacks) { - _m.Called(listener) +// RegisterListener provides a mock function with given fields: namespace, listener +func (_m *Plugin) RegisterListener(namespace string, listener tokens.Callbacks) error { + ret := _m.Called(namespace, listener) + + var r0 error + if rf, ok := ret.Get(0).(func(string, tokens.Callbacks) error); ok { + r0 = rf(namespace, listener) + } else { + r0 = ret.Error(0) + } + + return r0 } // Start provides a mock function with given fields: diff --git a/pkg/tokens/plugin.go b/pkg/tokens/plugin.go index 0363da50ed..8be3b561e9 100644 --- a/pkg/tokens/plugin.go +++ b/pkg/tokens/plugin.go @@ -36,7 +36,7 @@ type Plugin interface { Init(ctx context.Context, name string, config config.Section) error // RegisterListener registers a listener to receive callbacks - RegisterListener(listener Callbacks) + RegisterListener(namespace string, listener Callbacks) error // Blockchain interface must not deliver any events until start is called Start() error