Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/blockchain/ethereum/ethereum.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func (e *Ethereum) Init(ctx context.Context, prefix config.Prefix, callbacks blo
wsConfig.WSKeyPath = "/ws"
}

e.wsconn, err = wsclient.New(ctx, wsConfig, e.afterConnect)
e.wsconn, err = wsclient.New(ctx, wsConfig, nil, e.afterConnect)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion internal/blockchain/fabric/fabric.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (f *Fabric) Init(ctx context.Context, prefix config.Prefix, callbacks block
wsConfig.WSKeyPath = "/ws"
}

f.wsconn, err = wsclient.New(ctx, wsConfig, f.afterConnect)
f.wsconn, err = wsclient.New(ctx, wsConfig, nil, f.afterConnect)
if err != nil {
return err
}
Expand Down
3 changes: 3 additions & 0 deletions internal/dataexchange/ffdx/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,12 @@ import (
const (
// DataExchangeManifestEnabled determines whether to require+validate a manifest from other DX instances in the network. Must be supported by the connector
DataExchangeManifestEnabled = "manifestEnabled"
// DataExchangeInitEnabled instructs FireFly to always post all current nodes to the /init API before connecting or reconnecting to the connector
DataExchangeInitEnabled = "initEnabled"
)

func (h *FFDX) InitPrefix(prefix config.Prefix) {
wsconfig.InitPrefix(prefix)
prefix.AddKnownKey(DataExchangeManifestEnabled, false)
prefix.AddKnownKey(DataExchangeInitEnabled, false)
}
83 changes: 73 additions & 10 deletions internal/dataexchange/ffdx/ffdx.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"io"
"net/http"
"strconv"
"sync"

"github.com/go-resty/resty/v2"
"github.com/hyperledger/firefly/internal/config"
Expand All @@ -41,6 +42,10 @@ type FFDX struct {
callbacks dataexchange.Callbacks
client *resty.Client
wsconn wsclient.WSClient
needsInit bool
initialized bool
initMutex sync.Mutex
nodes []fftypes.DXInfo
}

type wsEvent struct {
Expand Down Expand Up @@ -100,26 +105,34 @@ type wsAck struct {
Manifest string `json:"manifest,omitempty"` // FireFly core determined that DX should propagate opaquely to TransferResult, if this DX supports delivery acknowledgements.
}

type dxStatus struct {
Status string `json:"status"`
}

func (h *FFDX) Name() string {
return "ffdx"
}

func (h *FFDX) Init(ctx context.Context, prefix config.Prefix, callbacks dataexchange.Callbacks) (err error) {
h.ctx = log.WithLogField(ctx, "dx", "ffdx")
func (h *FFDX) Init(ctx context.Context, prefix config.Prefix, nodes []fftypes.DXInfo, callbacks dataexchange.Callbacks) (err error) {
h.ctx = log.WithLogField(ctx, "dx", "https")
h.callbacks = callbacks

h.needsInit = prefix.GetBool(DataExchangeInitEnabled)

if prefix.GetString(restclient.HTTPConfigURL) == "" {
return i18n.NewError(ctx, i18n.MsgMissingPluginConfig, "url", "dataexchange.ffdx")
}

h.nodes = nodes

h.client = restclient.New(h.ctx, prefix)
h.capabilities = &dataexchange.Capabilities{
Manifest: prefix.GetBool(DataExchangeManifestEnabled),
}

wsConfig := wsconfig.GenerateConfigFromPrefix(prefix)

h.wsconn, err = wsclient.New(ctx, wsConfig, nil)
h.wsconn, err = wsclient.New(ctx, wsConfig, h.beforeConnect, nil)
if err != nil {
return err
}
Expand All @@ -135,20 +148,62 @@ func (h *FFDX) Capabilities() *dataexchange.Capabilities {
return h.capabilities
}

func (h *FFDX) GetEndpointInfo(ctx context.Context) (peerID string, endpoint fftypes.JSONObject, err error) {
func (h *FFDX) beforeConnect(ctx context.Context) error {
h.initMutex.Lock()
defer h.initMutex.Unlock()

if h.needsInit {
h.initialized = false
var status dxStatus
res, err := h.client.R().SetContext(ctx).
SetBody(h.nodes).
SetResult(&status).
Post("/api/v1/init")
if err != nil || !res.IsSuccess() {
return restclient.WrapRestErr(ctx, res, err, i18n.MsgDXRESTErr)
}
if status.Status != "ready" {
return fmt.Errorf("DX returned non-ready status: %s", status.Status)
}
}
h.initialized = true
return nil
}

func (h *FFDX) checkInitialized(ctx context.Context) error {
h.initMutex.Lock()
defer h.initMutex.Unlock()

if !h.initialized {
return i18n.NewError(ctx, i18n.MsgDXNotInitialized)
}
return nil
}

func (h *FFDX) GetEndpointInfo(ctx context.Context) (peer fftypes.DXInfo, err error) {
if err := h.checkInitialized(ctx); err != nil {
return peer, err
}

res, err := h.client.R().SetContext(ctx).
SetResult(&endpoint).
SetResult(&peer.Endpoint).
Get("/api/v1/id")
if err != nil || !res.IsSuccess() {
return peerID, endpoint, restclient.WrapRestErr(ctx, res, err, i18n.MsgDXRESTErr)
return peer, restclient.WrapRestErr(ctx, res, err, i18n.MsgDXRESTErr)
}
return endpoint.GetString("id"), endpoint, nil
peer.Peer = peer.Endpoint.GetString("id")
h.nodes = append(h.nodes, peer)
return peer, nil
}

func (h *FFDX) AddPeer(ctx context.Context, peerID string, endpoint fftypes.JSONObject) (err error) {
func (h *FFDX) AddPeer(ctx context.Context, peer fftypes.DXInfo) (err error) {
if err := h.checkInitialized(ctx); err != nil {
return err
}

res, err := h.client.R().SetContext(ctx).
SetBody(endpoint).
Put(fmt.Sprintf("/api/v1/peers/%s", peerID))
SetBody(peer.Endpoint).
Put(fmt.Sprintf("/api/v1/peers/%s", peer.Peer))
if err != nil || !res.IsSuccess() {
return restclient.WrapRestErr(ctx, res, err, i18n.MsgDXRESTErr)
}
Expand Down Expand Up @@ -186,6 +241,10 @@ func (h *FFDX) DownloadBLOB(ctx context.Context, payloadRef string) (content io.
}

func (h *FFDX) SendMessage(ctx context.Context, opID *fftypes.UUID, peerID string, data []byte) (err error) {
if err := h.checkInitialized(ctx); err != nil {
return err
}

var responseData responseWithRequestID
res, err := h.client.R().SetContext(ctx).
SetBody(&sendMessage{
Expand All @@ -202,6 +261,10 @@ func (h *FFDX) SendMessage(ctx context.Context, opID *fftypes.UUID, peerID strin
}

func (h *FFDX) TransferBLOB(ctx context.Context, opID *fftypes.UUID, peerID, payloadRef string) (err error) {
if err := h.checkInitialized(ctx); err != nil {
return err
}

var responseData responseWithRequestID
res, err := h.client.R().SetContext(ctx).
SetBody(&transferBlob{
Expand Down
105 changes: 91 additions & 14 deletions internal/dataexchange/ffdx/ffdx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package ffdx
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
Expand Down Expand Up @@ -53,9 +54,10 @@ func newTestFFDX(t *testing.T) (h *FFDX, toServer, fromServer chan string, httpU
utConfPrefix.Set(restclient.HTTPConfigURL, httpURL)
utConfPrefix.Set(restclient.HTTPCustomClient, mockedClient)

h = &FFDX{}
h = &FFDX{initialized: true}
nodes := make([]fftypes.DXInfo, 0)
h.InitPrefix(utConfPrefix)
err := h.Init(context.Background(), utConfPrefix, &dataexchangemocks.Callbacks{})
err := h.Init(context.Background(), utConfPrefix, nodes, &dataexchangemocks.Callbacks{})
assert.NoError(t, err)
assert.Equal(t, "ffdx", h.Name())
assert.NotNil(t, h.Capabilities())
Expand All @@ -68,22 +70,23 @@ func newTestFFDX(t *testing.T) (h *FFDX, toServer, fromServer chan string, httpU
func TestInitBadURL(t *testing.T) {
config.Reset()
h := &FFDX{}
nodes := make([]fftypes.DXInfo, 0)
h.InitPrefix(utConfPrefix)
utConfPrefix.Set(restclient.HTTPConfigURL, "::::////")
err := h.Init(context.Background(), utConfPrefix, &dataexchangemocks.Callbacks{})
err := h.Init(context.Background(), utConfPrefix, nodes, &dataexchangemocks.Callbacks{})
assert.Regexp(t, "FF10162", err)
}

func TestInitMissingURL(t *testing.T) {
config.Reset()
h := &FFDX{}
nodes := make([]fftypes.DXInfo, 0)
h.InitPrefix(utConfPrefix)
err := h.Init(context.Background(), utConfPrefix, &dataexchangemocks.Callbacks{})
err := h.Init(context.Background(), utConfPrefix, nodes, &dataexchangemocks.Callbacks{})
assert.Regexp(t, "FF10138", err)
}

func TestGetEndpointInfo(t *testing.T) {

h, _, _, httpURL, done := newTestFFDX(t)
defer done()

Expand All @@ -94,14 +97,14 @@ func TestGetEndpointInfo(t *testing.T) {
"cert": "cert data...",
}))

peerID, endpoint, err := h.GetEndpointInfo(context.Background())
peer, err := h.GetEndpointInfo(context.Background())
assert.NoError(t, err)
assert.Equal(t, "peer1", peerID)
assert.Equal(t, "peer1", peer.Peer)
assert.Equal(t, fftypes.JSONObject{
"id": "peer1",
"endpoint": "https://peer1.example.com",
"cert": "cert data...",
}, endpoint)
}, peer.Endpoint)
}

func TestGetEndpointInfoError(t *testing.T) {
Expand All @@ -111,25 +114,25 @@ func TestGetEndpointInfoError(t *testing.T) {
httpmock.RegisterResponder("GET", fmt.Sprintf("%s/api/v1/id", httpURL),
httpmock.NewJsonResponderOrPanic(500, fftypes.JSONObject{}))

_, _, err := h.GetEndpointInfo(context.Background())
_, err := h.GetEndpointInfo(context.Background())
assert.Regexp(t, "FF10229", err)
}

func TestAddPeer(t *testing.T) {

h, _, _, httpURL, done := newTestFFDX(t)
defer done()

httpmock.RegisterResponder("PUT", fmt.Sprintf("%s/api/v1/peers/peer1", httpURL),
httpmock.NewJsonResponderOrPanic(200, fftypes.JSONObject{}))

err := h.AddPeer(context.Background(), "peer1",
fftypes.JSONObject{
err := h.AddPeer(context.Background(), fftypes.DXInfo{
Peer: "peer1",
Endpoint: fftypes.JSONObject{
"id": "peer1",
"endpoint": "https://peer1.example.com",
"cert": "cert...",
},
)
})
assert.NoError(t, err)
}

Expand All @@ -140,7 +143,10 @@ func TestAddPeerError(t *testing.T) {
httpmock.RegisterResponder("PUT", fmt.Sprintf("%s/api/v1/peers/peer1", httpURL),
httpmock.NewJsonResponderOrPanic(500, fftypes.JSONObject{}))

err := h.AddPeer(context.Background(), "peer1", fftypes.JSONObject{})
err := h.AddPeer(context.Background(), fftypes.DXInfo{
Peer: "peer1",
Endpoint: fftypes.JSONObject{},
})
assert.Regexp(t, "FF10229", err)
}

Expand Down Expand Up @@ -501,3 +507,74 @@ func TestEventLoopClosedContext(t *testing.T) {
wsm.On("Send", mock.Anything, mock.Anything).Return(nil)
h.eventLoop() // we're simply looking for it exiting
}

func TestWebsocketWithReinit(t *testing.T) {
mockedClient := &http.Client{}
httpmock.ActivateNonDefault(mockedClient)
defer httpmock.DeactivateAndReset()

_, _, wsURL, cancel := wsclient.NewTestWSServer(nil)
defer cancel()

u, _ := url.Parse(wsURL)
u.Scheme = "http"
httpURL := u.String()
h := &FFDX{}
nodes := []fftypes.DXInfo{{}}

config.Reset()
h.InitPrefix(utConfPrefix)
utConfPrefix.Set(restclient.HTTPConfigURL, httpURL)
utConfPrefix.Set(restclient.HTTPCustomClient, mockedClient)
utConfPrefix.Set(DataExchangeInitEnabled, true)

first := true
httpmock.RegisterResponder("POST", fmt.Sprintf("%s/api/v1/init", httpURL),
func(req *http.Request) (*http.Response, error) {
var reqNodes []fftypes.DXInfo
err := json.NewDecoder(req.Body).Decode(&reqNodes)
assert.NoError(t, err)
assert.Equal(t, 1, len(reqNodes))

assert.False(t, h.initialized)

if first {
first = false
return httpmock.NewJsonResponse(200, fftypes.JSONObject{
"status": "notready",
})
}
return httpmock.NewJsonResponse(200, fftypes.JSONObject{
"status": "ready",
})
})

h.InitPrefix(utConfPrefix)
err := h.Init(context.Background(), utConfPrefix, nodes, &dataexchangemocks.Callbacks{})
assert.NoError(t, err)

err = h.Start()
assert.NoError(t, err)

assert.Equal(t, 2, httpmock.GetTotalCallCount())
assert.True(t, h.initialized)
}

func TestDXUninitialized(t *testing.T) {
h, _, _, _, done := newTestFFDX(t)
defer done()

h.initialized = false

_, err := h.GetEndpointInfo(context.Background())
assert.Regexp(t, "FF10342", err)

err = h.AddPeer(context.Background(), fftypes.DXInfo{})
assert.Regexp(t, "FF10342", err)

err = h.TransferBLOB(context.Background(), fftypes.NewUUID(), "peer1", "ns1/id1")
assert.Regexp(t, "FF10342", err)

err = h.SendMessage(context.Background(), fftypes.NewUUID(), "peer1", []byte(`some data`))
assert.Regexp(t, "FF10342", err)
}
2 changes: 1 addition & 1 deletion internal/definitions/definition_handler_network_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (dh *definitionHandlers) handleNodeBroadcast(ctx context.Context, msg *ffty
return ActionConfirm, &DefinitionBatchActions{
PreFinalize: func(ctx context.Context) error {
// Tell the data exchange about this node. Treat these errors like database errors - and return for retry processing
return dh.exchange.AddPeer(ctx, node.DX.Peer, node.DX.Endpoint)
return dh.exchange.AddPeer(ctx, node.DX)
},
}, nil
}
Loading