Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions internal/assets/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,14 @@ type assetManager struct {
keyNormalization int
}

func NewAssetManager(ctx context.Context, di database.Plugin, im identity.Manager, dm data.Manager, sa syncasync.Bridge, bm broadcast.Manager, pm privatemessaging.Manager, ti map[string]tokens.Plugin, mm metrics.Manager, om operations.Manager) (Manager, error) {
func NewAssetManager(ctx context.Context, di database.Plugin, im identity.Manager, dm data.Manager, sa syncasync.Bridge, bm broadcast.Manager, pm privatemessaging.Manager, ti map[string]tokens.Plugin, mm metrics.Manager, om operations.Manager, txHelper txcommon.Helper) (Manager, error) {
if di == nil || im == nil || sa == nil || bm == nil || pm == nil || ti == nil || mm == nil || om == nil {
return nil, i18n.NewError(ctx, i18n.MsgInitializationNilDepError)
}
am := &assetManager{
ctx: ctx,
database: di,
txHelper: txcommon.NewTransactionHelper(di),
txHelper: txHelper,
identity: im,
data: dm,
syncasync: sa,
Expand Down
9 changes: 6 additions & 3 deletions internal/assets/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"testing"

"github.com/hyperledger/firefly/internal/config"
"github.com/hyperledger/firefly/internal/txcommon"
"github.com/hyperledger/firefly/mocks/broadcastmocks"
"github.com/hyperledger/firefly/mocks/databasemocks"
"github.com/hyperledger/firefly/mocks/datamocks"
Expand Down Expand Up @@ -47,11 +48,12 @@ func newTestAssets(t *testing.T) (*assetManager, func()) {
mti := &tokenmocks.Plugin{}
mm := &metricsmocks.Manager{}
mom := &operationmocks.Manager{}
txHelper := txcommon.NewTransactionHelper(mdi, mdm)
mti.On("Name").Return("ut_tokens").Maybe()
mm.On("IsMetricsEnabled").Return(false)
mom.On("RegisterHandler", mock.Anything, mock.Anything, mock.Anything)
ctx, cancel := context.WithCancel(context.Background())
a, err := NewAssetManager(ctx, mdi, mim, mdm, msa, mbm, mpm, map[string]tokens.Plugin{"magic-tokens": mti}, mm, mom)
a, err := NewAssetManager(ctx, mdi, mim, mdm, msa, mbm, mpm, map[string]tokens.Plugin{"magic-tokens": mti}, mm, mom, txHelper)
rag := mdi.On("RunAsGroup", mock.Anything, mock.Anything).Maybe()
rag.RunFn = func(a mock.Arguments) {
rag.ReturnArguments = mock.Arguments{a[1].(func(context.Context) error)(a[0].(context.Context))}
Expand All @@ -73,12 +75,13 @@ func newTestAssetsWithMetrics(t *testing.T) (*assetManager, func()) {
mti := &tokenmocks.Plugin{}
mm := &metricsmocks.Manager{}
mom := &operationmocks.Manager{}
txHelper := txcommon.NewTransactionHelper(mdi, mdm)
mti.On("Name").Return("ut_tokens").Maybe()
mm.On("IsMetricsEnabled").Return(true)
mm.On("TransferSubmitted", mock.Anything)
mom.On("RegisterHandler", mock.Anything, mock.Anything, mock.Anything)
ctx, cancel := context.WithCancel(context.Background())
a, err := NewAssetManager(ctx, mdi, mim, mdm, msa, mbm, mpm, map[string]tokens.Plugin{"magic-tokens": mti}, mm, mom)
a, err := NewAssetManager(ctx, mdi, mim, mdm, msa, mbm, mpm, map[string]tokens.Plugin{"magic-tokens": mti}, mm, mom, txHelper)
rag := mdi.On("RunAsGroup", mock.Anything, mock.Anything).Maybe()
rag.RunFn = func(a mock.Arguments) {
rag.ReturnArguments = mock.Arguments{a[1].(func(context.Context) error)(a[0].(context.Context))}
Expand All @@ -90,7 +93,7 @@ func newTestAssetsWithMetrics(t *testing.T) (*assetManager, func()) {
}

func TestInitFail(t *testing.T) {
_, err := NewAssetManager(context.Background(), nil, nil, nil, nil, nil, nil, nil, nil, nil)
_, err := NewAssetManager(context.Background(), nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
assert.Regexp(t, "FF10128", err)
}

Expand Down
22 changes: 13 additions & 9 deletions internal/batch/batch_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,12 @@ import (
"github.com/hyperledger/firefly/internal/log"
"github.com/hyperledger/firefly/internal/retry"
"github.com/hyperledger/firefly/internal/sysmessaging"
"github.com/hyperledger/firefly/internal/txcommon"
"github.com/hyperledger/firefly/pkg/database"
"github.com/hyperledger/firefly/pkg/fftypes"
)

func NewBatchManager(ctx context.Context, ni sysmessaging.LocalNodeInfo, di database.Plugin, dm data.Manager) (Manager, error) {
func NewBatchManager(ctx context.Context, ni sysmessaging.LocalNodeInfo, di database.Plugin, dm data.Manager, txHelper txcommon.Helper) (Manager, error) {
if di == nil || dm == nil {
return nil, i18n.NewError(ctx, i18n.MsgInitializationNilDepError)
}
Expand All @@ -44,6 +45,7 @@ func NewBatchManager(ctx context.Context, ni sysmessaging.LocalNodeInfo, di data
ni: ni,
database: di,
data: dm,
txHelper: txHelper,
readOffset: -1, // On restart we trawl for all ready messages
readPageSize: uint64(readPageSize),
messagePollTimeout: config.GetDuration(config.BatchManagerReadPollTimeout),
Expand Down Expand Up @@ -85,6 +87,7 @@ type batchManager struct {
ni sysmessaging.LocalNodeInfo
database database.Plugin
data data.Manager
txHelper txcommon.Helper
dispatcherMux sync.Mutex
dispatchers map[string]*dispatcher
newMessages chan int64
Expand Down Expand Up @@ -170,21 +173,18 @@ func (bm *batchManager) getProcessor(txType fftypes.TransactionType, msgType fft
dispatch: dispatcher.handler,
},
bm.retry,
bm.txHelper,
)
dispatcher.processors[name] = processor
}
log.L(bm.ctx).Debugf("Created new processor: %s", name)
return processor, nil
}

func (bm *batchManager) assembleMessageData(batchType fftypes.BatchType, msg *fftypes.Message) (retData fftypes.DataArray, err error) {
var cro []data.CacheReadOption
if batchType == fftypes.BatchTypeBroadcast {
cro = append(cro, data.CRORequirePublicBlobRefs)
}
func (bm *batchManager) assembleMessageData(msg *fftypes.Message) (retData fftypes.DataArray, err error) {
var foundAll = false
err = bm.retry.Do(bm.ctx, fmt.Sprintf("assemble message %s data", msg.Header.ID), func(attempt int) (retry bool, err error) {
retData, foundAll, err = bm.data.GetMessageDataCached(bm.ctx, msg, cro...)
retData, foundAll, err = bm.data.GetMessageDataCached(bm.ctx, msg)
// continual retry for persistence error (distinct from not-found)
return true, err
})
Expand Down Expand Up @@ -236,7 +236,7 @@ func (bm *batchManager) messageSequencer() {
continue
}

data, err := bm.assembleMessageData(processor.conf.DispatcherOptions.BatchType, msg)
data, err := bm.assembleMessageData(msg)
if err != nil {
l.Errorf("Failed to retrieve message data for %s: %s", msg.Header.ID, err)
continue
Expand Down Expand Up @@ -343,10 +343,14 @@ func (bm *batchManager) getProcessors() []*batchProcessor {
bm.dispatcherMux.Lock()
defer bm.dispatcherMux.Unlock()

exists := make(map[*batchProcessor]bool)
var processors []*batchProcessor
for _, d := range bm.dispatchers {
for _, p := range d.processors {
processors = append(processors, p)
if !exists[p] {
processors = append(processors, p)
exists[p] = true
}
}
}
return processors
Expand Down
56 changes: 35 additions & 21 deletions internal/batch/batch_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import (
"time"

"github.com/hyperledger/firefly/internal/config"
"github.com/hyperledger/firefly/internal/data"
"github.com/hyperledger/firefly/internal/log"
"github.com/hyperledger/firefly/internal/txcommon"
"github.com/hyperledger/firefly/mocks/databasemocks"
"github.com/hyperledger/firefly/mocks/datamocks"
"github.com/hyperledger/firefly/mocks/sysmessagingmocks"
Expand All @@ -41,6 +41,7 @@ func TestE2EDispatchBroadcast(t *testing.T) {
mdi := &databasemocks.Plugin{}
mdm := &datamocks.Manager{}
mni := &sysmessagingmocks.LocalNodeInfo{}
txHelper := txcommon.NewTransactionHelper(mdi, mdm)
mni.On("GetNodeUUID", mock.Anything).Return(fftypes.NewUUID())
readyForDispatch := make(chan bool)
waitForDispatch := make(chan *DispatchState)
Expand Down Expand Up @@ -70,7 +71,7 @@ func TestE2EDispatchBroadcast(t *testing.T) {
return nil
}
ctx, cancel := context.WithCancel(context.Background())
bmi, _ := NewBatchManager(ctx, mni, mdi, mdm)
bmi, _ := NewBatchManager(ctx, mni, mdi, mdm, txHelper)
bm := bmi.(*batchManager)

bm.RegisterDispatcher("utdispatcher", fftypes.TransactionTypeBatchPin, []fftypes.MessageType{fftypes.MessageTypeBroadcast}, handler, DispatcherOptions{
Expand Down Expand Up @@ -155,6 +156,7 @@ func TestE2EDispatchPrivateUnpinned(t *testing.T) {
mdi := &databasemocks.Plugin{}
mdm := &datamocks.Manager{}
mni := &sysmessagingmocks.LocalNodeInfo{}
txHelper := txcommon.NewTransactionHelper(mdi, mdm)
mni.On("GetNodeUUID", mock.Anything).Return(fftypes.NewUUID())
readyForDispatch := make(chan bool)
waitForDispatch := make(chan *DispatchState)
Expand Down Expand Up @@ -187,7 +189,7 @@ func TestE2EDispatchPrivateUnpinned(t *testing.T) {
return nil
}
ctx, cancel := context.WithCancel(context.Background())
bmi, _ := NewBatchManager(ctx, mni, mdi, mdm)
bmi, _ := NewBatchManager(ctx, mni, mdi, mdm, txHelper)
bm := bmi.(*batchManager)

bm.RegisterDispatcher("utdispatcher", fftypes.TransactionTypeBatchPin, []fftypes.MessageType{fftypes.MessageTypePrivate}, handler, DispatcherOptions{
Expand Down Expand Up @@ -268,8 +270,9 @@ func TestDispatchUnknownType(t *testing.T) {
mdi := &databasemocks.Plugin{}
mdm := &datamocks.Manager{}
mni := &sysmessagingmocks.LocalNodeInfo{}
txHelper := txcommon.NewTransactionHelper(mdi, mdm)
ctx, cancel := context.WithCancel(context.Background())
bmi, _ := NewBatchManager(ctx, mni, mdi, mdm)
bmi, _ := NewBatchManager(ctx, mni, mdi, mdm, txHelper)
bm := bmi.(*batchManager)

msg := &fftypes.Message{}
Expand All @@ -285,7 +288,7 @@ func TestDispatchUnknownType(t *testing.T) {
}

func TestInitFailNoPersistence(t *testing.T) {
_, err := NewBatchManager(context.Background(), nil, nil, nil)
_, err := NewBatchManager(context.Background(), nil, nil, nil, nil)
assert.Error(t, err)
}

Expand All @@ -294,7 +297,8 @@ func TestGetInvalidBatchTypeMsg(t *testing.T) {
mdi := &databasemocks.Plugin{}
mdm := &datamocks.Manager{}
mni := &sysmessagingmocks.LocalNodeInfo{}
bm, _ := NewBatchManager(context.Background(), mni, mdi, mdm)
txHelper := txcommon.NewTransactionHelper(mdi, mdm)
bm, _ := NewBatchManager(context.Background(), mni, mdi, mdm, txHelper)
defer bm.Close()
_, err := bm.(*batchManager).getProcessor(fftypes.BatchTypeBroadcast, "wrong", nil, "ns1", &fftypes.SignerRef{})
assert.Regexp(t, "FF10126", err)
Expand All @@ -304,8 +308,9 @@ func TestMessageSequencerCancelledContext(t *testing.T) {
mdi := &databasemocks.Plugin{}
mdm := &datamocks.Manager{}
mni := &sysmessagingmocks.LocalNodeInfo{}
txHelper := txcommon.NewTransactionHelper(mdi, mdm)
mdi.On("GetMessages", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil, fmt.Errorf("pop"))
bm, _ := NewBatchManager(context.Background(), mni, mdi, mdm)
bm, _ := NewBatchManager(context.Background(), mni, mdi, mdm, txHelper)
defer bm.Close()
ctx, cancel := context.WithCancel(context.Background())
cancel()
Expand All @@ -318,7 +323,8 @@ func TestMessageSequencerMissingMessageData(t *testing.T) {
mdi := &databasemocks.Plugin{}
mdm := &datamocks.Manager{}
mni := &sysmessagingmocks.LocalNodeInfo{}
bm, _ := NewBatchManager(context.Background(), mni, mdi, mdm)
txHelper := txcommon.NewTransactionHelper(mdi, mdm)
bm, _ := NewBatchManager(context.Background(), mni, mdi, mdm, txHelper)
bm.RegisterDispatcher("utdispatcher", fftypes.TransactionTypeNone, []fftypes.MessageType{fftypes.MessageTypeBroadcast},
func(c context.Context, state *DispatchState) error {
return nil
Expand All @@ -345,7 +351,7 @@ func TestMessageSequencerMissingMessageData(t *testing.T) {
}).
Once()
mdi.On("GetMessages", mock.Anything, mock.Anything, mock.Anything).Return([]*fftypes.Message{}, nil, nil)
mdm.On("GetMessageDataCached", mock.Anything, mock.Anything, data.CRORequirePublicBlobRefs).Return(fftypes.DataArray{}, false, nil)
mdm.On("GetMessageDataCached", mock.Anything, mock.Anything).Return(fftypes.DataArray{}, false, nil)

bm.(*batchManager).messageSequencer()

Expand All @@ -359,9 +365,10 @@ func TestMessageSequencerUpdateMessagesFail(t *testing.T) {
mdi := &databasemocks.Plugin{}
mdm := &datamocks.Manager{}
mni := &sysmessagingmocks.LocalNodeInfo{}
txHelper := txcommon.NewTransactionHelper(mdi, mdm)
mni.On("GetNodeUUID", mock.Anything).Return(fftypes.NewUUID())
ctx, cancelCtx := context.WithCancel(context.Background())
bm, _ := NewBatchManager(ctx, mni, mdi, mdm)
bm, _ := NewBatchManager(ctx, mni, mdi, mdm, txHelper)
bm.RegisterDispatcher("utdispatcher", fftypes.TransactionTypeBatchPin, []fftypes.MessageType{fftypes.MessageTypeBroadcast},
func(c context.Context, state *DispatchState) error {
return nil
Expand Down Expand Up @@ -414,8 +421,9 @@ func TestMessageSequencerDispatchFail(t *testing.T) {
mdm := &datamocks.Manager{}
mni := &sysmessagingmocks.LocalNodeInfo{}
mni.On("GetNodeUUID", mock.Anything).Return(fftypes.NewUUID())
txHelper := txcommon.NewTransactionHelper(mdi, mdm)
ctx, cancelCtx := context.WithCancel(context.Background())
bm, _ := NewBatchManager(ctx, mni, mdi, mdm)
bm, _ := NewBatchManager(ctx, mni, mdi, mdm, txHelper)
bm.RegisterDispatcher("utdispatcher", fftypes.TransactionTypeBatchPin, []fftypes.MessageType{fftypes.MessageTypeBroadcast},
func(c context.Context, state *DispatchState) error {
cancelCtx()
Expand Down Expand Up @@ -454,7 +462,8 @@ func TestMessageSequencerUpdateBatchFail(t *testing.T) {
mni := &sysmessagingmocks.LocalNodeInfo{}
mni.On("GetNodeUUID", mock.Anything).Return(fftypes.NewUUID())
ctx, cancelCtx := context.WithCancel(context.Background())
bm, _ := NewBatchManager(ctx, mni, mdi, mdm)
txHelper := txcommon.NewTransactionHelper(mdi, mdm)
bm, _ := NewBatchManager(ctx, mni, mdi, mdm, txHelper)
bm.RegisterDispatcher("utdispatcher", fftypes.TransactionTypeBatchPin, []fftypes.MessageType{fftypes.MessageTypeBroadcast},
func(c context.Context, state *DispatchState) error {
return nil
Expand Down Expand Up @@ -504,7 +513,8 @@ func TestWaitForPollTimeout(t *testing.T) {
mdi := &databasemocks.Plugin{}
mdm := &datamocks.Manager{}
mni := &sysmessagingmocks.LocalNodeInfo{}
bm, _ := NewBatchManager(context.Background(), mni, mdi, mdm)
txHelper := txcommon.NewTransactionHelper(mdi, mdm)
bm, _ := NewBatchManager(context.Background(), mni, mdi, mdm, txHelper)
bm.(*batchManager).messagePollTimeout = 1 * time.Microsecond
bm.(*batchManager).waitForNewMessages()
}
Expand All @@ -513,7 +523,8 @@ func TestWaitForNewMessage(t *testing.T) {
mdi := &databasemocks.Plugin{}
mdm := &datamocks.Manager{}
mni := &sysmessagingmocks.LocalNodeInfo{}
bmi, _ := NewBatchManager(context.Background(), mni, mdi, mdm)
txHelper := txcommon.NewTransactionHelper(mdi, mdm)
bmi, _ := NewBatchManager(context.Background(), mni, mdi, mdm, txHelper)
bm := bmi.(*batchManager)
bm.readOffset = 22222
bm.NewMessages() <- 12345
Expand All @@ -525,10 +536,11 @@ func TestAssembleMessageDataNilData(t *testing.T) {
mdi := &databasemocks.Plugin{}
mdm := &datamocks.Manager{}
mni := &sysmessagingmocks.LocalNodeInfo{}
bm, _ := NewBatchManager(context.Background(), mni, mdi, mdm)
txHelper := txcommon.NewTransactionHelper(mdi, mdm)
bm, _ := NewBatchManager(context.Background(), mni, mdi, mdm, txHelper)
bm.Close()
mdm.On("GetMessageDataCached", mock.Anything, mock.Anything).Return(nil, false, nil)
_, err := bm.(*batchManager).assembleMessageData(fftypes.BatchTypePrivate, &fftypes.Message{
_, err := bm.(*batchManager).assembleMessageData(&fftypes.Message{
Header: fftypes.MessageHeader{
ID: fftypes.NewUUID(),
},
Expand All @@ -541,10 +553,11 @@ func TestGetMessageDataFail(t *testing.T) {
mdi := &databasemocks.Plugin{}
mdm := &datamocks.Manager{}
mni := &sysmessagingmocks.LocalNodeInfo{}
bm, _ := NewBatchManager(context.Background(), mni, mdi, mdm)
txHelper := txcommon.NewTransactionHelper(mdi, mdm)
bm, _ := NewBatchManager(context.Background(), mni, mdi, mdm, txHelper)
mdm.On("GetMessageDataCached", mock.Anything, mock.Anything).Return(nil, false, fmt.Errorf("pop"))
bm.Close()
_, _ = bm.(*batchManager).assembleMessageData(fftypes.BatchTypePrivate, &fftypes.Message{
_, _ = bm.(*batchManager).assembleMessageData(&fftypes.Message{
Header: fftypes.MessageHeader{
ID: fftypes.NewUUID(),
},
Expand All @@ -559,10 +572,11 @@ func TestGetMessageNotFound(t *testing.T) {
mdi := &databasemocks.Plugin{}
mdm := &datamocks.Manager{}
mni := &sysmessagingmocks.LocalNodeInfo{}
bm, _ := NewBatchManager(context.Background(), mni, mdi, mdm)
mdm.On("GetMessageDataCached", mock.Anything, mock.Anything, data.CRORequirePublicBlobRefs).Return(nil, false, nil)
txHelper := txcommon.NewTransactionHelper(mdi, mdm)
bm, _ := NewBatchManager(context.Background(), mni, mdi, mdm, txHelper)
mdm.On("GetMessageDataCached", mock.Anything, mock.Anything).Return(nil, false, nil)
bm.Close()
_, err := bm.(*batchManager).assembleMessageData(fftypes.BatchTypeBroadcast, &fftypes.Message{
_, err := bm.(*batchManager).assembleMessageData(&fftypes.Message{
Header: fftypes.MessageHeader{
ID: fftypes.NewUUID(),
},
Expand Down
4 changes: 2 additions & 2 deletions internal/batch/batch_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ type DispatchState struct {

const batchSizeEstimateBase = int64(512)

func newBatchProcessor(ctx context.Context, ni sysmessaging.LocalNodeInfo, di database.Plugin, dm data.Manager, conf *batchProcessorConf, baseRetryConf *retry.Retry) *batchProcessor {
func newBatchProcessor(ctx context.Context, ni sysmessaging.LocalNodeInfo, di database.Plugin, dm data.Manager, conf *batchProcessorConf, baseRetryConf *retry.Retry, txHelper txcommon.Helper) *batchProcessor {
pCtx := log.WithLogField(log.WithLogField(ctx, "d", conf.dispatcherName), "p", conf.name)
pCtx, cancelCtx := context.WithCancel(pCtx)
bp := &batchProcessor{
Expand All @@ -110,7 +110,7 @@ func newBatchProcessor(ctx context.Context, ni sysmessaging.LocalNodeInfo, di da
ni: ni,
database: di,
data: dm,
txHelper: txcommon.NewTransactionHelper(di),
txHelper: txHelper,
newWork: make(chan *batchWork, conf.BatchMaxSize),
quescing: make(chan bool, 1),
done: make(chan struct{}),
Expand Down
4 changes: 3 additions & 1 deletion internal/batch/batch_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/hyperledger/firefly/internal/config"
"github.com/hyperledger/firefly/internal/log"
"github.com/hyperledger/firefly/internal/retry"
"github.com/hyperledger/firefly/internal/txcommon"
"github.com/hyperledger/firefly/mocks/databasemocks"
"github.com/hyperledger/firefly/mocks/datamocks"
"github.com/hyperledger/firefly/mocks/sysmessagingmocks"
Expand All @@ -36,6 +37,7 @@ func newTestBatchProcessor(dispatch DispatchHandler) (*databasemocks.Plugin, *ba
mdi := &databasemocks.Plugin{}
mni := &sysmessagingmocks.LocalNodeInfo{}
mdm := &datamocks.Manager{}
txHelper := txcommon.NewTransactionHelper(mdi, mdm)
mni.On("GetNodeUUID", mock.Anything).Return(fftypes.NewUUID()).Maybe()
bp := newBatchProcessor(context.Background(), mni, mdi, mdm, &batchProcessorConf{
namespace: "ns1",
Expand All @@ -51,7 +53,7 @@ func newTestBatchProcessor(dispatch DispatchHandler) (*databasemocks.Plugin, *ba
}, &retry.Retry{
InitialDelay: 1 * time.Microsecond,
MaximumDelay: 1 * time.Microsecond,
})
}, txHelper)
bp.txHelper = &txcommonmocks.Helper{}
return mdi, bp
}
Expand Down
6 changes: 2 additions & 4 deletions internal/broadcast/definition.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,8 @@ func (bm *broadcastManager) broadcastDefinitionCommon(ctx context.Context, ns st
},
},
},
ResolvedData: data.Resolved{
NewData: fftypes.DataArray{d},
AllData: fftypes.DataArray{d},
},
NewData: fftypes.DataArray{d},
AllData: fftypes.DataArray{d},
}

// Broadcast the message
Expand Down
Loading