Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 3 additions & 7 deletions internal/batch/batch_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,14 +187,10 @@ func (bm *batchManager) getProcessor(txType fftypes.TransactionType, msgType fft
return processor, nil
}

func (bm *batchManager) assembleMessageData(batchType fftypes.BatchType, msg *fftypes.Message) (retData fftypes.DataArray, err error) {
var cro []data.CacheReadOption
if batchType == fftypes.BatchTypeBroadcast {
cro = append(cro, data.CRORequirePublicBlobRefs)
}
func (bm *batchManager) assembleMessageData(msg *fftypes.Message) (retData fftypes.DataArray, err error) {
var foundAll = false
err = bm.retry.Do(bm.ctx, fmt.Sprintf("assemble message %s data", msg.Header.ID), func(attempt int) (retry bool, err error) {
retData, foundAll, err = bm.data.GetMessageDataCached(bm.ctx, msg, cro...)
retData, foundAll, err = bm.data.GetMessageDataCached(bm.ctx, msg)
// continual retry for persistence error (distinct from not-found)
return true, err
})
Expand Down Expand Up @@ -246,7 +242,7 @@ func (bm *batchManager) messageSequencer() {
continue
}

data, err := bm.assembleMessageData(processor.conf.DispatcherOptions.BatchType, msg)
data, err := bm.assembleMessageData(msg)
if err != nil {
l.Errorf("Failed to retrieve message data for %s: %s", msg.Header.ID, err)
continue
Expand Down
11 changes: 5 additions & 6 deletions internal/batch/batch_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"time"

"github.com/hyperledger/firefly/internal/config"
"github.com/hyperledger/firefly/internal/data"
"github.com/hyperledger/firefly/internal/log"
"github.com/hyperledger/firefly/internal/txcommon"
"github.com/hyperledger/firefly/mocks/databasemocks"
Expand Down Expand Up @@ -352,7 +351,7 @@ func TestMessageSequencerMissingMessageData(t *testing.T) {
}).
Once()
mdi.On("GetMessages", mock.Anything, mock.Anything, mock.Anything).Return([]*fftypes.Message{}, nil, nil)
mdm.On("GetMessageDataCached", mock.Anything, mock.Anything, data.CRORequirePublicBlobRefs).Return(fftypes.DataArray{}, false, nil)
mdm.On("GetMessageDataCached", mock.Anything, mock.Anything).Return(fftypes.DataArray{}, false, nil)

bm.(*batchManager).messageSequencer()

Expand Down Expand Up @@ -541,7 +540,7 @@ func TestAssembleMessageDataNilData(t *testing.T) {
bm, _ := NewBatchManager(context.Background(), mni, mdi, mdm, txHelper)
bm.Close()
mdm.On("GetMessageDataCached", mock.Anything, mock.Anything).Return(nil, false, nil)
_, err := bm.(*batchManager).assembleMessageData(fftypes.BatchTypePrivate, &fftypes.Message{
_, err := bm.(*batchManager).assembleMessageData(&fftypes.Message{
Header: fftypes.MessageHeader{
ID: fftypes.NewUUID(),
},
Expand All @@ -558,7 +557,7 @@ func TestGetMessageDataFail(t *testing.T) {
bm, _ := NewBatchManager(context.Background(), mni, mdi, mdm, txHelper)
mdm.On("GetMessageDataCached", mock.Anything, mock.Anything).Return(nil, false, fmt.Errorf("pop"))
bm.Close()
_, _ = bm.(*batchManager).assembleMessageData(fftypes.BatchTypePrivate, &fftypes.Message{
_, _ = bm.(*batchManager).assembleMessageData(&fftypes.Message{
Header: fftypes.MessageHeader{
ID: fftypes.NewUUID(),
},
Expand All @@ -575,9 +574,9 @@ func TestGetMessageNotFound(t *testing.T) {
mni := &sysmessagingmocks.LocalNodeInfo{}
txHelper := txcommon.NewTransactionHelper(mdi, mdm)
bm, _ := NewBatchManager(context.Background(), mni, mdi, mdm, txHelper)
mdm.On("GetMessageDataCached", mock.Anything, mock.Anything, data.CRORequirePublicBlobRefs).Return(nil, false, nil)
mdm.On("GetMessageDataCached", mock.Anything, mock.Anything).Return(nil, false, nil)
bm.Close()
_, err := bm.(*batchManager).assembleMessageData(fftypes.BatchTypeBroadcast, &fftypes.Message{
_, err := bm.(*batchManager).assembleMessageData(&fftypes.Message{
Header: fftypes.MessageHeader{
ID: fftypes.NewUUID(),
},
Expand Down
6 changes: 2 additions & 4 deletions internal/broadcast/definition.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,8 @@ func (bm *broadcastManager) broadcastDefinitionCommon(ctx context.Context, ns st
},
},
},
ResolvedData: data.Resolved{
NewData: fftypes.DataArray{d},
AllData: fftypes.DataArray{d},
},
NewData: fftypes.DataArray{d},
AllData: fftypes.DataArray{d},
}

// Broadcast the message
Expand Down
45 changes: 31 additions & 14 deletions internal/broadcast/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,12 @@ func (bm *broadcastManager) Name() string {
}

func (bm *broadcastManager) dispatchBatch(ctx context.Context, state *batch.DispatchState) error {

// Ensure all the blobs are published
if err := bm.publishBlobs(ctx, state.Payload.Data); err != nil {
return err
}

// The completed SharedStorage upload
op := fftypes.NewOperation(
bm.sharedstorage,
Expand All @@ -144,21 +150,32 @@ func (bm *broadcastManager) dispatchBatch(ctx context.Context, state *batch.Disp
return bm.batchpin.SubmitPinnedBatch(ctx, &state.Persisted, state.Pins)
}

func (bm *broadcastManager) publishBlobs(ctx context.Context, newMsg *data.NewMessage) error {
for _, d := range newMsg.ResolvedData.DataToPublish {
// Stream from the local data exchange ...
reader, err := bm.exchange.DownloadBLOB(ctx, d.Blob.PayloadRef)
if err != nil {
return i18n.WrapError(ctx, err, i18n.MsgDownloadBlobFailed, d.Blob.PayloadRef)
}
defer reader.Close()

// ... to the shared storage
d.Data.Blob.Public, err = bm.sharedstorage.PublishData(ctx, reader)
if err != nil {
return err
func (bm *broadcastManager) publishBlobs(ctx context.Context, data fftypes.DataArray) error {
for _, d := range data {
// We only need to send a blob if there is one, and it's not been uploaded to the shared storage
if d.Blob != nil && d.Blob.Hash != nil && d.Blob.Public == "" {
blob, err := bm.database.GetBlobMatchingHash(ctx, d.Blob.Hash)
if err != nil {
return err
}
if blob == nil {
return i18n.NewError(ctx, i18n.MsgBlobNotFound, d.Blob.Hash)
}

// Stream from the local data exchange ...
reader, err := bm.exchange.DownloadBLOB(ctx, blob.PayloadRef)
if err != nil {
return i18n.WrapError(ctx, err, i18n.MsgDownloadBlobFailed, blob.PayloadRef)
}
defer reader.Close()

// ... to the shared storage
d.Blob.Public, err = bm.sharedstorage.PublishData(ctx, reader)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was the part where I got stuck before. How does this public ref 1) get stored in the database, and 2) not cause the data hash to change? Hasn't the batch been sealed and stored before we reach here?

Copy link
Contributor

@awrichar awrichar Mar 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After refreshing myself on the other recent batching changes, it looks like the answer to (2) is that the hash calculation changed to not include the public ref as part of the hash. That's good.

Still can't figure out (1), ie where this data item is updated in the database with the public reference received here. It's definitely updated when it's received again in the event aggregator, but I'd expect it's updated prior to that as well... I'm likely just missing something.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great point. It's not currently at the point the batch is written to the comms layer. As you observe, it is when it comes back in the other end at the end of this PR - but I actually optimized that away in a later change.

I will add it back in, as it's only a performance hit for blobs. But if it's ok will do that as a separate PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added in 17b9f90 in #599

if err != nil {
return err
}
log.L(ctx).Infof("Published blob with hash '%s' for data '%s' to shared storage: '%s'", d.Blob.Hash, d.ID, d.Blob.Public)
}
log.L(ctx).Infof("Published blob with hash '%s' for data '%s' to shared storage: '%s'", d.Blob.Hash, d.Data.ID, d.Data.Blob.Public)
}

return nil
Expand Down
Loading