Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
BEGIN;
ALTER TABLE operations ADD COLUMN backend_id VARCHAR(256);
CREATE INDEX operations_backend ON operations(backend_id);
COMMIT;
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
BEGIN;
DROP INDEX operations_backend;
ALTER TABLE operations DROP COLUMN backend_id;
COMMIT;
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE operations ADD COLUMN backend_id VARCHAR(256);
CREATE INDEX operations_backend ON operations(backend_id);
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
DROP INDEX operations_backend;
ALTER TABLE operations DROP COLUMN backend_id;
13 changes: 0 additions & 13 deletions docs/swagger/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4630,8 +4630,6 @@ paths:
application/json:
schema:
properties:
backendId:
type: string
created: {}
error:
type: string
Expand Down Expand Up @@ -5292,11 +5290,6 @@ paths:
schema:
default: 120s
type: string
- description: 'Data filter field. Prefixes supported: > >= < <= @ ^ ! !@ !^'
in: query
name: backendid
schema:
type: string
- description: 'Data filter field. Prefixes supported: > >= < <= @ ^ ! !@ !^'
in: query
name: created
Expand Down Expand Up @@ -5392,8 +5385,6 @@ paths:
application/json:
schema:
properties:
backendId:
type: string
created: {}
error:
type: string
Expand Down Expand Up @@ -5458,8 +5449,6 @@ paths:
application/json:
schema:
properties:
backendId:
type: string
created: {}
error:
type: string
Expand Down Expand Up @@ -8032,8 +8021,6 @@ paths:
schema:
items:
properties:
backendId:
type: string
created: {}
error:
type: string
Expand Down
2 changes: 0 additions & 2 deletions internal/assets/token_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ func (am *assetManager) createTokenPoolInternal(ctx context.Context, pool *fftyp
plugin,
pool.Namespace,
txid,
"",
fftypes.OpTypeTokenCreatePool)
txcommon.AddTokenPoolCreateInputs(op, pool)

Expand Down Expand Up @@ -109,7 +108,6 @@ func (am *assetManager) ActivateTokenPool(ctx context.Context, pool *fftypes.Tok
plugin,
pool.Namespace,
pool.TX.ID,
"",
fftypes.OpTypeTokenActivatePool)
if err := am.database.InsertOperation(ctx, op); err != nil {
return err
Expand Down
1 change: 0 additions & 1 deletion internal/assets/token_transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,6 @@ func (s *transferSender) sendInternal(ctx context.Context, method sendMethod) er
plugin,
s.namespace,
txid,
"",
fftypes.OpTypeTokenTransfer)
if err = txcommon.AddTokenTransferInputs(op, &s.transfer.TokenTransfer); err == nil {
err = s.mgr.database.InsertOperation(ctx, op)
Expand Down
1 change: 0 additions & 1 deletion internal/batchpin/batchpin.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ func (bp *batchPinSubmitter) SubmitPinnedBatch(ctx context.Context, batch *fftyp
bp.blockchain,
batch.Namespace,
batch.Payload.TX.ID,
"",
fftypes.OpTypeBlockchainBatchPin)
if err := bp.database.InsertOperation(ctx, op); err != nil {
return err
Expand Down
1 change: 0 additions & 1 deletion internal/broadcast/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ func (bm *broadcastManager) submitTXAndUpdateDB(ctx context.Context, batch *ffty
bm.publicstorage,
batch.Namespace,
batch.Payload.TX.ID,
batch.PayloadRef,
fftypes.OpTypePublicStorageBatchBroadcast)
op.Status = fftypes.OpStatusSucceeded // Note we performed the action synchronously above
err = bm.database.InsertOperation(ctx, op)
Expand Down
1 change: 0 additions & 1 deletion internal/broadcast/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,6 @@ func TestSubmitTXAndUpdateDBSucceed(t *testing.T) {
op := mdi.Calls[1].Arguments[1].(*fftypes.Operation)
assert.Equal(t, *batch.Payload.TX.ID, *op.Transaction)
assert.Equal(t, "ut_publicstorage", op.Plugin)
assert.Equal(t, "ipfs_id", op.BackendID)
assert.Equal(t, fftypes.OpTypePublicStorageBatchBroadcast, op.Type)

}
Expand Down
1 change: 0 additions & 1 deletion internal/contracts/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,6 @@ func (cm *contractManager) InvokeContract(ctx context.Context, ns string, req *f
cm.blockchain,
ns,
txid,
"",
fftypes.OpTypeBlockchainInvoke)
op.Input = req.Input
return cm.database.InsertOperation(ctx, op)
Expand Down
12 changes: 4 additions & 8 deletions internal/database/sqlcommon/operation_sql.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2021 Kaleido, Inc.
// Copyright © 2022 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -35,18 +35,16 @@ var (
"optype",
"opstatus",
"plugin",
"backend_id",
"created",
"updated",
"error",
"input",
"output",
}
opFilterFieldMap = map[string]string{
"tx": "tx_id",
"type": "optype",
"status": "opstatus",
"backendid": "backend_id",
"tx": "tx_id",
"type": "optype",
"status": "opstatus",
}
)

Expand All @@ -67,7 +65,6 @@ func (s *SQLCommon) InsertOperation(ctx context.Context, operation *fftypes.Oper
string(operation.Type),
string(operation.Status),
operation.Plugin,
operation.BackendID,
operation.Created,
operation.Updated,
operation.Error,
Expand All @@ -93,7 +90,6 @@ func (s *SQLCommon) opResult(ctx context.Context, row *sql.Rows) (*fftypes.Opera
&op.Type,
&op.Status,
&op.Plugin,
&op.BackendID,
&op.Created,
&op.Updated,
&op.Error,
Expand Down
4 changes: 1 addition & 3 deletions internal/database/sqlcommon/operation_sql_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2021 Kaleido, Inc.
// Copyright © 2022 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -43,7 +43,6 @@ func TestOperationE2EWithDB(t *testing.T) {
Transaction: fftypes.NewUUID(),
Status: fftypes.OpStatusFailed,
Plugin: "ethereum",
BackendID: fftypes.NewRandB32().String(),
Error: "pop",
Input: fftypes.JSONObject{"some": "input-info"},
Output: fftypes.JSONObject{"some": "output-info"},
Expand All @@ -70,7 +69,6 @@ func TestOperationE2EWithDB(t *testing.T) {
fb.Eq("status", operation.Status),
fb.Eq("error", operation.Error),
fb.Eq("plugin", operation.Plugin),
fb.Eq("backendid", operation.BackendID),
fb.Gt("created", 0),
fb.Gt("updated", 0),
)
Expand Down
38 changes: 21 additions & 17 deletions internal/dataexchange/dxhttps/dxhttps.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,17 @@ type HTTPS struct {
}

type wsEvent struct {
Type msgType `json:"type"`
Sender string `json:"sender"`
Recipient string `json:"recipient"`
RequestID string `json:"requestID"`
Path string `json:"path"`
Message string `json:"message"`
Hash string `json:"hash"`
Size int64 `json:"size"`
Error string `json:"error"`
Manifest string `json:"manifest"`
Info string `json:"info"`
Type msgType `json:"type"`
Sender string `json:"sender"`
Recipient string `json:"recipient"`
RequestID string `json:"requestId"`
Path string `json:"path"`
Message string `json:"message"`
Hash string `json:"hash"`
Size int64 `json:"size"`
Error string `json:"error"`
Manifest string `json:"manifest"`
Info fftypes.JSONObject `json:"info"`
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@peterbroadhurst @gabriel-indik I wanted to call special attention to this change. I don't actually see any dataexchange events that include "info" in their payload, so I don't think it breaks anything. Was this just added as a possible future plug point?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@awrichar the Info field is actually expected. After reviewing the latest changes I realized this is something that is going to be coming from Data Exchange. This is, instead of having to deal with each individual type of response Data Exchange produces, the information will always be encapsulated under the "info" JSON object.

}

const (
Expand Down Expand Up @@ -86,11 +86,13 @@ type uploadBlob struct {
type sendMessage struct {
Message string `json:"message"`
Recipient string `json:"recipient"`
RequestID string `json:"requestId"`
}

type transferBlob struct {
Path string `json:"path"`
Recipient string `json:"recipient"`
RequestID string `json:"requestId"`
}

type wsAck struct {
Expand Down Expand Up @@ -183,34 +185,36 @@ func (h *HTTPS) DownloadBLOB(ctx context.Context, payloadRef string) (content io
return res.RawBody(), nil
}

func (h *HTTPS) SendMessage(ctx context.Context, peerID string, data []byte) (trackingID string, err error) {
func (h *HTTPS) SendMessage(ctx context.Context, opID *fftypes.UUID, peerID string, data []byte) (err error) {
var responseData responseWithRequestID
res, err := h.client.R().SetContext(ctx).
SetBody(&sendMessage{
Message: string(data),
Recipient: peerID,
RequestID: opID.String(),
}).
SetResult(&responseData).
Post("/api/v1/messages")
if err != nil || !res.IsSuccess() {
return "", restclient.WrapRestErr(ctx, res, err, i18n.MsgDXRESTErr)
return restclient.WrapRestErr(ctx, res, err, i18n.MsgDXRESTErr)
}
return responseData.RequestID, nil
return nil
}

func (h *HTTPS) TransferBLOB(ctx context.Context, peerID, payloadRef string) (trackingID string, err error) {
func (h *HTTPS) TransferBLOB(ctx context.Context, opID *fftypes.UUID, peerID, payloadRef string) (err error) {
var responseData responseWithRequestID
res, err := h.client.R().SetContext(ctx).
SetBody(&transferBlob{
Path: fmt.Sprintf("/%s", payloadRef),
Recipient: peerID,
RequestID: opID.String(),
}).
SetResult(&responseData).
Post("/api/v1/transfers")
if err != nil || !res.IsSuccess() {
return "", restclient.WrapRestErr(ctx, res, err, i18n.MsgDXRESTErr)
return restclient.WrapRestErr(ctx, res, err, i18n.MsgDXRESTErr)
}
return responseData.RequestID, nil
return nil
}

func (h *HTTPS) CheckBLOBReceived(ctx context.Context, peerID, ns string, id fftypes.UUID) (hash *fftypes.Bytes32, size int64, err error) {
Expand Down
22 changes: 8 additions & 14 deletions internal/dataexchange/dxhttps/dxhttps_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,13 +347,10 @@ func TestSendMessage(t *testing.T) {
defer done()

httpmock.RegisterResponder("POST", fmt.Sprintf("%s/api/v1/messages", httpURL),
httpmock.NewJsonResponderOrPanic(200, fftypes.JSONObject{
"requestID": "abcd1234",
}))
httpmock.NewJsonResponderOrPanic(200, fftypes.JSONObject{}))

trackingID, err := h.SendMessage(context.Background(), "peer1", []byte(`some data`))
err := h.SendMessage(context.Background(), fftypes.NewUUID(), "peer1", []byte(`some data`))
assert.NoError(t, err)
assert.Equal(t, "abcd1234", trackingID)
}

func TestSendMessageError(t *testing.T) {
Expand All @@ -363,7 +360,7 @@ func TestSendMessageError(t *testing.T) {
httpmock.RegisterResponder("POST", fmt.Sprintf("%s/api/v1/message", httpURL),
httpmock.NewJsonResponderOrPanic(500, fftypes.JSONObject{}))

_, err := h.SendMessage(context.Background(), "peer1", []byte(`some data`))
err := h.SendMessage(context.Background(), fftypes.NewUUID(), "peer1", []byte(`some data`))
assert.Regexp(t, "FF10229", err)
}

Expand All @@ -373,13 +370,10 @@ func TestTransferBLOB(t *testing.T) {
defer done()

httpmock.RegisterResponder("POST", fmt.Sprintf("%s/api/v1/transfers", httpURL),
httpmock.NewJsonResponderOrPanic(200, fftypes.JSONObject{
"requestID": "abcd1234",
}))
httpmock.NewJsonResponderOrPanic(200, fftypes.JSONObject{}))

trackingID, err := h.TransferBLOB(context.Background(), "peer1", "ns1/id1")
err := h.TransferBLOB(context.Background(), fftypes.NewUUID(), "peer1", "ns1/id1")
assert.NoError(t, err)
assert.Equal(t, "abcd1234", trackingID)
}

func TestTransferBLOBError(t *testing.T) {
Expand All @@ -389,7 +383,7 @@ func TestTransferBLOBError(t *testing.T) {
httpmock.RegisterResponder("POST", fmt.Sprintf("%s/api/v1/transfers", httpURL),
httpmock.NewJsonResponderOrPanic(500, fftypes.JSONObject{}))

_, err := h.TransferBLOB(context.Background(), "peer1", "ns1/id1")
err := h.TransferBLOB(context.Background(), fftypes.NewUUID(), "peer1", "ns1/id1")
assert.Regexp(t, "FF10229", err)
}

Expand All @@ -416,9 +410,9 @@ func TestEvents(t *testing.T) {
assert.Equal(t, `{"action":"commit"}`, string(msg))

mcb.On("TransferResult", "tx12345", fftypes.OpStatusSucceeded, mock.MatchedBy(func(ts fftypes.TransportStatusUpdate) bool {
return ts.Manifest == `{"manifest":true}` && ts.Info == `{"signatures":"and stuff"}`
return ts.Manifest == `{"manifest":true}` && ts.Info.String() == `{"signatures":"and stuff"}`
})).Return(nil)
fromServer <- `{"type":"message-delivered","requestID":"tx12345","info":"{\"signatures\":\"and stuff\"}","manifest":"{\"manifest\":true}"}`
fromServer <- `{"type":"message-delivered","requestID":"tx12345","info":{"signatures":"and stuff"},"manifest":"{\"manifest\":true}"}`
msg = <-toServer
assert.Equal(t, `{"action":"commit"}`, string(msg))

Expand Down
6 changes: 3 additions & 3 deletions internal/events/dx_callbacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ func (em *eventManager) TransferResult(dx dataexchange.Plugin, trackingID string
var operations []*fftypes.Operation
fb := database.OperationQueryFactory.NewFilter(em.ctx)
filter := fb.And(
fb.Eq("backendid", trackingID),
fb.Eq("id", trackingID),
fb.Eq("plugin", dx.Name()),
)
operations, _, err = em.database.GetOperations(em.ctx, filter)
Expand All @@ -256,7 +256,7 @@ func (em *eventManager) TransferResult(dx dataexchange.Plugin, trackingID string
return true, i18n.NewError(em.ctx, i18n.Msg404NotFound)
}

// The maniest should exactly match that stored into the operation input, if supported
// The manifest should exactly match that stored into the operation input, if supported
op := operations[0]
if status == fftypes.OpStatusSucceeded && dx.Capabilities().Manifest {
expectedManifest := op.Input.GetString("manifest")
Expand All @@ -272,7 +272,7 @@ func (em *eventManager) TransferResult(dx dataexchange.Plugin, trackingID string
update := database.OperationQueryFactory.NewUpdate(em.ctx).
Set("status", status).
Set("error", update.Error).
Set("output", update.Info) // We don't need the manifest to be kept here, as it's already in the input
Set("output", update.Info) // Note that we don't need the manifest to be kept here, as it's already in the input
if err := em.database.UpdateOperation(em.ctx, op.ID, update); err != nil {
return true, err // this is always retryable
}
Expand Down
Loading