diff --git a/internal/blockchain/ethereum/ethereum.go b/internal/blockchain/ethereum/ethereum.go index 41314f1936..f24e6a1eca 100644 --- a/internal/blockchain/ethereum/ethereum.go +++ b/internal/blockchain/ethereum/ethereum.go @@ -183,7 +183,7 @@ func (e *Ethereum) Init(ctx context.Context, prefix config.Prefix, callbacks blo wsConfig.WSKeyPath = "/ws" } - e.wsconn, err = wsclient.New(ctx, wsConfig, e.afterConnect) + e.wsconn, err = wsclient.New(ctx, wsConfig, nil, e.afterConnect) if err != nil { return err } diff --git a/internal/blockchain/fabric/fabric.go b/internal/blockchain/fabric/fabric.go index 59a27f571c..23a9e42ede 100644 --- a/internal/blockchain/fabric/fabric.go +++ b/internal/blockchain/fabric/fabric.go @@ -189,7 +189,7 @@ func (f *Fabric) Init(ctx context.Context, prefix config.Prefix, callbacks block wsConfig.WSKeyPath = "/ws" } - f.wsconn, err = wsclient.New(ctx, wsConfig, f.afterConnect) + f.wsconn, err = wsclient.New(ctx, wsConfig, nil, f.afterConnect) if err != nil { return err } diff --git a/internal/dataexchange/ffdx/config.go b/internal/dataexchange/ffdx/config.go index a284c5746c..8ebc0a7fbc 100644 --- a/internal/dataexchange/ffdx/config.go +++ b/internal/dataexchange/ffdx/config.go @@ -24,9 +24,12 @@ import ( const ( // DataExchangeManifestEnabled determines whether to require+validate a manifest from other DX instances in the network. Must be supported by the connector DataExchangeManifestEnabled = "manifestEnabled" + // DataExchangeInitEnabled instructs FireFly to always post all current nodes to the /init API before connecting or reconnecting to the connector + DataExchangeInitEnabled = "initEnabled" ) func (h *FFDX) InitPrefix(prefix config.Prefix) { wsconfig.InitPrefix(prefix) prefix.AddKnownKey(DataExchangeManifestEnabled, false) + prefix.AddKnownKey(DataExchangeInitEnabled, false) } diff --git a/internal/dataexchange/ffdx/ffdx.go b/internal/dataexchange/ffdx/ffdx.go index dc3b696348..531fccf53b 100644 --- a/internal/dataexchange/ffdx/ffdx.go +++ b/internal/dataexchange/ffdx/ffdx.go @@ -23,6 +23,7 @@ import ( "io" "net/http" "strconv" + "sync" "github.com/go-resty/resty/v2" "github.com/hyperledger/firefly/internal/config" @@ -41,6 +42,10 @@ type FFDX struct { callbacks dataexchange.Callbacks client *resty.Client wsconn wsclient.WSClient + needsInit bool + initialized bool + initMutex sync.Mutex + nodes []fftypes.DXInfo } type wsEvent struct { @@ -100,18 +105,26 @@ type wsAck struct { Manifest string `json:"manifest,omitempty"` // FireFly core determined that DX should propagate opaquely to TransferResult, if this DX supports delivery acknowledgements. } +type dxStatus struct { + Status string `json:"status"` +} + func (h *FFDX) Name() string { return "ffdx" } -func (h *FFDX) Init(ctx context.Context, prefix config.Prefix, callbacks dataexchange.Callbacks) (err error) { - h.ctx = log.WithLogField(ctx, "dx", "ffdx") +func (h *FFDX) Init(ctx context.Context, prefix config.Prefix, nodes []fftypes.DXInfo, callbacks dataexchange.Callbacks) (err error) { + h.ctx = log.WithLogField(ctx, "dx", "https") h.callbacks = callbacks + h.needsInit = prefix.GetBool(DataExchangeInitEnabled) + if prefix.GetString(restclient.HTTPConfigURL) == "" { return i18n.NewError(ctx, i18n.MsgMissingPluginConfig, "url", "dataexchange.ffdx") } + h.nodes = nodes + h.client = restclient.New(h.ctx, prefix) h.capabilities = &dataexchange.Capabilities{ Manifest: prefix.GetBool(DataExchangeManifestEnabled), @@ -119,7 +132,7 @@ func (h *FFDX) Init(ctx context.Context, prefix config.Prefix, callbacks dataexc wsConfig := wsconfig.GenerateConfigFromPrefix(prefix) - h.wsconn, err = wsclient.New(ctx, wsConfig, nil) + h.wsconn, err = wsclient.New(ctx, wsConfig, h.beforeConnect, nil) if err != nil { return err } @@ -135,20 +148,62 @@ func (h *FFDX) Capabilities() *dataexchange.Capabilities { return h.capabilities } -func (h *FFDX) GetEndpointInfo(ctx context.Context) (peerID string, endpoint fftypes.JSONObject, err error) { +func (h *FFDX) beforeConnect(ctx context.Context) error { + h.initMutex.Lock() + defer h.initMutex.Unlock() + + if h.needsInit { + h.initialized = false + var status dxStatus + res, err := h.client.R().SetContext(ctx). + SetBody(h.nodes). + SetResult(&status). + Post("/api/v1/init") + if err != nil || !res.IsSuccess() { + return restclient.WrapRestErr(ctx, res, err, i18n.MsgDXRESTErr) + } + if status.Status != "ready" { + return fmt.Errorf("DX returned non-ready status: %s", status.Status) + } + } + h.initialized = true + return nil +} + +func (h *FFDX) checkInitialized(ctx context.Context) error { + h.initMutex.Lock() + defer h.initMutex.Unlock() + + if !h.initialized { + return i18n.NewError(ctx, i18n.MsgDXNotInitialized) + } + return nil +} + +func (h *FFDX) GetEndpointInfo(ctx context.Context) (peer fftypes.DXInfo, err error) { + if err := h.checkInitialized(ctx); err != nil { + return peer, err + } + res, err := h.client.R().SetContext(ctx). - SetResult(&endpoint). + SetResult(&peer.Endpoint). Get("/api/v1/id") if err != nil || !res.IsSuccess() { - return peerID, endpoint, restclient.WrapRestErr(ctx, res, err, i18n.MsgDXRESTErr) + return peer, restclient.WrapRestErr(ctx, res, err, i18n.MsgDXRESTErr) } - return endpoint.GetString("id"), endpoint, nil + peer.Peer = peer.Endpoint.GetString("id") + h.nodes = append(h.nodes, peer) + return peer, nil } -func (h *FFDX) AddPeer(ctx context.Context, peerID string, endpoint fftypes.JSONObject) (err error) { +func (h *FFDX) AddPeer(ctx context.Context, peer fftypes.DXInfo) (err error) { + if err := h.checkInitialized(ctx); err != nil { + return err + } + res, err := h.client.R().SetContext(ctx). - SetBody(endpoint). - Put(fmt.Sprintf("/api/v1/peers/%s", peerID)) + SetBody(peer.Endpoint). + Put(fmt.Sprintf("/api/v1/peers/%s", peer.Peer)) if err != nil || !res.IsSuccess() { return restclient.WrapRestErr(ctx, res, err, i18n.MsgDXRESTErr) } @@ -186,6 +241,10 @@ func (h *FFDX) DownloadBLOB(ctx context.Context, payloadRef string) (content io. } func (h *FFDX) SendMessage(ctx context.Context, opID *fftypes.UUID, peerID string, data []byte) (err error) { + if err := h.checkInitialized(ctx); err != nil { + return err + } + var responseData responseWithRequestID res, err := h.client.R().SetContext(ctx). SetBody(&sendMessage{ @@ -202,6 +261,10 @@ func (h *FFDX) SendMessage(ctx context.Context, opID *fftypes.UUID, peerID strin } func (h *FFDX) TransferBLOB(ctx context.Context, opID *fftypes.UUID, peerID, payloadRef string) (err error) { + if err := h.checkInitialized(ctx); err != nil { + return err + } + var responseData responseWithRequestID res, err := h.client.R().SetContext(ctx). SetBody(&transferBlob{ diff --git a/internal/dataexchange/ffdx/ffdx_test.go b/internal/dataexchange/ffdx/ffdx_test.go index 07b773fbb6..9515e241b9 100644 --- a/internal/dataexchange/ffdx/ffdx_test.go +++ b/internal/dataexchange/ffdx/ffdx_test.go @@ -19,6 +19,7 @@ package ffdx import ( "bytes" "context" + "encoding/json" "fmt" "io/ioutil" "net/http" @@ -53,9 +54,10 @@ func newTestFFDX(t *testing.T) (h *FFDX, toServer, fromServer chan string, httpU utConfPrefix.Set(restclient.HTTPConfigURL, httpURL) utConfPrefix.Set(restclient.HTTPCustomClient, mockedClient) - h = &FFDX{} + h = &FFDX{initialized: true} + nodes := make([]fftypes.DXInfo, 0) h.InitPrefix(utConfPrefix) - err := h.Init(context.Background(), utConfPrefix, &dataexchangemocks.Callbacks{}) + err := h.Init(context.Background(), utConfPrefix, nodes, &dataexchangemocks.Callbacks{}) assert.NoError(t, err) assert.Equal(t, "ffdx", h.Name()) assert.NotNil(t, h.Capabilities()) @@ -68,22 +70,23 @@ func newTestFFDX(t *testing.T) (h *FFDX, toServer, fromServer chan string, httpU func TestInitBadURL(t *testing.T) { config.Reset() h := &FFDX{} + nodes := make([]fftypes.DXInfo, 0) h.InitPrefix(utConfPrefix) utConfPrefix.Set(restclient.HTTPConfigURL, "::::////") - err := h.Init(context.Background(), utConfPrefix, &dataexchangemocks.Callbacks{}) + err := h.Init(context.Background(), utConfPrefix, nodes, &dataexchangemocks.Callbacks{}) assert.Regexp(t, "FF10162", err) } func TestInitMissingURL(t *testing.T) { config.Reset() h := &FFDX{} + nodes := make([]fftypes.DXInfo, 0) h.InitPrefix(utConfPrefix) - err := h.Init(context.Background(), utConfPrefix, &dataexchangemocks.Callbacks{}) + err := h.Init(context.Background(), utConfPrefix, nodes, &dataexchangemocks.Callbacks{}) assert.Regexp(t, "FF10138", err) } func TestGetEndpointInfo(t *testing.T) { - h, _, _, httpURL, done := newTestFFDX(t) defer done() @@ -94,14 +97,14 @@ func TestGetEndpointInfo(t *testing.T) { "cert": "cert data...", })) - peerID, endpoint, err := h.GetEndpointInfo(context.Background()) + peer, err := h.GetEndpointInfo(context.Background()) assert.NoError(t, err) - assert.Equal(t, "peer1", peerID) + assert.Equal(t, "peer1", peer.Peer) assert.Equal(t, fftypes.JSONObject{ "id": "peer1", "endpoint": "https://peer1.example.com", "cert": "cert data...", - }, endpoint) + }, peer.Endpoint) } func TestGetEndpointInfoError(t *testing.T) { @@ -111,25 +114,25 @@ func TestGetEndpointInfoError(t *testing.T) { httpmock.RegisterResponder("GET", fmt.Sprintf("%s/api/v1/id", httpURL), httpmock.NewJsonResponderOrPanic(500, fftypes.JSONObject{})) - _, _, err := h.GetEndpointInfo(context.Background()) + _, err := h.GetEndpointInfo(context.Background()) assert.Regexp(t, "FF10229", err) } func TestAddPeer(t *testing.T) { - h, _, _, httpURL, done := newTestFFDX(t) defer done() httpmock.RegisterResponder("PUT", fmt.Sprintf("%s/api/v1/peers/peer1", httpURL), httpmock.NewJsonResponderOrPanic(200, fftypes.JSONObject{})) - err := h.AddPeer(context.Background(), "peer1", - fftypes.JSONObject{ + err := h.AddPeer(context.Background(), fftypes.DXInfo{ + Peer: "peer1", + Endpoint: fftypes.JSONObject{ "id": "peer1", "endpoint": "https://peer1.example.com", "cert": "cert...", }, - ) + }) assert.NoError(t, err) } @@ -140,7 +143,10 @@ func TestAddPeerError(t *testing.T) { httpmock.RegisterResponder("PUT", fmt.Sprintf("%s/api/v1/peers/peer1", httpURL), httpmock.NewJsonResponderOrPanic(500, fftypes.JSONObject{})) - err := h.AddPeer(context.Background(), "peer1", fftypes.JSONObject{}) + err := h.AddPeer(context.Background(), fftypes.DXInfo{ + Peer: "peer1", + Endpoint: fftypes.JSONObject{}, + }) assert.Regexp(t, "FF10229", err) } @@ -501,3 +507,74 @@ func TestEventLoopClosedContext(t *testing.T) { wsm.On("Send", mock.Anything, mock.Anything).Return(nil) h.eventLoop() // we're simply looking for it exiting } + +func TestWebsocketWithReinit(t *testing.T) { + mockedClient := &http.Client{} + httpmock.ActivateNonDefault(mockedClient) + defer httpmock.DeactivateAndReset() + + _, _, wsURL, cancel := wsclient.NewTestWSServer(nil) + defer cancel() + + u, _ := url.Parse(wsURL) + u.Scheme = "http" + httpURL := u.String() + h := &FFDX{} + nodes := []fftypes.DXInfo{{}} + + config.Reset() + h.InitPrefix(utConfPrefix) + utConfPrefix.Set(restclient.HTTPConfigURL, httpURL) + utConfPrefix.Set(restclient.HTTPCustomClient, mockedClient) + utConfPrefix.Set(DataExchangeInitEnabled, true) + + first := true + httpmock.RegisterResponder("POST", fmt.Sprintf("%s/api/v1/init", httpURL), + func(req *http.Request) (*http.Response, error) { + var reqNodes []fftypes.DXInfo + err := json.NewDecoder(req.Body).Decode(&reqNodes) + assert.NoError(t, err) + assert.Equal(t, 1, len(reqNodes)) + + assert.False(t, h.initialized) + + if first { + first = false + return httpmock.NewJsonResponse(200, fftypes.JSONObject{ + "status": "notready", + }) + } + return httpmock.NewJsonResponse(200, fftypes.JSONObject{ + "status": "ready", + }) + }) + + h.InitPrefix(utConfPrefix) + err := h.Init(context.Background(), utConfPrefix, nodes, &dataexchangemocks.Callbacks{}) + assert.NoError(t, err) + + err = h.Start() + assert.NoError(t, err) + + assert.Equal(t, 2, httpmock.GetTotalCallCount()) + assert.True(t, h.initialized) +} + +func TestDXUninitialized(t *testing.T) { + h, _, _, _, done := newTestFFDX(t) + defer done() + + h.initialized = false + + _, err := h.GetEndpointInfo(context.Background()) + assert.Regexp(t, "FF10342", err) + + err = h.AddPeer(context.Background(), fftypes.DXInfo{}) + assert.Regexp(t, "FF10342", err) + + err = h.TransferBLOB(context.Background(), fftypes.NewUUID(), "peer1", "ns1/id1") + assert.Regexp(t, "FF10342", err) + + err = h.SendMessage(context.Background(), fftypes.NewUUID(), "peer1", []byte(`some data`)) + assert.Regexp(t, "FF10342", err) +} diff --git a/internal/definitions/definition_handler_network_node.go b/internal/definitions/definition_handler_network_node.go index d6ea0b01b4..b8fe4bf59d 100644 --- a/internal/definitions/definition_handler_network_node.go +++ b/internal/definitions/definition_handler_network_node.go @@ -73,7 +73,7 @@ func (dh *definitionHandlers) handleNodeBroadcast(ctx context.Context, msg *ffty return ActionConfirm, &DefinitionBatchActions{ PreFinalize: func(ctx context.Context) error { // Tell the data exchange about this node. Treat these errors like database errors - and return for retry processing - return dh.exchange.AddPeer(ctx, node.DX.Peer, node.DX.Endpoint) + return dh.exchange.AddPeer(ctx, node.DX) }, }, nil } diff --git a/internal/definitions/definition_handler_network_node_test.go b/internal/definitions/definition_handler_network_node_test.go index 5dd611b63d..f172c27b8c 100644 --- a/internal/definitions/definition_handler_network_node_test.go +++ b/internal/definitions/definition_handler_network_node_test.go @@ -54,8 +54,8 @@ func TestHandleDefinitionBroadcastNodeOk(t *testing.T) { mdi.On("GetNodeByID", mock.Anything, node.ID).Return(nil, nil) mdi.On("UpsertNode", mock.Anything, mock.Anything, true).Return(nil) mdx := dh.exchange.(*dataexchangemocks.Plugin) - mdx.On("AddPeer", mock.Anything, "peer1", node.DX.Endpoint).Return(nil) - action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ + mdx.On("AddPeer", mock.Anything, node.DX).Return(nil) + action, ba, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Namespace: "ns1", Identity: fftypes.Identity{ @@ -68,6 +68,9 @@ func TestHandleDefinitionBroadcastNodeOk(t *testing.T) { assert.Equal(t, ActionConfirm, action) assert.NoError(t, err) + err = ba.PreFinalize(context.Background()) + assert.NoError(t, err) + mdi.AssertExpectations(t) } @@ -136,7 +139,7 @@ func TestHandleDefinitionBroadcastNodeAddPeerFail(t *testing.T) { mdi.On("GetNodeByID", mock.Anything, node.ID).Return(nil, nil) mdi.On("UpsertNode", mock.Anything, mock.Anything, true).Return(nil) mdx := dh.exchange.(*dataexchangemocks.Plugin) - mdx.On("AddPeer", mock.Anything, "peer1", mock.Anything).Return(fmt.Errorf("pop")) + mdx.On("AddPeer", mock.Anything, node.DX).Return(fmt.Errorf("pop")) action, ba, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Namespace: "ns1", @@ -217,7 +220,7 @@ func TestHandleDefinitionBroadcastNodeDupOK(t *testing.T) { mdi.On("GetNode", mock.Anything, "0x23456", "node1").Return(&fftypes.Node{Owner: "0x23456"}, nil) mdi.On("UpsertNode", mock.Anything, mock.Anything, true).Return(nil) mdx := dh.exchange.(*dataexchangemocks.Plugin) - mdx.On("AddPeer", mock.Anything, "peer1", mock.Anything).Return(nil) + mdx.On("AddPeer", mock.Anything, node.DX).Return(nil) action, _, err := dh.HandleDefinitionBroadcast(context.Background(), &fftypes.Message{ Header: fftypes.MessageHeader{ Namespace: "ns1", diff --git a/internal/events/websockets/websockets_test.go b/internal/events/websockets/websockets_test.go index 59eb97f9a0..10db21d4fc 100644 --- a/internal/events/websockets/websockets_test.go +++ b/internal/events/websockets/websockets_test.go @@ -62,7 +62,7 @@ func newTestWebsockets(t *testing.T, cbs *eventsmocks.Callbacks, queryParams ... clientPrefix.Set(restclient.HTTPConfigURL, fmt.Sprintf("http://%s%s", svr.Listener.Addr(), qs)) wsConfig := wsconfig.GenerateConfigFromPrefix(clientPrefix) - wsc, err := wsclient.New(ctx, wsConfig, nil) + wsc, err := wsclient.New(ctx, wsConfig, nil, nil) assert.NoError(t, err) err = wsc.Connect() assert.NoError(t, err) diff --git a/internal/i18n/en_translations.go b/internal/i18n/en_translations.go index da753d466e..26e15de9bc 100644 --- a/internal/i18n/en_translations.go +++ b/internal/i18n/en_translations.go @@ -259,6 +259,7 @@ var ( MsgAddressResolveFailed = ffm("FF10339", "Failed to resolve signing key string '%s': %s", 500) MsgAddressResolveBadStatus = ffm("FF10340", "Failed to resolve signing key string '%s' [%d]: %s", 500) MsgAddressResolveBadResData = ffm("FF10341", "Failed to resolve signing key string '%s' - invalid address returned '%s': %s", 500) + MsgDXNotInitialized = ffm("FF10342", "Data exchange is initializing") MsgInvalidTXTypeForMessage = ffm("FF10343", "Invalid transaction type for sending a message: %s", 400) MsgGroupRequired = ffm("FF10344", "Group must be set", 400) MsgDBLockFailed = ffm("FF10345", "Database lock failed") diff --git a/internal/networkmap/register_node.go b/internal/networkmap/register_node.go index 01dc631d81..64e02be401 100644 --- a/internal/networkmap/register_node.go +++ b/internal/networkmap/register_node.go @@ -1,4 +1,4 @@ -// Copyright © 2021 Kaleido, Inc. +// Copyright © 2022 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -49,7 +49,7 @@ func (nm *networkMap) RegisterNode(ctx context.Context, waitConfirm bool) (node return nil, nil, i18n.NewError(ctx, i18n.MsgNodeAndOrgIDMustBeSet) } - node.DX.Peer, node.DX.Endpoint, err = nm.exchange.GetEndpointInfo(ctx) + node.DX, err = nm.exchange.GetEndpointInfo(ctx) if err != nil { return nil, nil, err } diff --git a/internal/networkmap/register_node_test.go b/internal/networkmap/register_node_test.go index 9fb9f21893..fa1af98eab 100644 --- a/internal/networkmap/register_node_test.go +++ b/internal/networkmap/register_node_test.go @@ -49,7 +49,10 @@ func TestRegisterNodeOk(t *testing.T) { mim.On("ResolveSigningKey", nm.ctx, "0x23456").Return("0x23456", nil) mdx := nm.exchange.(*dataexchangemocks.Plugin) - mdx.On("GetEndpointInfo", nm.ctx).Return("peer1", fftypes.JSONObject{"endpoint": "details"}, nil) + mdx.On("GetEndpointInfo", nm.ctx).Return(fftypes.DXInfo{ + Peer: "peer1", + Endpoint: fftypes.JSONObject{"endpoint": "details"}, + }, nil) mockMsg := &fftypes.Message{Header: fftypes.MessageHeader{ID: fftypes.NewUUID()}} mbm := nm.broadcast.(*broadcastmocks.Manager) @@ -147,7 +150,10 @@ func TestRegisterNodeParentNotFound(t *testing.T) { mim.On("ResolveSigningKey", nm.ctx, "0x23456").Return("0x23456", nil) mdx := nm.exchange.(*dataexchangemocks.Plugin) - mdx.On("GetEndpointInfo", nm.ctx).Return("peer1", fftypes.JSONObject{"endpoint": "details"}, nil) + mdx.On("GetEndpointInfo", nm.ctx).Return(fftypes.DXInfo{ + Peer: "peer1", + Endpoint: fftypes.JSONObject{"endpoint": "details"}, + }, nil) _, _, err := nm.RegisterNode(nm.ctx, false) assert.Regexp(t, "FF10214", err) @@ -167,7 +173,10 @@ func TestRegisterNodeParentBadNode(t *testing.T) { mim.On("ResolveSigningKey", nm.ctx, "0x23456").Return("0x23456", nil) mdx := nm.exchange.(*dataexchangemocks.Plugin) - mdx.On("GetEndpointInfo", nm.ctx).Return("peer1", fftypes.JSONObject{"endpoint": "details"}, nil) + mdx.On("GetEndpointInfo", nm.ctx).Return(fftypes.DXInfo{ + Peer: "peer1", + Endpoint: fftypes.JSONObject{"endpoint": "details"}, + }, nil) _, _, err := nm.RegisterNode(nm.ctx, false) assert.Regexp(t, "FF10188", err) @@ -184,7 +193,10 @@ func TestRegisterNodeParentDXEndpointFail(t *testing.T) { config.Set(config.NodeName, "node1") mdx := nm.exchange.(*dataexchangemocks.Plugin) - mdx.On("GetEndpointInfo", nm.ctx).Return("", nil, fmt.Errorf("pop")) + mdx.On("GetEndpointInfo", nm.ctx).Return(fftypes.DXInfo{ + Peer: "", + Endpoint: nil, + }, fmt.Errorf("pop")) mim := nm.identity.(*identitymanagermocks.Manager) mim.On("ResolveSigningKey", nm.ctx, "0x23456").Return("0x23456", nil) diff --git a/internal/orchestrator/orchestrator.go b/internal/orchestrator/orchestrator.go index fd3867d090..b7fae5f0a6 100644 --- a/internal/orchestrator/orchestrator.go +++ b/internal/orchestrator/orchestrator.go @@ -281,7 +281,6 @@ func (or *orchestrator) Metrics() metrics.Manager { } func (or *orchestrator) initDatabaseCheckPreinit(ctx context.Context) (err error) { - if or.database == nil { diType := config.GetString(config.DatabaseType) if or.database, err = difactory.GetPlugin(ctx, diType); err != nil { @@ -305,6 +304,33 @@ func (or *orchestrator) initDatabaseCheckPreinit(ctx context.Context) (err error return config.MergeConfig(configRecords) } +func (or *orchestrator) initDataExchange(ctx context.Context) (err error) { + dxPlugin := config.GetString(config.DataexchangeType) + if or.dataexchange == nil { + pluginName := dxPlugin + if pluginName == "https" { + // Migration path for old plugin name + // TODO: eventually make this fatal + log.L(ctx).Warnf("Your data exchange config uses the old plugin name 'https' - this plugin has been renamed to 'ffdx'") + pluginName = "ffdx" + } + if or.dataexchange, err = dxfactory.GetPlugin(ctx, pluginName); err != nil { + return err + } + } + + nodes, _, err := or.database.GetNodes(ctx, database.NodeQueryFactory.NewFilter(ctx).And()) + if err != nil { + return err + } + nodeInfo := make([]fftypes.DXInfo, len(nodes)) + for i, node := range nodes { + nodeInfo[i] = node.DX + } + + return or.dataexchange.Init(ctx, dataexchangeConfig.SubPrefix(dxPlugin), nodeInfo, &or.bc) +} + func (or *orchestrator) initPlugins(ctx context.Context) (err error) { if err = or.initDatabaseCheckPreinit(ctx); err != nil { @@ -343,20 +369,7 @@ func (or *orchestrator) initPlugins(ctx context.Context) (err error) { return err } - dxPlugin := config.GetString(config.DataexchangeType) - if or.dataexchange == nil { - pluginName := dxPlugin - if pluginName == "https" { - // Migration path for old plugin name - // TODO: eventually make this fatal - log.L(ctx).Warnf("Your data exchange config uses the old plugin name 'https' - this plugin has been renamed to 'ffdx'") - pluginName = "ffdx" - } - if or.dataexchange, err = dxfactory.GetPlugin(ctx, pluginName); err != nil { - return err - } - } - if err = or.dataexchange.Init(ctx, dataexchangeConfig.SubPrefix(dxPlugin), &or.bc); err != nil { + if err = or.initDataExchange(ctx); err != nil { return err } diff --git a/internal/orchestrator/orchestrator_test.go b/internal/orchestrator/orchestrator_test.go index 68f3a5d351..74f2f4456a 100644 --- a/internal/orchestrator/orchestrator_test.go +++ b/internal/orchestrator/orchestrator_test.go @@ -277,7 +277,8 @@ func TestBadDataExchangeInitFail(t *testing.T) { or.mbi.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil) or.mii.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil) or.mps.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil) - or.mdx.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("pop")) + or.mdi.On("GetNodes", mock.Anything, mock.Anything).Return([]*fftypes.Node{}, nil, nil) + or.mdx.On("Init", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("pop")) ctx, cancelCtx := context.WithCancel(context.Background()) err := or.Init(ctx, cancelCtx) assert.EqualError(t, err, "pop") @@ -294,6 +295,7 @@ func TestDataExchangePluginOldName(t *testing.T) { or.mbi.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil) or.mii.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil) or.mps.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil) + or.mdi.On("GetNodes", mock.Anything, mock.Anything).Return([]*fftypes.Node{}, nil, nil) or.mdi.On("GetNamespace", mock.Anything, mock.Anything).Return(nil, fmt.Errorf("pop")) ctx, cancelCtx := context.WithCancel(context.Background()) err := or.Init(ctx, cancelCtx) @@ -312,7 +314,8 @@ func TestBadTokensPlugin(t *testing.T) { or.mbi.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil) or.mii.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil) or.mps.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil) - or.mdx.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil) + or.mdi.On("GetNodes", mock.Anything, mock.Anything).Return([]*fftypes.Node{}, nil, nil) + or.mdx.On("Init", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) or.mdi.On("GetNamespace", mock.Anything, mock.Anything).Return(nil, nil) or.mdi.On("UpsertNamespace", mock.Anything, mock.Anything, true).Return(nil) ctx, cancelCtx := context.WithCancel(context.Background()) @@ -333,7 +336,8 @@ func TestBadTokensPluginNoConnector(t *testing.T) { or.mii.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil) or.mbi.On("VerifyIdentitySyntax", mock.Anything, mock.Anything, mock.Anything).Return("", nil) or.mps.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil) - or.mdx.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil) + or.mdi.On("GetNodes", mock.Anything, mock.Anything).Return([]*fftypes.Node{}, nil, nil) + or.mdx.On("Init", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) or.mdi.On("GetNamespace", mock.Anything, mock.Anything).Return(nil, nil) or.mdi.On("UpsertNamespace", mock.Anything, mock.Anything, true).Return(nil) ctx, cancelCtx := context.WithCancel(context.Background()) @@ -353,7 +357,8 @@ func TestBadTokensPluginNoName(t *testing.T) { or.mbi.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil) or.mii.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil) or.mps.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil) - or.mdx.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil) + or.mdi.On("GetNodes", mock.Anything, mock.Anything).Return([]*fftypes.Node{}, nil, nil) + or.mdx.On("Init", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) or.mdi.On("GetNamespace", mock.Anything, mock.Anything).Return(nil, nil) or.mdi.On("UpsertNamespace", mock.Anything, mock.Anything, true).Return(nil) ctx, cancelCtx := context.WithCancel(context.Background()) @@ -373,7 +378,8 @@ func TestBadTokensPluginInvalidName(t *testing.T) { or.mbi.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil) or.mii.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil) or.mps.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil) - or.mdx.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil) + or.mdi.On("GetNodes", mock.Anything, mock.Anything).Return([]*fftypes.Node{}, nil, nil) + or.mdx.On("Init", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) or.mdi.On("GetNamespace", mock.Anything, mock.Anything).Return(nil, nil) or.mdi.On("UpsertNamespace", mock.Anything, mock.Anything, true).Return(nil) ctx, cancelCtx := context.WithCancel(context.Background()) @@ -394,7 +400,8 @@ func TestBadTokensPluginNoType(t *testing.T) { or.mii.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil) or.mbi.On("VerifyIdentitySyntax", mock.Anything, mock.Anything, mock.Anything).Return("", nil) or.mps.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil) - or.mdx.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil) + or.mdi.On("GetNodes", mock.Anything, mock.Anything).Return([]*fftypes.Node{}, nil, nil) + or.mdx.On("Init", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) or.mdi.On("GetNamespace", mock.Anything, mock.Anything).Return(nil, nil) or.mdi.On("UpsertNamespace", mock.Anything, mock.Anything, true).Return(nil) ctx, cancelCtx := context.WithCancel(context.Background()) @@ -416,7 +423,8 @@ func TestGoodTokensPlugin(t *testing.T) { or.mbi.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil) or.mii.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil) or.mps.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil) - or.mdx.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil) + or.mdi.On("GetNodes", mock.Anything, mock.Anything).Return([]*fftypes.Node{}, nil, nil) + or.mdx.On("Init", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) or.mdi.On("GetNamespace", mock.Anything, mock.Anything).Return(nil, nil) or.mdi.On("UpsertNamespace", mock.Anything, mock.Anything, true).Return(nil) or.mti.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("pop")) @@ -613,7 +621,8 @@ func TestInitOK(t *testing.T) { or.mii.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil) or.mbi.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil) or.mps.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil) - or.mdx.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil) + or.mdi.On("GetNodes", mock.Anything, mock.Anything).Return([]*fftypes.Node{}, nil, nil) + or.mdx.On("Init", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) or.mdi.On("GetNamespace", mock.Anything, mock.Anything).Return(nil, nil) or.mdi.On("UpsertNamespace", mock.Anything, mock.Anything, true).Return(nil) or.mti.On("Init", mock.Anything, mock.Anything, mock.Anything).Return(nil) @@ -635,3 +644,22 @@ func TestInitOK(t *testing.T) { assert.Equal(t, or.mcm, or.Contracts()) assert.Equal(t, or.mmi, or.Metrics()) } + +func TestInitDataExchangeGetNodesFail(t *testing.T) { + or := newTestOrchestrator() + + or.mdi.On("GetNodes", mock.Anything, mock.Anything).Return(nil, nil, fmt.Errorf("pop")) + + err := or.initDataExchange(or.ctx) + assert.EqualError(t, err, "pop") +} + +func TestInitDataExchangeWithNodes(t *testing.T) { + or := newTestOrchestrator() + + or.mdi.On("GetNodes", mock.Anything, mock.Anything).Return([]*fftypes.Node{{}}, nil, nil) + or.mdx.On("Init", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) + + err := or.initDataExchange(or.ctx) + assert.NoError(t, err) +} diff --git a/internal/tokens/fftokens/fftokens.go b/internal/tokens/fftokens/fftokens.go index ce24ef958b..f8b4609fea 100644 --- a/internal/tokens/fftokens/fftokens.go +++ b/internal/tokens/fftokens/fftokens.go @@ -132,7 +132,7 @@ func (ft *FFTokens) Init(ctx context.Context, name string, prefix config.Prefix, wsConfig.WSKeyPath = "/api/ws" } - ft.wsconn, err = wsclient.New(ctx, wsConfig, nil) + ft.wsconn, err = wsclient.New(ctx, wsConfig, nil, nil) if err != nil { return err } diff --git a/mocks/dataexchangemocks/plugin.go b/mocks/dataexchangemocks/plugin.go index b179c52f34..48f4e5fe13 100644 --- a/mocks/dataexchangemocks/plugin.go +++ b/mocks/dataexchangemocks/plugin.go @@ -21,13 +21,13 @@ type Plugin struct { mock.Mock } -// AddPeer provides a mock function with given fields: ctx, peerID, endpoint -func (_m *Plugin) AddPeer(ctx context.Context, peerID string, endpoint fftypes.JSONObject) error { - ret := _m.Called(ctx, peerID, endpoint) +// AddPeer provides a mock function with given fields: ctx, peer +func (_m *Plugin) AddPeer(ctx context.Context, peer fftypes.DXInfo) error { + ret := _m.Called(ctx, peer) var r0 error - if rf, ok := ret.Get(0).(func(context.Context, string, fftypes.JSONObject) error); ok { - r0 = rf(ctx, peerID, endpoint) + if rf, ok := ret.Get(0).(func(context.Context, fftypes.DXInfo) error); ok { + r0 = rf(ctx, peer) } else { r0 = ret.Error(0) } @@ -105,42 +105,33 @@ func (_m *Plugin) DownloadBLOB(ctx context.Context, payloadRef string) (io.ReadC } // GetEndpointInfo provides a mock function with given fields: ctx -func (_m *Plugin) GetEndpointInfo(ctx context.Context) (string, fftypes.JSONObject, error) { +func (_m *Plugin) GetEndpointInfo(ctx context.Context) (fftypes.DXInfo, error) { ret := _m.Called(ctx) - var r0 string - if rf, ok := ret.Get(0).(func(context.Context) string); ok { + var r0 fftypes.DXInfo + if rf, ok := ret.Get(0).(func(context.Context) fftypes.DXInfo); ok { r0 = rf(ctx) } else { - r0 = ret.Get(0).(string) + r0 = ret.Get(0).(fftypes.DXInfo) } - var r1 fftypes.JSONObject - if rf, ok := ret.Get(1).(func(context.Context) fftypes.JSONObject); ok { + var r1 error + if rf, ok := ret.Get(1).(func(context.Context) error); ok { r1 = rf(ctx) } else { - if ret.Get(1) != nil { - r1 = ret.Get(1).(fftypes.JSONObject) - } - } - - var r2 error - if rf, ok := ret.Get(2).(func(context.Context) error); ok { - r2 = rf(ctx) - } else { - r2 = ret.Error(2) + r1 = ret.Error(1) } - return r0, r1, r2 + return r0, r1 } -// Init provides a mock function with given fields: ctx, prefix, callbacks -func (_m *Plugin) Init(ctx context.Context, prefix config.Prefix, callbacks dataexchange.Callbacks) error { - ret := _m.Called(ctx, prefix, callbacks) +// Init provides a mock function with given fields: ctx, prefix, nodes, callbacks +func (_m *Plugin) Init(ctx context.Context, prefix config.Prefix, nodes []fftypes.DXInfo, callbacks dataexchange.Callbacks) error { + ret := _m.Called(ctx, prefix, nodes, callbacks) var r0 error - if rf, ok := ret.Get(0).(func(context.Context, config.Prefix, dataexchange.Callbacks) error); ok { - r0 = rf(ctx, prefix, callbacks) + if rf, ok := ret.Get(0).(func(context.Context, config.Prefix, []fftypes.DXInfo, dataexchange.Callbacks) error); ok { + r0 = rf(ctx, prefix, nodes, callbacks) } else { r0 = ret.Error(0) } diff --git a/pkg/dataexchange/plugin.go b/pkg/dataexchange/plugin.go index 539b33157a..aedf4fde84 100644 --- a/pkg/dataexchange/plugin.go +++ b/pkg/dataexchange/plugin.go @@ -60,8 +60,7 @@ type Plugin interface { InitPrefix(prefix config.Prefix) // Init initializes the plugin, with configuration - // Returns the supported featureset of the interface - Init(ctx context.Context, prefix config.Prefix, callbacks Callbacks) error + Init(ctx context.Context, prefix config.Prefix, nodes []fftypes.DXInfo, callbacks Callbacks) error // Data exchange interface must not deliver any events until start is called Start() error @@ -70,10 +69,10 @@ type Plugin interface { Capabilities() *Capabilities // GetEndpointInfo returns the information about the local endpoint - GetEndpointInfo(ctx context.Context) (peerID string, endpoint fftypes.JSONObject, err error) + GetEndpointInfo(ctx context.Context) (peer fftypes.DXInfo, err error) // AddPeer translates the configuration published by another peer, into a reference string that is used between DX and FireFly to refer to the peer - AddPeer(ctx context.Context, peerID string, endpoint fftypes.JSONObject) (err error) + AddPeer(ctx context.Context, peer fftypes.DXInfo) (err error) // UploadBLOB streams a blob to storage, and returns the hash to confirm the hash calculated in Core matches the hash calculated in the plugin UploadBLOB(ctx context.Context, ns string, id fftypes.UUID, content io.Reader) (payloadRef string, hash *fftypes.Bytes32, size int64, err error) diff --git a/pkg/wsclient/wsclient.go b/pkg/wsclient/wsclient.go index a8c23afafd..08071a837c 100644 --- a/pkg/wsclient/wsclient.go +++ b/pkg/wsclient/wsclient.go @@ -69,6 +69,7 @@ type wsClient struct { send chan []byte sendDone chan []byte closing chan struct{} + beforeConnect WSPreConnectHandler afterConnect WSPostConnectHandler heartbeatInterval time.Duration heartbeatMux sync.Mutex @@ -76,10 +77,13 @@ type wsClient struct { lastPingCompleted time.Time } +// WSPreConnectHandler will be called before every connect/reconnect. Any error returned will prevent the websocket from connecting. +type WSPreConnectHandler func(ctx context.Context) error + // WSPostConnectHandler will be called after every connect/reconnect. Can send data over ws, but must not block listening for data on the ws. type WSPostConnectHandler func(ctx context.Context, w WSClient) error -func New(ctx context.Context, config *WSConfig, afterConnect WSPostConnectHandler) (WSClient, error) { +func New(ctx context.Context, config *WSConfig, beforeConnect WSPreConnectHandler, afterConnect WSPostConnectHandler) (WSClient, error) { wsURL, err := buildWSUrl(ctx, config) if err != nil { @@ -102,6 +106,7 @@ func New(ctx context.Context, config *WSConfig, afterConnect WSPostConnectHandle receive: make(chan []byte), send: make(chan []byte), closing: make(chan struct{}), + beforeConnect: beforeConnect, afterConnect: afterConnect, heartbeatInterval: config.HeartbeatInterval, } @@ -204,6 +209,15 @@ func (w *wsClient) connect(initial bool) error { if w.closed { return false, i18n.NewError(w.ctx, i18n.MsgWSClosing) } + + retry = !initial || attempt < w.initialRetryAttempts + if w.beforeConnect != nil { + if err = w.beforeConnect(w.ctx); err != nil { + l.Warnf("WS %s connect attempt %d failed in beforeConnect", w.url, attempt) + return retry, err + } + } + var res *http.Response w.wsconn, res, err = w.wsdialer.Dial(w.url, w.headers) if err != nil { @@ -215,8 +229,9 @@ func (w *wsClient) connect(initial bool) error { status = res.StatusCode } l.Warnf("WS %s connect attempt %d failed [%d]: %s", w.url, attempt, status, string(b)) - return !initial || attempt > w.initialRetryAttempts, i18n.WrapError(w.ctx, err, i18n.MsgWSConnectFailed) + return retry, i18n.WrapError(w.ctx, err, i18n.MsgWSConnectFailed) } + w.pongReceivedOrReset(false) w.wsconn.SetPongHandler(w.pongHandler) l.Infof("WS %s connected", w.url) @@ -309,7 +324,7 @@ func (w *wsClient) receiveReconnectLoop() { l := log.L(w.ctx) defer close(w.receive) for !w.closed { - // Start the sender, letting it close without blocking sending a notifiation on the sendDone + // Start the sender, letting it close without blocking sending a notification on the sendDone w.sendDone = make(chan []byte, 1) receiverDone := make(chan struct{}) go w.sendLoop(receiverDone) diff --git a/pkg/wsclient/wsclient_test.go b/pkg/wsclient/wsclient_test.go index 1440f8f92b..b7ffb69f8e 100644 --- a/pkg/wsclient/wsclient_test.go +++ b/pkg/wsclient/wsclient_test.go @@ -39,6 +39,14 @@ func TestWSClientE2E(t *testing.T) { }) defer close() + first := true + beforeConnect := func(ctx context.Context) error { + if first { + first = false + return fmt.Errorf("first run fails") + } + return nil + } afterConnect := func(ctx context.Context, w WSClient) error { return w.Send(ctx, []byte(`after connect message`)) } @@ -49,8 +57,9 @@ func TestWSClientE2E(t *testing.T) { wsConfig.HTTPURL = url wsConfig.WSKeyPath = "/test" wsConfig.HeartbeatInterval = 50 * time.Millisecond + wsConfig.InitialConnectAttempts = 2 - wsc, err := New(context.Background(), wsConfig, afterConnect) + wsc, err := New(context.Background(), wsConfig, beforeConnect, afterConnect) assert.NoError(t, err) // Change the settings and connect @@ -90,7 +99,7 @@ func TestWSClientBadURL(t *testing.T) { wsConfig := generateConfig() wsConfig.HTTPURL = ":::" - _, err := New(context.Background(), wsConfig, nil) + _, err := New(context.Background(), wsConfig, nil, nil) assert.Regexp(t, "FF10162", err) } @@ -134,7 +143,7 @@ func TestWSFailStartupHttp500(t *testing.T) { wsConfig.InitialDelay = 1 wsConfig.InitialConnectAttempts = 1 - w, _ := New(context.Background(), wsConfig, nil) + w, _ := New(context.Background(), wsConfig, nil, nil) err := w.Connect() assert.Regexp(t, "FF10161", err) } @@ -153,7 +162,7 @@ func TestWSFailStartupConnect(t *testing.T) { wsConfig.InitialDelay = 1 wsConfig.InitialConnectAttempts = 1 - w, _ := New(context.Background(), wsConfig, nil) + w, _ := New(context.Background(), wsConfig, nil, nil) err := w.Connect() assert.Regexp(t, "FF10161", err) } @@ -163,7 +172,7 @@ func TestWSSendClosed(t *testing.T) { wsConfig := generateConfig() wsConfig.HTTPURL = "http://test:12345" - w, err := New(context.Background(), wsConfig, nil) + w, err := New(context.Background(), wsConfig, nil, nil) assert.NoError(t, err) w.Close() @@ -308,7 +317,7 @@ func TestHeartbeatSendFailed(t *testing.T) { _, _, url, close := NewTestWSServer(func(req *http.Request) {}) defer close() - wsc, err := New(context.Background(), &WSConfig{HTTPURL: url}, func(ctx context.Context, w WSClient) error { return nil }) + wsc, err := New(context.Background(), &WSConfig{HTTPURL: url}, nil, func(ctx context.Context, w WSClient) error { return nil }) assert.NoError(t, err) defer wsc.Close()