Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 36 additions & 9 deletions internal/dataexchange/ffdx/ffdx.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,14 @@ const (
type msgType string

const (
messageReceived msgType = "message-received"
messageDelivered msgType = "message-delivered"
messageFailed msgType = "message-failed"
blobReceived msgType = "blob-received"
blobDelivered msgType = "blob-delivered"
blobFailed msgType = "blob-failed"
messageReceived msgType = "message-received"
messageDelivered msgType = "message-delivered"
messageAcknowledged msgType = "message-acknowledged"
messageFailed msgType = "message-failed"
blobReceived msgType = "blob-received"
blobDelivered msgType = "blob-delivered"
blobAcknowledged msgType = "blob-acknowledged"
blobFailed msgType = "blob-failed"
)

type responseWithRequestID struct {
Expand Down Expand Up @@ -345,18 +347,38 @@ func (h *FFDX) eventLoop() {
var manifest string
switch msg.Type {
case messageFailed:
err = h.callbacks.TransferResult(msg.RequestID, fftypes.OpStatusFailed, fftypes.TransportStatusUpdate{Error: msg.Error})
err = h.callbacks.TransferResult(msg.RequestID, fftypes.OpStatusFailed, fftypes.TransportStatusUpdate{
Error: msg.Error,
Info: msg.Info,
})
case messageDelivered:
status := fftypes.OpStatusSucceeded
if h.capabilities.Manifest {
status = fftypes.OpStatusPending
}
err = h.callbacks.TransferResult(msg.RequestID, status, fftypes.TransportStatusUpdate{
Info: msg.Info,
})
case messageAcknowledged:
err = h.callbacks.TransferResult(msg.RequestID, fftypes.OpStatusSucceeded, fftypes.TransportStatusUpdate{
Manifest: msg.Manifest,
Info: msg.Info,
})
case messageReceived:
manifest, err = h.callbacks.MessageReceived(msg.Sender, []byte(msg.Message))
case blobFailed:
err = h.callbacks.TransferResult(msg.RequestID, fftypes.OpStatusFailed, fftypes.TransportStatusUpdate{Error: msg.Error})
err = h.callbacks.TransferResult(msg.RequestID, fftypes.OpStatusFailed, fftypes.TransportStatusUpdate{
Error: msg.Error,
Info: msg.Info,
})
case blobDelivered:
err = h.callbacks.TransferResult(msg.RequestID, fftypes.OpStatusSucceeded, fftypes.TransportStatusUpdate{})
status := fftypes.OpStatusSucceeded
if h.capabilities.Manifest {
status = fftypes.OpStatusPending
}
err = h.callbacks.TransferResult(msg.RequestID, status, fftypes.TransportStatusUpdate{
Info: msg.Info,
})
case blobReceived:
var hash *fftypes.Bytes32
hash, err = fftypes.ParseBytes32(ctx, msg.Hash)
Expand All @@ -366,6 +388,11 @@ func (h *FFDX) eventLoop() {
} else {
err = h.callbacks.BLOBReceived(msg.Sender, *hash, msg.Size, msg.Path)
}
case blobAcknowledged:
err = h.callbacks.TransferResult(msg.RequestID, fftypes.OpStatusSucceeded, fftypes.TransportStatusUpdate{
Hash: msg.Hash,
Info: msg.Info,
})
default:
l.Errorf("Message unexpected: %s", msg.Type)
}
Expand Down
86 changes: 64 additions & 22 deletions internal/dataexchange/ffdx/ffdx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import (

var utConfPrefix = config.NewPluginConfig("ffdx_unit_tests")

func newTestFFDX(t *testing.T) (h *FFDX, toServer, fromServer chan string, httpURL string, done func()) {
func newTestFFDX(t *testing.T, manifestEnabled bool) (h *FFDX, toServer, fromServer chan string, httpURL string, done func()) {
mockedClient := &http.Client{}
httpmock.ActivateNonDefault(mockedClient)

Expand All @@ -53,10 +53,12 @@ func newTestFFDX(t *testing.T) (h *FFDX, toServer, fromServer chan string, httpU
h.InitPrefix(utConfPrefix)
utConfPrefix.Set(restclient.HTTPConfigURL, httpURL)
utConfPrefix.Set(restclient.HTTPCustomClient, mockedClient)
utConfPrefix.Set(DataExchangeManifestEnabled, manifestEnabled)

h = &FFDX{initialized: true}
nodes := make([]fftypes.DXInfo, 0)
h.InitPrefix(utConfPrefix)

err := h.Init(context.Background(), utConfPrefix, nodes, &dataexchangemocks.Callbacks{})
assert.NoError(t, err)
assert.Equal(t, "ffdx", h.Name())
Expand Down Expand Up @@ -87,7 +89,7 @@ func TestInitMissingURL(t *testing.T) {
}

func TestGetEndpointInfo(t *testing.T) {
h, _, _, httpURL, done := newTestFFDX(t)
h, _, _, httpURL, done := newTestFFDX(t, false)
defer done()

httpmock.RegisterResponder("GET", fmt.Sprintf("%s/api/v1/id", httpURL),
Expand All @@ -108,7 +110,7 @@ func TestGetEndpointInfo(t *testing.T) {
}

func TestGetEndpointInfoError(t *testing.T) {
h, _, _, httpURL, done := newTestFFDX(t)
h, _, _, httpURL, done := newTestFFDX(t, false)
defer done()

httpmock.RegisterResponder("GET", fmt.Sprintf("%s/api/v1/id", httpURL),
Expand All @@ -119,7 +121,7 @@ func TestGetEndpointInfoError(t *testing.T) {
}

func TestAddPeer(t *testing.T) {
h, _, _, httpURL, done := newTestFFDX(t)
h, _, _, httpURL, done := newTestFFDX(t, false)
defer done()

httpmock.RegisterResponder("PUT", fmt.Sprintf("%s/api/v1/peers/peer1", httpURL),
Expand All @@ -137,7 +139,7 @@ func TestAddPeer(t *testing.T) {
}

func TestAddPeerError(t *testing.T) {
h, _, _, httpURL, done := newTestFFDX(t)
h, _, _, httpURL, done := newTestFFDX(t, false)
defer done()

httpmock.RegisterResponder("PUT", fmt.Sprintf("%s/api/v1/peers/peer1", httpURL),
Expand All @@ -152,7 +154,7 @@ func TestAddPeerError(t *testing.T) {

func TestUploadBLOB(t *testing.T) {

h, _, _, httpURL, done := newTestFFDX(t)
h, _, _, httpURL, done := newTestFFDX(t, false)
defer done()

u := fftypes.NewUUID()
Expand Down Expand Up @@ -182,7 +184,7 @@ func TestUploadBLOB(t *testing.T) {

func TestUploadBLOBBadHash(t *testing.T) {

h, _, _, httpURL, done := newTestFFDX(t)
h, _, _, httpURL, done := newTestFFDX(t, false)
defer done()

u := fftypes.NewUUID()
Expand All @@ -203,7 +205,7 @@ func TestUploadBLOBBadHash(t *testing.T) {
}

func TestUploadBLOBError(t *testing.T) {
h, _, _, httpURL, done := newTestFFDX(t)
h, _, _, httpURL, done := newTestFFDX(t, false)
defer done()

u := fftypes.NewUUID()
Expand All @@ -216,7 +218,7 @@ func TestUploadBLOBError(t *testing.T) {

func TestCheckBLOBReceivedOk(t *testing.T) {

h, _, _, httpURL, done := newTestFFDX(t)
h, _, _, httpURL, done := newTestFFDX(t, false)
defer done()

u := fftypes.NewUUID()
Expand All @@ -241,7 +243,7 @@ func TestCheckBLOBReceivedOk(t *testing.T) {

func TestCheckBLOBReceivedBadHash(t *testing.T) {

h, _, _, httpURL, done := newTestFFDX(t)
h, _, _, httpURL, done := newTestFFDX(t, false)
defer done()

u := fftypes.NewUUID()
Expand All @@ -262,7 +264,7 @@ func TestCheckBLOBReceivedBadHash(t *testing.T) {

func TestCheckBLOBReceivedBadSize(t *testing.T) {

h, _, _, httpURL, done := newTestFFDX(t)
h, _, _, httpURL, done := newTestFFDX(t, false)
defer done()

u := fftypes.NewUUID()
Expand All @@ -285,7 +287,7 @@ func TestCheckBLOBReceivedBadSize(t *testing.T) {

func TestCheckBLOBReceivedNotFound(t *testing.T) {

h, _, _, httpURL, done := newTestFFDX(t)
h, _, _, httpURL, done := newTestFFDX(t, false)
defer done()

u := fftypes.NewUUID()
Expand All @@ -304,7 +306,7 @@ func TestCheckBLOBReceivedNotFound(t *testing.T) {

func TestCheckBLOBReceivedError(t *testing.T) {

h, _, _, httpURL, done := newTestFFDX(t)
h, _, _, httpURL, done := newTestFFDX(t, false)
defer done()

u := fftypes.NewUUID()
Expand All @@ -322,7 +324,7 @@ func TestCheckBLOBReceivedError(t *testing.T) {

func TestDownloadBLOB(t *testing.T) {

h, _, _, httpURL, done := newTestFFDX(t)
h, _, _, httpURL, done := newTestFFDX(t, false)
defer done()

u := fftypes.NewUUID()
Expand All @@ -337,7 +339,7 @@ func TestDownloadBLOB(t *testing.T) {
}

func TestDownloadBLOBError(t *testing.T) {
h, _, _, httpURL, done := newTestFFDX(t)
h, _, _, httpURL, done := newTestFFDX(t, false)
defer done()

httpmock.RegisterResponder("GET", fmt.Sprintf("%s/api/v1/blobs/bad", httpURL),
Expand All @@ -349,7 +351,7 @@ func TestDownloadBLOBError(t *testing.T) {

func TestSendMessage(t *testing.T) {

h, _, _, httpURL, done := newTestFFDX(t)
h, _, _, httpURL, done := newTestFFDX(t, false)
defer done()

httpmock.RegisterResponder("POST", fmt.Sprintf("%s/api/v1/messages", httpURL),
Expand All @@ -360,7 +362,7 @@ func TestSendMessage(t *testing.T) {
}

func TestSendMessageError(t *testing.T) {
h, _, _, httpURL, done := newTestFFDX(t)
h, _, _, httpURL, done := newTestFFDX(t, false)
defer done()

httpmock.RegisterResponder("POST", fmt.Sprintf("%s/api/v1/message", httpURL),
Expand All @@ -372,7 +374,7 @@ func TestSendMessageError(t *testing.T) {

func TestTransferBLOB(t *testing.T) {

h, _, _, httpURL, done := newTestFFDX(t)
h, _, _, httpURL, done := newTestFFDX(t, false)
defer done()

httpmock.RegisterResponder("POST", fmt.Sprintf("%s/api/v1/transfers", httpURL),
Expand All @@ -383,7 +385,7 @@ func TestTransferBLOB(t *testing.T) {
}

func TestTransferBLOBError(t *testing.T) {
h, _, _, httpURL, done := newTestFFDX(t)
h, _, _, httpURL, done := newTestFFDX(t, false)
defer done()

httpmock.RegisterResponder("POST", fmt.Sprintf("%s/api/v1/transfers", httpURL),
Expand All @@ -395,7 +397,7 @@ func TestTransferBLOBError(t *testing.T) {

func TestEvents(t *testing.T) {

h, toServer, fromServer, _, done := newTestFFDX(t)
h, toServer, fromServer, _, done := newTestFFDX(t, false)
defer done()

err := h.Start()
Expand All @@ -415,10 +417,15 @@ func TestEvents(t *testing.T) {
msg = <-toServer
assert.Equal(t, `{"action":"commit"}`, string(msg))

mcb.On("TransferResult", "tx12345", fftypes.OpStatusSucceeded, mock.Anything).Return(nil)
fromServer <- `{"type":"message-delivered","requestID":"tx12345"}`
msg = <-toServer
assert.Equal(t, `{"action":"commit"}`, string(msg))

mcb.On("TransferResult", "tx12345", fftypes.OpStatusSucceeded, mock.MatchedBy(func(ts fftypes.TransportStatusUpdate) bool {
return ts.Manifest == `{"manifest":true}` && ts.Info.String() == `{"signatures":"and stuff"}`
})).Return(nil)
fromServer <- `{"type":"message-delivered","requestID":"tx12345","info":{"signatures":"and stuff"},"manifest":"{\"manifest\":true}"}`
fromServer <- `{"type":"message-acknowledged","requestID":"tx12345","info":{"signatures":"and stuff"},"manifest":"{\"manifest\":true}"}`
msg = <-toServer
assert.Equal(t, `{"action":"commit"}`, string(msg))

Expand Down Expand Up @@ -456,6 +463,41 @@ func TestEvents(t *testing.T) {
msg = <-toServer
assert.Equal(t, `{"action":"commit"}`, string(msg))

mcb.On("TransferResult", "tx12345", fftypes.OpStatusSucceeded, mock.MatchedBy(func(ts fftypes.TransportStatusUpdate) bool {
return ts.Manifest == `{"manifest":true}` && ts.Info.String() == `{"signatures":"and stuff"}`
})).Return(nil)
fromServer <- `{"type":"blob-acknowledged","requestID":"tx12345","info":{"signatures":"and stuff"},"manifest":"{\"manifest\":true}"}`
msg = <-toServer
assert.Equal(t, `{"action":"commit"}`, string(msg))

mcb.AssertExpectations(t)
}

func TestEventsWithManifest(t *testing.T) {

h, toServer, fromServer, _, done := newTestFFDX(t, true)
defer done()

err := h.Start()
assert.NoError(t, err)

fromServer <- `!}` // ignored
fromServer <- `{}` // ignored
msg := <-toServer
assert.Equal(t, `{"action":"commit"}`, string(msg))

mcb := h.callbacks.(*dataexchangemocks.Callbacks)

mcb.On("TransferResult", "tx12345", fftypes.OpStatusPending, mock.Anything).Return(nil)
fromServer <- `{"type":"message-delivered","requestID":"tx12345"}`
msg = <-toServer
assert.Equal(t, `{"action":"commit"}`, string(msg))

mcb.On("TransferResult", "tx12345", fftypes.OpStatusPending, mock.Anything).Return(nil)
fromServer <- `{"type":"blob-delivered","requestID":"tx12345"}`
msg = <-toServer
assert.Equal(t, `{"action":"commit"}`, string(msg))

mcb.AssertExpectations(t)
}

Expand Down Expand Up @@ -564,7 +606,7 @@ func TestWebsocketWithReinit(t *testing.T) {
}

func TestDXUninitialized(t *testing.T) {
h, _, _, _, done := newTestFFDX(t)
h, _, _, _, done := newTestFFDX(t, false)
defer done()

h.initialized = false
Expand Down
26 changes: 19 additions & 7 deletions internal/events/dx_callbacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,13 +292,25 @@ func (em *eventManager) TransferResult(dx dataexchange.Plugin, trackingID string
// The maniest should exactly match that stored into the operation input, if supported
op := operations[0]
if status == fftypes.OpStatusSucceeded && dx.Capabilities().Manifest {
expectedManifest := op.Input.GetString("manifest")
if update.Manifest != expectedManifest {
// Log and map to failure for user to see that the receiver did not provide a matching acknowledgement
mismatchErr := i18n.NewError(em.ctx, i18n.MsgManifestMismatch, status, update.Manifest)
log.L(em.ctx).Errorf("%s transfer %s: %s", dx.Name(), trackingID, mismatchErr.Error())
update.Error = mismatchErr.Error()
status = fftypes.OpStatusFailed
switch op.Type {
case fftypes.OpTypeDataExchangeBatchSend:
expectedManifest := op.Input.GetString("manifest")
if update.Manifest != expectedManifest {
// Log and map to failure for user to see that the receiver did not provide a matching acknowledgement
mismatchErr := i18n.NewError(em.ctx, i18n.MsgManifestMismatch, status, update.Manifest)
log.L(em.ctx).Errorf("%s transfer %s: %s", dx.Name(), trackingID, mismatchErr.Error())
update.Error = mismatchErr.Error()
status = fftypes.OpStatusFailed
}
case fftypes.OpTypeDataExchangeBlobSend:
expectedHash := op.Input.GetString("hash")
if update.Hash != expectedHash {
// Log and map to failure for user to see that the receiver did not provide a matching hash
mismatchErr := i18n.NewError(em.ctx, i18n.MsgBlobHashMismatch, expectedHash, update.Hash)
log.L(em.ctx).Errorf("%s transfer %s: %s", dx.Name(), trackingID, mismatchErr.Error())
update.Error = mismatchErr.Error()
status = fftypes.OpStatusFailed
}
}
}

Expand Down
Loading