Skip to content
19 changes: 12 additions & 7 deletions internal/assets/token_transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type transferSender struct {
namespace string
transfer *fftypes.TokenTransferInput
resolved bool
msgSender sysmessaging.MessageSender
}

// sendMethod is the specific operation requested of the transferSender.
Expand Down Expand Up @@ -189,14 +190,14 @@ func (s *transferSender) resolveAndSend(ctx context.Context, method sendMethod)
return s.sendInternal(ctx, method)
}

func (s *transferSender) resolve(ctx context.Context) error {
func (s *transferSender) resolve(ctx context.Context) (err error) {
// Resolve the attached message
if s.transfer.Message != nil {
sender, err := s.buildTransferMessage(ctx, s.namespace, s.transfer.Message)
s.msgSender, err = s.buildTransferMessage(ctx, s.namespace, s.transfer.Message)
if err != nil {
return err
}
if err = sender.Prepare(ctx); err != nil {
if err = s.msgSender.Prepare(ctx); err != nil {
return err
}
s.transfer.TokenTransfer.Message = s.transfer.Message.Header.ID
Expand Down Expand Up @@ -255,16 +256,20 @@ func (s *transferSender) sendInternal(ctx context.Context, method sendMethod) er
return err
}

if s.transfer.Message != nil {
s.transfer.Message.State = fftypes.MessageStateStaged
err = s.mgr.database.UpsertMessage(ctx, &s.transfer.Message.Message, database.UpsertOptimizationNew)
}
return err
})
if err != nil {
return err
}

// Write the transfer message outside of any DB transaction, as it will use the background message writer.
if s.transfer.Message != nil {
s.transfer.Message.State = fftypes.MessageStateStaged
if err = s.msgSender.Send(ctx); err != nil {
return err
}
}

return s.mgr.operations.RunOperation(ctx, opTransfer(op, pool, &s.transfer.TokenTransfer))
}

Expand Down
70 changes: 61 additions & 9 deletions internal/assets/token_transfer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -800,9 +800,7 @@ func TestTransferTokensWithBroadcastMessage(t *testing.T) {
mdi.On("InsertOperation", context.Background(), mock.Anything).Return(nil)
mbm.On("NewBroadcast", "ns1", transfer.Message).Return(mms)
mms.On("Prepare", context.Background()).Return(nil)
mdi.On("UpsertMessage", context.Background(), mock.MatchedBy(func(msg *fftypes.Message) bool {
return msg.State == fftypes.MessageStateStaged
}), database.UpsertOptimizationNew).Return(nil)
mms.On("Send", context.Background()).Return(nil)
mom.On("RunOperation", context.Background(), mock.MatchedBy(func(op *fftypes.PreparedOperation) bool {
data := op.Data.(transferData)
return op.Type == fftypes.OpTypeTokenTransfer && data.Pool == pool && data.Transfer == &transfer.TokenTransfer
Expand All @@ -821,6 +819,64 @@ func TestTransferTokensWithBroadcastMessage(t *testing.T) {
mom.AssertExpectations(t)
}

func TestTransferTokensWithBroadcastMessageSendFail(t *testing.T) {
am, cancel := newTestAssets(t)
defer cancel()

msgID := fftypes.NewUUID()
hash := fftypes.NewRandB32()
transfer := &fftypes.TokenTransferInput{
TokenTransfer: fftypes.TokenTransfer{
From: "A",
To: "B",
Amount: *fftypes.NewFFBigInt(5),
},
Pool: "pool1",
Message: &fftypes.MessageInOut{
Message: fftypes.Message{
Header: fftypes.MessageHeader{
ID: msgID,
},
Hash: hash,
},
InlineData: fftypes.InlineData{
{
Value: fftypes.JSONAnyPtr("test data"),
},
},
},
}
pool := &fftypes.TokenPool{
State: fftypes.TokenPoolStateConfirmed,
}

mdi := am.database.(*databasemocks.Plugin)
mim := am.identity.(*identitymanagermocks.Manager)
mbm := am.broadcast.(*broadcastmocks.Manager)
mms := &sysmessagingmocks.MessageSender{}
mth := am.txHelper.(*txcommonmocks.Helper)
mom := am.operations.(*operationmocks.Manager)
mim.On("NormalizeSigningKey", context.Background(), "", identity.KeyNormalizationBlockchainPlugin).Return("0x12345", nil)
mdi.On("GetTokenPool", context.Background(), "ns1", "pool1").Return(pool, nil)
mth.On("SubmitNewTransaction", context.Background(), "ns1", fftypes.TransactionTypeTokenTransfer).Return(fftypes.NewUUID(), nil)
mdi.On("InsertOperation", context.Background(), mock.Anything).Return(nil)
mbm.On("NewBroadcast", "ns1", transfer.Message).Return(mms)
mms.On("Prepare", context.Background()).Return(nil)
mms.On("Send", context.Background()).Return(fmt.Errorf("pop"))

_, err := am.TransferTokens(context.Background(), "ns1", transfer, false)
assert.Regexp(t, "pop", err)
assert.Equal(t, *msgID, *transfer.TokenTransfer.Message)
assert.Equal(t, *hash, *transfer.TokenTransfer.MessageHash)

mbm.AssertExpectations(t)
mim.AssertExpectations(t)
mdi.AssertExpectations(t)
mms.AssertExpectations(t)
mth.AssertExpectations(t)
mom.AssertExpectations(t)
}

func TestTransferTokensWithBroadcastPrepareFail(t *testing.T) {
am, cancel := newTestAssets(t)
defer cancel()
Expand Down Expand Up @@ -900,9 +956,7 @@ func TestTransferTokensWithPrivateMessage(t *testing.T) {
mdi.On("InsertOperation", context.Background(), mock.Anything).Return(nil)
mpm.On("NewMessage", "ns1", transfer.Message).Return(mms)
mms.On("Prepare", context.Background()).Return(nil)
mdi.On("UpsertMessage", context.Background(), mock.MatchedBy(func(msg *fftypes.Message) bool {
return msg.State == fftypes.MessageStateStaged
}), database.UpsertOptimizationNew).Return(nil)
mms.On("Send", context.Background()).Return(nil)
mom.On("RunOperation", context.Background(), mock.MatchedBy(func(op *fftypes.PreparedOperation) bool {
data := op.Data.(transferData)
return op.Type == fftypes.OpTypeTokenTransfer && data.Pool == pool && data.Transfer == &transfer.TokenTransfer
Expand Down Expand Up @@ -1047,9 +1101,7 @@ func TestTransferTokensWithBroadcastConfirm(t *testing.T) {
mth.On("SubmitNewTransaction", context.Background(), "ns1", fftypes.TransactionTypeTokenTransfer).Return(fftypes.NewUUID(), nil)
mbm.On("NewBroadcast", "ns1", transfer.Message).Return(mms)
mms.On("Prepare", context.Background()).Return(nil)
mdi.On("UpsertMessage", context.Background(), mock.MatchedBy(func(msg *fftypes.Message) bool {
return msg.State == fftypes.MessageStateStaged
}), database.UpsertOptimizationNew).Return(nil)
mms.On("Send", context.Background()).Return(nil)
msa.On("WaitForMessage", context.Background(), "ns1", mock.Anything, mock.Anything).
Run(func(args mock.Arguments) {
send := args[3].(syncasync.RequestSender)
Expand Down
36 changes: 8 additions & 28 deletions internal/broadcast/datatype_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,11 @@ func TestBroadcastDatatypeBadValue(t *testing.T) {
func TestBroadcastUpsertFail(t *testing.T) {
bm, cancel := newTestBroadcast(t)
defer cancel()
mdi := bm.database.(*databasemocks.Plugin)
mdm := bm.data.(*datamocks.Manager)
mim := bm.identity.(*identitymanagermocks.Manager)

mim.On("ResolveInputSigningIdentity", mock.Anything, "ns1", mock.Anything).Return(nil)
mdi.On("UpsertData", mock.Anything, mock.Anything, database.UpsertOptimizationNew).Return(fmt.Errorf("pop"))
mdm.On("WriteNewMessage", mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("pop"))
mdm.On("VerifyNamespaceExists", mock.Anything, "ns1").Return(nil)
mdm.On("CheckDatatype", mock.Anything, "ns1", mock.Anything).Return(nil)

Expand All @@ -89,6 +88,9 @@ func TestBroadcastUpsertFail(t *testing.T) {
Value: fftypes.JSONAnyPtr(`{"some": "data"}`),
}, false)
assert.EqualError(t, err, "pop")

mim.AssertExpectations(t)
mdm.AssertExpectations(t)
}

func TestBroadcastDatatypeInvalid(t *testing.T) {
Expand All @@ -112,41 +114,16 @@ func TestBroadcastDatatypeInvalid(t *testing.T) {
assert.EqualError(t, err, "pop")
}

func TestBroadcastBroadcastFail(t *testing.T) {
bm, cancel := newTestBroadcast(t)
defer cancel()
mdi := bm.database.(*databasemocks.Plugin)
mdm := bm.data.(*datamocks.Manager)
mim := bm.identity.(*identitymanagermocks.Manager)

mim.On("ResolveInputSigningIdentity", mock.Anything, "ns1", mock.Anything).Return(nil)
mdi.On("UpsertData", mock.Anything, mock.Anything, database.UpsertOptimizationNew).Return(nil)
mdm.On("VerifyNamespaceExists", mock.Anything, "ns1").Return(nil)
mdm.On("CheckDatatype", mock.Anything, "ns1", mock.Anything).Return(nil)
mdi.On("UpsertMessage", mock.Anything, mock.Anything, database.UpsertOptimizationNew).Return(fmt.Errorf("pop"))

_, err := bm.BroadcastDatatype(context.Background(), "ns1", &fftypes.Datatype{
Namespace: "ns1",
Name: "ent1",
Version: "0.0.1",
Value: fftypes.JSONAnyPtr(`{"some": "data"}`),
}, false)
assert.EqualError(t, err, "pop")
}

func TestBroadcastOk(t *testing.T) {
bm, cancel := newTestBroadcast(t)
defer cancel()
mdi := bm.database.(*databasemocks.Plugin)
mdm := bm.data.(*datamocks.Manager)
mim := bm.identity.(*identitymanagermocks.Manager)

mim.On("ResolveInputSigningIdentity", mock.Anything, "ns1", mock.Anything).Return(nil)
mdi.On("UpsertData", mock.Anything, mock.Anything, database.UpsertOptimizationNew).Return(nil)
mdm.On("VerifyNamespaceExists", mock.Anything, "ns1").Return(nil)
mdm.On("CheckDatatype", mock.Anything, "ns1", mock.Anything).Return(nil)
mdm.On("UpdateMessageCache", mock.Anything, mock.Anything).Return()
mdi.On("UpsertMessage", mock.Anything, mock.Anything, database.UpsertOptimizationNew).Return(nil)
mdm.On("WriteNewMessage", mock.Anything, mock.Anything, mock.Anything).Return(nil)

_, err := bm.BroadcastDatatype(context.Background(), "ns1", &fftypes.Datatype{
Namespace: "ns1",
Expand All @@ -155,4 +132,7 @@ func TestBroadcastOk(t *testing.T) {
Value: fftypes.JSONAnyPtr(`{"some": "data"}`),
}, false)
assert.NoError(t, err)

mdm.AssertExpectations(t)
mim.AssertExpectations(t)
}
50 changes: 25 additions & 25 deletions internal/broadcast/definition.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ import (
"context"
"encoding/json"

"github.com/hyperledger/firefly/internal/data"
"github.com/hyperledger/firefly/internal/i18n"
"github.com/hyperledger/firefly/internal/identity"
"github.com/hyperledger/firefly/pkg/database"
"github.com/hyperledger/firefly/pkg/fftypes"
)

Expand Down Expand Up @@ -52,59 +52,59 @@ func (bm *broadcastManager) BroadcastIdentityClaim(ctx context.Context, ns strin
return bm.broadcastDefinitionCommon(ctx, ns, def, signingIdentity, tag, waitConfirm)
}

func (bm *broadcastManager) broadcastDefinitionCommon(ctx context.Context, ns string, def fftypes.Definition, signingIdentity *fftypes.SignerRef, tag string, waitConfirm bool) (msg *fftypes.Message, err error) {
func (bm *broadcastManager) broadcastDefinitionCommon(ctx context.Context, ns string, def fftypes.Definition, signingIdentity *fftypes.SignerRef, tag string, waitConfirm bool) (*fftypes.Message, error) {

// Serialize it into a data object, as a piece of data we can write to a message
data := &fftypes.Data{
d := &fftypes.Data{
Validator: fftypes.ValidatorTypeSystemDefinition,
ID: fftypes.NewUUID(),
Namespace: ns,
Created: fftypes.Now(),
}
b, err := json.Marshal(&def)
if err == nil {
data.Value = fftypes.JSONAnyPtrBytes(b)
err = data.Seal(ctx, nil)
d.Value = fftypes.JSONAnyPtrBytes(b)
err = d.Seal(ctx, nil)
}
if err != nil {
return nil, i18n.WrapError(ctx, err, i18n.MsgSerializationFailed)
}

// Write as data to the local store
if err = bm.database.UpsertData(ctx, data, database.UpsertOptimizationNew); err != nil {
return nil, err
}

// Create a broadcast message referring to the data
in := &fftypes.MessageInOut{
Message: fftypes.Message{
Header: fftypes.MessageHeader{
Namespace: ns,
Type: fftypes.MessageTypeDefinition,
SignerRef: *signingIdentity,
Topics: fftypes.FFStringArray{def.Topic()},
Tag: tag,
TxType: fftypes.TransactionTypeBatchPin,
},
Data: fftypes.DataRefs{
{ID: data.ID, Hash: data.Hash},
newMsg := &data.NewMessage{
Message: &fftypes.MessageInOut{
Message: fftypes.Message{
Header: fftypes.MessageHeader{
Namespace: ns,
Type: fftypes.MessageTypeDefinition,
SignerRef: *signingIdentity,
Topics: fftypes.FFStringArray{def.Topic()},
Tag: tag,
TxType: fftypes.TransactionTypeBatchPin,
},
Data: fftypes.DataRefs{
{ID: d.ID, Hash: d.Hash, ValueSize: d.ValueSize},
},
},
},
ResolvedData: data.Resolved{
NewData: fftypes.DataArray{d},
AllData: fftypes.DataArray{d},
},
}

// Broadcast the message
sender := broadcastSender{
mgr: bm,
namespace: ns,
msg: in,
msg: newMsg,
resolved: true,
data: fftypes.DataArray{data},
}
sender.setDefaults()
if waitConfirm {
err = sender.SendAndWait(ctx)
} else {
err = sender.Send(ctx)
}
return &in.Message, err
return &newMsg.Message.Message, err
}
Loading