Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions internal/dataexchange/dxfactory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,24 @@ import (
"context"

"github.com/hyperledger/firefly/internal/config"
"github.com/hyperledger/firefly/internal/dataexchange/dxhttps"
"github.com/hyperledger/firefly/internal/dataexchange/ffdx"
"github.com/hyperledger/firefly/internal/i18n"
"github.com/hyperledger/firefly/pkg/dataexchange"
)

var pluginsByName = map[string]func() dataexchange.Plugin{
(*dxhttps.HTTPS)(nil).Name(): func() dataexchange.Plugin { return &dxhttps.HTTPS{} },
(*ffdx.FFDX)(nil).Name(): func() dataexchange.Plugin { return &ffdx.FFDX{} },
}

func InitPrefix(prefix config.Prefix) {
for name, plugin := range pluginsByName {
plugin().InitPrefix(prefix.SubPrefix(name))

// Migration path for old plugin name
// TODO: remove this
if name == "ffdx" {
plugin().InitPrefix(prefix.SubPrefix("https"))
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package dxhttps
package ffdx

import (
"github.com/hyperledger/firefly/internal/config"
Expand All @@ -26,7 +26,7 @@ const (
DataExchangeManifestEnabled = "manifestEnabled"
)

func (h *HTTPS) InitPrefix(prefix config.Prefix) {
func (h *FFDX) InitPrefix(prefix config.Prefix) {
wsconfig.InitPrefix(prefix)
prefix.AddKnownKey(DataExchangeManifestEnabled, false)
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package dxhttps
package ffdx

import (
"context"
Expand All @@ -35,7 +35,7 @@ import (
"github.com/hyperledger/firefly/pkg/wsclient"
)

type HTTPS struct {
type FFDX struct {
ctx context.Context
capabilities *dataexchange.Capabilities
callbacks dataexchange.Callbacks
Expand Down Expand Up @@ -100,16 +100,16 @@ type wsAck struct {
Manifest string `json:"manifest,omitempty"` // FireFly core determined that DX should propagate opaquely to TransferResult, if this DX supports delivery acknowledgements.
}

func (h *HTTPS) Name() string {
return "https"
func (h *FFDX) Name() string {
return "ffdx"
}

func (h *HTTPS) Init(ctx context.Context, prefix config.Prefix, callbacks dataexchange.Callbacks) (err error) {
h.ctx = log.WithLogField(ctx, "dx", "https")
func (h *FFDX) Init(ctx context.Context, prefix config.Prefix, callbacks dataexchange.Callbacks) (err error) {
h.ctx = log.WithLogField(ctx, "dx", "ffdx")
h.callbacks = callbacks

if prefix.GetString(restclient.HTTPConfigURL) == "" {
return i18n.NewError(ctx, i18n.MsgMissingPluginConfig, "url", "dataexchange.https")
return i18n.NewError(ctx, i18n.MsgMissingPluginConfig, "url", "dataexchange.ffdx")
}

h.client = restclient.New(h.ctx, prefix)
Expand All @@ -127,15 +127,15 @@ func (h *HTTPS) Init(ctx context.Context, prefix config.Prefix, callbacks dataex
return nil
}

func (h *HTTPS) Start() error {
func (h *FFDX) Start() error {
return h.wsconn.Connect()
}

func (h *HTTPS) Capabilities() *dataexchange.Capabilities {
func (h *FFDX) Capabilities() *dataexchange.Capabilities {
return h.capabilities
}

func (h *HTTPS) GetEndpointInfo(ctx context.Context) (peerID string, endpoint fftypes.JSONObject, err error) {
func (h *FFDX) GetEndpointInfo(ctx context.Context) (peerID string, endpoint fftypes.JSONObject, err error) {
res, err := h.client.R().SetContext(ctx).
SetResult(&endpoint).
Get("/api/v1/id")
Expand All @@ -145,7 +145,7 @@ func (h *HTTPS) GetEndpointInfo(ctx context.Context) (peerID string, endpoint ff
return endpoint.GetString("id"), endpoint, nil
}

func (h *HTTPS) AddPeer(ctx context.Context, peerID string, endpoint fftypes.JSONObject) (err error) {
func (h *FFDX) AddPeer(ctx context.Context, peerID string, endpoint fftypes.JSONObject) (err error) {
res, err := h.client.R().SetContext(ctx).
SetBody(endpoint).
Put(fmt.Sprintf("/api/v1/peers/%s", peerID))
Expand All @@ -155,7 +155,7 @@ func (h *HTTPS) AddPeer(ctx context.Context, peerID string, endpoint fftypes.JSO
return nil
}

func (h *HTTPS) UploadBLOB(ctx context.Context, ns string, id fftypes.UUID, content io.Reader) (payloadRef string, hash *fftypes.Bytes32, size int64, err error) {
func (h *FFDX) UploadBLOB(ctx context.Context, ns string, id fftypes.UUID, content io.Reader) (payloadRef string, hash *fftypes.Bytes32, size int64, err error) {
payloadRef = fmt.Sprintf("%s/%s", ns, &id)
var upload uploadBlob
res, err := h.client.R().SetContext(ctx).
Expand All @@ -172,7 +172,7 @@ func (h *HTTPS) UploadBLOB(ctx context.Context, ns string, id fftypes.UUID, cont
return payloadRef, hash, upload.Size, nil
}

func (h *HTTPS) DownloadBLOB(ctx context.Context, payloadRef string) (content io.ReadCloser, err error) {
func (h *FFDX) DownloadBLOB(ctx context.Context, payloadRef string) (content io.ReadCloser, err error) {
res, err := h.client.R().SetContext(ctx).
SetDoNotParseResponse(true).
Get(fmt.Sprintf("/api/v1/blobs/%s", payloadRef))
Expand All @@ -185,7 +185,7 @@ func (h *HTTPS) DownloadBLOB(ctx context.Context, payloadRef string) (content io
return res.RawBody(), nil
}

func (h *HTTPS) SendMessage(ctx context.Context, opID *fftypes.UUID, peerID string, data []byte) (err error) {
func (h *FFDX) SendMessage(ctx context.Context, opID *fftypes.UUID, peerID string, data []byte) (err error) {
var responseData responseWithRequestID
res, err := h.client.R().SetContext(ctx).
SetBody(&sendMessage{
Expand All @@ -201,7 +201,7 @@ func (h *HTTPS) SendMessage(ctx context.Context, opID *fftypes.UUID, peerID stri
return nil
}

func (h *HTTPS) TransferBLOB(ctx context.Context, opID *fftypes.UUID, peerID, payloadRef string) (err error) {
func (h *FFDX) TransferBLOB(ctx context.Context, opID *fftypes.UUID, peerID, payloadRef string) (err error) {
var responseData responseWithRequestID
res, err := h.client.R().SetContext(ctx).
SetBody(&transferBlob{
Expand All @@ -217,7 +217,7 @@ func (h *HTTPS) TransferBLOB(ctx context.Context, opID *fftypes.UUID, peerID, pa
return nil
}

func (h *HTTPS) CheckBLOBReceived(ctx context.Context, peerID, ns string, id fftypes.UUID) (hash *fftypes.Bytes32, size int64, err error) {
func (h *FFDX) CheckBLOBReceived(ctx context.Context, peerID, ns string, id fftypes.UUID) (hash *fftypes.Bytes32, size int64, err error) {
var responseData responseWithRequestID
res, err := h.client.R().SetContext(ctx).
SetResult(&responseData).
Expand All @@ -241,7 +241,7 @@ func (h *HTTPS) CheckBLOBReceived(ctx context.Context, peerID, ns string, id fft
return hash, size, nil
}

func (h *HTTPS) eventLoop() {
func (h *FFDX) eventLoop() {
defer h.wsconn.Close()
l := log.L(h.ctx).WithField("role", "event-loop")
ctx := log.WithLogger(h.ctx, l)
Expand Down
Loading