From a016829f056ea766d609492873f6e76d4d983063 Mon Sep 17 00:00:00 2001 From: Gabriel Indik Date: Thu, 17 Feb 2022 11:38:42 -0500 Subject: [PATCH] Updates to DX Signed-off-by: Gabriel Indik --- internal/dataexchange/ffdx/ffdx.go | 45 ++++++++-- internal/dataexchange/ffdx/ffdx_test.go | 86 ++++++++++++++----- internal/events/dx_callbacks.go | 26 ++++-- internal/events/dx_callbacks_test.go | 37 +++++++- internal/i18n/en_translations.go | 3 + internal/privatemessaging/privatemessaging.go | 3 + pkg/fftypes/transport_wrapper.go | 1 + 7 files changed, 162 insertions(+), 39 deletions(-) diff --git a/internal/dataexchange/ffdx/ffdx.go b/internal/dataexchange/ffdx/ffdx.go index 2300530306..a88768ba7d 100644 --- a/internal/dataexchange/ffdx/ffdx.go +++ b/internal/dataexchange/ffdx/ffdx.go @@ -70,12 +70,14 @@ const ( type msgType string const ( - messageReceived msgType = "message-received" - messageDelivered msgType = "message-delivered" - messageFailed msgType = "message-failed" - blobReceived msgType = "blob-received" - blobDelivered msgType = "blob-delivered" - blobFailed msgType = "blob-failed" + messageReceived msgType = "message-received" + messageDelivered msgType = "message-delivered" + messageAcknowledged msgType = "message-acknowledged" + messageFailed msgType = "message-failed" + blobReceived msgType = "blob-received" + blobDelivered msgType = "blob-delivered" + blobAcknowledged msgType = "blob-acknowledged" + blobFailed msgType = "blob-failed" ) type responseWithRequestID struct { @@ -345,8 +347,19 @@ func (h *FFDX) eventLoop() { var manifest string switch msg.Type { case messageFailed: - err = h.callbacks.TransferResult(msg.RequestID, fftypes.OpStatusFailed, fftypes.TransportStatusUpdate{Error: msg.Error}) + err = h.callbacks.TransferResult(msg.RequestID, fftypes.OpStatusFailed, fftypes.TransportStatusUpdate{ + Error: msg.Error, + Info: msg.Info, + }) case messageDelivered: + status := fftypes.OpStatusSucceeded + if h.capabilities.Manifest { + status = fftypes.OpStatusPending + } + err = h.callbacks.TransferResult(msg.RequestID, status, fftypes.TransportStatusUpdate{ + Info: msg.Info, + }) + case messageAcknowledged: err = h.callbacks.TransferResult(msg.RequestID, fftypes.OpStatusSucceeded, fftypes.TransportStatusUpdate{ Manifest: msg.Manifest, Info: msg.Info, @@ -354,9 +367,18 @@ func (h *FFDX) eventLoop() { case messageReceived: manifest, err = h.callbacks.MessageReceived(msg.Sender, []byte(msg.Message)) case blobFailed: - err = h.callbacks.TransferResult(msg.RequestID, fftypes.OpStatusFailed, fftypes.TransportStatusUpdate{Error: msg.Error}) + err = h.callbacks.TransferResult(msg.RequestID, fftypes.OpStatusFailed, fftypes.TransportStatusUpdate{ + Error: msg.Error, + Info: msg.Info, + }) case blobDelivered: - err = h.callbacks.TransferResult(msg.RequestID, fftypes.OpStatusSucceeded, fftypes.TransportStatusUpdate{}) + status := fftypes.OpStatusSucceeded + if h.capabilities.Manifest { + status = fftypes.OpStatusPending + } + err = h.callbacks.TransferResult(msg.RequestID, status, fftypes.TransportStatusUpdate{ + Info: msg.Info, + }) case blobReceived: var hash *fftypes.Bytes32 hash, err = fftypes.ParseBytes32(ctx, msg.Hash) @@ -366,6 +388,11 @@ func (h *FFDX) eventLoop() { } else { err = h.callbacks.BLOBReceived(msg.Sender, *hash, msg.Size, msg.Path) } + case blobAcknowledged: + err = h.callbacks.TransferResult(msg.RequestID, fftypes.OpStatusSucceeded, fftypes.TransportStatusUpdate{ + Hash: msg.Hash, + Info: msg.Info, + }) default: l.Errorf("Message unexpected: %s", msg.Type) } diff --git a/internal/dataexchange/ffdx/ffdx_test.go b/internal/dataexchange/ffdx/ffdx_test.go index 66204c8516..1a659cf09c 100644 --- a/internal/dataexchange/ffdx/ffdx_test.go +++ b/internal/dataexchange/ffdx/ffdx_test.go @@ -39,7 +39,7 @@ import ( var utConfPrefix = config.NewPluginConfig("ffdx_unit_tests") -func newTestFFDX(t *testing.T) (h *FFDX, toServer, fromServer chan string, httpURL string, done func()) { +func newTestFFDX(t *testing.T, manifestEnabled bool) (h *FFDX, toServer, fromServer chan string, httpURL string, done func()) { mockedClient := &http.Client{} httpmock.ActivateNonDefault(mockedClient) @@ -53,10 +53,12 @@ func newTestFFDX(t *testing.T) (h *FFDX, toServer, fromServer chan string, httpU h.InitPrefix(utConfPrefix) utConfPrefix.Set(restclient.HTTPConfigURL, httpURL) utConfPrefix.Set(restclient.HTTPCustomClient, mockedClient) + utConfPrefix.Set(DataExchangeManifestEnabled, manifestEnabled) h = &FFDX{initialized: true} nodes := make([]fftypes.DXInfo, 0) h.InitPrefix(utConfPrefix) + err := h.Init(context.Background(), utConfPrefix, nodes, &dataexchangemocks.Callbacks{}) assert.NoError(t, err) assert.Equal(t, "ffdx", h.Name()) @@ -87,7 +89,7 @@ func TestInitMissingURL(t *testing.T) { } func TestGetEndpointInfo(t *testing.T) { - h, _, _, httpURL, done := newTestFFDX(t) + h, _, _, httpURL, done := newTestFFDX(t, false) defer done() httpmock.RegisterResponder("GET", fmt.Sprintf("%s/api/v1/id", httpURL), @@ -108,7 +110,7 @@ func TestGetEndpointInfo(t *testing.T) { } func TestGetEndpointInfoError(t *testing.T) { - h, _, _, httpURL, done := newTestFFDX(t) + h, _, _, httpURL, done := newTestFFDX(t, false) defer done() httpmock.RegisterResponder("GET", fmt.Sprintf("%s/api/v1/id", httpURL), @@ -119,7 +121,7 @@ func TestGetEndpointInfoError(t *testing.T) { } func TestAddPeer(t *testing.T) { - h, _, _, httpURL, done := newTestFFDX(t) + h, _, _, httpURL, done := newTestFFDX(t, false) defer done() httpmock.RegisterResponder("PUT", fmt.Sprintf("%s/api/v1/peers/peer1", httpURL), @@ -137,7 +139,7 @@ func TestAddPeer(t *testing.T) { } func TestAddPeerError(t *testing.T) { - h, _, _, httpURL, done := newTestFFDX(t) + h, _, _, httpURL, done := newTestFFDX(t, false) defer done() httpmock.RegisterResponder("PUT", fmt.Sprintf("%s/api/v1/peers/peer1", httpURL), @@ -152,7 +154,7 @@ func TestAddPeerError(t *testing.T) { func TestUploadBLOB(t *testing.T) { - h, _, _, httpURL, done := newTestFFDX(t) + h, _, _, httpURL, done := newTestFFDX(t, false) defer done() u := fftypes.NewUUID() @@ -182,7 +184,7 @@ func TestUploadBLOB(t *testing.T) { func TestUploadBLOBBadHash(t *testing.T) { - h, _, _, httpURL, done := newTestFFDX(t) + h, _, _, httpURL, done := newTestFFDX(t, false) defer done() u := fftypes.NewUUID() @@ -203,7 +205,7 @@ func TestUploadBLOBBadHash(t *testing.T) { } func TestUploadBLOBError(t *testing.T) { - h, _, _, httpURL, done := newTestFFDX(t) + h, _, _, httpURL, done := newTestFFDX(t, false) defer done() u := fftypes.NewUUID() @@ -216,7 +218,7 @@ func TestUploadBLOBError(t *testing.T) { func TestCheckBLOBReceivedOk(t *testing.T) { - h, _, _, httpURL, done := newTestFFDX(t) + h, _, _, httpURL, done := newTestFFDX(t, false) defer done() u := fftypes.NewUUID() @@ -241,7 +243,7 @@ func TestCheckBLOBReceivedOk(t *testing.T) { func TestCheckBLOBReceivedBadHash(t *testing.T) { - h, _, _, httpURL, done := newTestFFDX(t) + h, _, _, httpURL, done := newTestFFDX(t, false) defer done() u := fftypes.NewUUID() @@ -262,7 +264,7 @@ func TestCheckBLOBReceivedBadHash(t *testing.T) { func TestCheckBLOBReceivedBadSize(t *testing.T) { - h, _, _, httpURL, done := newTestFFDX(t) + h, _, _, httpURL, done := newTestFFDX(t, false) defer done() u := fftypes.NewUUID() @@ -285,7 +287,7 @@ func TestCheckBLOBReceivedBadSize(t *testing.T) { func TestCheckBLOBReceivedNotFound(t *testing.T) { - h, _, _, httpURL, done := newTestFFDX(t) + h, _, _, httpURL, done := newTestFFDX(t, false) defer done() u := fftypes.NewUUID() @@ -304,7 +306,7 @@ func TestCheckBLOBReceivedNotFound(t *testing.T) { func TestCheckBLOBReceivedError(t *testing.T) { - h, _, _, httpURL, done := newTestFFDX(t) + h, _, _, httpURL, done := newTestFFDX(t, false) defer done() u := fftypes.NewUUID() @@ -322,7 +324,7 @@ func TestCheckBLOBReceivedError(t *testing.T) { func TestDownloadBLOB(t *testing.T) { - h, _, _, httpURL, done := newTestFFDX(t) + h, _, _, httpURL, done := newTestFFDX(t, false) defer done() u := fftypes.NewUUID() @@ -337,7 +339,7 @@ func TestDownloadBLOB(t *testing.T) { } func TestDownloadBLOBError(t *testing.T) { - h, _, _, httpURL, done := newTestFFDX(t) + h, _, _, httpURL, done := newTestFFDX(t, false) defer done() httpmock.RegisterResponder("GET", fmt.Sprintf("%s/api/v1/blobs/bad", httpURL), @@ -349,7 +351,7 @@ func TestDownloadBLOBError(t *testing.T) { func TestSendMessage(t *testing.T) { - h, _, _, httpURL, done := newTestFFDX(t) + h, _, _, httpURL, done := newTestFFDX(t, false) defer done() httpmock.RegisterResponder("POST", fmt.Sprintf("%s/api/v1/messages", httpURL), @@ -360,7 +362,7 @@ func TestSendMessage(t *testing.T) { } func TestSendMessageError(t *testing.T) { - h, _, _, httpURL, done := newTestFFDX(t) + h, _, _, httpURL, done := newTestFFDX(t, false) defer done() httpmock.RegisterResponder("POST", fmt.Sprintf("%s/api/v1/message", httpURL), @@ -372,7 +374,7 @@ func TestSendMessageError(t *testing.T) { func TestTransferBLOB(t *testing.T) { - h, _, _, httpURL, done := newTestFFDX(t) + h, _, _, httpURL, done := newTestFFDX(t, false) defer done() httpmock.RegisterResponder("POST", fmt.Sprintf("%s/api/v1/transfers", httpURL), @@ -383,7 +385,7 @@ func TestTransferBLOB(t *testing.T) { } func TestTransferBLOBError(t *testing.T) { - h, _, _, httpURL, done := newTestFFDX(t) + h, _, _, httpURL, done := newTestFFDX(t, false) defer done() httpmock.RegisterResponder("POST", fmt.Sprintf("%s/api/v1/transfers", httpURL), @@ -395,7 +397,7 @@ func TestTransferBLOBError(t *testing.T) { func TestEvents(t *testing.T) { - h, toServer, fromServer, _, done := newTestFFDX(t) + h, toServer, fromServer, _, done := newTestFFDX(t, false) defer done() err := h.Start() @@ -415,10 +417,15 @@ func TestEvents(t *testing.T) { msg = <-toServer assert.Equal(t, `{"action":"commit"}`, string(msg)) + mcb.On("TransferResult", "tx12345", fftypes.OpStatusSucceeded, mock.Anything).Return(nil) + fromServer <- `{"type":"message-delivered","requestID":"tx12345"}` + msg = <-toServer + assert.Equal(t, `{"action":"commit"}`, string(msg)) + mcb.On("TransferResult", "tx12345", fftypes.OpStatusSucceeded, mock.MatchedBy(func(ts fftypes.TransportStatusUpdate) bool { return ts.Manifest == `{"manifest":true}` && ts.Info.String() == `{"signatures":"and stuff"}` })).Return(nil) - fromServer <- `{"type":"message-delivered","requestID":"tx12345","info":{"signatures":"and stuff"},"manifest":"{\"manifest\":true}"}` + fromServer <- `{"type":"message-acknowledged","requestID":"tx12345","info":{"signatures":"and stuff"},"manifest":"{\"manifest\":true}"}` msg = <-toServer assert.Equal(t, `{"action":"commit"}`, string(msg)) @@ -456,6 +463,41 @@ func TestEvents(t *testing.T) { msg = <-toServer assert.Equal(t, `{"action":"commit"}`, string(msg)) + mcb.On("TransferResult", "tx12345", fftypes.OpStatusSucceeded, mock.MatchedBy(func(ts fftypes.TransportStatusUpdate) bool { + return ts.Manifest == `{"manifest":true}` && ts.Info.String() == `{"signatures":"and stuff"}` + })).Return(nil) + fromServer <- `{"type":"blob-acknowledged","requestID":"tx12345","info":{"signatures":"and stuff"},"manifest":"{\"manifest\":true}"}` + msg = <-toServer + assert.Equal(t, `{"action":"commit"}`, string(msg)) + + mcb.AssertExpectations(t) +} + +func TestEventsWithManifest(t *testing.T) { + + h, toServer, fromServer, _, done := newTestFFDX(t, true) + defer done() + + err := h.Start() + assert.NoError(t, err) + + fromServer <- `!}` // ignored + fromServer <- `{}` // ignored + msg := <-toServer + assert.Equal(t, `{"action":"commit"}`, string(msg)) + + mcb := h.callbacks.(*dataexchangemocks.Callbacks) + + mcb.On("TransferResult", "tx12345", fftypes.OpStatusPending, mock.Anything).Return(nil) + fromServer <- `{"type":"message-delivered","requestID":"tx12345"}` + msg = <-toServer + assert.Equal(t, `{"action":"commit"}`, string(msg)) + + mcb.On("TransferResult", "tx12345", fftypes.OpStatusPending, mock.Anything).Return(nil) + fromServer <- `{"type":"blob-delivered","requestID":"tx12345"}` + msg = <-toServer + assert.Equal(t, `{"action":"commit"}`, string(msg)) + mcb.AssertExpectations(t) } @@ -564,7 +606,7 @@ func TestWebsocketWithReinit(t *testing.T) { } func TestDXUninitialized(t *testing.T) { - h, _, _, _, done := newTestFFDX(t) + h, _, _, _, done := newTestFFDX(t, false) defer done() h.initialized = false diff --git a/internal/events/dx_callbacks.go b/internal/events/dx_callbacks.go index 5d32065d68..6661c2cb0e 100644 --- a/internal/events/dx_callbacks.go +++ b/internal/events/dx_callbacks.go @@ -292,13 +292,25 @@ func (em *eventManager) TransferResult(dx dataexchange.Plugin, trackingID string // The maniest should exactly match that stored into the operation input, if supported op := operations[0] if status == fftypes.OpStatusSucceeded && dx.Capabilities().Manifest { - expectedManifest := op.Input.GetString("manifest") - if update.Manifest != expectedManifest { - // Log and map to failure for user to see that the receiver did not provide a matching acknowledgement - mismatchErr := i18n.NewError(em.ctx, i18n.MsgManifestMismatch, status, update.Manifest) - log.L(em.ctx).Errorf("%s transfer %s: %s", dx.Name(), trackingID, mismatchErr.Error()) - update.Error = mismatchErr.Error() - status = fftypes.OpStatusFailed + switch op.Type { + case fftypes.OpTypeDataExchangeBatchSend: + expectedManifest := op.Input.GetString("manifest") + if update.Manifest != expectedManifest { + // Log and map to failure for user to see that the receiver did not provide a matching acknowledgement + mismatchErr := i18n.NewError(em.ctx, i18n.MsgManifestMismatch, status, update.Manifest) + log.L(em.ctx).Errorf("%s transfer %s: %s", dx.Name(), trackingID, mismatchErr.Error()) + update.Error = mismatchErr.Error() + status = fftypes.OpStatusFailed + } + case fftypes.OpTypeDataExchangeBlobSend: + expectedHash := op.Input.GetString("hash") + if update.Hash != expectedHash { + // Log and map to failure for user to see that the receiver did not provide a matching hash + mismatchErr := i18n.NewError(em.ctx, i18n.MsgBlobHashMismatch, expectedHash, update.Hash) + log.L(em.ctx).Errorf("%s transfer %s: %s", dx.Name(), trackingID, mismatchErr.Error()) + update.Error = mismatchErr.Error() + status = fftypes.OpStatusFailed + } } } diff --git a/internal/events/dx_callbacks_test.go b/internal/events/dx_callbacks_test.go index ef9b30f4e7..0223a07373 100644 --- a/internal/events/dx_callbacks_test.go +++ b/internal/events/dx_callbacks_test.go @@ -450,7 +450,8 @@ func TestTransferResultManifestMismatch(t *testing.T) { id := fftypes.NewUUID() mdi.On("GetOperations", mock.Anything, mock.Anything).Return([]*fftypes.Operation{ { - ID: id, + ID: id, + Type: "dataexchange_batch_send", Input: fftypes.JSONObject{ "manifest": "Bob", }, @@ -475,6 +476,40 @@ func TestTransferResultManifestMismatch(t *testing.T) { } +func TestTransferResultHashtMismatch(t *testing.T) { + em, cancel := newTestEventManager(t) + defer cancel() + + mdi := em.database.(*databasemocks.Plugin) + id := fftypes.NewUUID() + mdi.On("GetOperations", mock.Anything, mock.Anything).Return([]*fftypes.Operation{ + { + ID: id, + Type: "dataexchange_blob_send", + Input: fftypes.JSONObject{ + "hash": "Bob", + }, + }, + }, nil, nil) + mdi.On("ResolveOperation", mock.Anything, id, fftypes.OpStatusFailed, mock.MatchedBy(func(errorMsg string) bool { + return strings.Contains(errorMsg, "FF10348") + }), fftypes.JSONObject{ + "extra": "info", + }).Return(nil) + + mdx := &dataexchangemocks.Plugin{} + mdx.On("Name").Return("utdx") + mdx.On("Capabilities").Return(&dataexchange.Capabilities{ + Manifest: true, + }) + err := em.TransferResult(mdx, id.String(), fftypes.OpStatusSucceeded, fftypes.TransportStatusUpdate{ + Info: fftypes.JSONObject{"extra": "info"}, + Hash: "Sally", + }) + assert.NoError(t, err) + +} + func TestTransferResultNotCorrelated(t *testing.T) { em, cancel := newTestEventManager(t) defer cancel() diff --git a/internal/i18n/en_translations.go b/internal/i18n/en_translations.go index 26e15de9bc..d747754992 100644 --- a/internal/i18n/en_translations.go +++ b/internal/i18n/en_translations.go @@ -263,4 +263,7 @@ var ( MsgInvalidTXTypeForMessage = ffm("FF10343", "Invalid transaction type for sending a message: %s", 400) MsgGroupRequired = ffm("FF10344", "Group must be set", 400) MsgDBLockFailed = ffm("FF10345", "Database lock failed") + MsgFFIGenerationFailed = ffm("FF10346", "Error generating smart contract interface: %s", 400) + MsgFFIGenerationUnsupported = ffm("FF10347", "Smart contract interface generation is not supported by this blockchain plugin", 400) + MsgBlobHashMismatch = ffm("FF10348", "Blob hash mismatch sent=%s received=%s", 400) ) diff --git a/internal/privatemessaging/privatemessaging.go b/internal/privatemessaging/privatemessaging.go index ea80bd3c8c..7717d4d180 100644 --- a/internal/privatemessaging/privatemessaging.go +++ b/internal/privatemessaging/privatemessaging.go @@ -187,6 +187,9 @@ func (pm *privateMessaging) transferBlobs(ctx context.Context, data []*fftypes.D d.Namespace, txid, fftypes.OpTypeDataExchangeBlobSend) + op.Input = fftypes.JSONObject{ + "hash": d.Blob.Hash, + } if err = pm.database.InsertOperation(ctx, op); err != nil { return err } diff --git a/pkg/fftypes/transport_wrapper.go b/pkg/fftypes/transport_wrapper.go index 882617ef5d..4823aedce2 100644 --- a/pkg/fftypes/transport_wrapper.go +++ b/pkg/fftypes/transport_wrapper.go @@ -33,4 +33,5 @@ type TransportStatusUpdate struct { Error string `json:"error,omitempty"` Manifest string `json:"manifest,omitempty"` Info JSONObject `json:"info,omitempty"` + Hash string `json:"hash,omitempty"` }