diff --git a/internal/batch/batch_manager.go b/internal/batch/batch_manager.go index 1f2ac337b7..7565cd73f4 100644 --- a/internal/batch/batch_manager.go +++ b/internal/batch/batch_manager.go @@ -187,14 +187,10 @@ func (bm *batchManager) getProcessor(txType fftypes.TransactionType, msgType fft return processor, nil } -func (bm *batchManager) assembleMessageData(batchType fftypes.BatchType, msg *fftypes.Message) (retData fftypes.DataArray, err error) { - var cro []data.CacheReadOption - if batchType == fftypes.BatchTypeBroadcast { - cro = append(cro, data.CRORequirePublicBlobRefs) - } +func (bm *batchManager) assembleMessageData(msg *fftypes.Message) (retData fftypes.DataArray, err error) { var foundAll = false err = bm.retry.Do(bm.ctx, fmt.Sprintf("assemble message %s data", msg.Header.ID), func(attempt int) (retry bool, err error) { - retData, foundAll, err = bm.data.GetMessageDataCached(bm.ctx, msg, cro...) + retData, foundAll, err = bm.data.GetMessageDataCached(bm.ctx, msg) // continual retry for persistence error (distinct from not-found) return true, err }) @@ -246,7 +242,7 @@ func (bm *batchManager) messageSequencer() { continue } - data, err := bm.assembleMessageData(processor.conf.DispatcherOptions.BatchType, msg) + data, err := bm.assembleMessageData(msg) if err != nil { l.Errorf("Failed to retrieve message data for %s: %s", msg.Header.ID, err) continue diff --git a/internal/batch/batch_manager_test.go b/internal/batch/batch_manager_test.go index 90eed813d1..27bbc328e1 100644 --- a/internal/batch/batch_manager_test.go +++ b/internal/batch/batch_manager_test.go @@ -23,7 +23,6 @@ import ( "time" "github.com/hyperledger/firefly/internal/config" - "github.com/hyperledger/firefly/internal/data" "github.com/hyperledger/firefly/internal/log" "github.com/hyperledger/firefly/internal/txcommon" "github.com/hyperledger/firefly/mocks/databasemocks" @@ -352,7 +351,7 @@ func TestMessageSequencerMissingMessageData(t *testing.T) { }). Once() mdi.On("GetMessages", mock.Anything, mock.Anything, mock.Anything).Return([]*fftypes.Message{}, nil, nil) - mdm.On("GetMessageDataCached", mock.Anything, mock.Anything, data.CRORequirePublicBlobRefs).Return(fftypes.DataArray{}, false, nil) + mdm.On("GetMessageDataCached", mock.Anything, mock.Anything).Return(fftypes.DataArray{}, false, nil) bm.(*batchManager).messageSequencer() @@ -541,7 +540,7 @@ func TestAssembleMessageDataNilData(t *testing.T) { bm, _ := NewBatchManager(context.Background(), mni, mdi, mdm, txHelper) bm.Close() mdm.On("GetMessageDataCached", mock.Anything, mock.Anything).Return(nil, false, nil) - _, err := bm.(*batchManager).assembleMessageData(fftypes.BatchTypePrivate, &fftypes.Message{ + _, err := bm.(*batchManager).assembleMessageData(&fftypes.Message{ Header: fftypes.MessageHeader{ ID: fftypes.NewUUID(), }, @@ -558,7 +557,7 @@ func TestGetMessageDataFail(t *testing.T) { bm, _ := NewBatchManager(context.Background(), mni, mdi, mdm, txHelper) mdm.On("GetMessageDataCached", mock.Anything, mock.Anything).Return(nil, false, fmt.Errorf("pop")) bm.Close() - _, _ = bm.(*batchManager).assembleMessageData(fftypes.BatchTypePrivate, &fftypes.Message{ + _, _ = bm.(*batchManager).assembleMessageData(&fftypes.Message{ Header: fftypes.MessageHeader{ ID: fftypes.NewUUID(), }, @@ -575,9 +574,9 @@ func TestGetMessageNotFound(t *testing.T) { mni := &sysmessagingmocks.LocalNodeInfo{} txHelper := txcommon.NewTransactionHelper(mdi, mdm) bm, _ := NewBatchManager(context.Background(), mni, mdi, mdm, txHelper) - mdm.On("GetMessageDataCached", mock.Anything, mock.Anything, data.CRORequirePublicBlobRefs).Return(nil, false, nil) + mdm.On("GetMessageDataCached", mock.Anything, mock.Anything).Return(nil, false, nil) bm.Close() - _, err := bm.(*batchManager).assembleMessageData(fftypes.BatchTypeBroadcast, &fftypes.Message{ + _, err := bm.(*batchManager).assembleMessageData(&fftypes.Message{ Header: fftypes.MessageHeader{ ID: fftypes.NewUUID(), }, diff --git a/internal/broadcast/definition.go b/internal/broadcast/definition.go index 7778f99eed..c466fecea5 100644 --- a/internal/broadcast/definition.go +++ b/internal/broadcast/definition.go @@ -87,10 +87,8 @@ func (bm *broadcastManager) broadcastDefinitionCommon(ctx context.Context, ns st }, }, }, - ResolvedData: data.Resolved{ - NewData: fftypes.DataArray{d}, - AllData: fftypes.DataArray{d}, - }, + NewData: fftypes.DataArray{d}, + AllData: fftypes.DataArray{d}, } // Broadcast the message diff --git a/internal/broadcast/manager.go b/internal/broadcast/manager.go index 942211cf02..82674a4ec9 100644 --- a/internal/broadcast/manager.go +++ b/internal/broadcast/manager.go @@ -122,6 +122,12 @@ func (bm *broadcastManager) Name() string { } func (bm *broadcastManager) dispatchBatch(ctx context.Context, state *batch.DispatchState) error { + + // Ensure all the blobs are published + if err := bm.publishBlobs(ctx, state.Payload.Data); err != nil { + return err + } + // The completed SharedStorage upload op := fftypes.NewOperation( bm.sharedstorage, @@ -144,21 +150,32 @@ func (bm *broadcastManager) dispatchBatch(ctx context.Context, state *batch.Disp return bm.batchpin.SubmitPinnedBatch(ctx, &state.Persisted, state.Pins) } -func (bm *broadcastManager) publishBlobs(ctx context.Context, newMsg *data.NewMessage) error { - for _, d := range newMsg.ResolvedData.DataToPublish { - // Stream from the local data exchange ... - reader, err := bm.exchange.DownloadBLOB(ctx, d.Blob.PayloadRef) - if err != nil { - return i18n.WrapError(ctx, err, i18n.MsgDownloadBlobFailed, d.Blob.PayloadRef) - } - defer reader.Close() - - // ... to the shared storage - d.Data.Blob.Public, err = bm.sharedstorage.PublishData(ctx, reader) - if err != nil { - return err +func (bm *broadcastManager) publishBlobs(ctx context.Context, data fftypes.DataArray) error { + for _, d := range data { + // We only need to send a blob if there is one, and it's not been uploaded to the shared storage + if d.Blob != nil && d.Blob.Hash != nil && d.Blob.Public == "" { + blob, err := bm.database.GetBlobMatchingHash(ctx, d.Blob.Hash) + if err != nil { + return err + } + if blob == nil { + return i18n.NewError(ctx, i18n.MsgBlobNotFound, d.Blob.Hash) + } + + // Stream from the local data exchange ... + reader, err := bm.exchange.DownloadBLOB(ctx, blob.PayloadRef) + if err != nil { + return i18n.WrapError(ctx, err, i18n.MsgDownloadBlobFailed, blob.PayloadRef) + } + defer reader.Close() + + // ... to the shared storage + d.Blob.Public, err = bm.sharedstorage.PublishData(ctx, reader) + if err != nil { + return err + } + log.L(ctx).Infof("Published blob with hash '%s' for data '%s' to shared storage: '%s'", d.Blob.Hash, d.ID, d.Blob.Public) } - log.L(ctx).Infof("Published blob with hash '%s' for data '%s' to shared storage: '%s'", d.Blob.Hash, d.Data.ID, d.Data.Blob.Public) } return nil diff --git a/internal/broadcast/manager_test.go b/internal/broadcast/manager_test.go index 818e4fb019..6e95c3c948 100644 --- a/internal/broadcast/manager_test.go +++ b/internal/broadcast/manager_test.go @@ -121,10 +121,8 @@ func TestBroadcastMessageGood(t *testing.T) { }, }, }, - ResolvedData: data.Resolved{ - AllData: fftypes.DataArray{ - {ID: dataID, Hash: dataHash}, - }, + AllData: fftypes.DataArray{ + {ID: dataID, Hash: dataHash}, }, } @@ -170,6 +168,31 @@ func TestBroadcastMessageBad(t *testing.T) { } +func TestDispatchBatchBlobsFaill(t *testing.T) { + bm, cancel := newTestBroadcast(t) + defer cancel() + + blobHash := fftypes.NewRandB32() + state := &batch.DispatchState{ + Payload: fftypes.BatchPayload{ + Data: []*fftypes.Data{ + {ID: fftypes.NewUUID(), Blob: &fftypes.BlobRef{ + Hash: blobHash, + }}, + }, + }, + Pins: []*fftypes.Bytes32{fftypes.NewRandB32()}, + } + + mdi := bm.database.(*databasemocks.Plugin) + mdi.On("GetBlobMatchingHash", bm.ctx, blobHash).Return(nil, fmt.Errorf("pop")) + + err := bm.dispatchBatch(bm.ctx, state) + assert.EqualError(t, err, "pop") + + mdi.AssertExpectations(t) +} + func TestDispatchBatchInsertOpFail(t *testing.T) { bm, cancel := newTestBroadcast(t) defer cancel() @@ -276,81 +299,173 @@ func TestDispatchBatchSubmitBroadcastFail(t *testing.T) { mom.AssertExpectations(t) } +func TestPublishBlobsPublishOk(t *testing.T) { + bm, cancel := newTestBroadcast(t) + defer cancel() + mdx := bm.exchange.(*dataexchangemocks.Plugin) + mps := bm.sharedstorage.(*sharedstoragemocks.Plugin) + mdi := bm.database.(*databasemocks.Plugin) + + blob := &fftypes.Blob{ + Hash: fftypes.NewRandB32(), + PayloadRef: "blob/1", + } + + var capturedReader io.ReadCloser + ctx := context.Background() + mdi.On("GetBlobMatchingHash", ctx, blob.Hash).Return(blob, nil) + mdx.On("DownloadBLOB", ctx, "blob/1").Return(ioutil.NopCloser(bytes.NewReader([]byte(`some data`))), nil) + mps.On("PublishData", ctx, mock.MatchedBy(func(reader io.ReadCloser) bool { + capturedReader = reader + return true + })).Return("payload-ref1", nil) + + data := &fftypes.Data{ + ID: fftypes.NewUUID(), + Blob: &fftypes.BlobRef{ + Hash: blob.Hash, + }, + } + + err := bm.publishBlobs(ctx, fftypes.DataArray{data}) + assert.NoError(t, err) + assert.Equal(t, "payload-ref1", data.Blob.Public) + + b, err := ioutil.ReadAll(capturedReader) + assert.NoError(t, err) + assert.Equal(t, "some data", string(b)) + + mdi.AssertExpectations(t) + mdx.AssertExpectations(t) + mps.AssertExpectations(t) + +} + func TestPublishBlobsPublishFail(t *testing.T) { bm, cancel := newTestBroadcast(t) defer cancel() mdx := bm.exchange.(*dataexchangemocks.Plugin) mps := bm.sharedstorage.(*sharedstoragemocks.Plugin) - mim := bm.identity.(*identitymanagermocks.Manager) + mdi := bm.database.(*databasemocks.Plugin) - blobHash := fftypes.NewRandB32() + blob := &fftypes.Blob{ + Hash: fftypes.NewRandB32(), + PayloadRef: "blob/1", + } dataID := fftypes.NewUUID() + var capturedReader io.ReadCloser ctx := context.Background() + mdi.On("GetBlobMatchingHash", ctx, blob.Hash).Return(blob, nil) mdx.On("DownloadBLOB", ctx, "blob/1").Return(ioutil.NopCloser(bytes.NewReader([]byte(`some data`))), nil) mps.On("PublishData", ctx, mock.MatchedBy(func(reader io.ReadCloser) bool { - b, err := ioutil.ReadAll(reader) - assert.NoError(t, err) - assert.Equal(t, "some data", string(b)) + capturedReader = reader return true })).Return("", fmt.Errorf("pop")) - mim.On("ResolveInputIdentity", ctx, mock.Anything).Return(nil) - - err := bm.publishBlobs(ctx, &data.NewMessage{ - ResolvedData: data.Resolved{ - DataToPublish: []*fftypes.DataAndBlob{ - { - Data: &fftypes.Data{ - ID: dataID, - Blob: &fftypes.BlobRef{ - Hash: blobHash, - }, - }, - Blob: &fftypes.Blob{ - Hash: blobHash, - PayloadRef: "blob/1", - }, - }, + + err := bm.publishBlobs(ctx, fftypes.DataArray{ + { + ID: dataID, + Blob: &fftypes.BlobRef{ + Hash: blob.Hash, }, }, }) assert.EqualError(t, err, "pop") + b, err := ioutil.ReadAll(capturedReader) + assert.NoError(t, err) + assert.Equal(t, "some data", string(b)) + + mdi.AssertExpectations(t) + mdx.AssertExpectations(t) + mps.AssertExpectations(t) + } func TestPublishBlobsDownloadFail(t *testing.T) { bm, cancel := newTestBroadcast(t) defer cancel() - mdi := bm.database.(*databasemocks.Plugin) mdx := bm.exchange.(*dataexchangemocks.Plugin) - mim := bm.identity.(*identitymanagermocks.Manager) + mdi := bm.database.(*databasemocks.Plugin) - blobHash := fftypes.NewRandB32() + blob := &fftypes.Blob{ + Hash: fftypes.NewRandB32(), + PayloadRef: "blob/1", + } dataID := fftypes.NewUUID() ctx := context.Background() + mdi.On("GetBlobMatchingHash", ctx, blob.Hash).Return(blob, nil) mdx.On("DownloadBLOB", ctx, "blob/1").Return(nil, fmt.Errorf("pop")) - mim.On("ResolveInputIdentity", ctx, mock.Anything).Return(nil) - - err := bm.publishBlobs(ctx, &data.NewMessage{ - ResolvedData: data.Resolved{ - DataToPublish: []*fftypes.DataAndBlob{ - { - Data: &fftypes.Data{ - ID: dataID, - Blob: &fftypes.BlobRef{ - Hash: blobHash, - }, - }, - Blob: &fftypes.Blob{ - Hash: blobHash, - PayloadRef: "blob/1", - }, - }, + + err := bm.publishBlobs(ctx, fftypes.DataArray{ + { + ID: dataID, + Blob: &fftypes.BlobRef{ + Hash: blob.Hash, }, }, }) assert.Regexp(t, "FF10240", err) mdi.AssertExpectations(t) + mdx.AssertExpectations(t) + +} + +func TestPublishBlobsGetBlobFail(t *testing.T) { + bm, cancel := newTestBroadcast(t) + defer cancel() + mdi := bm.database.(*databasemocks.Plugin) + + blob := &fftypes.Blob{ + Hash: fftypes.NewRandB32(), + PayloadRef: "blob/1", + } + dataID := fftypes.NewUUID() + + ctx := context.Background() + mdi.On("GetBlobMatchingHash", ctx, blob.Hash).Return(nil, fmt.Errorf("pop")) + + err := bm.publishBlobs(ctx, fftypes.DataArray{ + { + ID: dataID, + Blob: &fftypes.BlobRef{ + Hash: blob.Hash, + }, + }, + }) + assert.Regexp(t, "pop", err) + + mdi.AssertExpectations(t) + +} + +func TestPublishBlobsGetBlobNotFound(t *testing.T) { + bm, cancel := newTestBroadcast(t) + defer cancel() + mdi := bm.database.(*databasemocks.Plugin) + + blob := &fftypes.Blob{ + Hash: fftypes.NewRandB32(), + PayloadRef: "blob/1", + } + dataID := fftypes.NewUUID() + + ctx := context.Background() + mdi.On("GetBlobMatchingHash", ctx, blob.Hash).Return(nil, nil) + + err := bm.publishBlobs(ctx, fftypes.DataArray{ + { + ID: dataID, + Blob: &fftypes.BlobRef{ + Hash: blob.Hash, + }, + }, + }) + assert.Regexp(t, "FF10239", err) + + mdi.AssertExpectations(t) + } diff --git a/internal/broadcast/message.go b/internal/broadcast/message.go index d12d4e4375..eceb37a18c 100644 --- a/internal/broadcast/message.go +++ b/internal/broadcast/message.go @@ -105,13 +105,6 @@ func (s *broadcastSender) resolveAndSend(ctx context.Context, method sendMethod) if msgSizeEstimate > s.mgr.maxBatchPayloadLength { return i18n.NewError(ctx, i18n.MsgTooLargeBroadcast, float64(msgSizeEstimate)/1024, float64(s.mgr.maxBatchPayloadLength)/1024) } - - // Perform deferred processing - if len(s.msg.ResolvedData.DataToPublish) > 0 { - if err := s.mgr.publishBlobs(ctx, s.msg); err != nil { - return err - } - } s.resolved = true } return s.sendInternal(ctx, method) @@ -128,7 +121,7 @@ func (s *broadcastSender) resolve(ctx context.Context) error { } // The data manager is responsible for the heavy lifting of storing/validating all our in-line data elements - err := s.mgr.data.ResolveInlineDataBroadcast(ctx, s.msg) + err := s.mgr.data.ResolveInlineData(ctx, s.msg) return err } @@ -154,7 +147,7 @@ func (s *broadcastSender) sendInternal(ctx context.Context, method sendMethod) ( if err := s.mgr.data.WriteNewMessage(ctx, s.msg); err != nil { return err } - log.L(ctx).Infof("Sent broadcast message %s:%s sequence=%d datacount=%d", msg.Header.Namespace, msg.Header.ID, msg.Sequence, len(s.msg.ResolvedData.AllData)) + log.L(ctx).Infof("Sent broadcast message %s:%s sequence=%d datacount=%d", msg.Header.Namespace, msg.Header.ID, msg.Sequence, len(s.msg.AllData)) return err } diff --git a/internal/broadcast/message_test.go b/internal/broadcast/message_test.go index 397fce10e4..949ca7a82c 100644 --- a/internal/broadcast/message_test.go +++ b/internal/broadcast/message_test.go @@ -17,19 +17,14 @@ package broadcast import ( - "bytes" "context" "fmt" - "io" - "io/ioutil" "testing" "github.com/hyperledger/firefly/internal/data" "github.com/hyperledger/firefly/internal/syncasync" - "github.com/hyperledger/firefly/mocks/dataexchangemocks" "github.com/hyperledger/firefly/mocks/datamocks" "github.com/hyperledger/firefly/mocks/identitymanagermocks" - "github.com/hyperledger/firefly/mocks/sharedstoragemocks" "github.com/hyperledger/firefly/mocks/syncasyncmocks" "github.com/hyperledger/firefly/pkg/fftypes" "github.com/stretchr/testify/assert" @@ -43,7 +38,7 @@ func TestBroadcastMessageOk(t *testing.T) { mim := bm.identity.(*identitymanagermocks.Manager) ctx := context.Background() - mdm.On("ResolveInlineDataBroadcast", ctx, mock.Anything).Return(nil) + mdm.On("ResolveInlineData", ctx, mock.Anything).Return(nil) mdm.On("WriteNewMessage", mock.Anything, mock.Anything, mock.Anything).Return(nil) mim.On("ResolveInputSigningIdentity", ctx, "ns1", mock.Anything).Return(nil) @@ -75,7 +70,7 @@ func TestBroadcastMessageWaitConfirmOk(t *testing.T) { mim := bm.identity.(*identitymanagermocks.Manager) ctx := context.Background() - mdm.On("ResolveInlineDataBroadcast", ctx, mock.Anything).Return(nil) + mdm.On("ResolveInlineData", ctx, mock.Anything).Return(nil) mim.On("ResolveInputSigningIdentity", ctx, "ns1", mock.Anything).Return(nil) replyMsg := &fftypes.Message{ @@ -113,78 +108,6 @@ func TestBroadcastMessageWaitConfirmOk(t *testing.T) { mdm.AssertExpectations(t) } -func TestBroadcastMessageWithBlobsOk(t *testing.T) { - bm, cancel := newTestBroadcast(t) - defer cancel() - mdm := bm.data.(*datamocks.Manager) - mdx := bm.exchange.(*dataexchangemocks.Plugin) - mps := bm.sharedstorage.(*sharedstoragemocks.Plugin) - mim := bm.identity.(*identitymanagermocks.Manager) - - blobHash := fftypes.NewRandB32() - dataID := fftypes.NewUUID() - - ctx := context.Background() - mdm.On("ResolveInlineDataBroadcast", ctx, mock.Anything). - Run(func(args mock.Arguments) { - newMsg := args[1].(*data.NewMessage) - newMsg.ResolvedData.AllData = fftypes.DataArray{ - {ID: dataID, Hash: fftypes.NewRandB32()}, - } - newMsg.ResolvedData.DataToPublish = []*fftypes.DataAndBlob{ - { - Data: &fftypes.Data{ - ID: dataID, - Blob: &fftypes.BlobRef{ - Hash: blobHash, - }, - }, - Blob: &fftypes.Blob{ - Hash: blobHash, - PayloadRef: "blob/1", - }, - }, - } - }). - Return(nil) - mdx.On("DownloadBLOB", ctx, "blob/1").Return(ioutil.NopCloser(bytes.NewReader([]byte(`some data`))), nil) - var readStr string - mps.On("PublishData", ctx, mock.MatchedBy(func(reader io.ReadCloser) bool { - if readStr == "" { // called again in AssertExpectationa - b, err := ioutil.ReadAll(reader) - assert.NoError(t, err) - readStr = string(b) - } - assert.Equal(t, "some data", readStr) - return true - })).Return("payload-ref", nil).Once() - mdm.On("WriteNewMessage", ctx, mock.Anything, mock.Anything).Return(nil) - mim.On("ResolveInputSigningIdentity", ctx, "ns1", mock.Anything).Return(nil) - - msg, err := bm.BroadcastMessage(ctx, "ns1", &fftypes.MessageInOut{ - Message: fftypes.Message{ - Header: fftypes.MessageHeader{ - SignerRef: fftypes.SignerRef{ - Author: "did:firefly:org/abcd", - Key: "0x12345", - }, - }, - }, - InlineData: fftypes.InlineData{ - {Blob: &fftypes.BlobRef{ - Hash: blobHash, - }}, - }, - }, false) - assert.NoError(t, err) - assert.Equal(t, "ns1", msg.Header.Namespace) - - mdx.AssertExpectations(t) - mps.AssertExpectations(t) - mdm.AssertExpectations(t) - mim.AssertExpectations(t) -} - func TestBroadcastMessageTooLarge(t *testing.T) { bm, cancel := newTestBroadcast(t) bm.maxBatchPayloadLength = 1000000 @@ -193,7 +116,7 @@ func TestBroadcastMessageTooLarge(t *testing.T) { mim := bm.identity.(*identitymanagermocks.Manager) ctx := context.Background() - mdm.On("ResolveInlineDataBroadcast", ctx, mock.Anything).Run( + mdm.On("ResolveInlineData", ctx, mock.Anything).Run( func(args mock.Arguments) { newMsg := args[1].(*data.NewMessage) newMsg.Message.Data = fftypes.DataRefs{ @@ -228,7 +151,7 @@ func TestBroadcastMessageBadInput(t *testing.T) { mim := bm.identity.(*identitymanagermocks.Manager) ctx := context.Background() - mdm.On("ResolveInlineDataBroadcast", ctx, mock.Anything).Return(fmt.Errorf("pop")) + mdm.On("ResolveInlineData", ctx, mock.Anything).Return(fmt.Errorf("pop")) mim.On("ResolveInputSigningIdentity", ctx, "ns1", mock.Anything).Return(nil) _, err := bm.BroadcastMessage(ctx, "ns1", &fftypes.MessageInOut{ @@ -259,64 +182,6 @@ func TestBroadcastMessageBadIdentity(t *testing.T) { mim.AssertExpectations(t) } -func TestPublishBlobsSendMessageFail(t *testing.T) { - bm, cancel := newTestBroadcast(t) - defer cancel() - mdm := bm.data.(*datamocks.Manager) - mdx := bm.exchange.(*dataexchangemocks.Plugin) - mim := bm.identity.(*identitymanagermocks.Manager) - - blobHash := fftypes.NewRandB32() - dataID := fftypes.NewUUID() - - ctx := context.Background() - mim.On("ResolveInputSigningIdentity", ctx, "ns1", mock.Anything).Return(nil) - mdm.On("ResolveInlineDataBroadcast", ctx, mock.Anything). - Run(func(args mock.Arguments) { - newMsg := args[1].(*data.NewMessage) - newMsg.ResolvedData.AllData = fftypes.DataArray{ - {ID: dataID, Hash: fftypes.NewRandB32()}, - } - newMsg.ResolvedData.DataToPublish = []*fftypes.DataAndBlob{ - { - Data: &fftypes.Data{ - ID: dataID, - Blob: &fftypes.BlobRef{ - Hash: blobHash, - }, - }, - Blob: &fftypes.Blob{ - Hash: blobHash, - PayloadRef: "blob/1", - }, - }, - } - }). - Return(nil) - mdx.On("DownloadBLOB", ctx, "blob/1").Return(nil, fmt.Errorf("pop")) - - _, err := bm.BroadcastMessage(ctx, "ns1", &fftypes.MessageInOut{ - Message: fftypes.Message{ - Header: fftypes.MessageHeader{ - SignerRef: fftypes.SignerRef{ - Author: "did:firefly:org/abcd", - Key: "0x12345", - }, - }, - }, - InlineData: fftypes.InlineData{ - {Blob: &fftypes.BlobRef{ - Hash: blobHash, - }}, - }, - }, false) - assert.Regexp(t, "FF10240", err) - - mdm.AssertExpectations(t) - mdx.AssertExpectations(t) - mim.AssertExpectations(t) -} - func TestBroadcastPrepare(t *testing.T) { bm, cancel := newTestBroadcast(t) defer cancel() @@ -324,7 +189,7 @@ func TestBroadcastPrepare(t *testing.T) { mim := bm.identity.(*identitymanagermocks.Manager) ctx := context.Background() - mdm.On("ResolveInlineDataBroadcast", ctx, mock.Anything).Return(nil) + mdm.On("ResolveInlineData", ctx, mock.Anything).Return(nil) mim.On("ResolveInputSigningIdentity", ctx, "ns1", mock.Anything).Return(nil) msg := &fftypes.MessageInOut{ diff --git a/internal/broadcast/operations.go b/internal/broadcast/operations.go index 74c7fe2b90..571910f864 100644 --- a/internal/broadcast/operations.go +++ b/internal/broadcast/operations.go @@ -22,6 +22,7 @@ import ( "encoding/json" "github.com/hyperledger/firefly/internal/i18n" + "github.com/hyperledger/firefly/internal/log" "github.com/hyperledger/firefly/pkg/database" "github.com/hyperledger/firefly/pkg/fftypes" ) @@ -78,6 +79,7 @@ func (bm *broadcastManager) RunOperation(ctx context.Context, op *fftypes.Prepar if err != nil { return false, err } + log.L(ctx).Infof("Published batch '%s' to shared storage: '%s'", data.Batch.ID, payloadRef) // Update the batch to store the payloadRef data.Batch.PayloadRef = payloadRef diff --git a/internal/data/data_manager.go b/internal/data/data_manager.go index 12b6ff2850..309afec8ba 100644 --- a/internal/data/data_manager.go +++ b/internal/data/data_manager.go @@ -39,8 +39,7 @@ type Manager interface { GetMessageDataCached(ctx context.Context, msg *fftypes.Message, options ...CacheReadOption) (data fftypes.DataArray, foundAll bool, err error) UpdateMessageCache(msg *fftypes.Message, data fftypes.DataArray) UpdateMessageIfCached(ctx context.Context, msg *fftypes.Message) - ResolveInlineDataPrivate(ctx context.Context, msg *NewMessage) error - ResolveInlineDataBroadcast(ctx context.Context, msg *NewMessage) error + ResolveInlineData(ctx context.Context, msg *NewMessage) error WriteNewMessage(ctx context.Context, newMsg *NewMessage) error VerifyNamespaceExists(ctx context.Context, ns string) error @@ -374,7 +373,7 @@ func (dm *dataManager) checkValidation(ctx context.Context, ns string, validator return nil } -func (dm *dataManager) validateInputData(ctx context.Context, ns string, inData *fftypes.DataRefOrValue) (data *fftypes.Data, blob *fftypes.Blob, err error) { +func (dm *dataManager) validateInputData(ctx context.Context, ns string, inData *fftypes.DataRefOrValue) (data *fftypes.Data, err error) { validator := inData.Validator datatype := inData.Datatype @@ -382,11 +381,12 @@ func (dm *dataManager) validateInputData(ctx context.Context, ns string, inData blobRef := inData.Blob if err := dm.checkValidation(ctx, ns, validator, datatype, value); err != nil { - return nil, nil, err + return nil, err } - if blob, err = dm.resolveBlob(ctx, blobRef); err != nil { - return nil, nil, err + blob, err := dm.resolveBlob(ctx, blobRef) + if err != nil { + return nil, err } // Ok, we're good to generate the full data payload and save it @@ -399,13 +399,13 @@ func (dm *dataManager) validateInputData(ctx context.Context, ns string, inData } err = data.Seal(ctx, blob) if err != nil { - return nil, nil, err + return nil, err } - return data, blob, nil + return data, nil } func (dm *dataManager) UploadJSON(ctx context.Context, ns string, inData *fftypes.DataRefOrValue) (*fftypes.Data, error) { - data, _, err := dm.validateInputData(ctx, ns, inData) + data, err := dm.validateInputData(ctx, ns, inData) if err != nil { return nil, err } @@ -415,34 +415,21 @@ func (dm *dataManager) UploadJSON(ctx context.Context, ns string, inData *fftype return data, err } -func (dm *dataManager) ResolveInlineDataPrivate(ctx context.Context, newMessage *NewMessage) error { - return dm.resolveInlineData(ctx, newMessage, false) -} - -// ResolveInlineDataBroadcast ensures the data object are stored, and returns a list of any data that does not currently -// have a shared storage reference, and hence must be published to sharedstorage before a broadcast message can be sent. -// We deliberately do NOT perform those publishes inside of this action, as we expect to be in a RunAsGroup (trnasaction) -// at this point, and hence expensive things like a multi-megabyte upload should be decoupled by our caller. -func (dm *dataManager) ResolveInlineDataBroadcast(ctx context.Context, newMessage *NewMessage) error { - return dm.resolveInlineData(ctx, newMessage, true) -} - -func (dm *dataManager) resolveInlineData(ctx context.Context, newMessage *NewMessage, broadcast bool) (err error) { +// ResolveInlineData processes an input message that is going to be stored, to see which of the data +// elements are new, and which are existing. It verifies everything that points to an existing +// reference, and returns a list of what data is new separately - so that it can be stored by the +// message writer when the sending code is ready. +func (dm *dataManager) ResolveInlineData(ctx context.Context, newMessage *NewMessage) (err error) { if newMessage.Message == nil { return i18n.NewError(ctx, i18n.MsgNilOrNullObject) } - r := &newMessage.ResolvedData inData := newMessage.Message.InlineData msg := newMessage.Message - r.AllData = make(fftypes.DataArray, len(newMessage.Message.InlineData)) - if broadcast { - r.DataToPublish = make([]*fftypes.DataAndBlob, 0, len(inData)) - } + newMessage.AllData = make(fftypes.DataArray, len(newMessage.Message.InlineData)) for i, dataOrValue := range inData { var d *fftypes.Data - var blob *fftypes.Blob switch { case dataOrValue.ID != nil: // If an ID is supplied, then it must be a reference to existing data @@ -453,31 +440,23 @@ func (dm *dataManager) resolveInlineData(ctx context.Context, newMessage *NewMes if d == nil { return i18n.NewError(ctx, i18n.MsgDataReferenceUnresolvable, i) } - if blob, err = dm.resolveBlob(ctx, d.Blob); err != nil { + if _, err = dm.resolveBlob(ctx, d.Blob); err != nil { return err } case dataOrValue.Value != nil || dataOrValue.Blob != nil: // We've got a Value, so we can validate + store it - if d, blob, err = dm.validateInputData(ctx, msg.Header.Namespace, dataOrValue); err != nil { + if d, err = dm.validateInputData(ctx, msg.Header.Namespace, dataOrValue); err != nil { return err } - r.NewData = append(r.NewData, d) + newMessage.NewData = append(newMessage.NewData, d) default: // We have nothing - this must be a mistake return i18n.NewError(ctx, i18n.MsgDataMissing, i) } - r.AllData[i] = d - - // If the data is being resolved for public broadcast, and there is a blob attachment, that blob - // needs to be published by our calller - if broadcast && blob != nil && d.Blob.Public == "" { - r.DataToPublish = append(r.DataToPublish, &fftypes.DataAndBlob{ - Data: d, - Blob: blob, - }) - } + newMessage.AllData[i] = d + } - newMessage.Message.Data = r.AllData.Refs() + newMessage.Message.Data = newMessage.AllData.Refs() return nil } @@ -529,7 +508,7 @@ func (dm *dataManager) WriteNewMessage(ctx context.Context, newMsg *NewMessage) if err != nil { return err } - dm.UpdateMessageCache(&newMsg.Message.Message, newMsg.ResolvedData.AllData) + dm.UpdateMessageCache(&newMsg.Message.Message, newMsg.AllData) return nil } diff --git a/internal/data/data_manager_test.go b/internal/data/data_manager_test.go index 0c652c4276..8be4529e84 100644 --- a/internal/data/data_manager_test.go +++ b/internal/data/data_manager_test.go @@ -166,12 +166,12 @@ func TestWriteNewMessageE2E(t *testing.T) { {Value: fftypes.JSONAnyPtr(`"message 2 - data C"`)}, } - err = dm.ResolveInlineDataPrivate(ctx, newMsg1) + err = dm.ResolveInlineData(ctx, newMsg1) assert.NoError(t, err) - err = dm.ResolveInlineDataPrivate(ctx, newMsg2) + err = dm.ResolveInlineData(ctx, newMsg2) assert.NoError(t, err) - allData := append(append(fftypes.DataArray{}, newMsg1.ResolvedData.NewData...), newMsg2.ResolvedData.NewData...) + allData := append(append(fftypes.DataArray{}, newMsg1.NewData...), newMsg2.NewData...) assert.Len(t, allData, 4) mdi.On("InsertMessages", mock.Anything, mock.MatchedBy(func(msgs []*fftypes.Message) bool { @@ -189,10 +189,10 @@ func TestWriteNewMessageE2E(t *testing.T) { dataByID[*data.ID] = true } return len(dataArray) == 4 && - dataByID[*newMsg1.ResolvedData.AllData[1].ID] && - dataByID[*newMsg1.ResolvedData.AllData[2].ID] && - dataByID[*newMsg2.ResolvedData.AllData[0].ID] && - dataByID[*newMsg2.ResolvedData.AllData[1].ID] + dataByID[*newMsg1.AllData[1].ID] && + dataByID[*newMsg1.AllData[2].ID] && + dataByID[*newMsg2.AllData[0].ID] && + dataByID[*newMsg2.AllData[1].ID] })).Return(nil).Once() results := make(chan error) @@ -397,9 +397,9 @@ func TestResolveInlineDataEmpty(t *testing.T) { }, } - err := dm.ResolveInlineDataPrivate(ctx, newMsg) + err := dm.ResolveInlineData(ctx, newMsg) assert.NoError(t, err) - assert.Empty(t, newMsg.ResolvedData.AllData) + assert.Empty(t, newMsg.AllData) assert.Empty(t, newMsg.Message.Data) } @@ -417,17 +417,16 @@ func TestResolveInlineDataRefIDOnlyOK(t *testing.T) { Hash: dataHash, }, nil) - err := dm.ResolveInlineDataPrivate(ctx, newMsg) + err := dm.ResolveInlineData(ctx, newMsg) assert.NoError(t, err) - assert.Len(t, newMsg.ResolvedData.AllData, 1) + assert.Len(t, newMsg.AllData, 1) assert.Len(t, newMsg.Message.Data, 1) - assert.Equal(t, dataID, newMsg.ResolvedData.AllData[0].ID) - assert.Equal(t, dataHash, newMsg.ResolvedData.AllData[0].Hash) - assert.Empty(t, newMsg.ResolvedData.NewData) - assert.Empty(t, newMsg.ResolvedData.DataToPublish) + assert.Equal(t, dataID, newMsg.AllData[0].ID) + assert.Equal(t, dataHash, newMsg.AllData[0].Hash) + assert.Empty(t, newMsg.NewData) } -func TestResolveInlineDataBroadcastDataToPublish(t *testing.T) { +func TestResolveInlineDataDataToPublish(t *testing.T) { dm, ctx, cancel := newTestDataManager(t) defer cancel() mdi := dm.database.(*databasemocks.Plugin) @@ -448,20 +447,16 @@ func TestResolveInlineDataBroadcastDataToPublish(t *testing.T) { PayloadRef: "blob/1", }, nil) - err := dm.ResolveInlineDataBroadcast(ctx, newMsg) + err := dm.ResolveInlineData(ctx, newMsg) assert.NoError(t, err) - assert.Len(t, newMsg.ResolvedData.AllData, 1) + assert.Len(t, newMsg.AllData, 1) assert.Len(t, newMsg.Message.Data, 1) - assert.Empty(t, newMsg.ResolvedData.NewData) - assert.Len(t, newMsg.ResolvedData.DataToPublish, 1) - assert.Equal(t, dataID, newMsg.ResolvedData.AllData[0].ID) - assert.Equal(t, dataHash, newMsg.ResolvedData.AllData[0].Hash) - assert.Len(t, newMsg.ResolvedData.DataToPublish, 1) - assert.Equal(t, newMsg.ResolvedData.AllData[0].ID, newMsg.ResolvedData.DataToPublish[0].Data.ID) - assert.Equal(t, "blob/1", newMsg.ResolvedData.DataToPublish[0].Blob.PayloadRef) + assert.Empty(t, newMsg.NewData) + assert.Equal(t, dataID, newMsg.AllData[0].ID) + assert.Equal(t, dataHash, newMsg.AllData[0].Hash) } -func TestResolveInlineDataBroadcastResolveBlobFail(t *testing.T) { +func TestResolveInlineDataResolveBlobFail(t *testing.T) { dm, ctx, cancel := newTestDataManager(t) defer cancel() mdi := dm.database.(*databasemocks.Plugin) @@ -479,7 +474,7 @@ func TestResolveInlineDataBroadcastResolveBlobFail(t *testing.T) { }, nil) mdi.On("GetBlobMatchingHash", ctx, blobHash).Return(nil, fmt.Errorf("pop")) - err := dm.ResolveInlineDataBroadcast(ctx, newMsg) + err := dm.ResolveInlineData(ctx, newMsg) assert.EqualError(t, err, "pop") } @@ -496,7 +491,7 @@ func TestResolveInlineDataRefBadNamespace(t *testing.T) { Hash: dataHash, }, nil) - err := dm.ResolveInlineDataPrivate(ctx, newMsg) + err := dm.ResolveInlineData(ctx, newMsg) assert.Regexp(t, "FF10204", err) } @@ -513,7 +508,7 @@ func TestResolveInlineDataRefBadHash(t *testing.T) { Hash: dataHash, }, nil) - err := dm.ResolveInlineDataPrivate(ctx, newMsg) + err := dm.ResolveInlineData(ctx, newMsg) assert.Regexp(t, "FF10204", err) } @@ -521,7 +516,7 @@ func TestResolveInlineDataNilMsg(t *testing.T) { dm, ctx, cancel := newTestDataManager(t) defer cancel() - err := dm.ResolveInlineDataPrivate(ctx, &NewMessage{}) + err := dm.ResolveInlineData(ctx, &NewMessage{}) assert.Regexp(t, "FF10368", err) } @@ -534,7 +529,7 @@ func TestResolveInlineDataRefLookkupFail(t *testing.T) { mdi.On("GetDataByID", ctx, dataID, true).Return(nil, fmt.Errorf("pop")) - err := dm.ResolveInlineDataPrivate(ctx, newMsg) + err := dm.ResolveInlineData(ctx, newMsg) assert.EqualError(t, err, "pop") } @@ -550,13 +545,13 @@ func TestResolveInlineDataValueNoValidatorOK(t *testing.T) { {Value: fftypes.JSONAnyPtr(`{"some":"json"}`)}, } - err := dm.ResolveInlineDataPrivate(ctx, newMsg) + err := dm.ResolveInlineData(ctx, newMsg) assert.NoError(t, err) - assert.Len(t, newMsg.ResolvedData.AllData, 1) - assert.Len(t, newMsg.ResolvedData.NewData, 1) + assert.Len(t, newMsg.AllData, 1) + assert.Len(t, newMsg.NewData, 1) assert.Len(t, newMsg.Message.Data, 1) - assert.NotNil(t, newMsg.ResolvedData.AllData[0].ID) - assert.NotNil(t, newMsg.ResolvedData.AllData[0].Hash) + assert.NotNil(t, newMsg.AllData[0].ID) + assert.NotNil(t, newMsg.AllData[0].Hash) } func TestResolveInlineDataValueWithValidation(t *testing.T) { @@ -592,12 +587,12 @@ func TestResolveInlineDataValueWithValidation(t *testing.T) { }, } - err := dm.ResolveInlineDataPrivate(ctx, newMsg) + err := dm.ResolveInlineData(ctx, newMsg) assert.NoError(t, err) - assert.Len(t, newMsg.ResolvedData.AllData, 1) - assert.Len(t, newMsg.ResolvedData.NewData, 1) - assert.NotNil(t, newMsg.ResolvedData.AllData[0].ID) - assert.NotNil(t, newMsg.ResolvedData.AllData[0].Hash) + assert.Len(t, newMsg.AllData, 1) + assert.Len(t, newMsg.NewData, 1) + assert.NotNil(t, newMsg.AllData[0].ID) + assert.NotNil(t, newMsg.AllData[0].Hash) newMsg.Message.InlineData = fftypes.InlineData{ { @@ -608,7 +603,7 @@ func TestResolveInlineDataValueWithValidation(t *testing.T) { Value: fftypes.JSONAnyPtr(`{"not_allowed":"value"}`), }, } - err = dm.ResolveInlineDataPrivate(ctx, newMsg) + err = dm.ResolveInlineData(ctx, newMsg) assert.Regexp(t, "FF10198", err) } @@ -621,7 +616,7 @@ func TestResolveInlineDataNoRefOrValue(t *testing.T) { { /* missing */ }, } - err := dm.ResolveInlineDataPrivate(ctx, newMsg) + err := dm.ResolveInlineData(ctx, newMsg) assert.Regexp(t, "FF10205", err) } @@ -654,7 +649,7 @@ func TestValidateAndStoreLoadNilRef(t *testing.T) { dm, ctx, cancel := newTestDataManager(t) defer cancel() - _, _, err := dm.validateInputData(ctx, "ns1", &fftypes.DataRefOrValue{ + _, err := dm.validateInputData(ctx, "ns1", &fftypes.DataRefOrValue{ Validator: fftypes.ValidatorTypeJSON, Datatype: nil, }) @@ -667,7 +662,7 @@ func TestValidateAndStoreLoadValidatorUnknown(t *testing.T) { defer cancel() mdi := dm.database.(*databasemocks.Plugin) mdi.On("GetDatatypeByName", mock.Anything, "ns1", "customer", "0.0.1").Return(nil, nil) - _, _, err := dm.validateInputData(ctx, "ns1", &fftypes.DataRefOrValue{ + _, err := dm.validateInputData(ctx, "ns1", &fftypes.DataRefOrValue{ Validator: "wrong!", Datatype: &fftypes.DatatypeRef{ Name: "customer", @@ -684,7 +679,7 @@ func TestValidateAndStoreLoadBadRef(t *testing.T) { defer cancel() mdi := dm.database.(*databasemocks.Plugin) mdi.On("GetDatatypeByName", mock.Anything, "ns1", "customer", "0.0.1").Return(nil, nil) - _, _, err := dm.validateInputData(ctx, "ns1", &fftypes.DataRefOrValue{ + _, err := dm.validateInputData(ctx, "ns1", &fftypes.DataRefOrValue{ Datatype: &fftypes.DatatypeRef{ // Missing name }, @@ -698,7 +693,7 @@ func TestValidateAndStoreNotFound(t *testing.T) { defer cancel() mdi := dm.database.(*databasemocks.Plugin) mdi.On("GetDatatypeByName", mock.Anything, "ns1", "customer", "0.0.1").Return(nil, nil) - _, _, err := dm.validateInputData(ctx, "ns1", &fftypes.DataRefOrValue{ + _, err := dm.validateInputData(ctx, "ns1", &fftypes.DataRefOrValue{ Datatype: &fftypes.DatatypeRef{ Name: "customer", Version: "0.0.1", @@ -714,7 +709,7 @@ func TestValidateAndStoreBlobError(t *testing.T) { mdi := dm.database.(*databasemocks.Plugin) blobHash := fftypes.NewRandB32() mdi.On("GetBlobMatchingHash", mock.Anything, blobHash).Return(nil, fmt.Errorf("pop")) - _, _, err := dm.validateInputData(ctx, "ns1", &fftypes.DataRefOrValue{ + _, err := dm.validateInputData(ctx, "ns1", &fftypes.DataRefOrValue{ Blob: &fftypes.BlobRef{ Hash: blobHash, }, @@ -729,7 +724,7 @@ func TestValidateAndStoreBlobNotFound(t *testing.T) { mdi := dm.database.(*databasemocks.Plugin) blobHash := fftypes.NewRandB32() mdi.On("GetBlobMatchingHash", mock.Anything, blobHash).Return(nil, nil) - _, _, err := dm.validateInputData(ctx, "ns1", &fftypes.DataRefOrValue{ + _, err := dm.validateInputData(ctx, "ns1", &fftypes.DataRefOrValue{ Blob: &fftypes.BlobRef{ Hash: blobHash, }, diff --git a/internal/data/message_writer.go b/internal/data/message_writer.go index 98c92f4912..7bcfef51f4 100644 --- a/internal/data/message_writer.go +++ b/internal/data/message_writer.go @@ -27,14 +27,9 @@ import ( ) type NewMessage struct { - Message *fftypes.MessageInOut - ResolvedData Resolved -} - -type Resolved struct { - AllData fftypes.DataArray - NewData fftypes.DataArray - DataToPublish []*fftypes.DataAndBlob + Message *fftypes.MessageInOut + AllData fftypes.DataArray + NewData fftypes.DataArray } // writeRequest is a combination of a message and a list of data that is new and needs to be @@ -100,7 +95,7 @@ func (mw *messageWriter) WriteNewMessage(ctx context.Context, newMsg *NewMessage } nmi := &writeRequest{ newMessage: &newMsg.Message.Message, - newData: newMsg.ResolvedData.NewData, + newData: newMsg.NewData, result: make(chan error), } select { @@ -112,7 +107,7 @@ func (mw *messageWriter) WriteNewMessage(ctx context.Context, newMsg *NewMessage } // Otherwise do it in-line on this context return mw.database.RunAsGroup(ctx, func(ctx context.Context) error { - return mw.writeMessages(ctx, []*fftypes.Message{&newMsg.Message.Message}, newMsg.ResolvedData.NewData) + return mw.writeMessages(ctx, []*fftypes.Message{&newMsg.Message.Message}, newMsg.NewData) }) } diff --git a/internal/data/message_writer_test.go b/internal/data/message_writer_test.go index 0f8bbea2e4..59ffb007b7 100644 --- a/internal/data/message_writer_test.go +++ b/internal/data/message_writer_test.go @@ -91,9 +91,7 @@ func TestWriteNewMessageSyncFallback(t *testing.T) { err := mw.WriteNewMessage(customCtx, &NewMessage{ Message: msg1, - ResolvedData: Resolved{ - NewData: fftypes.DataArray{data1}, - }, + NewData: fftypes.DataArray{data1}, }) assert.NoError(t, err) diff --git a/internal/events/aggregator.go b/internal/events/aggregator.go index 539ca72265..a8f3144ecb 100644 --- a/internal/events/aggregator.go +++ b/internal/events/aggregator.go @@ -337,11 +337,13 @@ func (ag *aggregator) checkOnchainConsistency(ctx context.Context, msg *fftypes. func (ag *aggregator) processMessage(ctx context.Context, manifest *fftypes.BatchManifest, pin *fftypes.Pin, msgBaseIndex int64, msgEntry *fftypes.MessageManifestEntry, state *batchState) (err error) { l := log.L(ctx) - var cros []data.CacheReadOption + var cro data.CacheReadOption if pin.Masked { - cros = []data.CacheReadOption{data.CRORequirePins} + cro = data.CRORequirePins + } else { + cro = data.CRORequirePublicBlobRefs } - msg, data, dataAvailable, err := ag.data.GetMessageWithDataCached(ctx, msgEntry.ID, cros...) + msg, data, dataAvailable, err := ag.data.GetMessageWithDataCached(ctx, msgEntry.ID, cro) if err != nil { return err } @@ -534,14 +536,14 @@ func (ag *aggregator) resolveBlobs(ctx context.Context, data fftypes.DataArray) return false, err } if blob != nil { - l.Debugf("Blob '%s' downloaded from shared storage to local DX with ref '%s'", blob.Hash, blob.PayloadRef) + l.Debugf("Blob '%s' for data %s downloaded from shared storage to local DX with ref '%s'", blob.Hash, d.ID, blob.PayloadRef) continue } } // If we've reached here, the data isn't available yet. // This isn't an error, we just need to wait for it to arrive. - l.Debugf("Blob '%s' not available", d.Blob.Hash) + l.Debugf("Blob '%s' not available for data %s", d.Blob.Hash, d.ID) return false, nil } diff --git a/internal/events/aggregator_test.go b/internal/events/aggregator_test.go index fe338608bb..eefec9d436 100644 --- a/internal/events/aggregator_test.go +++ b/internal/events/aggregator_test.go @@ -428,7 +428,7 @@ func TestAggregationBroadcast(t *testing.T) { // Do not resolve any pins earlier mdi.On("GetPins", mock.Anything, mock.Anything).Return([]*fftypes.Pin{}, nil, nil) // Validate the message is ok - mdm.On("GetMessageWithDataCached", ag.ctx, batch.Payload.Messages[0].Header.ID).Return(batch.Payload.Messages[0], fftypes.DataArray{}, true, nil) + mdm.On("GetMessageWithDataCached", ag.ctx, batch.Payload.Messages[0].Header.ID, data.CRORequirePublicBlobRefs).Return(batch.Payload.Messages[0], fftypes.DataArray{}, true, nil) mdm.On("ValidateAll", ag.ctx, mock.Anything).Return(true, nil) // Insert the confirmed event mdi.On("InsertEvent", ag.ctx, mock.MatchedBy(func(e *fftypes.Event) bool { @@ -521,7 +521,7 @@ func TestAggregationMigratedBroadcast(t *testing.T) { // Do not resolve any pins earlier mdi.On("GetPins", mock.Anything, mock.Anything).Return([]*fftypes.Pin{}, nil, nil) // Validate the message is ok - mdm.On("GetMessageWithDataCached", ag.ctx, batch.Payload.Messages[0].Header.ID).Return(batch.Payload.Messages[0], fftypes.DataArray{}, true, nil) + mdm.On("GetMessageWithDataCached", ag.ctx, batch.Payload.Messages[0].Header.ID, data.CRORequirePublicBlobRefs).Return(batch.Payload.Messages[0], fftypes.DataArray{}, true, nil) mdm.On("ValidateAll", ag.ctx, mock.Anything).Return(true, nil) // Insert the confirmed event mdi.On("InsertEvent", ag.ctx, mock.MatchedBy(func(e *fftypes.Event) bool { @@ -841,7 +841,7 @@ func TestProcessSkipDupMsg(t *testing.T) { mdi.On("UpdateOffset", ag.ctx, mock.Anything, mock.Anything, mock.Anything).Return(nil) mdm := ag.data.(*datamocks.Manager) - mdm.On("GetMessageWithDataCached", ag.ctx, mock.Anything).Return(batch.Payload.Messages[0], nil, true, nil) + mdm.On("GetMessageWithDataCached", ag.ctx, mock.Anything, data.CRORequirePublicBlobRefs).Return(batch.Payload.Messages[0], nil, true, nil) err := ag.processPins(ag.ctx, []*fftypes.Pin{ {Sequence: 12345, Batch: batchID, Index: 0, Hash: fftypes.NewRandB32()}, @@ -879,7 +879,7 @@ func TestProcessMsgFailGetPins(t *testing.T) { mdi.On("GetPins", mock.Anything, mock.Anything).Return(nil, nil, fmt.Errorf("pop")) mdm := ag.data.(*datamocks.Manager) - mdm.On("GetMessageWithDataCached", ag.ctx, mock.Anything).Return(batch.Payload.Messages[0], nil, true, nil) + mdm.On("GetMessageWithDataCached", ag.ctx, mock.Anything, data.CRORequirePublicBlobRefs).Return(batch.Payload.Messages[0], nil, true, nil) err := ag.processPins(ag.ctx, []*fftypes.Pin{ {Sequence: 12345, Batch: batchID, Index: 0, Hash: fftypes.NewRandB32()}, @@ -1010,7 +1010,7 @@ func TestProcessMsgFailDispatch(t *testing.T) { } mdm := ag.data.(*datamocks.Manager) - mdm.On("GetMessageWithDataCached", ag.ctx, mock.Anything).Return(msg, nil, true, nil) + mdm.On("GetMessageWithDataCached", ag.ctx, mock.Anything, data.CRORequirePublicBlobRefs).Return(msg, nil, true, nil) mim := ag.identity.(*identitymanagermocks.Manager) mim.On("FindIdentityForVerifier", ag.ctx, mock.Anything, mock.Anything, mock.Anything).Return(nil, fmt.Errorf("pop")) @@ -1572,8 +1572,8 @@ func TestDispatchBroadcastQueuesLaterDispatch(t *testing.T) { mim.On("FindIdentityForVerifier", ag.ctx, mock.Anything, mock.Anything, mock.Anything).Return(org1, nil) mdm := ag.data.(*datamocks.Manager) - mdm.On("GetMessageWithDataCached", ag.ctx, msg1.Header.ID).Return(msg1, fftypes.DataArray{}, true, nil).Once() - mdm.On("GetMessageWithDataCached", ag.ctx, msg2.Header.ID).Return(msg2, fftypes.DataArray{}, true, nil).Once() + mdm.On("GetMessageWithDataCached", ag.ctx, msg1.Header.ID, data.CRORequirePublicBlobRefs).Return(msg1, fftypes.DataArray{}, true, nil).Once() + mdm.On("GetMessageWithDataCached", ag.ctx, msg2.Header.ID, data.CRORequirePublicBlobRefs).Return(msg2, fftypes.DataArray{}, true, nil).Once() mdi := ag.database.(*databasemocks.Plugin) mdi.On("GetPins", ag.ctx, mock.Anything).Return([]*fftypes.Pin{}, nil, nil) diff --git a/internal/privatemessaging/message.go b/internal/privatemessaging/message.go index 6f79d1a115..e2526c1ce6 100644 --- a/internal/privatemessaging/message.go +++ b/internal/privatemessaging/message.go @@ -141,7 +141,7 @@ func (s *messageSender) resolve(ctx context.Context) error { } // The data manager is responsible for the heavy lifting of storing/validating all our in-line data elements - err := s.mgr.data.ResolveInlineDataPrivate(ctx, s.msg) + err := s.mgr.data.ResolveInlineData(ctx, s.msg) return err } diff --git a/internal/privatemessaging/message_test.go b/internal/privatemessaging/message_test.go index 96fee24bb1..586ce6b568 100644 --- a/internal/privatemessaging/message_test.go +++ b/internal/privatemessaging/message_test.go @@ -85,7 +85,7 @@ func TestSendConfirmMessageE2EOk(t *testing.T) { mim.On("CachedIdentityLookupByID", pm.ctx, rootOrg.ID).Return(rootOrg, nil) mdm := pm.data.(*datamocks.Manager) - mdm.On("ResolveInlineDataPrivate", pm.ctx, mock.Anything).Return(nil) + mdm.On("ResolveInlineData", pm.ctx, mock.Anything).Return(nil) mdm.On("WriteNewMessage", pm.ctx, mock.Anything).Return(nil).Once() mdi := pm.database.(*databasemocks.Plugin) @@ -138,7 +138,7 @@ func TestSendUnpinnedMessageE2EOk(t *testing.T) { groupID := fftypes.NewRandB32() mdm := pm.data.(*datamocks.Manager) - mdm.On("ResolveInlineDataPrivate", pm.ctx, mock.Anything).Return(nil) + mdm.On("ResolveInlineData", pm.ctx, mock.Anything).Return(nil) mdm.On("WriteNewMessage", pm.ctx, mock.Anything).Return(nil).Once() mdi := pm.database.(*databasemocks.Plugin) @@ -235,7 +235,7 @@ func TestResolveAndSendBadInlineData(t *testing.T) { mdi.On("GetGroupByHash", pm.ctx, mock.Anything, mock.Anything).Return(&fftypes.Group{Hash: fftypes.NewRandB32()}, nil, nil).Once() mdm := pm.data.(*datamocks.Manager) - mdm.On("ResolveInlineDataPrivate", pm.ctx, mock.Anything).Return(fmt.Errorf("pop")) + mdm.On("ResolveInlineData", pm.ctx, mock.Anything).Return(fmt.Errorf("pop")) message := &messageSender{ mgr: pm, @@ -277,7 +277,7 @@ func TestSendUnpinnedMessageTooLarge(t *testing.T) { dataID := fftypes.NewUUID() groupID := fftypes.NewRandB32() mdm := pm.data.(*datamocks.Manager) - mdm.On("ResolveInlineDataPrivate", pm.ctx, mock.Anything).Run(func(args mock.Arguments) { + mdm.On("ResolveInlineData", pm.ctx, mock.Anything).Run(func(args mock.Arguments) { newMsg := args[1].(*data.NewMessage) newMsg.Message.Data = fftypes.DataRefs{ {ID: dataID, Hash: fftypes.NewRandB32(), ValueSize: 100001}, @@ -352,7 +352,7 @@ func TestMessagePrepare(t *testing.T) { mdi.On("GetGroupByHash", pm.ctx, mock.Anything, mock.Anything).Return(&fftypes.Group{Hash: fftypes.NewRandB32()}, nil, nil).Once() mdm := pm.data.(*datamocks.Manager) - mdm.On("ResolveInlineDataPrivate", pm.ctx, mock.Anything).Return(nil) + mdm.On("ResolveInlineData", pm.ctx, mock.Anything).Return(nil) message := pm.NewMessage("ns1", &fftypes.MessageInOut{ Message: fftypes.Message{ @@ -425,7 +425,7 @@ func TestSendUnpinnedMessageInsertFail(t *testing.T) { groupID := fftypes.NewRandB32() mdm := pm.data.(*datamocks.Manager) - mdm.On("ResolveInlineDataPrivate", pm.ctx, mock.Anything).Return(nil) + mdm.On("ResolveInlineData", pm.ctx, mock.Anything).Return(nil) mdm.On("WriteNewMessage", pm.ctx, mock.Anything).Return(fmt.Errorf("pop")).Once() mdi := pm.database.(*databasemocks.Plugin) @@ -606,7 +606,7 @@ func TestRequestReplySuccess(t *testing.T) { Return(nil, nil) mdm := pm.data.(*datamocks.Manager) - mdm.On("ResolveInlineDataPrivate", pm.ctx, mock.Anything).Return(nil) + mdm.On("ResolveInlineData", pm.ctx, mock.Anything).Return(nil) mdm.On("WriteNewMessage", pm.ctx, mock.Anything).Return(nil).Once() groupID := fftypes.NewRandB32() diff --git a/internal/restclient/ffresty.go b/internal/restclient/ffresty.go index 36f93c80b2..1c03a9c3d9 100644 --- a/internal/restclient/ffresty.go +++ b/internal/restclient/ffresty.go @@ -31,6 +31,7 @@ import ( "github.com/hyperledger/firefly/internal/i18n" "github.com/hyperledger/firefly/internal/log" "github.com/hyperledger/firefly/pkg/fftypes" + "github.com/sirupsen/logrus" ) type retryCtxKey struct{} @@ -52,7 +53,12 @@ func OnAfterResponse(c *resty.Client, resp *resty.Response) { rctx := resp.Request.Context() rc := rctx.Value(retryCtxKey{}).(*retryCtx) elapsed := float64(time.Since(rc.start)) / float64(time.Millisecond) - log.L(rctx).Infof("<== %s %s [%d] (%.2fms)", resp.Request.Method, resp.Request.URL, resp.StatusCode(), elapsed) + level := logrus.DebugLevel + status := resp.StatusCode() + if status >= 300 { + level = logrus.ErrorLevel + } + log.L(rctx).Logf(level, "<== %s %s [%d] (%.2fms)", resp.Request.Method, resp.Request.URL, status, elapsed) } // New creates a new Resty client, using static configuration (from the config file) @@ -117,7 +123,7 @@ func New(ctx context.Context, staticConfig config.Prefix) *resty.Client { rctx = log.WithLogger(rctx, l) req.SetContext(rctx) } - log.L(rctx).Infof("==> %s %s%s", req.Method, url, req.URL) + log.L(rctx).Debugf("==> %s %s%s", req.Method, url, req.URL) return nil }) diff --git a/mocks/datamocks/manager.go b/mocks/datamocks/manager.go index f2e35335d6..1a8594774e 100644 --- a/mocks/datamocks/manager.go +++ b/mocks/datamocks/manager.go @@ -32,11 +32,6 @@ func (_m *Manager) CheckDatatype(ctx context.Context, ns string, datatype *fftyp return r0 } -// Close provides a mock function with given fields: -func (_m *Manager) Close() { - _m.Called() -} - // CopyBlobPStoDX provides a mock function with given fields: ctx, _a1 func (_m *Manager) CopyBlobPStoDX(ctx context.Context, _a1 *fftypes.Data) (*fftypes.Blob, error) { ret := _m.Called(ctx, _a1) @@ -198,22 +193,8 @@ func (_m *Manager) HydrateBatch(ctx context.Context, persistedBatch *fftypes.Bat return r0, r1 } -// ResolveInlineDataBroadcast provides a mock function with given fields: ctx, msg -func (_m *Manager) ResolveInlineDataBroadcast(ctx context.Context, msg *data.NewMessage) error { - ret := _m.Called(ctx, msg) - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, *data.NewMessage) error); ok { - r0 = rf(ctx, msg) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// ResolveInlineDataPrivate provides a mock function with given fields: ctx, msg -func (_m *Manager) ResolveInlineDataPrivate(ctx context.Context, msg *data.NewMessage) error { +// ResolveInlineData provides a mock function with given fields: ctx, msg +func (_m *Manager) ResolveInlineData(ctx context.Context, msg *data.NewMessage) error { ret := _m.Called(ctx, msg) var r0 error