From f78cbe52e82453b217b304bd1010cab72f36b098 Mon Sep 17 00:00:00 2001 From: Andrew Richardson Date: Thu, 10 Feb 2022 15:03:11 -0500 Subject: [PATCH 1/6] Add beforeConnect callback to websocket client Signed-off-by: Andrew Richardson --- internal/blockchain/ethereum/ethereum.go | 2 +- internal/blockchain/fabric/fabric.go | 2 +- internal/dataexchange/ffdx/ffdx.go | 2 +- internal/events/websockets/websockets_test.go | 2 +- internal/tokens/fftokens/fftokens.go | 2 +- pkg/wsclient/wsclient.go | 21 ++++++++++++++++--- pkg/wsclient/wsclient_test.go | 21 +++++++++++++------ 7 files changed, 38 insertions(+), 14 deletions(-) diff --git a/internal/blockchain/ethereum/ethereum.go b/internal/blockchain/ethereum/ethereum.go index de99bbbf53..dd6663b9f1 100644 --- a/internal/blockchain/ethereum/ethereum.go +++ b/internal/blockchain/ethereum/ethereum.go @@ -139,7 +139,7 @@ func (e *Ethereum) Init(ctx context.Context, prefix config.Prefix, callbacks blo wsConfig.WSKeyPath = "/ws" } - e.wsconn, err = wsclient.New(ctx, wsConfig, e.afterConnect) + e.wsconn, err = wsclient.New(ctx, wsConfig, nil, e.afterConnect) if err != nil { return err } diff --git a/internal/blockchain/fabric/fabric.go b/internal/blockchain/fabric/fabric.go index 59a27f571c..23a9e42ede 100644 --- a/internal/blockchain/fabric/fabric.go +++ b/internal/blockchain/fabric/fabric.go @@ -189,7 +189,7 @@ func (f *Fabric) Init(ctx context.Context, prefix config.Prefix, callbacks block wsConfig.WSKeyPath = "/ws" } - f.wsconn, err = wsclient.New(ctx, wsConfig, f.afterConnect) + f.wsconn, err = wsclient.New(ctx, wsConfig, nil, f.afterConnect) if err != nil { return err } diff --git a/internal/dataexchange/ffdx/ffdx.go b/internal/dataexchange/ffdx/ffdx.go index dc3b696348..ede610f507 100644 --- a/internal/dataexchange/ffdx/ffdx.go +++ b/internal/dataexchange/ffdx/ffdx.go @@ -119,7 +119,7 @@ func (h *FFDX) Init(ctx context.Context, prefix config.Prefix, callbacks dataexc wsConfig := wsconfig.GenerateConfigFromPrefix(prefix) - h.wsconn, err = wsclient.New(ctx, wsConfig, nil) + h.wsconn, err = wsclient.New(ctx, wsConfig, nil, nil) if err != nil { return err } diff --git a/internal/events/websockets/websockets_test.go b/internal/events/websockets/websockets_test.go index 59eb97f9a0..10db21d4fc 100644 --- a/internal/events/websockets/websockets_test.go +++ b/internal/events/websockets/websockets_test.go @@ -62,7 +62,7 @@ func newTestWebsockets(t *testing.T, cbs *eventsmocks.Callbacks, queryParams ... clientPrefix.Set(restclient.HTTPConfigURL, fmt.Sprintf("http://%s%s", svr.Listener.Addr(), qs)) wsConfig := wsconfig.GenerateConfigFromPrefix(clientPrefix) - wsc, err := wsclient.New(ctx, wsConfig, nil) + wsc, err := wsclient.New(ctx, wsConfig, nil, nil) assert.NoError(t, err) err = wsc.Connect() assert.NoError(t, err) diff --git a/internal/tokens/fftokens/fftokens.go b/internal/tokens/fftokens/fftokens.go index ce24ef958b..f8b4609fea 100644 --- a/internal/tokens/fftokens/fftokens.go +++ b/internal/tokens/fftokens/fftokens.go @@ -132,7 +132,7 @@ func (ft *FFTokens) Init(ctx context.Context, name string, prefix config.Prefix, wsConfig.WSKeyPath = "/api/ws" } - ft.wsconn, err = wsclient.New(ctx, wsConfig, nil) + ft.wsconn, err = wsclient.New(ctx, wsConfig, nil, nil) if err != nil { return err } diff --git a/pkg/wsclient/wsclient.go b/pkg/wsclient/wsclient.go index a8c23afafd..08071a837c 100644 --- a/pkg/wsclient/wsclient.go +++ b/pkg/wsclient/wsclient.go @@ -69,6 +69,7 @@ type wsClient struct { send chan []byte sendDone chan []byte closing chan struct{} + beforeConnect WSPreConnectHandler afterConnect WSPostConnectHandler heartbeatInterval time.Duration heartbeatMux sync.Mutex @@ -76,10 +77,13 @@ type wsClient struct { lastPingCompleted time.Time } +// WSPreConnectHandler will be called before every connect/reconnect. Any error returned will prevent the websocket from connecting. +type WSPreConnectHandler func(ctx context.Context) error + // WSPostConnectHandler will be called after every connect/reconnect. Can send data over ws, but must not block listening for data on the ws. type WSPostConnectHandler func(ctx context.Context, w WSClient) error -func New(ctx context.Context, config *WSConfig, afterConnect WSPostConnectHandler) (WSClient, error) { +func New(ctx context.Context, config *WSConfig, beforeConnect WSPreConnectHandler, afterConnect WSPostConnectHandler) (WSClient, error) { wsURL, err := buildWSUrl(ctx, config) if err != nil { @@ -102,6 +106,7 @@ func New(ctx context.Context, config *WSConfig, afterConnect WSPostConnectHandle receive: make(chan []byte), send: make(chan []byte), closing: make(chan struct{}), + beforeConnect: beforeConnect, afterConnect: afterConnect, heartbeatInterval: config.HeartbeatInterval, } @@ -204,6 +209,15 @@ func (w *wsClient) connect(initial bool) error { if w.closed { return false, i18n.NewError(w.ctx, i18n.MsgWSClosing) } + + retry = !initial || attempt < w.initialRetryAttempts + if w.beforeConnect != nil { + if err = w.beforeConnect(w.ctx); err != nil { + l.Warnf("WS %s connect attempt %d failed in beforeConnect", w.url, attempt) + return retry, err + } + } + var res *http.Response w.wsconn, res, err = w.wsdialer.Dial(w.url, w.headers) if err != nil { @@ -215,8 +229,9 @@ func (w *wsClient) connect(initial bool) error { status = res.StatusCode } l.Warnf("WS %s connect attempt %d failed [%d]: %s", w.url, attempt, status, string(b)) - return !initial || attempt > w.initialRetryAttempts, i18n.WrapError(w.ctx, err, i18n.MsgWSConnectFailed) + return retry, i18n.WrapError(w.ctx, err, i18n.MsgWSConnectFailed) } + w.pongReceivedOrReset(false) w.wsconn.SetPongHandler(w.pongHandler) l.Infof("WS %s connected", w.url) @@ -309,7 +324,7 @@ func (w *wsClient) receiveReconnectLoop() { l := log.L(w.ctx) defer close(w.receive) for !w.closed { - // Start the sender, letting it close without blocking sending a notifiation on the sendDone + // Start the sender, letting it close without blocking sending a notification on the sendDone w.sendDone = make(chan []byte, 1) receiverDone := make(chan struct{}) go w.sendLoop(receiverDone) diff --git a/pkg/wsclient/wsclient_test.go b/pkg/wsclient/wsclient_test.go index 1440f8f92b..b7ffb69f8e 100644 --- a/pkg/wsclient/wsclient_test.go +++ b/pkg/wsclient/wsclient_test.go @@ -39,6 +39,14 @@ func TestWSClientE2E(t *testing.T) { }) defer close() + first := true + beforeConnect := func(ctx context.Context) error { + if first { + first = false + return fmt.Errorf("first run fails") + } + return nil + } afterConnect := func(ctx context.Context, w WSClient) error { return w.Send(ctx, []byte(`after connect message`)) } @@ -49,8 +57,9 @@ func TestWSClientE2E(t *testing.T) { wsConfig.HTTPURL = url wsConfig.WSKeyPath = "/test" wsConfig.HeartbeatInterval = 50 * time.Millisecond + wsConfig.InitialConnectAttempts = 2 - wsc, err := New(context.Background(), wsConfig, afterConnect) + wsc, err := New(context.Background(), wsConfig, beforeConnect, afterConnect) assert.NoError(t, err) // Change the settings and connect @@ -90,7 +99,7 @@ func TestWSClientBadURL(t *testing.T) { wsConfig := generateConfig() wsConfig.HTTPURL = ":::" - _, err := New(context.Background(), wsConfig, nil) + _, err := New(context.Background(), wsConfig, nil, nil) assert.Regexp(t, "FF10162", err) } @@ -134,7 +143,7 @@ func TestWSFailStartupHttp500(t *testing.T) { wsConfig.InitialDelay = 1 wsConfig.InitialConnectAttempts = 1 - w, _ := New(context.Background(), wsConfig, nil) + w, _ := New(context.Background(), wsConfig, nil, nil) err := w.Connect() assert.Regexp(t, "FF10161", err) } @@ -153,7 +162,7 @@ func TestWSFailStartupConnect(t *testing.T) { wsConfig.InitialDelay = 1 wsConfig.InitialConnectAttempts = 1 - w, _ := New(context.Background(), wsConfig, nil) + w, _ := New(context.Background(), wsConfig, nil, nil) err := w.Connect() assert.Regexp(t, "FF10161", err) } @@ -163,7 +172,7 @@ func TestWSSendClosed(t *testing.T) { wsConfig := generateConfig() wsConfig.HTTPURL = "http://test:12345" - w, err := New(context.Background(), wsConfig, nil) + w, err := New(context.Background(), wsConfig, nil, nil) assert.NoError(t, err) w.Close() @@ -308,7 +317,7 @@ func TestHeartbeatSendFailed(t *testing.T) { _, _, url, close := NewTestWSServer(func(req *http.Request) {}) defer close() - wsc, err := New(context.Background(), &WSConfig{HTTPURL: url}, func(ctx context.Context, w WSClient) error { return nil }) + wsc, err := New(context.Background(), &WSConfig{HTTPURL: url}, nil, func(ctx context.Context, w WSClient) error { return nil }) assert.NoError(t, err) defer wsc.Close() From 97cdc53229133e826a9dd8855c12746bdf3bf11f Mon Sep 17 00:00:00 2001 From: Andrew Richardson Date: Thu, 10 Feb 2022 15:43:20 -0500 Subject: [PATCH 2/6] Add "reInitEnabled" config to DX Signed-off-by: Andrew Richardson --- internal/dataexchange/ffdx/config.go | 3 ++ internal/dataexchange/ffdx/ffdx.go | 25 +++++++++++++- internal/dataexchange/ffdx/ffdx_test.go | 43 +++++++++++++++++++++++++ 3 files changed, 70 insertions(+), 1 deletion(-) diff --git a/internal/dataexchange/ffdx/config.go b/internal/dataexchange/ffdx/config.go index a284c5746c..af506d4917 100644 --- a/internal/dataexchange/ffdx/config.go +++ b/internal/dataexchange/ffdx/config.go @@ -24,9 +24,12 @@ import ( const ( // DataExchangeManifestEnabled determines whether to require+validate a manifest from other DX instances in the network. Must be supported by the connector DataExchangeManifestEnabled = "manifestEnabled" + // DataExchangeReInitEnabled instructs FireFly to always post all current nodes to the /init API before connecting or reconnecting to the connector + DataExchangeReInitEnabled = "reInitEnabled" ) func (h *FFDX) InitPrefix(prefix config.Prefix) { wsconfig.InitPrefix(prefix) prefix.AddKnownKey(DataExchangeManifestEnabled, false) + prefix.AddKnownKey(DataExchangeReInitEnabled, false) } diff --git a/internal/dataexchange/ffdx/ffdx.go b/internal/dataexchange/ffdx/ffdx.go index ede610f507..d60b2e1e8f 100644 --- a/internal/dataexchange/ffdx/ffdx.go +++ b/internal/dataexchange/ffdx/ffdx.go @@ -41,6 +41,7 @@ type FFDX struct { callbacks dataexchange.Callbacks client *resty.Client wsconn wsclient.WSClient + reinit bool } type wsEvent struct { @@ -100,6 +101,10 @@ type wsAck struct { Manifest string `json:"manifest,omitempty"` // FireFly core determined that DX should propagate opaquely to TransferResult, if this DX supports delivery acknowledgements. } +type dxStatus struct { + Status string `json:"status"` +} + func (h *FFDX) Name() string { return "ffdx" } @@ -108,6 +113,8 @@ func (h *FFDX) Init(ctx context.Context, prefix config.Prefix, callbacks dataexc h.ctx = log.WithLogField(ctx, "dx", "ffdx") h.callbacks = callbacks + h.reinit = prefix.GetBool(DataExchangeReInitEnabled) + if prefix.GetString(restclient.HTTPConfigURL) == "" { return i18n.NewError(ctx, i18n.MsgMissingPluginConfig, "url", "dataexchange.ffdx") } @@ -119,7 +126,7 @@ func (h *FFDX) Init(ctx context.Context, prefix config.Prefix, callbacks dataexc wsConfig := wsconfig.GenerateConfigFromPrefix(prefix) - h.wsconn, err = wsclient.New(ctx, wsConfig, nil, nil) + h.wsconn, err = wsclient.New(ctx, wsConfig, h.beforeConnect, nil) if err != nil { return err } @@ -135,6 +142,22 @@ func (h *FFDX) Capabilities() *dataexchange.Capabilities { return h.capabilities } +func (h *FFDX) beforeConnect(ctx context.Context) error { + if h.reinit { + var status dxStatus + res, err := h.client.R().SetContext(ctx). + SetResult(&status). + Post("/api/v1/init") + if err != nil || !res.IsSuccess() { + return restclient.WrapRestErr(ctx, res, err, i18n.MsgDXRESTErr) + } + if status.Status != "ready" { + return fmt.Errorf("DX returned non-ready status: %s", status.Status) + } + } + return nil +} + func (h *FFDX) GetEndpointInfo(ctx context.Context) (peerID string, endpoint fftypes.JSONObject, err error) { res, err := h.client.R().SetContext(ctx). SetResult(&endpoint). diff --git a/internal/dataexchange/ffdx/ffdx_test.go b/internal/dataexchange/ffdx/ffdx_test.go index 07b773fbb6..2408fa22de 100644 --- a/internal/dataexchange/ffdx/ffdx_test.go +++ b/internal/dataexchange/ffdx/ffdx_test.go @@ -501,3 +501,46 @@ func TestEventLoopClosedContext(t *testing.T) { wsm.On("Send", mock.Anything, mock.Anything).Return(nil) h.eventLoop() // we're simply looking for it exiting } + +func TestWebsocketWithReinit(t *testing.T) { + mockedClient := &http.Client{} + httpmock.ActivateNonDefault(mockedClient) + defer httpmock.DeactivateAndReset() + + _, _, wsURL, cancel := wsclient.NewTestWSServer(nil) + defer cancel() + + u, _ := url.Parse(wsURL) + u.Scheme = "http" + httpURL := u.String() + h := &FFDX{} + + config.Reset() + h.InitPrefix(utConfPrefix) + utConfPrefix.Set(restclient.HTTPConfigURL, httpURL) + utConfPrefix.Set(restclient.HTTPCustomClient, mockedClient) + utConfPrefix.Set(DataExchangeReInitEnabled, true) + + first := true + httpmock.RegisterResponder("POST", fmt.Sprintf("%s/api/v1/init", httpURL), + func(r *http.Request) (*http.Response, error) { + if first { + first = false + return httpmock.NewJsonResponse(200, fftypes.JSONObject{ + "status": "notready", + }) + } + return httpmock.NewJsonResponse(200, fftypes.JSONObject{ + "status": "ready", + }) + }) + + h.InitPrefix(utConfPrefix) + err := h.Init(context.Background(), utConfPrefix, &dataexchangemocks.Callbacks{}) + assert.NoError(t, err) + + err = h.Start() + assert.NoError(t, err) + + assert.Equal(t, 2, httpmock.GetTotalCallCount()) +} From 5852da18eef1137e2a13cbe30c84cc4f94ab1f0e Mon Sep 17 00:00:00 2001 From: Andrew Richardson Date: Thu, 10 Feb 2022 16:12:06 -0500 Subject: [PATCH 3/6] Alter DX peer methods to pass DXInfo type Signed-off-by: Andrew Richardson --- internal/dataexchange/ffdx/ffdx.go | 15 ++++---- internal/dataexchange/ffdx/ffdx_test.go | 20 ++++++----- .../definition_handler_network_node.go | 2 +- .../definition_handler_network_node_test.go | 11 +++--- internal/networkmap/register_node.go | 4 +-- internal/networkmap/register_node_test.go | 20 ++++++++--- mocks/dataexchangemocks/plugin.go | 35 +++++++------------ pkg/dataexchange/plugin.go | 4 +-- 8 files changed, 61 insertions(+), 50 deletions(-) diff --git a/internal/dataexchange/ffdx/ffdx.go b/internal/dataexchange/ffdx/ffdx.go index d60b2e1e8f..89d5fd7281 100644 --- a/internal/dataexchange/ffdx/ffdx.go +++ b/internal/dataexchange/ffdx/ffdx.go @@ -158,20 +158,21 @@ func (h *FFDX) beforeConnect(ctx context.Context) error { return nil } -func (h *FFDX) GetEndpointInfo(ctx context.Context) (peerID string, endpoint fftypes.JSONObject, err error) { +func (h *FFDX) GetEndpointInfo(ctx context.Context) (peer fftypes.DXInfo, err error) { res, err := h.client.R().SetContext(ctx). - SetResult(&endpoint). + SetResult(&peer.Endpoint). Get("/api/v1/id") if err != nil || !res.IsSuccess() { - return peerID, endpoint, restclient.WrapRestErr(ctx, res, err, i18n.MsgDXRESTErr) + return peer, restclient.WrapRestErr(ctx, res, err, i18n.MsgDXRESTErr) } - return endpoint.GetString("id"), endpoint, nil + peer.Peer = peer.Endpoint.GetString("id") + return peer, nil } -func (h *FFDX) AddPeer(ctx context.Context, peerID string, endpoint fftypes.JSONObject) (err error) { +func (h *FFDX) AddPeer(ctx context.Context, peer fftypes.DXInfo) (err error) { res, err := h.client.R().SetContext(ctx). - SetBody(endpoint). - Put(fmt.Sprintf("/api/v1/peers/%s", peerID)) + SetBody(peer.Endpoint). + Put(fmt.Sprintf("/api/v1/peers/%s", peer.Peer)) if err != nil || !res.IsSuccess() { return restclient.WrapRestErr(ctx, res, err, i18n.MsgDXRESTErr) } diff --git a/internal/dataexchange/ffdx/ffdx_test.go b/internal/dataexchange/ffdx/ffdx_test.go index 2408fa22de..27ce6bdc0a 100644 --- a/internal/dataexchange/ffdx/ffdx_test.go +++ b/internal/dataexchange/ffdx/ffdx_test.go @@ -94,14 +94,14 @@ func TestGetEndpointInfo(t *testing.T) { "cert": "cert data...", })) - peerID, endpoint, err := h.GetEndpointInfo(context.Background()) + peer, err := h.GetEndpointInfo(context.Background()) assert.NoError(t, err) - assert.Equal(t, "peer1", peerID) + assert.Equal(t, "peer1", peer.Peer) assert.Equal(t, fftypes.JSONObject{ "id": "peer1", "endpoint": "https://peer1.example.com", "cert": "cert data...", - }, endpoint) + }, peer.Endpoint) } func TestGetEndpointInfoError(t *testing.T) { @@ -111,7 +111,7 @@ func TestGetEndpointInfoError(t *testing.T) { httpmock.RegisterResponder("GET", fmt.Sprintf("%s/api/v1/id", httpURL), httpmock.NewJsonResponderOrPanic(500, fftypes.JSONObject{})) - _, _, err := h.GetEndpointInfo(context.Background()) + _, err := h.GetEndpointInfo(context.Background()) assert.Regexp(t, "FF10229", err) } @@ -123,13 +123,14 @@ func TestAddPeer(t *testing.T) { httpmock.RegisterResponder("PUT", fmt.Sprintf("%s/api/v1/peers/peer1", httpURL), httpmock.NewJsonResponderOrPanic(200, fftypes.JSONObject{})) - err := h.AddPeer(context.Background(), "peer1", - fftypes.JSONObject{ + err := h.AddPeer(context.Background(), fftypes.DXInfo{ + Peer: "peer1", + Endpoint: fftypes.JSONObject{ "id": "peer1", "endpoint": "https://peer1.example.com", "cert": "cert...", }, - ) + }) assert.NoError(t, err) } @@ -140,7 +141,10 @@ func TestAddPeerError(t *testing.T) { httpmock.RegisterResponder("PUT", fmt.Sprintf("%s/api/v1/peers/peer1", httpURL), httpmock.NewJsonResponderOrPanic(500, fftypes.JSONObject{})) - err := h.AddPeer(context.Background(), "peer1", fftypes.JSONObject{}) + err := h.AddPeer(context.Background(), fftypes.DXInfo{ + Peer: "peer1", + Endpoint: fftypes.JSONObject{}, + }) assert.Regexp(t, "FF10229", err) } diff --git a/internal/definitions/definition_handler_network_node.go b/internal/definitions/definition_handler_network_node.go index d6ea0b01b4..b8fe4bf59d 100644 --- a/internal/definitions/definition_handler_network_node.go +++ b/internal/definitions/definition_handler_network_node.go @@ -73,7 +73,7 @@ func (dh *definitionHandlers) handleNodeBroadcast(ctx context.Context, msg *ffty return ActionConfirm, &DefinitionBatchActions{ PreFinalize: func(ctx context.Context) error { // Tell the data exchange about this node. Treat these errors like database errors - and return for retry processing - return dh.exchange.AddPeer(ctx, node.DX.Peer, node.DX.Endpoint) + return dh.exchange.AddPeer(ctx, node.DX) }, }, nil } diff --git a/internal/definitions/definition_handler_network_node_test.go b/internal/definitions/definition_handler_network_node_test.go index 5dd611b63d..f172c27b8c 100644 --- a/internal/definitions/definition_handler_network_node_test.go +++ b/internal/definitions/definition_handler_network_node_test.go @@ -54,8 +54,8 @@ func TestHandleDefinitionBroadcastNodeOk(t *testing.T) { mdi.On("GetNodeByID", mock.Anything, node.ID).Return(nil, nil) mdi.On("UpsertNode", mock.Anything, mock.Anything, true).Return(nil) mdx := dh.exchange.(*dataexchangemocks.Plugin) - mdx.On("AddPeer", mock.Anything, "peer1", node.DX.Endpoint).Return(nil) - action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ + mdx.On("AddPeer", mock.Anything, node.DX).Return(nil) + action, ba, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Namespace: "ns1", Identity: fftypes.Identity{ @@ -68,6 +68,9 @@ func TestHandleDefinitionBroadcastNodeOk(t *testing.T) { assert.Equal(t, ActionConfirm, action) assert.NoError(t, err) + err = ba.PreFinalize(context.Background()) + assert.NoError(t, err) + mdi.AssertExpectations(t) } @@ -136,7 +139,7 @@ func TestHandleDefinitionBroadcastNodeAddPeerFail(t *testing.T) { mdi.On("GetNodeByID", mock.Anything, node.ID).Return(nil, nil) mdi.On("UpsertNode", mock.Anything, mock.Anything, true).Return(nil) mdx := dh.exchange.(*dataexchangemocks.Plugin) - mdx.On("AddPeer", mock.Anything, "peer1", mock.Anything).Return(fmt.Errorf("pop")) + mdx.On("AddPeer", mock.Anything, node.DX).Return(fmt.Errorf("pop")) action, ba, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Namespace: "ns1", @@ -217,7 +220,7 @@ func TestHandleDefinitionBroadcastNodeDupOK(t *testing.T) { mdi.On("GetNode", mock.Anything, "0x23456", "node1").Return(&fftypes.Node{Owner: "0x23456"}, nil) mdi.On("UpsertNode", mock.Anything, mock.Anything, true).Return(nil) mdx := dh.exchange.(*dataexchangemocks.Plugin) - mdx.On("AddPeer", mock.Anything, "peer1", mock.Anything).Return(nil) + mdx.On("AddPeer", mock.Anything, node.DX).Return(nil) action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Namespace: "ns1", diff --git a/internal/networkmap/register_node.go b/internal/networkmap/register_node.go index 01dc631d81..64e02be401 100644 --- a/internal/networkmap/register_node.go +++ b/internal/networkmap/register_node.go @@ -1,4 +1,4 @@ -// Copyright © 2021 Kaleido, Inc. +// Copyright © 2022 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -49,7 +49,7 @@ func (nm *networkMap) RegisterNode(ctx context.Context, waitConfirm bool) (node return nil, nil, i18n.NewError(ctx, i18n.MsgNodeAndOrgIDMustBeSet) } - node.DX.Peer, node.DX.Endpoint, err = nm.exchange.GetEndpointInfo(ctx) + node.DX, err = nm.exchange.GetEndpointInfo(ctx) if err != nil { return nil, nil, err } diff --git a/internal/networkmap/register_node_test.go b/internal/networkmap/register_node_test.go index 9fb9f21893..fa1af98eab 100644 --- a/internal/networkmap/register_node_test.go +++ b/internal/networkmap/register_node_test.go @@ -49,7 +49,10 @@ func TestRegisterNodeOk(t *testing.T) { mim.On("ResolveSigningKey", nm.ctx, "0x23456").Return("0x23456", nil) mdx := nm.exchange.(*dataexchangemocks.Plugin) - mdx.On("GetEndpointInfo", nm.ctx).Return("peer1", fftypes.JSONObject{"endpoint": "details"}, nil) + mdx.On("GetEndpointInfo", nm.ctx).Return(fftypes.DXInfo{ + Peer: "peer1", + Endpoint: fftypes.JSONObject{"endpoint": "details"}, + }, nil) mockMsg := &fftypes.Message{Header: fftypes.MessageHeader{ID: fftypes.NewUUID()}} mbm := nm.broadcast.(*broadcastmocks.Manager) @@ -147,7 +150,10 @@ func TestRegisterNodeParentNotFound(t *testing.T) { mim.On("ResolveSigningKey", nm.ctx, "0x23456").Return("0x23456", nil) mdx := nm.exchange.(*dataexchangemocks.Plugin) - mdx.On("GetEndpointInfo", nm.ctx).Return("peer1", fftypes.JSONObject{"endpoint": "details"}, nil) + mdx.On("GetEndpointInfo", nm.ctx).Return(fftypes.DXInfo{ + Peer: "peer1", + Endpoint: fftypes.JSONObject{"endpoint": "details"}, + }, nil) _, _, err := nm.RegisterNode(nm.ctx, false) assert.Regexp(t, "FF10214", err) @@ -167,7 +173,10 @@ func TestRegisterNodeParentBadNode(t *testing.T) { mim.On("ResolveSigningKey", nm.ctx, "0x23456").Return("0x23456", nil) mdx := nm.exchange.(*dataexchangemocks.Plugin) - mdx.On("GetEndpointInfo", nm.ctx).Return("peer1", fftypes.JSONObject{"endpoint": "details"}, nil) + mdx.On("GetEndpointInfo", nm.ctx).Return(fftypes.DXInfo{ + Peer: "peer1", + Endpoint: fftypes.JSONObject{"endpoint": "details"}, + }, nil) _, _, err := nm.RegisterNode(nm.ctx, false) assert.Regexp(t, "FF10188", err) @@ -184,7 +193,10 @@ func TestRegisterNodeParentDXEndpointFail(t *testing.T) { config.Set(config.NodeName, "node1") mdx := nm.exchange.(*dataexchangemocks.Plugin) - mdx.On("GetEndpointInfo", nm.ctx).Return("", nil, fmt.Errorf("pop")) + mdx.On("GetEndpointInfo", nm.ctx).Return(fftypes.DXInfo{ + Peer: "", + Endpoint: nil, + }, fmt.Errorf("pop")) mim := nm.identity.(*identitymanagermocks.Manager) mim.On("ResolveSigningKey", nm.ctx, "0x23456").Return("0x23456", nil) diff --git a/mocks/dataexchangemocks/plugin.go b/mocks/dataexchangemocks/plugin.go index b179c52f34..344cfb2d5f 100644 --- a/mocks/dataexchangemocks/plugin.go +++ b/mocks/dataexchangemocks/plugin.go @@ -21,13 +21,13 @@ type Plugin struct { mock.Mock } -// AddPeer provides a mock function with given fields: ctx, peerID, endpoint -func (_m *Plugin) AddPeer(ctx context.Context, peerID string, endpoint fftypes.JSONObject) error { - ret := _m.Called(ctx, peerID, endpoint) +// AddPeer provides a mock function with given fields: ctx, peer +func (_m *Plugin) AddPeer(ctx context.Context, peer fftypes.DXInfo) error { + ret := _m.Called(ctx, peer) var r0 error - if rf, ok := ret.Get(0).(func(context.Context, string, fftypes.JSONObject) error); ok { - r0 = rf(ctx, peerID, endpoint) + if rf, ok := ret.Get(0).(func(context.Context, fftypes.DXInfo) error); ok { + r0 = rf(ctx, peer) } else { r0 = ret.Error(0) } @@ -105,33 +105,24 @@ func (_m *Plugin) DownloadBLOB(ctx context.Context, payloadRef string) (io.ReadC } // GetEndpointInfo provides a mock function with given fields: ctx -func (_m *Plugin) GetEndpointInfo(ctx context.Context) (string, fftypes.JSONObject, error) { +func (_m *Plugin) GetEndpointInfo(ctx context.Context) (fftypes.DXInfo, error) { ret := _m.Called(ctx) - var r0 string - if rf, ok := ret.Get(0).(func(context.Context) string); ok { + var r0 fftypes.DXInfo + if rf, ok := ret.Get(0).(func(context.Context) fftypes.DXInfo); ok { r0 = rf(ctx) } else { - r0 = ret.Get(0).(string) + r0 = ret.Get(0).(fftypes.DXInfo) } - var r1 fftypes.JSONObject - if rf, ok := ret.Get(1).(func(context.Context) fftypes.JSONObject); ok { + var r1 error + if rf, ok := ret.Get(1).(func(context.Context) error); ok { r1 = rf(ctx) } else { - if ret.Get(1) != nil { - r1 = ret.Get(1).(fftypes.JSONObject) - } - } - - var r2 error - if rf, ok := ret.Get(2).(func(context.Context) error); ok { - r2 = rf(ctx) - } else { - r2 = ret.Error(2) + r1 = ret.Error(1) } - return r0, r1, r2 + return r0, r1 } // Init provides a mock function with given fields: ctx, prefix, callbacks diff --git a/pkg/dataexchange/plugin.go b/pkg/dataexchange/plugin.go index 539b33157a..0ef92e725d 100644 --- a/pkg/dataexchange/plugin.go +++ b/pkg/dataexchange/plugin.go @@ -70,10 +70,10 @@ type Plugin interface { Capabilities() *Capabilities // GetEndpointInfo returns the information about the local endpoint - GetEndpointInfo(ctx context.Context) (peerID string, endpoint fftypes.JSONObject, err error) + GetEndpointInfo(ctx context.Context) (peer fftypes.DXInfo, err error) // AddPeer translates the configuration published by another peer, into a reference string that is used between DX and FireFly to refer to the peer - AddPeer(ctx context.Context, peerID string, endpoint fftypes.JSONObject) (err error) + AddPeer(ctx context.Context, peer fftypes.DXInfo) (err error) // UploadBLOB streams a blob to storage, and returns the hash to confirm the hash calculated in Core matches the hash calculated in the plugin UploadBLOB(ctx context.Context, ns string, id fftypes.UUID, content io.Reader) (payloadRef string, hash *fftypes.Bytes32, size int64, err error) From 0c03325f52cee1d9c20cabca09f4bd9d22890475 Mon Sep 17 00:00:00 2001 From: Andrew Richardson Date: Thu, 10 Feb 2022 16:44:49 -0500 Subject: [PATCH 4/6] Pass list of nodes when performing DX init Signed-off-by: Andrew Richardson --- internal/dataexchange/ffdx/ffdx.go | 9 ++++- internal/dataexchange/ffdx/ffdx_test.go | 20 +++++++--- internal/orchestrator/orchestrator.go | 43 +++++++++++++-------- internal/orchestrator/orchestrator_test.go | 44 ++++++++++++++++++---- mocks/dataexchangemocks/plugin.go | 10 ++--- pkg/dataexchange/plugin.go | 3 +- 6 files changed, 92 insertions(+), 37 deletions(-) diff --git a/internal/dataexchange/ffdx/ffdx.go b/internal/dataexchange/ffdx/ffdx.go index 89d5fd7281..0016eb8c95 100644 --- a/internal/dataexchange/ffdx/ffdx.go +++ b/internal/dataexchange/ffdx/ffdx.go @@ -42,6 +42,7 @@ type FFDX struct { client *resty.Client wsconn wsclient.WSClient reinit bool + nodes []fftypes.DXInfo } type wsEvent struct { @@ -109,8 +110,8 @@ func (h *FFDX) Name() string { return "ffdx" } -func (h *FFDX) Init(ctx context.Context, prefix config.Prefix, callbacks dataexchange.Callbacks) (err error) { - h.ctx = log.WithLogField(ctx, "dx", "ffdx") +func (h *FFDX) Init(ctx context.Context, prefix config.Prefix, nodes []fftypes.DXInfo, callbacks dataexchange.Callbacks) (err error) { + h.ctx = log.WithLogField(ctx, "dx", "https") h.callbacks = callbacks h.reinit = prefix.GetBool(DataExchangeReInitEnabled) @@ -119,6 +120,8 @@ func (h *FFDX) Init(ctx context.Context, prefix config.Prefix, callbacks dataexc return i18n.NewError(ctx, i18n.MsgMissingPluginConfig, "url", "dataexchange.ffdx") } + h.nodes = nodes + h.client = restclient.New(h.ctx, prefix) h.capabilities = &dataexchange.Capabilities{ Manifest: prefix.GetBool(DataExchangeManifestEnabled), @@ -146,6 +149,7 @@ func (h *FFDX) beforeConnect(ctx context.Context) error { if h.reinit { var status dxStatus res, err := h.client.R().SetContext(ctx). + SetBody(h.nodes). SetResult(&status). Post("/api/v1/init") if err != nil || !res.IsSuccess() { @@ -166,6 +170,7 @@ func (h *FFDX) GetEndpointInfo(ctx context.Context) (peer fftypes.DXInfo, err er return peer, restclient.WrapRestErr(ctx, res, err, i18n.MsgDXRESTErr) } peer.Peer = peer.Endpoint.GetString("id") + h.nodes = append(h.nodes, peer) return peer, nil } diff --git a/internal/dataexchange/ffdx/ffdx_test.go b/internal/dataexchange/ffdx/ffdx_test.go index 27ce6bdc0a..cfea31ecb5 100644 --- a/internal/dataexchange/ffdx/ffdx_test.go +++ b/internal/dataexchange/ffdx/ffdx_test.go @@ -19,6 +19,7 @@ package ffdx import ( "bytes" "context" + "encoding/json" "fmt" "io/ioutil" "net/http" @@ -54,8 +55,9 @@ func newTestFFDX(t *testing.T) (h *FFDX, toServer, fromServer chan string, httpU utConfPrefix.Set(restclient.HTTPCustomClient, mockedClient) h = &FFDX{} + nodes := make([]fftypes.DXInfo, 0) h.InitPrefix(utConfPrefix) - err := h.Init(context.Background(), utConfPrefix, &dataexchangemocks.Callbacks{}) + err := h.Init(context.Background(), utConfPrefix, nodes, &dataexchangemocks.Callbacks{}) assert.NoError(t, err) assert.Equal(t, "ffdx", h.Name()) assert.NotNil(t, h.Capabilities()) @@ -68,17 +70,19 @@ func newTestFFDX(t *testing.T) (h *FFDX, toServer, fromServer chan string, httpU func TestInitBadURL(t *testing.T) { config.Reset() h := &FFDX{} + nodes := make([]fftypes.DXInfo, 0) h.InitPrefix(utConfPrefix) utConfPrefix.Set(restclient.HTTPConfigURL, "::::////") - err := h.Init(context.Background(), utConfPrefix, &dataexchangemocks.Callbacks{}) + err := h.Init(context.Background(), utConfPrefix, nodes, &dataexchangemocks.Callbacks{}) assert.Regexp(t, "FF10162", err) } func TestInitMissingURL(t *testing.T) { config.Reset() h := &FFDX{} + nodes := make([]fftypes.DXInfo, 0) h.InitPrefix(utConfPrefix) - err := h.Init(context.Background(), utConfPrefix, &dataexchangemocks.Callbacks{}) + err := h.Init(context.Background(), utConfPrefix, nodes, &dataexchangemocks.Callbacks{}) assert.Regexp(t, "FF10138", err) } @@ -518,6 +522,7 @@ func TestWebsocketWithReinit(t *testing.T) { u.Scheme = "http" httpURL := u.String() h := &FFDX{} + nodes := []fftypes.DXInfo{{}} config.Reset() h.InitPrefix(utConfPrefix) @@ -527,7 +532,12 @@ func TestWebsocketWithReinit(t *testing.T) { first := true httpmock.RegisterResponder("POST", fmt.Sprintf("%s/api/v1/init", httpURL), - func(r *http.Request) (*http.Response, error) { + func(req *http.Request) (*http.Response, error) { + var reqNodes []fftypes.DXInfo + err := json.NewDecoder(req.Body).Decode(&reqNodes) + assert.NoError(t, err) + assert.Equal(t, 1, len(reqNodes)) + if first { first = false return httpmock.NewJsonResponse(200, fftypes.JSONObject{ @@ -540,7 +550,7 @@ func TestWebsocketWithReinit(t *testing.T) { }) h.InitPrefix(utConfPrefix) - err := h.Init(context.Background(), utConfPrefix, &dataexchangemocks.Callbacks{}) + err := h.Init(context.Background(), utConfPrefix, nodes, &dataexchangemocks.Callbacks{}) assert.NoError(t, err) err = h.Start() diff --git a/internal/orchestrator/orchestrator.go b/internal/orchestrator/orchestrator.go index 43200e67ca..3fb681ea0a 100644 --- a/internal/orchestrator/orchestrator.go +++ b/internal/orchestrator/orchestrator.go @@ -266,7 +266,6 @@ func (or *orchestrator) Contracts() contracts.Manager { } func (or *orchestrator) initDatabaseCheckPreinit(ctx context.Context) (err error) { - if or.database == nil { diType := config.GetString(config.DatabaseType) if or.database, err = difactory.GetPlugin(ctx, diType); err != nil { @@ -290,6 +289,33 @@ func (or *orchestrator) initDatabaseCheckPreinit(ctx context.Context) (err error return config.MergeConfig(configRecords) } +func (or *orchestrator) initDataExchange(ctx context.Context) (err error) { + dxPlugin := config.GetString(config.DataexchangeType) + if or.dataexchange == nil { + pluginName := dxPlugin + if pluginName == "https" { + // Migration path for old plugin name + // TODO: eventually make this fatal + log.L(ctx).Warnf("Your data exchange config uses the old plugin name 'https' - this plugin has been renamed to 'ffdx'") + pluginName = "ffdx" + } + if or.dataexchange, err = dxfactory.GetPlugin(ctx, pluginName); err != nil { + return err + } + } + + nodes, _, err := or.database.GetNodes(ctx, database.NodeQueryFactory.NewFilter(ctx).And()) + if err != nil { + return err + } + nodeInfo := make([]fftypes.DXInfo, len(nodes)) + for i, node := range nodes { + nodeInfo[i] = node.DX + } + + return or.dataexchange.Init(ctx, dataexchangeConfig.SubPrefix(dxPlugin), nodeInfo, &or.bc) +} + func (or *orchestrator) initPlugins(ctx context.Context) (err error) { if err = or.initDatabaseCheckPreinit(ctx); err != nil { @@ -328,20 +354,7 @@ func (or *orchestrator) initPlugins(ctx context.Context) (err error) { return err } - dxPlugin := config.GetString(config.DataexchangeType) - if or.dataexchange == nil { - pluginName := dxPlugin - if pluginName == "https" { - // Migration path for old plugin name - // TODO: eventually make this fatal - log.L(ctx).Warnf("Your data exchange config uses the old plugin name 'https' - this plugin has been renamed to 'ffdx'") - pluginName = "ffdx" - } - if or.dataexchange, err = dxfactory.GetPlugin(ctx, pluginName); err != nil { - return err - } - } - if err = or.dataexchange.Init(ctx, dataexchangeConfig.SubPrefix(dxPlugin), &or.bc); err != nil { + if err = or.initDataExchange(ctx); err != nil { return err } diff --git a/internal/orchestrator/orchestrator_test.go b/internal/orchestrator/orchestrator_test.go index 1e830c37d1..dedc270210 100644 --- a/internal/orchestrator/orchestrator_test.go +++ b/internal/orchestrator/orchestrator_test.go @@ -272,7 +272,8 @@ func TestBadDataExchangeInitFail(t *testing.T) { or.mbi.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil) or.mii.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil) or.mps.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil) - or.mdx.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("pop")) + or.mdi.On("GetNodes", mock.Anything, mock.Anything).Return([]*fftypes.Node{}, nil, nil) + or.mdx.On("Init", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("pop")) ctx, cancelCtx := context.WithCancel(context.Background()) err := or.Init(ctx, cancelCtx) assert.EqualError(t, err, "pop") @@ -289,6 +290,7 @@ func TestDataExchangePluginOldName(t *testing.T) { or.mbi.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil) or.mii.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil) or.mps.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil) + or.mdi.On("GetNodes", mock.Anything, mock.Anything).Return([]*fftypes.Node{}, nil, nil) or.mdi.On("GetNamespace", mock.Anything, mock.Anything).Return(nil, fmt.Errorf("pop")) ctx, cancelCtx := context.WithCancel(context.Background()) err := or.Init(ctx, cancelCtx) @@ -307,7 +309,8 @@ func TestBadTokensPlugin(t *testing.T) { or.mbi.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil) or.mii.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil) or.mps.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil) - or.mdx.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil) + or.mdi.On("GetNodes", mock.Anything, mock.Anything).Return([]*fftypes.Node{}, nil, nil) + or.mdx.On("Init", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) or.mdi.On("GetNamespace", mock.Anything, mock.Anything).Return(nil, nil) or.mdi.On("UpsertNamespace", mock.Anything, mock.Anything, true).Return(nil) ctx, cancelCtx := context.WithCancel(context.Background()) @@ -328,7 +331,8 @@ func TestBadTokensPluginNoConnector(t *testing.T) { or.mii.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil) or.mbi.On("VerifyIdentitySyntax", mock.Anything, mock.Anything, mock.Anything).Return("", nil) or.mps.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil) - or.mdx.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil) + or.mdi.On("GetNodes", mock.Anything, mock.Anything).Return([]*fftypes.Node{}, nil, nil) + or.mdx.On("Init", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) or.mdi.On("GetNamespace", mock.Anything, mock.Anything).Return(nil, nil) or.mdi.On("UpsertNamespace", mock.Anything, mock.Anything, true).Return(nil) ctx, cancelCtx := context.WithCancel(context.Background()) @@ -348,7 +352,8 @@ func TestBadTokensPluginNoName(t *testing.T) { or.mbi.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil) or.mii.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil) or.mps.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil) - or.mdx.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil) + or.mdi.On("GetNodes", mock.Anything, mock.Anything).Return([]*fftypes.Node{}, nil, nil) + or.mdx.On("Init", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) or.mdi.On("GetNamespace", mock.Anything, mock.Anything).Return(nil, nil) or.mdi.On("UpsertNamespace", mock.Anything, mock.Anything, true).Return(nil) ctx, cancelCtx := context.WithCancel(context.Background()) @@ -368,7 +373,8 @@ func TestBadTokensPluginInvalidName(t *testing.T) { or.mbi.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil) or.mii.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil) or.mps.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil) - or.mdx.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil) + or.mdi.On("GetNodes", mock.Anything, mock.Anything).Return([]*fftypes.Node{}, nil, nil) + or.mdx.On("Init", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) or.mdi.On("GetNamespace", mock.Anything, mock.Anything).Return(nil, nil) or.mdi.On("UpsertNamespace", mock.Anything, mock.Anything, true).Return(nil) ctx, cancelCtx := context.WithCancel(context.Background()) @@ -389,7 +395,8 @@ func TestBadTokensPluginNoType(t *testing.T) { or.mii.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil) or.mbi.On("VerifyIdentitySyntax", mock.Anything, mock.Anything, mock.Anything).Return("", nil) or.mps.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil) - or.mdx.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil) + or.mdi.On("GetNodes", mock.Anything, mock.Anything).Return([]*fftypes.Node{}, nil, nil) + or.mdx.On("Init", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) or.mdi.On("GetNamespace", mock.Anything, mock.Anything).Return(nil, nil) or.mdi.On("UpsertNamespace", mock.Anything, mock.Anything, true).Return(nil) ctx, cancelCtx := context.WithCancel(context.Background()) @@ -411,7 +418,8 @@ func TestGoodTokensPlugin(t *testing.T) { or.mbi.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil) or.mii.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil) or.mps.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil) - or.mdx.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil) + or.mdi.On("GetNodes", mock.Anything, mock.Anything).Return([]*fftypes.Node{}, nil, nil) + or.mdx.On("Init", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) or.mdi.On("GetNamespace", mock.Anything, mock.Anything).Return(nil, nil) or.mdi.On("UpsertNamespace", mock.Anything, mock.Anything, true).Return(nil) or.mti.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("pop")) @@ -601,7 +609,8 @@ func TestInitOK(t *testing.T) { or.mii.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil) or.mbi.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil) or.mps.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil) - or.mdx.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil) + or.mdi.On("GetNodes", mock.Anything, mock.Anything).Return([]*fftypes.Node{}, nil, nil) + or.mdx.On("Init", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) or.mdi.On("GetNamespace", mock.Anything, mock.Anything).Return(nil, nil) or.mdi.On("UpsertNamespace", mock.Anything, mock.Anything, true).Return(nil) or.mti.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil) @@ -620,3 +629,22 @@ func TestInitOK(t *testing.T) { assert.Equal(t, or.mam, or.Assets()) assert.Equal(t, or.mcm, or.Contracts()) } + +func TestInitDataExchangeGetNodesFail(t *testing.T) { + or := newTestOrchestrator() + + or.mdi.On("GetNodes", mock.Anything, mock.Anything).Return(nil, nil, fmt.Errorf("pop")) + + err := or.initDataExchange(or.ctx) + assert.EqualError(t, err, "pop") +} + +func TestInitDataExchangeWithNodes(t *testing.T) { + or := newTestOrchestrator() + + or.mdi.On("GetNodes", mock.Anything, mock.Anything).Return([]*fftypes.Node{{}}, nil, nil) + or.mdx.On("Init", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) + + err := or.initDataExchange(or.ctx) + assert.NoError(t, err) +} diff --git a/mocks/dataexchangemocks/plugin.go b/mocks/dataexchangemocks/plugin.go index 344cfb2d5f..48f4e5fe13 100644 --- a/mocks/dataexchangemocks/plugin.go +++ b/mocks/dataexchangemocks/plugin.go @@ -125,13 +125,13 @@ func (_m *Plugin) GetEndpointInfo(ctx context.Context) (fftypes.DXInfo, error) { return r0, r1 } -// Init provides a mock function with given fields: ctx, prefix, callbacks -func (_m *Plugin) Init(ctx context.Context, prefix config.Prefix, callbacks dataexchange.Callbacks) error { - ret := _m.Called(ctx, prefix, callbacks) +// Init provides a mock function with given fields: ctx, prefix, nodes, callbacks +func (_m *Plugin) Init(ctx context.Context, prefix config.Prefix, nodes []fftypes.DXInfo, callbacks dataexchange.Callbacks) error { + ret := _m.Called(ctx, prefix, nodes, callbacks) var r0 error - if rf, ok := ret.Get(0).(func(context.Context, config.Prefix, dataexchange.Callbacks) error); ok { - r0 = rf(ctx, prefix, callbacks) + if rf, ok := ret.Get(0).(func(context.Context, config.Prefix, []fftypes.DXInfo, dataexchange.Callbacks) error); ok { + r0 = rf(ctx, prefix, nodes, callbacks) } else { r0 = ret.Error(0) } diff --git a/pkg/dataexchange/plugin.go b/pkg/dataexchange/plugin.go index 0ef92e725d..aedf4fde84 100644 --- a/pkg/dataexchange/plugin.go +++ b/pkg/dataexchange/plugin.go @@ -60,8 +60,7 @@ type Plugin interface { InitPrefix(prefix config.Prefix) // Init initializes the plugin, with configuration - // Returns the supported featureset of the interface - Init(ctx context.Context, prefix config.Prefix, callbacks Callbacks) error + Init(ctx context.Context, prefix config.Prefix, nodes []fftypes.DXInfo, callbacks Callbacks) error // Data exchange interface must not deliver any events until start is called Start() error From 0dd54d47115a1684b24f6d9b22aab1f0f06f6b2a Mon Sep 17 00:00:00 2001 From: Andrew Richardson Date: Fri, 11 Feb 2022 13:52:03 -0500 Subject: [PATCH 5/6] Use name "initEnabled" instead of "reInitEnabled" Signed-off-by: Andrew Richardson --- internal/dataexchange/ffdx/config.go | 6 +++--- internal/dataexchange/ffdx/ffdx.go | 6 +++--- internal/dataexchange/ffdx/ffdx_test.go | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/internal/dataexchange/ffdx/config.go b/internal/dataexchange/ffdx/config.go index af506d4917..8ebc0a7fbc 100644 --- a/internal/dataexchange/ffdx/config.go +++ b/internal/dataexchange/ffdx/config.go @@ -24,12 +24,12 @@ import ( const ( // DataExchangeManifestEnabled determines whether to require+validate a manifest from other DX instances in the network. Must be supported by the connector DataExchangeManifestEnabled = "manifestEnabled" - // DataExchangeReInitEnabled instructs FireFly to always post all current nodes to the /init API before connecting or reconnecting to the connector - DataExchangeReInitEnabled = "reInitEnabled" + // DataExchangeInitEnabled instructs FireFly to always post all current nodes to the /init API before connecting or reconnecting to the connector + DataExchangeInitEnabled = "initEnabled" ) func (h *FFDX) InitPrefix(prefix config.Prefix) { wsconfig.InitPrefix(prefix) prefix.AddKnownKey(DataExchangeManifestEnabled, false) - prefix.AddKnownKey(DataExchangeReInitEnabled, false) + prefix.AddKnownKey(DataExchangeInitEnabled, false) } diff --git a/internal/dataexchange/ffdx/ffdx.go b/internal/dataexchange/ffdx/ffdx.go index 0016eb8c95..dbdb00c3d1 100644 --- a/internal/dataexchange/ffdx/ffdx.go +++ b/internal/dataexchange/ffdx/ffdx.go @@ -41,7 +41,7 @@ type FFDX struct { callbacks dataexchange.Callbacks client *resty.Client wsconn wsclient.WSClient - reinit bool + needsInit bool nodes []fftypes.DXInfo } @@ -114,7 +114,7 @@ func (h *FFDX) Init(ctx context.Context, prefix config.Prefix, nodes []fftypes.D h.ctx = log.WithLogField(ctx, "dx", "https") h.callbacks = callbacks - h.reinit = prefix.GetBool(DataExchangeReInitEnabled) + h.needsInit = prefix.GetBool(DataExchangeInitEnabled) if prefix.GetString(restclient.HTTPConfigURL) == "" { return i18n.NewError(ctx, i18n.MsgMissingPluginConfig, "url", "dataexchange.ffdx") @@ -146,7 +146,7 @@ func (h *FFDX) Capabilities() *dataexchange.Capabilities { } func (h *FFDX) beforeConnect(ctx context.Context) error { - if h.reinit { + if h.needsInit { var status dxStatus res, err := h.client.R().SetContext(ctx). SetBody(h.nodes). diff --git a/internal/dataexchange/ffdx/ffdx_test.go b/internal/dataexchange/ffdx/ffdx_test.go index cfea31ecb5..b0cbc9a312 100644 --- a/internal/dataexchange/ffdx/ffdx_test.go +++ b/internal/dataexchange/ffdx/ffdx_test.go @@ -528,7 +528,7 @@ func TestWebsocketWithReinit(t *testing.T) { h.InitPrefix(utConfPrefix) utConfPrefix.Set(restclient.HTTPConfigURL, httpURL) utConfPrefix.Set(restclient.HTTPCustomClient, mockedClient) - utConfPrefix.Set(DataExchangeReInitEnabled, true) + utConfPrefix.Set(DataExchangeInitEnabled, true) first := true httpmock.RegisterResponder("POST", fmt.Sprintf("%s/api/v1/init", httpURL), From 7b837b12287c663cc48fec118231547eaaf0f38d Mon Sep 17 00:00:00 2001 From: Andrew Richardson Date: Fri, 11 Feb 2022 16:06:26 -0500 Subject: [PATCH 6/6] If DX "init" is enabled, block certain requests until init succeeds Only requests that may rely on an updated list of peers are blocked. Signed-off-by: Andrew Richardson --- internal/dataexchange/ffdx/ffdx.go | 34 +++++++++++++++++++++++++ internal/dataexchange/ffdx/ffdx_test.go | 26 ++++++++++++++++--- internal/i18n/en_translations.go | 1 + 3 files changed, 58 insertions(+), 3 deletions(-) diff --git a/internal/dataexchange/ffdx/ffdx.go b/internal/dataexchange/ffdx/ffdx.go index dbdb00c3d1..531fccf53b 100644 --- a/internal/dataexchange/ffdx/ffdx.go +++ b/internal/dataexchange/ffdx/ffdx.go @@ -23,6 +23,7 @@ import ( "io" "net/http" "strconv" + "sync" "github.com/go-resty/resty/v2" "github.com/hyperledger/firefly/internal/config" @@ -42,6 +43,8 @@ type FFDX struct { client *resty.Client wsconn wsclient.WSClient needsInit bool + initialized bool + initMutex sync.Mutex nodes []fftypes.DXInfo } @@ -146,7 +149,11 @@ func (h *FFDX) Capabilities() *dataexchange.Capabilities { } func (h *FFDX) beforeConnect(ctx context.Context) error { + h.initMutex.Lock() + defer h.initMutex.Unlock() + if h.needsInit { + h.initialized = false var status dxStatus res, err := h.client.R().SetContext(ctx). SetBody(h.nodes). @@ -159,10 +166,25 @@ func (h *FFDX) beforeConnect(ctx context.Context) error { return fmt.Errorf("DX returned non-ready status: %s", status.Status) } } + h.initialized = true + return nil +} + +func (h *FFDX) checkInitialized(ctx context.Context) error { + h.initMutex.Lock() + defer h.initMutex.Unlock() + + if !h.initialized { + return i18n.NewError(ctx, i18n.MsgDXNotInitialized) + } return nil } func (h *FFDX) GetEndpointInfo(ctx context.Context) (peer fftypes.DXInfo, err error) { + if err := h.checkInitialized(ctx); err != nil { + return peer, err + } + res, err := h.client.R().SetContext(ctx). SetResult(&peer.Endpoint). Get("/api/v1/id") @@ -175,6 +197,10 @@ func (h *FFDX) GetEndpointInfo(ctx context.Context) (peer fftypes.DXInfo, err er } func (h *FFDX) AddPeer(ctx context.Context, peer fftypes.DXInfo) (err error) { + if err := h.checkInitialized(ctx); err != nil { + return err + } + res, err := h.client.R().SetContext(ctx). SetBody(peer.Endpoint). Put(fmt.Sprintf("/api/v1/peers/%s", peer.Peer)) @@ -215,6 +241,10 @@ func (h *FFDX) DownloadBLOB(ctx context.Context, payloadRef string) (content io. } func (h *FFDX) SendMessage(ctx context.Context, opID *fftypes.UUID, peerID string, data []byte) (err error) { + if err := h.checkInitialized(ctx); err != nil { + return err + } + var responseData responseWithRequestID res, err := h.client.R().SetContext(ctx). SetBody(&sendMessage{ @@ -231,6 +261,10 @@ func (h *FFDX) SendMessage(ctx context.Context, opID *fftypes.UUID, peerID strin } func (h *FFDX) TransferBLOB(ctx context.Context, opID *fftypes.UUID, peerID, payloadRef string) (err error) { + if err := h.checkInitialized(ctx); err != nil { + return err + } + var responseData responseWithRequestID res, err := h.client.R().SetContext(ctx). SetBody(&transferBlob{ diff --git a/internal/dataexchange/ffdx/ffdx_test.go b/internal/dataexchange/ffdx/ffdx_test.go index b0cbc9a312..9515e241b9 100644 --- a/internal/dataexchange/ffdx/ffdx_test.go +++ b/internal/dataexchange/ffdx/ffdx_test.go @@ -54,7 +54,7 @@ func newTestFFDX(t *testing.T) (h *FFDX, toServer, fromServer chan string, httpU utConfPrefix.Set(restclient.HTTPConfigURL, httpURL) utConfPrefix.Set(restclient.HTTPCustomClient, mockedClient) - h = &FFDX{} + h = &FFDX{initialized: true} nodes := make([]fftypes.DXInfo, 0) h.InitPrefix(utConfPrefix) err := h.Init(context.Background(), utConfPrefix, nodes, &dataexchangemocks.Callbacks{}) @@ -87,7 +87,6 @@ func TestInitMissingURL(t *testing.T) { } func TestGetEndpointInfo(t *testing.T) { - h, _, _, httpURL, done := newTestFFDX(t) defer done() @@ -120,7 +119,6 @@ func TestGetEndpointInfoError(t *testing.T) { } func TestAddPeer(t *testing.T) { - h, _, _, httpURL, done := newTestFFDX(t) defer done() @@ -538,6 +536,8 @@ func TestWebsocketWithReinit(t *testing.T) { assert.NoError(t, err) assert.Equal(t, 1, len(reqNodes)) + assert.False(t, h.initialized) + if first { first = false return httpmock.NewJsonResponse(200, fftypes.JSONObject{ @@ -557,4 +557,24 @@ func TestWebsocketWithReinit(t *testing.T) { assert.NoError(t, err) assert.Equal(t, 2, httpmock.GetTotalCallCount()) + assert.True(t, h.initialized) +} + +func TestDXUninitialized(t *testing.T) { + h, _, _, _, done := newTestFFDX(t) + defer done() + + h.initialized = false + + _, err := h.GetEndpointInfo(context.Background()) + assert.Regexp(t, "FF10342", err) + + err = h.AddPeer(context.Background(), fftypes.DXInfo{}) + assert.Regexp(t, "FF10342", err) + + err = h.TransferBLOB(context.Background(), fftypes.NewUUID(), "peer1", "ns1/id1") + assert.Regexp(t, "FF10342", err) + + err = h.SendMessage(context.Background(), fftypes.NewUUID(), "peer1", []byte(`some data`)) + assert.Regexp(t, "FF10342", err) } diff --git a/internal/i18n/en_translations.go b/internal/i18n/en_translations.go index b36cb1e9d7..7bf0121fe6 100644 --- a/internal/i18n/en_translations.go +++ b/internal/i18n/en_translations.go @@ -259,4 +259,5 @@ var ( MsgAddressResolveFailed = ffm("FF10339", "Failed to resolve signing key string '%s': %s", 500) MsgAddressResolveBadStatus = ffm("FF10340", "Failed to resolve signing key string '%s' [%d]: %s", 500) MsgAddressResolveBadResData = ffm("FF10341", "Failed to resolve signing key string '%s' - invalid address returned '%s': %s", 500) + MsgDXNotInitialized = ffm("FF10342", "Data exchange is initializing") )