From bec9fd4f79db67eed635710ae8fca8a566be134c Mon Sep 17 00:00:00 2001 From: Andrew Richardson Date: Tue, 8 Feb 2022 12:03:54 -0500 Subject: [PATCH 1/3] Pass requestId to DX and remove backend_id from Operation Follow the convention of all other plugins by passing a request ID into the connector, rather than reading one from the connector response. This will be enforced as a requirement for future connectors, so backend_id is removed from the operations table. Signed-off-by: Andrew Richardson --- ...000060_remove_operation_backendid.down.sql | 4 ++ .../000060_remove_operation_backendid.up.sql | 4 ++ ...000060_remove_operation_backendid.down.sql | 2 + .../000060_remove_operation_backendid.up.sql | 2 + docs/swagger/swagger.yaml | 13 ------ internal/assets/token_pool.go | 2 - internal/assets/token_transfer.go | 1 - internal/batchpin/batchpin.go | 1 - internal/broadcast/manager.go | 1 - internal/broadcast/manager_test.go | 1 - internal/contracts/manager.go | 1 - internal/database/sqlcommon/operation_sql.go | 12 ++--- .../database/sqlcommon/operation_sql_test.go | 4 +- internal/dataexchange/dxhttps/dxhttps.go | 18 +++++--- internal/dataexchange/dxhttps/dxhttps_test.go | 18 +++----- internal/events/dx_callbacks.go | 13 ++++-- internal/events/dx_callbacks_test.go | 15 +++--- internal/privatemessaging/message_test.go | 6 +-- internal/privatemessaging/privatemessaging.go | 26 +++++------ .../privatemessaging/privatemessaging_test.go | 25 +++++----- mocks/dataexchangemocks/plugin.go | 46 +++++++------------ pkg/database/plugin.go | 1 - pkg/dataexchange/plugin.go | 4 +- pkg/fftypes/operation.go | 4 +- pkg/fftypes/operation_test.go | 3 +- 25 files changed, 99 insertions(+), 128 deletions(-) create mode 100644 db/migrations/postgres/000060_remove_operation_backendid.down.sql create mode 100644 db/migrations/postgres/000060_remove_operation_backendid.up.sql create mode 100644 db/migrations/sqlite/000060_remove_operation_backendid.down.sql create mode 100644 db/migrations/sqlite/000060_remove_operation_backendid.up.sql diff --git a/db/migrations/postgres/000060_remove_operation_backendid.down.sql b/db/migrations/postgres/000060_remove_operation_backendid.down.sql new file mode 100644 index 0000000000..49bcbe7bde --- /dev/null +++ b/db/migrations/postgres/000060_remove_operation_backendid.down.sql @@ -0,0 +1,4 @@ +BEGIN; +ALTER TABLE operations ADD COLUMN backend_id VARCHAR(256); +CREATE INDEX operations_backend ON operations(backend_id); +COMMIT; diff --git a/db/migrations/postgres/000060_remove_operation_backendid.up.sql b/db/migrations/postgres/000060_remove_operation_backendid.up.sql new file mode 100644 index 0000000000..fcd0c645f6 --- /dev/null +++ b/db/migrations/postgres/000060_remove_operation_backendid.up.sql @@ -0,0 +1,4 @@ +BEGIN; +DROP INDEX operations_backend; +ALTER TABLE operations DROP COLUMN backend_id; +COMMIT; diff --git a/db/migrations/sqlite/000060_remove_operation_backendid.down.sql b/db/migrations/sqlite/000060_remove_operation_backendid.down.sql new file mode 100644 index 0000000000..238a522057 --- /dev/null +++ b/db/migrations/sqlite/000060_remove_operation_backendid.down.sql @@ -0,0 +1,2 @@ +ALTER TABLE operations ADD COLUMN backend_id VARCHAR(256); +CREATE INDEX operations_backend ON operations(backend_id); diff --git a/db/migrations/sqlite/000060_remove_operation_backendid.up.sql b/db/migrations/sqlite/000060_remove_operation_backendid.up.sql new file mode 100644 index 0000000000..4d25cccfdc --- /dev/null +++ b/db/migrations/sqlite/000060_remove_operation_backendid.up.sql @@ -0,0 +1,2 @@ +DROP INDEX operations_backend; +ALTER TABLE operations DROP COLUMN backend_id; diff --git a/docs/swagger/swagger.yaml b/docs/swagger/swagger.yaml index 35f99af619..d4a400f936 100644 --- a/docs/swagger/swagger.yaml +++ b/docs/swagger/swagger.yaml @@ -4630,8 +4630,6 @@ paths: application/json: schema: properties: - backendId: - type: string created: {} error: type: string @@ -5292,11 +5290,6 @@ paths: schema: default: 120s type: string - - description: 'Data filter field. Prefixes supported: > >= < <= @ ^ ! !@ !^' - in: query - name: backendid - schema: - type: string - description: 'Data filter field. Prefixes supported: > >= < <= @ ^ ! !@ !^' in: query name: created @@ -5392,8 +5385,6 @@ paths: application/json: schema: properties: - backendId: - type: string created: {} error: type: string @@ -5458,8 +5449,6 @@ paths: application/json: schema: properties: - backendId: - type: string created: {} error: type: string @@ -8032,8 +8021,6 @@ paths: schema: items: properties: - backendId: - type: string created: {} error: type: string diff --git a/internal/assets/token_pool.go b/internal/assets/token_pool.go index c4ae5386ce..37480afd93 100644 --- a/internal/assets/token_pool.go +++ b/internal/assets/token_pool.go @@ -80,7 +80,6 @@ func (am *assetManager) createTokenPoolInternal(ctx context.Context, pool *fftyp plugin, pool.Namespace, txid, - "", fftypes.OpTypeTokenCreatePool) txcommon.AddTokenPoolCreateInputs(op, pool) @@ -109,7 +108,6 @@ func (am *assetManager) ActivateTokenPool(ctx context.Context, pool *fftypes.Tok plugin, pool.Namespace, pool.TX.ID, - "", fftypes.OpTypeTokenActivatePool) if err := am.database.InsertOperation(ctx, op); err != nil { return err diff --git a/internal/assets/token_transfer.go b/internal/assets/token_transfer.go index f11c4bc354..b6a7bc6bad 100644 --- a/internal/assets/token_transfer.go +++ b/internal/assets/token_transfer.go @@ -243,7 +243,6 @@ func (s *transferSender) sendInternal(ctx context.Context, method sendMethod) er plugin, s.namespace, txid, - "", fftypes.OpTypeTokenTransfer) if err = txcommon.AddTokenTransferInputs(op, &s.transfer.TokenTransfer); err == nil { err = s.mgr.database.InsertOperation(ctx, op) diff --git a/internal/batchpin/batchpin.go b/internal/batchpin/batchpin.go index 780cd16145..0300f89306 100644 --- a/internal/batchpin/batchpin.go +++ b/internal/batchpin/batchpin.go @@ -54,7 +54,6 @@ func (bp *batchPinSubmitter) SubmitPinnedBatch(ctx context.Context, batch *fftyp bp.blockchain, batch.Namespace, batch.Payload.TX.ID, - "", fftypes.OpTypeBlockchainBatchPin) if err := bp.database.InsertOperation(ctx, op); err != nil { return err diff --git a/internal/broadcast/manager.go b/internal/broadcast/manager.go index 725b307e9d..19f86b3ef3 100644 --- a/internal/broadcast/manager.go +++ b/internal/broadcast/manager.go @@ -128,7 +128,6 @@ func (bm *broadcastManager) submitTXAndUpdateDB(ctx context.Context, batch *ffty bm.publicstorage, batch.Namespace, batch.Payload.TX.ID, - batch.PayloadRef, fftypes.OpTypePublicStorageBatchBroadcast) op.Status = fftypes.OpStatusSucceeded // Note we performed the action synchronously above err = bm.database.InsertOperation(ctx, op) diff --git a/internal/broadcast/manager_test.go b/internal/broadcast/manager_test.go index ad393f3026..acbfdbc20e 100644 --- a/internal/broadcast/manager_test.go +++ b/internal/broadcast/manager_test.go @@ -253,7 +253,6 @@ func TestSubmitTXAndUpdateDBSucceed(t *testing.T) { op := mdi.Calls[1].Arguments[1].(*fftypes.Operation) assert.Equal(t, *batch.Payload.TX.ID, *op.Transaction) assert.Equal(t, "ut_publicstorage", op.Plugin) - assert.Equal(t, "ipfs_id", op.BackendID) assert.Equal(t, fftypes.OpTypePublicStorageBatchBroadcast, op.Type) } diff --git a/internal/contracts/manager.go b/internal/contracts/manager.go index acdda7b01e..6e71491e36 100644 --- a/internal/contracts/manager.go +++ b/internal/contracts/manager.go @@ -187,7 +187,6 @@ func (cm *contractManager) InvokeContract(ctx context.Context, ns string, req *f cm.blockchain, ns, txid, - "", fftypes.OpTypeBlockchainInvoke) op.Input = req.Input return cm.database.InsertOperation(ctx, op) diff --git a/internal/database/sqlcommon/operation_sql.go b/internal/database/sqlcommon/operation_sql.go index c69617d286..93112ae9b4 100644 --- a/internal/database/sqlcommon/operation_sql.go +++ b/internal/database/sqlcommon/operation_sql.go @@ -1,4 +1,4 @@ -// Copyright © 2021 Kaleido, Inc. +// Copyright © 2022 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -35,7 +35,6 @@ var ( "optype", "opstatus", "plugin", - "backend_id", "created", "updated", "error", @@ -43,10 +42,9 @@ var ( "output", } opFilterFieldMap = map[string]string{ - "tx": "tx_id", - "type": "optype", - "status": "opstatus", - "backendid": "backend_id", + "tx": "tx_id", + "type": "optype", + "status": "opstatus", } ) @@ -67,7 +65,6 @@ func (s *SQLCommon) InsertOperation(ctx context.Context, operation *fftypes.Oper string(operation.Type), string(operation.Status), operation.Plugin, - operation.BackendID, operation.Created, operation.Updated, operation.Error, @@ -93,7 +90,6 @@ func (s *SQLCommon) opResult(ctx context.Context, row *sql.Rows) (*fftypes.Opera &op.Type, &op.Status, &op.Plugin, - &op.BackendID, &op.Created, &op.Updated, &op.Error, diff --git a/internal/database/sqlcommon/operation_sql_test.go b/internal/database/sqlcommon/operation_sql_test.go index e644677b37..f3c1b0ef60 100644 --- a/internal/database/sqlcommon/operation_sql_test.go +++ b/internal/database/sqlcommon/operation_sql_test.go @@ -1,4 +1,4 @@ -// Copyright © 2021 Kaleido, Inc. +// Copyright © 2022 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -43,7 +43,6 @@ func TestOperationE2EWithDB(t *testing.T) { Transaction: fftypes.NewUUID(), Status: fftypes.OpStatusFailed, Plugin: "ethereum", - BackendID: fftypes.NewRandB32().String(), Error: "pop", Input: fftypes.JSONObject{"some": "input-info"}, Output: fftypes.JSONObject{"some": "output-info"}, @@ -70,7 +69,6 @@ func TestOperationE2EWithDB(t *testing.T) { fb.Eq("status", operation.Status), fb.Eq("error", operation.Error), fb.Eq("plugin", operation.Plugin), - fb.Eq("backendid", operation.BackendID), fb.Gt("created", 0), fb.Gt("updated", 0), ) diff --git a/internal/dataexchange/dxhttps/dxhttps.go b/internal/dataexchange/dxhttps/dxhttps.go index a37f6f72f3..c38f5fb46e 100644 --- a/internal/dataexchange/dxhttps/dxhttps.go +++ b/internal/dataexchange/dxhttps/dxhttps.go @@ -47,7 +47,7 @@ type wsEvent struct { Type msgType `json:"type"` Sender string `json:"sender"` Recipient string `json:"recipient"` - RequestID string `json:"requestID"` + RequestID string `json:"requestId"` Path string `json:"path"` Message string `json:"message"` Hash string `json:"hash"` @@ -86,11 +86,13 @@ type uploadBlob struct { type sendMessage struct { Message string `json:"message"` Recipient string `json:"recipient"` + RequestID string `json:"requestId"` } type transferBlob struct { Path string `json:"path"` Recipient string `json:"recipient"` + RequestID string `json:"requestId"` } type wsAck struct { @@ -183,34 +185,36 @@ func (h *HTTPS) DownloadBLOB(ctx context.Context, payloadRef string) (content io return res.RawBody(), nil } -func (h *HTTPS) SendMessage(ctx context.Context, peerID string, data []byte) (trackingID string, err error) { +func (h *HTTPS) SendMessage(ctx context.Context, opID *fftypes.UUID, peerID string, data []byte) (err error) { var responseData responseWithRequestID res, err := h.client.R().SetContext(ctx). SetBody(&sendMessage{ Message: string(data), Recipient: peerID, + RequestID: opID.String(), }). SetResult(&responseData). Post("/api/v1/messages") if err != nil || !res.IsSuccess() { - return "", restclient.WrapRestErr(ctx, res, err, i18n.MsgDXRESTErr) + return restclient.WrapRestErr(ctx, res, err, i18n.MsgDXRESTErr) } - return responseData.RequestID, nil + return nil } -func (h *HTTPS) TransferBLOB(ctx context.Context, peerID, payloadRef string) (trackingID string, err error) { +func (h *HTTPS) TransferBLOB(ctx context.Context, opID *fftypes.UUID, peerID, payloadRef string) (err error) { var responseData responseWithRequestID res, err := h.client.R().SetContext(ctx). SetBody(&transferBlob{ Path: fmt.Sprintf("/%s", payloadRef), Recipient: peerID, + RequestID: opID.String(), }). SetResult(&responseData). Post("/api/v1/transfers") if err != nil || !res.IsSuccess() { - return "", restclient.WrapRestErr(ctx, res, err, i18n.MsgDXRESTErr) + return restclient.WrapRestErr(ctx, res, err, i18n.MsgDXRESTErr) } - return responseData.RequestID, nil + return nil } func (h *HTTPS) CheckBLOBReceived(ctx context.Context, peerID, ns string, id fftypes.UUID) (hash *fftypes.Bytes32, size int64, err error) { diff --git a/internal/dataexchange/dxhttps/dxhttps_test.go b/internal/dataexchange/dxhttps/dxhttps_test.go index 7caf43e2f4..90b3d9d290 100644 --- a/internal/dataexchange/dxhttps/dxhttps_test.go +++ b/internal/dataexchange/dxhttps/dxhttps_test.go @@ -347,13 +347,10 @@ func TestSendMessage(t *testing.T) { defer done() httpmock.RegisterResponder("POST", fmt.Sprintf("%s/api/v1/messages", httpURL), - httpmock.NewJsonResponderOrPanic(200, fftypes.JSONObject{ - "requestID": "abcd1234", - })) + httpmock.NewJsonResponderOrPanic(200, fftypes.JSONObject{})) - trackingID, err := h.SendMessage(context.Background(), "peer1", []byte(`some data`)) + err := h.SendMessage(context.Background(), fftypes.NewUUID(), "peer1", []byte(`some data`)) assert.NoError(t, err) - assert.Equal(t, "abcd1234", trackingID) } func TestSendMessageError(t *testing.T) { @@ -363,7 +360,7 @@ func TestSendMessageError(t *testing.T) { httpmock.RegisterResponder("POST", fmt.Sprintf("%s/api/v1/message", httpURL), httpmock.NewJsonResponderOrPanic(500, fftypes.JSONObject{})) - _, err := h.SendMessage(context.Background(), "peer1", []byte(`some data`)) + err := h.SendMessage(context.Background(), fftypes.NewUUID(), "peer1", []byte(`some data`)) assert.Regexp(t, "FF10229", err) } @@ -373,13 +370,10 @@ func TestTransferBLOB(t *testing.T) { defer done() httpmock.RegisterResponder("POST", fmt.Sprintf("%s/api/v1/transfers", httpURL), - httpmock.NewJsonResponderOrPanic(200, fftypes.JSONObject{ - "requestID": "abcd1234", - })) + httpmock.NewJsonResponderOrPanic(200, fftypes.JSONObject{})) - trackingID, err := h.TransferBLOB(context.Background(), "peer1", "ns1/id1") + err := h.TransferBLOB(context.Background(), fftypes.NewUUID(), "peer1", "ns1/id1") assert.NoError(t, err) - assert.Equal(t, "abcd1234", trackingID) } func TestTransferBLOBError(t *testing.T) { @@ -389,7 +383,7 @@ func TestTransferBLOBError(t *testing.T) { httpmock.RegisterResponder("POST", fmt.Sprintf("%s/api/v1/transfers", httpURL), httpmock.NewJsonResponderOrPanic(500, fftypes.JSONObject{})) - _, err := h.TransferBLOB(context.Background(), "peer1", "ns1/id1") + err := h.TransferBLOB(context.Background(), fftypes.NewUUID(), "peer1", "ns1/id1") assert.Regexp(t, "FF10229", err) } diff --git a/internal/events/dx_callbacks.go b/internal/events/dx_callbacks.go index aac7a7600b..a9176c2e78 100644 --- a/internal/events/dx_callbacks.go +++ b/internal/events/dx_callbacks.go @@ -238,7 +238,7 @@ func (em *eventManager) TransferResult(dx dataexchange.Plugin, trackingID string var operations []*fftypes.Operation fb := database.OperationQueryFactory.NewFilter(em.ctx) filter := fb.And( - fb.Eq("backendid", trackingID), + fb.Eq("id", trackingID), fb.Eq("plugin", dx.Name()), ) operations, _, err = em.database.GetOperations(em.ctx, filter) @@ -256,7 +256,7 @@ func (em *eventManager) TransferResult(dx dataexchange.Plugin, trackingID string return true, i18n.NewError(em.ctx, i18n.Msg404NotFound) } - // The maniest should exactly match that stored into the operation input, if supported + // The manifest should exactly match that stored into the operation input, if supported op := operations[0] if status == fftypes.OpStatusSucceeded && dx.Capabilities().Manifest { expectedManifest := op.Input.GetString("manifest") @@ -269,10 +269,17 @@ func (em *eventManager) TransferResult(dx dataexchange.Plugin, trackingID string } } + // Save any interesting outputs + // Note that we don't need the manifest to be kept here, as it's already in the input + output := fftypes.JSONObject{} + if update.Info != "" { + output["info"] = update.Info + } + update := database.OperationQueryFactory.NewUpdate(em.ctx). Set("status", status). Set("error", update.Error). - Set("output", update.Info) // We don't need the manifest to be kept here, as it's already in the input + Set("output", output) if err := em.database.UpdateOperation(em.ctx, op.ID, update); err != nil { return true, err // this is always retryable } diff --git a/internal/events/dx_callbacks_test.go b/internal/events/dx_callbacks_test.go index db2d2c3070..00ffee7340 100644 --- a/internal/events/dx_callbacks_test.go +++ b/internal/events/dx_callbacks_test.go @@ -489,15 +489,14 @@ func TestTransferResultOk(t *testing.T) { id := fftypes.NewUUID() mdi.On("GetOperations", mock.Anything, mock.Anything).Return([]*fftypes.Operation{ { - ID: id, - BackendID: "tracking12345", + ID: id, }, }, nil, nil) mdi.On("UpdateOperation", mock.Anything, id, mock.Anything).Return(nil) mdx := &dataexchangemocks.Plugin{} mdx.On("Name").Return("utdx") - err := em.TransferResult(mdx, "tracking12345", fftypes.OpStatusFailed, fftypes.TransportStatusUpdate{ + err := em.TransferResult(mdx, id.String(), fftypes.OpStatusFailed, fftypes.TransportStatusUpdate{ Error: "error info", Info: `{"extra": "info"}`, }) @@ -513,8 +512,7 @@ func TestTransferResultManifestMismatch(t *testing.T) { id := fftypes.NewUUID() mdi.On("GetOperations", mock.Anything, mock.Anything).Return([]*fftypes.Operation{ { - ID: id, - BackendID: "tracking12345", + ID: id, Input: fftypes.JSONObject{ "manifest": "Bob", }, @@ -527,7 +525,7 @@ func TestTransferResultManifestMismatch(t *testing.T) { mdx.On("Capabilities").Return(&dataexchange.Capabilities{ Manifest: true, }) - err := em.TransferResult(mdx, "tracking12345", fftypes.OpStatusSucceeded, fftypes.TransportStatusUpdate{ + err := em.TransferResult(mdx, id.String(), fftypes.OpStatusSucceeded, fftypes.TransportStatusUpdate{ Info: `{"extra": "info"}`, Manifest: "Sally", }) @@ -596,15 +594,14 @@ func TestTransferUpdateFail(t *testing.T) { id := fftypes.NewUUID() mdi.On("GetOperations", mock.Anything, mock.Anything).Return([]*fftypes.Operation{ { - ID: id, - BackendID: "tracking12345", + ID: id, }, }, nil, nil) mdi.On("UpdateOperation", mock.Anything, id, mock.Anything).Return(fmt.Errorf("pop")) mdx := &dataexchangemocks.Plugin{} mdx.On("Name").Return("utdx") - err := em.TransferResult(mdx, "tracking12345", fftypes.OpStatusFailed, fftypes.TransportStatusUpdate{ + err := em.TransferResult(mdx, id.String(), fftypes.OpStatusFailed, fftypes.TransportStatusUpdate{ Error: "error info", Info: `{"extra": "info"}`, }) diff --git a/internal/privatemessaging/message_test.go b/internal/privatemessaging/message_test.go index 0b2214861f..b83179e5bc 100644 --- a/internal/privatemessaging/message_test.go +++ b/internal/privatemessaging/message_test.go @@ -139,7 +139,7 @@ func TestSendUnpinnedMessageE2EOk(t *testing.T) { mdi.On("InsertEvent", pm.ctx, mock.Anything).Return(nil).Once() mdx := pm.exchange.(*dataexchangemocks.Plugin) - mdx.On("SendMessage", pm.ctx, "peer2-remote", mock.Anything).Return("tracking1", nil).Once() + mdx.On("SendMessage", pm.ctx, mock.Anything, "peer2-remote", mock.Anything).Return(nil).Once() msg, err := pm.SendMessage(pm.ctx, "ns1", &fftypes.MessageInOut{ Message: fftypes.Message{ @@ -661,7 +661,7 @@ func TestSendUnpinnedMessageResolveGroupFail(t *testing.T) { mdi.On("UpsertMessage", pm.ctx, mock.Anything, database.UpsertOptimizationNew).Return(nil).Once() mdx := pm.exchange.(*dataexchangemocks.Plugin) - mdx.On("SendMessage", pm.ctx, "peer2-remote", mock.Anything).Return("tracking1", nil).Once() + mdx.On("SendMessage", pm.ctx, mock.Anything, "peer2-remote", mock.Anything).Return(nil).Once() _, err := pm.SendMessage(pm.ctx, "ns1", &fftypes.MessageInOut{ Message: fftypes.Message{ @@ -728,7 +728,7 @@ func TestSendUnpinnedMessageEventFail(t *testing.T) { mdi.On("InsertEvent", pm.ctx, mock.Anything).Return(fmt.Errorf("pop")).Once() mdx := pm.exchange.(*dataexchangemocks.Plugin) - mdx.On("SendMessage", pm.ctx, "peer2-remote", mock.Anything).Return("tracking1", nil).Once() + mdx.On("SendMessage", pm.ctx, mock.Anything, "peer2-remote", mock.Anything).Return(nil).Once() _, err := pm.SendMessage(pm.ctx, "ns1", &fftypes.MessageInOut{ Message: fftypes.Message{ diff --git a/internal/privatemessaging/privatemessaging.go b/internal/privatemessaging/privatemessaging.go index b93b666749..98a87f03da 100644 --- a/internal/privatemessaging/privatemessaging.go +++ b/internal/privatemessaging/privatemessaging.go @@ -152,21 +152,21 @@ func (pm *privateMessaging) transferBlobs(ctx context.Context, data []*fftypes.D return i18n.NewError(ctx, i18n.MsgBlobNotFound, d.Blob) } - trackingID, err := pm.exchange.TransferBLOB(ctx, node.DX.Peer, blob.PayloadRef) - if err != nil { - return err - } - + opID := fftypes.NewUUID() if txid != nil { op := fftypes.NewOperation( pm.exchange, d.Namespace, txid, - trackingID, fftypes.OpTypeDataExchangeBlobSend) if err = pm.database.InsertOperation(ctx, op); err != nil { return err } + opID = op.ID + } + + if err := pm.exchange.TransferBLOB(ctx, opID, node.DX.Peer, blob.PayloadRef); err != nil { + return err } } } @@ -202,18 +202,12 @@ func (pm *privateMessaging) sendData(ctx context.Context, mType string, mID *fft return err } - // Send the payload itself - trackingID, err := pm.exchange.SendMessage(ctx, node.DX.Peer, payload) - if err != nil { - return err - } - + opID := fftypes.NewUUID() if txid != nil { op := fftypes.NewOperation( pm.exchange, ns, txid, - trackingID, fftypes.OpTypeDataExchangeBatchSend) op.Input = fftypes.JSONObject{ "manifest": tw.Manifest().String(), @@ -221,8 +215,14 @@ func (pm *privateMessaging) sendData(ctx context.Context, mType string, mID *fft if err = pm.database.InsertOperation(ctx, op); err != nil { return err } + opID = op.ID } + // Send the payload itself + err := pm.exchange.SendMessage(ctx, opID, node.DX.Peer, payload) + if err != nil { + return err + } } return nil diff --git a/internal/privatemessaging/privatemessaging_test.go b/internal/privatemessaging/privatemessaging_test.go index 776794fc57..95093e76aa 100644 --- a/internal/privatemessaging/privatemessaging_test.go +++ b/internal/privatemessaging/privatemessaging_test.go @@ -129,22 +129,22 @@ func TestDispatchBatchWithBlobs(t *testing.T) { Hash: blob1, PayloadRef: "/blob/1", }, nil) - mdx.On("TransferBLOB", pm.ctx, "node1", "/blob/1").Return("tracking1", nil) + mdx.On("TransferBLOB", pm.ctx, mock.Anything, "node1", "/blob/1").Return(nil).Once() mdi.On("InsertOperation", pm.ctx, mock.MatchedBy(func(op *fftypes.Operation) bool { - return op.BackendID == "tracking1" && op.Type == fftypes.OpTypeDataExchangeBlobSend + return op.Type == fftypes.OpTypeDataExchangeBlobSend })).Return(nil, nil) - mdx.On("TransferBLOB", pm.ctx, "node2", "/blob/1").Return("tracking2", nil) + mdx.On("TransferBLOB", pm.ctx, mock.Anything, "node2", "/blob/1").Return(nil).Once() mdi.On("InsertOperation", pm.ctx, mock.MatchedBy(func(op *fftypes.Operation) bool { - return op.BackendID == "tracking2" && op.Type == fftypes.OpTypeDataExchangeBlobSend + return op.Type == fftypes.OpTypeDataExchangeBlobSend })).Return(nil, nil) - mdx.On("SendMessage", pm.ctx, mock.Anything, mock.Anything).Return("tracking3", nil).Once() + mdx.On("SendMessage", pm.ctx, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once() mdi.On("InsertOperation", pm.ctx, mock.MatchedBy(func(op *fftypes.Operation) bool { - return op.BackendID == "tracking3" && op.Type == fftypes.OpTypeDataExchangeBatchSend + return op.Type == fftypes.OpTypeDataExchangeBatchSend })).Return(nil, nil) - mdx.On("SendMessage", pm.ctx, mock.Anything, mock.Anything).Return("tracking4", nil).Once() + mdx.On("SendMessage", pm.ctx, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once() mdi.On("InsertOperation", pm.ctx, mock.MatchedBy(func(op *fftypes.Operation) bool { - return op.BackendID == "tracking4" && op.Type == fftypes.OpTypeDataExchangeBatchSend + return op.Type == fftypes.OpTypeDataExchangeBatchSend })).Return(nil, nil) mbp.On("SubmitPinnedBatch", pm.ctx, mock.Anything, mock.Anything).Return(nil) @@ -258,7 +258,7 @@ func TestSendImmediateFail(t *testing.T) { mim.On("GetLocalOrgKey", pm.ctx).Return("localorg", nil) mdx := pm.exchange.(*dataexchangemocks.Plugin) - mdx.On("SendMessage", pm.ctx, mock.Anything, mock.Anything).Return("", fmt.Errorf("pop")) + mdx.On("SendMessage", pm.ctx, mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("pop")) err := pm.sendAndSubmitBatch(pm.ctx, &fftypes.Batch{ Identity: fftypes.Identity{ @@ -283,7 +283,7 @@ func TestSendSubmitInsertOperationFail(t *testing.T) { mim.On("GetLocalOrgKey", pm.ctx).Return("localorgkey", nil) mdx := pm.exchange.(*dataexchangemocks.Plugin) - mdx.On("SendMessage", pm.ctx, mock.Anything, mock.Anything).Return("tracking1", nil) + mdx.On("SendMessage", pm.ctx, mock.Anything, mock.Anything, mock.Anything).Return(nil) mdi := pm.database.(*databasemocks.Plugin) mdi.On("InsertOperation", pm.ctx, mock.Anything).Return(fmt.Errorf("pop")) @@ -376,7 +376,8 @@ func TestTransferBlobsFail(t *testing.T) { mdi := pm.database.(*databasemocks.Plugin) mdi.On("GetBlobMatchingHash", pm.ctx, mock.Anything).Return(&fftypes.Blob{PayloadRef: "blob/1"}, nil) mdx := pm.exchange.(*dataexchangemocks.Plugin) - mdx.On("TransferBLOB", pm.ctx, "peer1", "blob/1").Return("", fmt.Errorf("pop")) + mdx.On("TransferBLOB", pm.ctx, mock.Anything, "peer1", "blob/1").Return(fmt.Errorf("pop")) + mdi.On("InsertOperation", pm.ctx, mock.Anything).Return(nil) err := pm.transferBlobs(pm.ctx, []*fftypes.Data{ {ID: fftypes.NewUUID(), Hash: fftypes.NewRandB32(), Blob: &fftypes.BlobRef{Hash: fftypes.NewRandB32()}}, @@ -392,7 +393,7 @@ func TestTransferBlobsOpInsertFail(t *testing.T) { mdx := pm.exchange.(*dataexchangemocks.Plugin) mdi.On("GetBlobMatchingHash", pm.ctx, mock.Anything).Return(&fftypes.Blob{PayloadRef: "blob/1"}, nil) - mdx.On("TransferBLOB", pm.ctx, "peer1", "blob/1").Return("tracking1", nil) + mdx.On("TransferBLOB", pm.ctx, mock.Anything, "peer1", "blob/1").Return(nil) mdi.On("InsertOperation", pm.ctx, mock.Anything).Return(fmt.Errorf("pop")) err := pm.transferBlobs(pm.ctx, []*fftypes.Data{ diff --git a/mocks/dataexchangemocks/plugin.go b/mocks/dataexchangemocks/plugin.go index 6e320d6011..b179c52f34 100644 --- a/mocks/dataexchangemocks/plugin.go +++ b/mocks/dataexchangemocks/plugin.go @@ -167,25 +167,18 @@ func (_m *Plugin) Name() string { return r0 } -// SendMessage provides a mock function with given fields: ctx, peerID, data -func (_m *Plugin) SendMessage(ctx context.Context, peerID string, data []byte) (string, error) { - ret := _m.Called(ctx, peerID, data) +// SendMessage provides a mock function with given fields: ctx, opID, peerID, data +func (_m *Plugin) SendMessage(ctx context.Context, opID *fftypes.UUID, peerID string, data []byte) error { + ret := _m.Called(ctx, opID, peerID, data) - var r0 string - if rf, ok := ret.Get(0).(func(context.Context, string, []byte) string); ok { - r0 = rf(ctx, peerID, data) - } else { - r0 = ret.Get(0).(string) - } - - var r1 error - if rf, ok := ret.Get(1).(func(context.Context, string, []byte) error); ok { - r1 = rf(ctx, peerID, data) + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, *fftypes.UUID, string, []byte) error); ok { + r0 = rf(ctx, opID, peerID, data) } else { - r1 = ret.Error(1) + r0 = ret.Error(0) } - return r0, r1 + return r0 } // Start provides a mock function with given fields: @@ -202,25 +195,18 @@ func (_m *Plugin) Start() error { return r0 } -// TransferBLOB provides a mock function with given fields: ctx, peerID, payloadRef -func (_m *Plugin) TransferBLOB(ctx context.Context, peerID string, payloadRef string) (string, error) { - ret := _m.Called(ctx, peerID, payloadRef) +// TransferBLOB provides a mock function with given fields: ctx, opID, peerID, payloadRef +func (_m *Plugin) TransferBLOB(ctx context.Context, opID *fftypes.UUID, peerID string, payloadRef string) error { + ret := _m.Called(ctx, opID, peerID, payloadRef) - var r0 string - if rf, ok := ret.Get(0).(func(context.Context, string, string) string); ok { - r0 = rf(ctx, peerID, payloadRef) - } else { - r0 = ret.Get(0).(string) - } - - var r1 error - if rf, ok := ret.Get(1).(func(context.Context, string, string) error); ok { - r1 = rf(ctx, peerID, payloadRef) + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, *fftypes.UUID, string, string) error); ok { + r0 = rf(ctx, opID, peerID, payloadRef) } else { - r1 = ret.Error(1) + r0 = ret.Error(0) } - return r0, r1 + return r0 } // UploadBLOB provides a mock function with given fields: ctx, ns, id, content diff --git a/pkg/database/plugin.go b/pkg/database/plugin.go index 307eaa0154..58e57da03c 100644 --- a/pkg/database/plugin.go +++ b/pkg/database/plugin.go @@ -744,7 +744,6 @@ var OperationQueryFactory = &queryFields{ "plugin": &StringField{}, "input": &JSONField{}, "output": &JSONField{}, - "backendid": &StringField{}, "created": &TimeField{}, "updated": &TimeField{}, } diff --git a/pkg/dataexchange/plugin.go b/pkg/dataexchange/plugin.go index fe50d3f0f6..539b33157a 100644 --- a/pkg/dataexchange/plugin.go +++ b/pkg/dataexchange/plugin.go @@ -86,10 +86,10 @@ type Plugin interface { // SendMessage sends an in-line package of data to another network node. // Should return as quickly as possible for parallelsim, then report completion asynchronously via the operation ID - SendMessage(ctx context.Context, peerID string, data []byte) (trackingID string, err error) + SendMessage(ctx context.Context, opID *fftypes.UUID, peerID string, data []byte) (err error) // TransferBLOB initiates a transfer of a previoiusly stored blob to another node - TransferBLOB(ctx context.Context, peerID string, payloadRef string) (trackingID string, err error) + TransferBLOB(ctx context.Context, opID *fftypes.UUID, peerID string, payloadRef string) (err error) } // Callbacks is the interface provided to the data exchange plugin, to allow it to pass events back to firefly. diff --git a/pkg/fftypes/operation.go b/pkg/fftypes/operation.go index 4932cc018c..265e8735d9 100644 --- a/pkg/fftypes/operation.go +++ b/pkg/fftypes/operation.go @@ -57,12 +57,11 @@ type Named interface { } // NewOperation creates a new operation in a transaction -func NewOperation(plugin Named, namespace string, tx *UUID, backendID string, opType OpType) *Operation { +func NewOperation(plugin Named, namespace string, tx *UUID, opType OpType) *Operation { return &Operation{ ID: NewUUID(), Namespace: namespace, Plugin: plugin.Name(), - BackendID: backendID, Transaction: tx, Type: opType, Status: OpStatusPending, @@ -79,7 +78,6 @@ type Operation struct { Status OpStatus `json:"status"` Error string `json:"error,omitempty"` Plugin string `json:"plugin"` - BackendID string `json:"backendId"` Input JSONObject `json:"input,omitempty"` Output JSONObject `json:"output,omitempty"` Created *FFTime `json:"created,omitempty"` diff --git a/pkg/fftypes/operation_test.go b/pkg/fftypes/operation_test.go index b94ab7bcc5..82b01b0f0e 100644 --- a/pkg/fftypes/operation_test.go +++ b/pkg/fftypes/operation_test.go @@ -29,13 +29,12 @@ func (f *fakePlugin) Name() string { return "fake" } func TestNewPendingMessageOp(t *testing.T) { txID := NewUUID() - op := NewOperation(&fakePlugin{}, "ns1", txID, "testBackend", OpTypePublicStorageBatchBroadcast) + op := NewOperation(&fakePlugin{}, "ns1", txID, OpTypePublicStorageBatchBroadcast) assert.Equal(t, Operation{ ID: op.ID, Namespace: "ns1", Transaction: txID, Plugin: "fake", - BackendID: "testBackend", Type: OpTypePublicStorageBatchBroadcast, Status: OpStatusPending, Created: op.Created, From b848a169ff1e00775df2e40a70428d014011f5fc Mon Sep 17 00:00:00 2001 From: Andrew Richardson Date: Tue, 8 Feb 2022 15:06:34 -0500 Subject: [PATCH 2/3] Change DX "info" parameter to be a JSON object instead of a string Signed-off-by: Andrew Richardson --- internal/dataexchange/dxhttps/dxhttps.go | 22 +++++++++---------- internal/dataexchange/dxhttps/dxhttps_test.go | 4 ++-- internal/events/dx_callbacks.go | 9 +------- internal/events/dx_callbacks_test.go | 12 +++++----- internal/orchestrator/bound_callbacks_test.go | 2 +- pkg/fftypes/transport_wrapper.go | 6 ++--- 6 files changed, 24 insertions(+), 31 deletions(-) diff --git a/internal/dataexchange/dxhttps/dxhttps.go b/internal/dataexchange/dxhttps/dxhttps.go index c38f5fb46e..29b8190b51 100644 --- a/internal/dataexchange/dxhttps/dxhttps.go +++ b/internal/dataexchange/dxhttps/dxhttps.go @@ -44,17 +44,17 @@ type HTTPS struct { } type wsEvent struct { - Type msgType `json:"type"` - Sender string `json:"sender"` - Recipient string `json:"recipient"` - RequestID string `json:"requestId"` - Path string `json:"path"` - Message string `json:"message"` - Hash string `json:"hash"` - Size int64 `json:"size"` - Error string `json:"error"` - Manifest string `json:"manifest"` - Info string `json:"info"` + Type msgType `json:"type"` + Sender string `json:"sender"` + Recipient string `json:"recipient"` + RequestID string `json:"requestId"` + Path string `json:"path"` + Message string `json:"message"` + Hash string `json:"hash"` + Size int64 `json:"size"` + Error string `json:"error"` + Manifest string `json:"manifest"` + Info fftypes.JSONObject `json:"info"` } const ( diff --git a/internal/dataexchange/dxhttps/dxhttps_test.go b/internal/dataexchange/dxhttps/dxhttps_test.go index 90b3d9d290..a3b3d8a580 100644 --- a/internal/dataexchange/dxhttps/dxhttps_test.go +++ b/internal/dataexchange/dxhttps/dxhttps_test.go @@ -410,9 +410,9 @@ func TestEvents(t *testing.T) { assert.Equal(t, `{"action":"commit"}`, string(msg)) mcb.On("TransferResult", "tx12345", fftypes.OpStatusSucceeded, mock.MatchedBy(func(ts fftypes.TransportStatusUpdate) bool { - return ts.Manifest == `{"manifest":true}` && ts.Info == `{"signatures":"and stuff"}` + return ts.Manifest == `{"manifest":true}` && ts.Info.String() == `{"signatures":"and stuff"}` })).Return(nil) - fromServer <- `{"type":"message-delivered","requestID":"tx12345","info":"{\"signatures\":\"and stuff\"}","manifest":"{\"manifest\":true}"}` + fromServer <- `{"type":"message-delivered","requestID":"tx12345","info":{"signatures":"and stuff"},"manifest":"{\"manifest\":true}"}` msg = <-toServer assert.Equal(t, `{"action":"commit"}`, string(msg)) diff --git a/internal/events/dx_callbacks.go b/internal/events/dx_callbacks.go index a9176c2e78..5dabb4907a 100644 --- a/internal/events/dx_callbacks.go +++ b/internal/events/dx_callbacks.go @@ -269,17 +269,10 @@ func (em *eventManager) TransferResult(dx dataexchange.Plugin, trackingID string } } - // Save any interesting outputs - // Note that we don't need the manifest to be kept here, as it's already in the input - output := fftypes.JSONObject{} - if update.Info != "" { - output["info"] = update.Info - } - update := database.OperationQueryFactory.NewUpdate(em.ctx). Set("status", status). Set("error", update.Error). - Set("output", output) + Set("output", update.Info) // Note that we don't need the manifest to be kept here, as it's already in the input if err := em.database.UpdateOperation(em.ctx, op.ID, update); err != nil { return true, err // this is always retryable } diff --git a/internal/events/dx_callbacks_test.go b/internal/events/dx_callbacks_test.go index 00ffee7340..3a298cf999 100644 --- a/internal/events/dx_callbacks_test.go +++ b/internal/events/dx_callbacks_test.go @@ -498,7 +498,7 @@ func TestTransferResultOk(t *testing.T) { mdx.On("Name").Return("utdx") err := em.TransferResult(mdx, id.String(), fftypes.OpStatusFailed, fftypes.TransportStatusUpdate{ Error: "error info", - Info: `{"extra": "info"}`, + Info: fftypes.JSONObject{"extra": "info"}, }) assert.NoError(t, err) @@ -526,7 +526,7 @@ func TestTransferResultManifestMismatch(t *testing.T) { Manifest: true, }) err := em.TransferResult(mdx, id.String(), fftypes.OpStatusSucceeded, fftypes.TransportStatusUpdate{ - Info: `{"extra": "info"}`, + Info: fftypes.JSONObject{"extra": "info"}, Manifest: "Sally", }) assert.NoError(t, err) @@ -546,7 +546,7 @@ func TestTransferResultNotCorrelated(t *testing.T) { mdx.On("Name").Return("utdx") err := em.TransferResult(mdx, "tracking12345", fftypes.OpStatusFailed, fftypes.TransportStatusUpdate{ Error: "error info", - Info: `{"extra": "info"}`, + Info: fftypes.JSONObject{"extra": "info"}, }) assert.NoError(t, err) @@ -563,7 +563,7 @@ func TestTransferResultNotFound(t *testing.T) { mdx.On("Name").Return("utdx") err := em.TransferResult(mdx, "tracking12345", fftypes.OpStatusFailed, fftypes.TransportStatusUpdate{ Error: "error info", - Info: `{"extra": "info"}`, + Info: fftypes.JSONObject{"extra": "info"}, }) assert.NoError(t, err) @@ -580,7 +580,7 @@ func TestTransferGetOpFail(t *testing.T) { mdx.On("Name").Return("utdx") err := em.TransferResult(mdx, "tracking12345", fftypes.OpStatusFailed, fftypes.TransportStatusUpdate{ Error: "error info", - Info: `{"extra": "info"}`, + Info: fftypes.JSONObject{"extra": "info"}, }) assert.Regexp(t, "FF10158", err) @@ -603,7 +603,7 @@ func TestTransferUpdateFail(t *testing.T) { mdx.On("Name").Return("utdx") err := em.TransferResult(mdx, id.String(), fftypes.OpStatusFailed, fftypes.TransportStatusUpdate{ Error: "error info", - Info: `{"extra": "info"}`, + Info: fftypes.JSONObject{"extra": "info"}, }) assert.Regexp(t, "FF10158", err) diff --git a/internal/orchestrator/bound_callbacks_test.go b/internal/orchestrator/bound_callbacks_test.go index fd530631cd..06a5502e4a 100644 --- a/internal/orchestrator/bound_callbacks_test.go +++ b/internal/orchestrator/bound_callbacks_test.go @@ -59,7 +59,7 @@ func TestBoundCallbacks(t *testing.T) { mei.On("TransferResult", mdx, "tracking12345", fftypes.OpStatusFailed, mock.Anything).Return(fmt.Errorf("pop")) err = bc.TransferResult("tracking12345", fftypes.OpStatusFailed, fftypes.TransportStatusUpdate{ - Error: "error info", Info: info.String(), + Error: "error info", Info: info, }) assert.EqualError(t, err, "pop") diff --git a/pkg/fftypes/transport_wrapper.go b/pkg/fftypes/transport_wrapper.go index 3d989d0c71..87d95300fb 100644 --- a/pkg/fftypes/transport_wrapper.go +++ b/pkg/fftypes/transport_wrapper.go @@ -60,7 +60,7 @@ func (tw *TransportWrapper) Manifest() *Manifest { } type TransportStatusUpdate struct { - Error string `json:"error,omitempty"` - Manifest string `json:"manifest,omitempty"` - Info string `json:"info,omitempty"` + Error string `json:"error,omitempty"` + Manifest string `json:"manifest,omitempty"` + Info JSONObject `json:"info,omitempty"` } From 2319930e6010764ff0c054e30fcd69a904049b0c Mon Sep 17 00:00:00 2001 From: Andrew Richardson Date: Wed, 9 Feb 2022 07:37:04 -0500 Subject: [PATCH 3/3] Update DX version in manifest Signed-off-by: Andrew Richardson --- manifest.json | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/manifest.json b/manifest.json index 7e493f394d..5b6291f3a2 100644 --- a/manifest.json +++ b/manifest.json @@ -11,8 +11,8 @@ }, "dataexchange-https": { "image": "ghcr.io/hyperledger/firefly-dataexchange-https", - "tag": "v0.10.3", - "sha": "e166c0ff8de3f56e18e7f62cada833b8f3fc8eea476628d6aade0f012c394b40" + "tag": "v0.10.3-20220209-6", + "sha": "a94776c7f89c27548149e080627fe3c55ad528835ecea0131b1c1ae96981398e" }, "tokens-erc1155": { "image": "ghcr.io/hyperledger/firefly-tokens-erc1155",